/* vim:set ts=2 sw=2 sts=2 et: */ /** * \author Marcus Holland-Moritz (github@mhxnet.de) * \copyright Copyright (c) Marcus Holland-Moritz * * This file is part of dwarfs. * * dwarfs is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * dwarfs is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with dwarfs. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace dwarfs::writer { namespace internal { using namespace dwarfs::internal; namespace { size_t copy_stream(std::istream& is, std::ostream& os) { std::streambuf* rdbuf = is.rdbuf(); std::streamsize count{0}; std::streamsize transferred; char buffer[1024]; while ((transferred = rdbuf->sgetn(buffer, sizeof(buffer))) > 0) { os.write(buffer, transferred); count += transferred; } return count; } std::string get_friendly_section_name(section_type type) { switch (type) { case section_type::METADATA_V2_SCHEMA: return "schema"; case section_type::METADATA_V2: return "metadata"; case section_type::HISTORY: return "history"; case section_type::BLOCK: return "block"; case section_type::SECTION_INDEX: return "index"; } return get_section_name(type); } class compression_progress : public progress::context { public: using status = progress::context::status; compression_progress() = default; status get_status() const override { status st; st.color = termcolor::RED; st.context = "[compressing] "; auto bin = bytes_in.load(); auto bout = bytes_out.load(); if (bin > 0 && bout > 0) { st.status_string = fmt::format("compressed {} to {} (ratio {:.2f}%)", size_with_unit(bin), size_with_unit(bout), 100.0 * bout / bin); } st.bytes_processed.emplace(bytes_in.load()); return st; } int get_priority() const override { return -1000; } std::atomic bytes_in{0}; std::atomic bytes_out{0}; }; class fsblock { public: fsblock(section_type type, block_compressor const& bc, std::shared_ptr&& data, std::shared_ptr pctx, folly::Function set_block_cb = nullptr); fsblock(section_type type, compression_type compression, std::span data); fsblock(fs_section sec, std::span data, std::shared_ptr pctx); fsblock(section_type type, block_compressor const& bc, std::span data, compression_type data_comp_type, std::shared_ptr pctx); void compress(worker_group& wg, std::optional meta = std::nullopt) { impl_->compress(wg, std::move(meta)); } void wait_until_compressed() { impl_->wait_until_compressed(); } section_type type() const { return impl_->type(); } compression_type compression() const { return impl_->compression(); } std::string description() const { return impl_->description(); } std::span data() const { return impl_->data(); } size_t uncompressed_size() const { return impl_->uncompressed_size(); } size_t size() const { return impl_->size(); } void set_block_no(uint32_t number) { impl_->set_block_no(number); } uint32_t block_no() const { return impl_->block_no(); } section_header_v2 const& header() const { return impl_->header(); } class impl { public: virtual ~impl() = default; virtual void compress(worker_group& wg, std::optional meta) = 0; virtual void wait_until_compressed() = 0; virtual section_type type() const = 0; virtual compression_type compression() const = 0; virtual std::string description() const = 0; virtual std::span data() const = 0; virtual size_t uncompressed_size() const = 0; virtual size_t size() const = 0; virtual void set_block_no(uint32_t number) = 0; virtual uint32_t block_no() const = 0; virtual section_header_v2 const& header() const = 0; }; static void build_section_header(section_header_v2& sh, fsblock::impl const& fsb, std::optional const& sec = std::nullopt); private: std::unique_ptr impl_; }; class fsblock_merger_policy { public: explicit fsblock_merger_policy(size_t worst_case_block_size) : worst_case_block_size_{worst_case_block_size} {} size_t block_size(std::unique_ptr const& fsb) const { assert(fsb->size() <= worst_case_block_size_); return fsb->size(); } size_t worst_case_source_block_size(fragment_category /*source_id*/) const { return worst_case_block_size_; } private: size_t worst_case_block_size_; }; class raw_fsblock : public fsblock::impl { public: raw_fsblock(section_type type, const block_compressor& bc, std::shared_ptr&& data, std::shared_ptr pctx, folly::Function set_block_cb) : type_{type} , bc_{bc} , uncompressed_size_{data->size()} , data_{std::move(data)} , comp_type_{bc_.type()} , pctx_{std::move(pctx)} , set_block_cb_{std::move(set_block_cb)} { DWARFS_CHECK(bc_, "block_compressor must not be null"); } void compress(worker_group& wg, std::optional meta) override { std::promise prom; future_ = prom.get_future(); wg.add_job([this, prom = std::move(prom), meta = std::move(meta)]() mutable { try { std::shared_ptr tmp; if (meta) { tmp = std::make_shared(bc_.compress(data_->vec(), *meta)); } else { tmp = std::make_shared(bc_.compress(data_->vec())); } pctx_->bytes_in += data_->vec().size(); pctx_->bytes_out += tmp->vec().size(); { std::lock_guard lock(mx_); data_.swap(tmp); } } catch (bad_compression_ratio_error const&) { comp_type_ = compression_type::NONE; } prom.set_value(); }); } void wait_until_compressed() override { future_.wait(); } section_type type() const override { return type_; } compression_type compression() const override { return comp_type_; } std::string description() const override { return bc_.describe(); } std::span data() const override { return data_->vec(); } size_t uncompressed_size() const override { return uncompressed_size_; } size_t size() const override { std::lock_guard lock(mx_); return data_->size(); } void set_block_no(uint32_t number) override { { std::lock_guard lock(mx_); if (number_) { DWARFS_THROW(runtime_error, "block number already set"); } number_ = number; } if (set_block_cb_) { set_block_cb_(number); } } uint32_t block_no() const override { std::lock_guard lock(mx_); return number_.value(); } section_header_v2 const& header() const override { std::lock_guard lock(mx_); if (!header_) { header_ = section_header_v2{}; fsblock::build_section_header(*header_, *this); } return header_.value(); } private: const section_type type_; block_compressor const& bc_; const size_t uncompressed_size_; mutable std::recursive_mutex mx_; std::shared_ptr data_; std::future future_; std::optional number_; std::optional mutable header_; compression_type comp_type_; std::shared_ptr pctx_; folly::Function set_block_cb_; }; class compressed_fsblock : public fsblock::impl { public: compressed_fsblock(section_type type, compression_type compression, std::span range) : type_{type} , compression_{compression} , range_{range} {} compressed_fsblock(fs_section sec, std::span range, std::shared_ptr pctx) : type_{sec.type()} , compression_{sec.compression()} , range_{range} , pctx_{std::move(pctx)} , sec_{std::move(sec)} {} void compress(worker_group& wg, std::optional /* meta */) override { std::promise prom; future_ = prom.get_future(); wg.add_job([this, prom = std::move(prom)]() mutable { fsblock::build_section_header(header_, *this, sec_); if (pctx_) { pctx_->bytes_in += size(); pctx_->bytes_out += size(); } prom.set_value(); }); } void wait_until_compressed() override { future_.wait(); } section_type type() const override { return type_; } compression_type compression() const override { return compression_; } // TODO std::string description() const override { return ""; } std::span data() const override { return range_; } size_t uncompressed_size() const override { return range_.size(); } size_t size() const override { return range_.size(); } void set_block_no(uint32_t number) override { number_ = number; } uint32_t block_no() const override { return number_.value(); } section_header_v2 const& header() const override { return header_; } private: section_type const type_; compression_type const compression_; std::span range_; std::future future_; std::optional number_; section_header_v2 header_; std::shared_ptr pctx_; std::optional sec_; }; class rewritten_fsblock : public fsblock::impl { public: rewritten_fsblock(section_type type, block_compressor const& bc, std::span data, compression_type data_comp_type, std::shared_ptr pctx) : type_{type} , bc_{bc} , data_{data} , comp_type_{bc_.type()} , pctx_{std::move(pctx)} , data_comp_type_{data_comp_type} { DWARFS_CHECK(bc_, "block_compressor must not be null"); } void compress(worker_group& wg, std::optional meta) override { std::promise prom; future_ = prom.get_future(); wg.add_job( [this, prom = std::move(prom), meta = std::move(meta)]() mutable { try { // TODO: we don't have to do this for uncompressed blocks std::vector block; block_decompressor bd(data_comp_type_, data_.data(), data_.size(), block); bd.decompress_frame(bd.uncompressed_size()); if (!meta) { meta = bd.metadata(); } pctx_->bytes_in += block.size(); // TODO: data_.size()? try { if (meta) { block = bc_.compress(block, *meta); } else { block = bc_.compress(block); } } catch (bad_compression_ratio_error const&) { comp_type_ = compression_type::NONE; } pctx_->bytes_out += block.size(); { std::lock_guard lock(mx_); block_data_.swap(block); } prom.set_value(); } catch (...) { prom.set_exception(std::current_exception()); } }); } void wait_until_compressed() override { future_.get(); } section_type type() const override { return type_; } compression_type compression() const override { return comp_type_; } std::string description() const override { return bc_.describe(); } std::span data() const override { return block_data_; } size_t uncompressed_size() const override { return data_.size(); } size_t size() const override { std::lock_guard lock(mx_); return block_data_.size(); } void set_block_no(uint32_t number) override { { std::lock_guard lock(mx_); if (number_) { DWARFS_THROW(runtime_error, "block number already set"); } number_ = number; } } uint32_t block_no() const override { std::lock_guard lock(mx_); return number_.value(); } section_header_v2 const& header() const override { std::lock_guard lock(mx_); if (!header_) { header_ = section_header_v2{}; fsblock::build_section_header(*header_, *this); } return header_.value(); } private: const section_type type_; block_compressor const& bc_; mutable std::recursive_mutex mx_; std::span data_; std::vector block_data_; std::future future_; std::optional number_; std::optional mutable header_; compression_type comp_type_; std::shared_ptr pctx_; compression_type const data_comp_type_; }; fsblock::fsblock(section_type type, block_compressor const& bc, std::shared_ptr&& data, std::shared_ptr pctx, folly::Function set_block_cb) : impl_(std::make_unique(type, bc, std::move(data), std::move(pctx), std::move(set_block_cb))) {} fsblock::fsblock(section_type type, compression_type compression, std::span data) : impl_(std::make_unique(type, compression, data)) {} fsblock::fsblock(fs_section sec, std::span data, std::shared_ptr pctx) : impl_(std::make_unique(std::move(sec), data, std::move(pctx))) {} fsblock::fsblock(section_type type, block_compressor const& bc, std::span data, compression_type data_comp_type, std::shared_ptr pctx) : impl_(std::make_unique(type, bc, data, data_comp_type, std::move(pctx))) {} void fsblock::build_section_header(section_header_v2& sh, fsblock::impl const& fsb, std::optional const& sec) { auto range = fsb.data(); ::memcpy(&sh.magic[0], "DWARFS", 6); sh.major = MAJOR_VERSION; sh.minor = MINOR_VERSION; sh.number = fsb.block_no(); sh.type = static_cast(fsb.type()); sh.compression = static_cast(fsb.compression()); sh.length = range.size(); if (sec) { // This isn't just an optimization, it is actually a bit of a safety // feature. If we have an existing section header that we've previously // validated and we use its checksums, we can be sure that any mistake // in copying the data will be detected. auto secnum = sec->section_number(); if (secnum && secnum.value() == sh.number) { auto xxh = sec->xxh3_64_value(); auto sha = sec->sha2_512_256_value(); if (xxh && sha && sha->size() == sizeof(sh.sha2_512_256)) { sh.xxh3_64 = xxh.value(); std::copy(sha->begin(), sha->end(), &sh.sha2_512_256[0]); return; } } } checksum xxh(checksum::algorithm::XXH3_64); xxh.update(&sh.number, sizeof(section_header_v2) - offsetof(section_header_v2, number)); xxh.update(range.data(), range.size()); DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed"); checksum sha(checksum::algorithm::SHA2_512_256); sha.update(&sh.xxh3_64, sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64)); sha.update(range.data(), range.size()); DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed"); } } // namespace template class filesystem_writer_ final : public filesystem_writer_detail { public: using physical_block_cb_type = filesystem_writer_detail::physical_block_cb_type; filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg, progress& prog, filesystem_writer_options const& options, std::istream* header); ~filesystem_writer_() noexcept override; void add_default_compressor(block_compressor bc) override; void add_category_compressor(fragment_category::value_type cat, block_compressor bc) override; void add_section_compressor(section_type type, block_compressor bc) override; compression_constraints get_compression_constraints(fragment_category::value_type cat, std::string const& metadata) const override; block_compressor const& get_compressor( section_type type, std::optional cat) const override; void configure(std::vector const& expected_categories, size_t max_active_slots) override; void configure_rewrite(size_t filesystem_size, size_t block_count) override; void copy_header(std::span header) override; void write_block(fragment_category cat, std::shared_ptr&& data, physical_block_cb_type physical_block_cb, std::optional meta) override; void finish_category(fragment_category cat) override; void write_metadata_v2_schema(std::shared_ptr&& data) override; void write_metadata_v2(std::shared_ptr&& data) override; void write_history(std::shared_ptr&& data) override; void check_block_compression( compression_type compression, std::span data, std::optional cat) override; void write_section(section_type type, compression_type compression, std::span data, std::optional cat) override; void write_compressed_section(fs_section const& sec, std::span data) override; void flush() override; size_t size() const override { return image_size_; } private: using block_merger_type = multi_queue_block_merger, fsblock_merger_policy>; using block_holder_type = block_merger_type::block_holder_type; block_compressor const& compressor_for_category(fragment_category::value_type cat) const; void write_block_impl(fragment_category cat, std::shared_ptr&& data, block_compressor const& bc, std::optional meta, physical_block_cb_type physical_block_cb); void on_block_merged(block_holder_type holder); void write_section_impl(section_type type, std::shared_ptr&& data); void write(fsblock const& fsb); void write(const char* data, size_t size); template void write(const T& obj); void write(std::span range); void writer_thread(); void push_section_index(section_type type); void write_section_index(); size_t mem_used() const; std::ostream& os_; size_t image_size_{0}; std::istream* header_; worker_group& wg_; progress& prog_; std::optional default_bc_; std::unordered_map category_bc_; std::unordered_map section_bc_; filesystem_writer_options const options_; LOG_PROXY_DECL(LoggerPolicy); std::deque queue_; std::shared_ptr pctx_; mutable std::mutex mx_; std::condition_variable cond_; volatile bool flush_; std::thread writer_thread_; uint32_t section_number_{0}; std::vector section_index_; std::ostream::pos_type header_size_{0}; std::unique_ptr merger_; }; // TODO: Maybe we can factor out the logic to find the right compressor // into something that gets passed a (section_type, category) pair? template filesystem_writer_::filesystem_writer_( logger& lgr, std::ostream& os, worker_group& wg, progress& prog, filesystem_writer_options const& options, std::istream* header) : os_(os) , header_(header) , wg_(wg) , prog_(prog) , options_(options) , LOG_PROXY_INIT(lgr) , flush_{true} { if (header_) { if (options_.remove_header) { LOG_WARN << "header will not be written because remove_header is set"; } else { image_size_ = header_size_ = copy_stream(*header_, os_); } } // TODO: the whole flush & thread thing needs to be revisited flush_ = false; writer_thread_ = std::thread(&filesystem_writer_::writer_thread, this); } template filesystem_writer_::~filesystem_writer_() noexcept { try { if (!flush_) { flush(); } } catch (...) { } } template void filesystem_writer_::writer_thread() { folly::setThreadName("writer"); for (;;) { block_holder_type holder; { std::unique_lock lock(mx_); if (!flush_ and queue_.empty()) { cond_.wait(lock); } if (queue_.empty()) { if (flush_) break; else continue; } std::swap(holder, queue_.front()); queue_.pop_front(); } cond_.notify_one(); auto const& fsb = holder.value(); // TODO: this may throw fsb->wait_until_compressed(); LOG_DEBUG << get_friendly_section_name(fsb->type()) << " [" << fsb->block_no() << "] compressed from " << size_with_unit(fsb->uncompressed_size()) << " to " << size_with_unit(fsb->size()) << " [" << fsb->description() << "]"; write(*fsb); } } template size_t filesystem_writer_::mem_used() const { size_t s = 0; for (const auto& holder : queue_) { s += holder.value()->size(); } return s; } template void filesystem_writer_::write(const char* data, size_t size) { // TODO: error handling :-) os_.write(data, size); image_size_ += size; prog_.compressed_size += size; } template template void filesystem_writer_::write(const T& obj) { write(reinterpret_cast(&obj), sizeof(T)); } template void filesystem_writer_::write(std::span range) { write(reinterpret_cast(range.data()), range.size()); } template void filesystem_writer_::write(fsblock const& fsb) { if (fsb.type() != section_type::SECTION_INDEX) { push_section_index(fsb.type()); } write(fsb.header()); write(fsb.data()); if (fsb.type() == section_type::BLOCK) { prog_.blocks_written++; } } template block_compressor const& filesystem_writer_::compressor_for_category( fragment_category::value_type cat) const { if (auto it = category_bc_.find(cat); it != category_bc_.end()) { LOG_DEBUG << "using compressor (" << it->second.describe() << ") for category " << cat; return it->second; } LOG_DEBUG << "using default compressor (" << default_bc_.value().describe() << ") for category " << cat; return default_bc_.value(); } template void filesystem_writer_::write_block_impl( fragment_category cat, std::shared_ptr&& data, block_compressor const& bc, std::optional meta, physical_block_cb_type physical_block_cb) { if (!merger_) { DWARFS_THROW(runtime_error, "filesystem_writer not configured"); } std::shared_ptr pctx; { std::unique_lock lock(mx_); if (!pctx_) { pctx_ = prog_.create_context(); } pctx = pctx_; } auto fsb = std::make_unique(section_type::BLOCK, bc, std::move(data), pctx, std::move(physical_block_cb)); fsb->compress(wg_, meta); merger_->add(cat, std::move(fsb)); } template void filesystem_writer_::on_block_merged( block_holder_type holder) { uint32_t number; { std::unique_lock lock(mx_); // TODO: move all section_number_ stuff to writer thread // we probably can't do that if we want to build // metadata in the background as we need to know // the section numbers for that number = section_number_; holder.value()->set_block_no(section_number_++); queue_.emplace_back(std::move(holder)); } LOG_DEBUG << "merged block " << number; cond_.notify_one(); } template void filesystem_writer_::finish_category(fragment_category cat) { if (!merger_) { DWARFS_THROW(runtime_error, "filesystem_writer not configured"); } merger_->finish(cat); } template void filesystem_writer_::write_section_impl( section_type type, std::shared_ptr&& data) { auto& bc = get_compressor(type, std::nullopt); uint32_t number; { std::unique_lock lock(mx_); if (!pctx_) { pctx_ = prog_.create_context(); } auto fsb = std::make_unique(type, bc, std::move(data), pctx_); number = section_number_; fsb->set_block_no(section_number_++); fsb->compress(wg_); queue_.emplace_back(std::move(fsb)); } LOG_DEBUG << "write section " << number; cond_.notify_one(); } template void filesystem_writer_::check_block_compression( compression_type compression, std::span data, std::optional cat) { block_compressor const* bc{nullptr}; if (cat) { bc = &compressor_for_category(*cat); } else { bc = &default_bc_.value(); } if (auto reqstr = bc->metadata_requirements(); !reqstr.empty()) { auto req = compression_metadata_requirements{reqstr}; std::vector tmp; block_decompressor bd(compression, data.data(), data.size(), tmp); try { req.check(bd.metadata()); } catch (std::exception const& e) { auto msg = fmt::format( "cannot compress {} compressed block with compressor '{}' because " "the following metadata requirements are not met: {}", get_compression_name(compression), bc->describe(), e.what()); DWARFS_THROW(runtime_error, msg); } } } template void filesystem_writer_::write_section( section_type type, compression_type compression, std::span data, std::optional cat) { { std::unique_lock lock(mx_); if (!pctx_) { pctx_ = prog_.create_context(); } // TODO: do we still need this with the merger in place? while (mem_used() > options_.max_queue_size) { cond_.wait(lock); } auto& bc = get_compressor(type, cat); auto fsb = std::make_unique(type, bc, data, compression, pctx_); fsb->set_block_no(section_number_++); fsb->compress(wg_); queue_.emplace_back(std::move(fsb)); } cond_.notify_one(); } template void filesystem_writer_::write_compressed_section( fs_section const& sec, std::span data) { { std::lock_guard lock(mx_); if (!pctx_) { pctx_ = prog_.create_context(); } auto fsb = std::make_unique(sec, data, pctx_); fsb->set_block_no(section_number_++); fsb->compress(wg_); queue_.emplace_back(std::move(fsb)); } cond_.notify_one(); } template void filesystem_writer_::add_default_compressor( block_compressor bc) { DWARFS_CHECK(bc, "block_compressor must not be null"); LOG_DEBUG << "adding default compressor (" << bc.describe() << ")"; if (default_bc_) { DWARFS_THROW(runtime_error, "default compressor registered more than once"); } default_bc_ = std::move(bc); } template void filesystem_writer_::add_category_compressor( fragment_category::value_type cat, block_compressor bc) { DWARFS_CHECK(bc, "block_compressor must not be null"); LOG_DEBUG << "adding compressor (" << bc.describe() << ") for category " << cat; if (!category_bc_.emplace(cat, std::move(bc)).second) { DWARFS_THROW( runtime_error, fmt::format("compressor registered more than once for category {}", cat)); } } template void filesystem_writer_::add_section_compressor( section_type type, block_compressor bc) { DWARFS_CHECK(bc, "block_compressor must not be null"); LOG_DEBUG << "adding compressor (" << bc.describe() << ") for section type " << get_friendly_section_name(type); DWARFS_CHECK(type != section_type::SECTION_INDEX, "SECTION_INDEX is always uncompressed"); if (auto reqstr = bc.metadata_requirements(); !reqstr.empty()) { try { auto req = compression_metadata_requirements{reqstr}; req.check(std::nullopt); } catch (std::exception const& e) { auto msg = fmt::format("cannot use '{}' for {} compression because compression " "metadata requirements are not met: {}", bc.describe(), get_friendly_section_name(type), e.what()); DWARFS_THROW(runtime_error, msg); } } if (!section_bc_.emplace(type, std::move(bc)).second) { DWARFS_THROW( runtime_error, fmt::format("compressor registered more than once for section type {}", get_friendly_section_name(type))); } } template auto filesystem_writer_::get_compression_constraints( fragment_category::value_type cat, std::string const& metadata) const -> compression_constraints { return compressor_for_category(cat).get_compression_constraints(metadata); } template block_compressor const& filesystem_writer_::get_compressor( section_type type, std::optional cat) const { if (cat) { DWARFS_CHECK(type == section_type::BLOCK, "category-specific compressors are only supported for blocks"); return compressor_for_category(*cat); } if (auto it = section_bc_.find(type); it != section_bc_.end()) { return it->second; } return default_bc_.value(); } template void filesystem_writer_::configure( std::vector const& expected_categories, size_t max_active_slots) { if (merger_) { DWARFS_THROW(runtime_error, "filesystem_writer already configured"); } merger_ = std::make_unique( max_active_slots, options_.max_queue_size, expected_categories, [this](auto&& holder) { on_block_merged(std::move(holder)); }, fsblock_merger_policy{options_.worst_case_block_size}); } template void filesystem_writer_::configure_rewrite(size_t filesystem_size, size_t block_count) { prog_.original_size = filesystem_size; prog_.filesystem_size = filesystem_size; prog_.block_count = block_count; } template void filesystem_writer_::copy_header( std::span header) { if (!options_.remove_header) { if (header_) { LOG_WARN << "replacing old header"; } else { write(header); header_size_ = size(); } } } template void filesystem_writer_::write_block( fragment_category cat, std::shared_ptr&& data, physical_block_cb_type physical_block_cb, std::optional meta) { write_block_impl(cat, std::move(data), compressor_for_category(cat.value()), std::move(meta), std::move(physical_block_cb)); } template void filesystem_writer_::write_metadata_v2_schema( std::shared_ptr&& data) { write_section_impl(section_type::METADATA_V2_SCHEMA, std::move(data)); } template void filesystem_writer_::write_metadata_v2( std::shared_ptr&& data) { write_section_impl(section_type::METADATA_V2, std::move(data)); } template void filesystem_writer_::write_history( std::shared_ptr&& data) { write_section_impl(section_type::HISTORY, std::move(data)); } template void filesystem_writer_::flush() { { std::lock_guard lock(mx_); if (flush_) { return; } flush_ = true; } cond_.notify_one(); writer_thread_.join(); if (!options_.no_section_index) { write_section_index(); } } template void filesystem_writer_::push_section_index(section_type type) { section_index_.push_back((static_cast(type) << 48) | static_cast(size() - header_size_)); } template void filesystem_writer_::write_section_index() { push_section_index(section_type::SECTION_INDEX); auto data = std::span(reinterpret_cast(section_index_.data()), sizeof(section_index_[0]) * section_index_.size()); auto fsb = fsblock(section_type::SECTION_INDEX, compression_type::NONE, data); fsb.set_block_no(section_number_++); fsb.compress(wg_); fsb.wait_until_compressed(); write(fsb); } } // namespace internal filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool, writer_progress& prog) : filesystem_writer(os, lgr, pool, prog, {}) {} filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool, writer_progress& prog, filesystem_writer_options const& options, std::istream* header) : impl_{make_unique_logging_object( lgr, os, pool.get_worker_group(), prog.get_internal(), options, header)} {} filesystem_writer::~filesystem_writer() = default; filesystem_writer::filesystem_writer(filesystem_writer&&) = default; filesystem_writer& filesystem_writer::operator=(filesystem_writer&&) = default; void filesystem_writer::add_default_compressor(block_compressor bc) { impl_->add_default_compressor(std::move(bc)); } void filesystem_writer::add_category_compressor( fragment_category::value_type cat, block_compressor bc) { impl_->add_category_compressor(cat, std::move(bc)); } void filesystem_writer::add_section_compressor(section_type type, block_compressor bc) { impl_->add_section_compressor(type, std::move(bc)); } } // namespace dwarfs::writer