/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace dwarfs::writer {
namespace internal {
using namespace dwarfs::internal;
namespace {
size_t copy_stream(std::istream& is, std::ostream& os) {
std::streambuf* rdbuf = is.rdbuf();
std::streamsize count{0};
std::streamsize transferred;
char buffer[1024];
while ((transferred = rdbuf->sgetn(buffer, sizeof(buffer))) > 0) {
os.write(buffer, transferred);
count += transferred;
}
return count;
}
std::string get_friendly_section_name(section_type type) {
switch (type) {
case section_type::METADATA_V2_SCHEMA:
return "schema";
case section_type::METADATA_V2:
return "metadata";
case section_type::HISTORY:
return "history";
case section_type::BLOCK:
return "block";
case section_type::SECTION_INDEX:
return "index";
}
return get_section_name(type);
}
class compression_progress : public progress::context {
public:
using status = progress::context::status;
compression_progress() = default;
status get_status() const override {
status st;
st.color = termcolor::RED;
st.context = "[compressing] ";
auto bin = bytes_in.load();
auto bout = bytes_out.load();
if (bin > 0 && bout > 0) {
st.status_string = fmt::format("compressed {} to {} (ratio {:.2f}%)",
size_with_unit(bin), size_with_unit(bout),
100.0 * bout / bin);
}
st.bytes_processed.emplace(bytes_in.load());
return st;
}
int get_priority() const override { return -1000; }
std::atomic bytes_in{0};
std::atomic bytes_out{0};
};
class fsblock {
public:
fsblock(section_type type, block_compressor const& bc,
std::shared_ptr&& data,
std::shared_ptr pctx,
folly::Function set_block_cb = nullptr);
fsblock(section_type type, compression_type compression,
std::span data);
fsblock(fs_section sec, std::span data,
std::shared_ptr pctx);
fsblock(section_type type, block_compressor const& bc,
std::span data, compression_type data_comp_type,
std::shared_ptr pctx);
void
compress(worker_group& wg, std::optional meta = std::nullopt) {
impl_->compress(wg, std::move(meta));
}
void wait_until_compressed() { impl_->wait_until_compressed(); }
section_type type() const { return impl_->type(); }
compression_type compression() const { return impl_->compression(); }
std::string description() const { return impl_->description(); }
std::span data() const { return impl_->data(); }
size_t uncompressed_size() const { return impl_->uncompressed_size(); }
size_t size() const { return impl_->size(); }
void set_block_no(uint32_t number) { impl_->set_block_no(number); }
uint32_t block_no() const { return impl_->block_no(); }
section_header_v2 const& header() const { return impl_->header(); }
class impl {
public:
virtual ~impl() = default;
virtual void
compress(worker_group& wg, std::optional meta) = 0;
virtual void wait_until_compressed() = 0;
virtual section_type type() const = 0;
virtual compression_type compression() const = 0;
virtual std::string description() const = 0;
virtual std::span data() const = 0;
virtual size_t uncompressed_size() const = 0;
virtual size_t size() const = 0;
virtual void set_block_no(uint32_t number) = 0;
virtual uint32_t block_no() const = 0;
virtual section_header_v2 const& header() const = 0;
};
static void
build_section_header(section_header_v2& sh, fsblock::impl const& fsb,
std::optional const& sec = std::nullopt);
private:
std::unique_ptr impl_;
};
class fsblock_merger_policy {
public:
explicit fsblock_merger_policy(size_t worst_case_block_size)
: worst_case_block_size_{worst_case_block_size} {}
size_t block_size(std::unique_ptr const& fsb) const {
assert(fsb->size() <= worst_case_block_size_);
return fsb->size();
}
size_t worst_case_source_block_size(fragment_category /*source_id*/) const {
return worst_case_block_size_;
}
private:
size_t worst_case_block_size_;
};
class raw_fsblock : public fsblock::impl {
public:
raw_fsblock(section_type type, const block_compressor& bc,
std::shared_ptr&& data,
std::shared_ptr pctx,
folly::Function set_block_cb)
: type_{type}
, bc_{bc}
, uncompressed_size_{data->size()}
, data_{std::move(data)}
, comp_type_{bc_.type()}
, pctx_{std::move(pctx)}
, set_block_cb_{std::move(set_block_cb)} {
DWARFS_CHECK(bc_, "block_compressor must not be null");
}
void compress(worker_group& wg, std::optional meta) override {
std::promise prom;
future_ = prom.get_future();
wg.add_job([this, prom = std::move(prom),
meta = std::move(meta)]() mutable {
try {
std::shared_ptr tmp;
if (meta) {
tmp = std::make_shared(bc_.compress(data_->vec(), *meta));
} else {
tmp = std::make_shared(bc_.compress(data_->vec()));
}
pctx_->bytes_in += data_->vec().size();
pctx_->bytes_out += tmp->vec().size();
{
std::lock_guard lock(mx_);
data_.swap(tmp);
}
} catch (bad_compression_ratio_error const&) {
comp_type_ = compression_type::NONE;
}
prom.set_value();
});
}
void wait_until_compressed() override { future_.wait(); }
section_type type() const override { return type_; }
compression_type compression() const override { return comp_type_; }
std::string description() const override { return bc_.describe(); }
std::span data() const override { return data_->vec(); }
size_t uncompressed_size() const override { return uncompressed_size_; }
size_t size() const override {
std::lock_guard lock(mx_);
return data_->size();
}
void set_block_no(uint32_t number) override {
{
std::lock_guard lock(mx_);
if (number_) {
DWARFS_THROW(runtime_error, "block number already set");
}
number_ = number;
}
if (set_block_cb_) {
set_block_cb_(number);
}
}
uint32_t block_no() const override {
std::lock_guard lock(mx_);
return number_.value();
}
section_header_v2 const& header() const override {
std::lock_guard lock(mx_);
if (!header_) {
header_ = section_header_v2{};
fsblock::build_section_header(*header_, *this);
}
return header_.value();
}
private:
const section_type type_;
block_compressor const& bc_;
const size_t uncompressed_size_;
mutable std::recursive_mutex mx_;
std::shared_ptr data_;
std::future future_;
std::optional number_;
std::optional mutable header_;
compression_type comp_type_;
std::shared_ptr pctx_;
folly::Function set_block_cb_;
};
class compressed_fsblock : public fsblock::impl {
public:
compressed_fsblock(section_type type, compression_type compression,
std::span range)
: type_{type}
, compression_{compression}
, range_{range} {}
compressed_fsblock(fs_section sec, std::span range,
std::shared_ptr pctx)
: type_{sec.type()}
, compression_{sec.compression()}
, range_{range}
, pctx_{std::move(pctx)}
, sec_{std::move(sec)} {}
void
compress(worker_group& wg, std::optional /* meta */) override {
std::promise prom;
future_ = prom.get_future();
wg.add_job([this, prom = std::move(prom)]() mutable {
fsblock::build_section_header(header_, *this, sec_);
if (pctx_) {
pctx_->bytes_in += size();
pctx_->bytes_out += size();
}
prom.set_value();
});
}
void wait_until_compressed() override { future_.wait(); }
section_type type() const override { return type_; }
compression_type compression() const override { return compression_; }
// TODO
std::string description() const override { return ""; }
std::span data() const override { return range_; }
size_t uncompressed_size() const override { return range_.size(); }
size_t size() const override { return range_.size(); }
void set_block_no(uint32_t number) override { number_ = number; }
uint32_t block_no() const override { return number_.value(); }
section_header_v2 const& header() const override { return header_; }
private:
section_type const type_;
compression_type const compression_;
std::span range_;
std::future future_;
std::optional number_;
section_header_v2 header_;
std::shared_ptr pctx_;
std::optional sec_;
};
class rewritten_fsblock : public fsblock::impl {
public:
rewritten_fsblock(section_type type, block_compressor const& bc,
std::span data,
compression_type data_comp_type,
std::shared_ptr pctx)
: type_{type}
, bc_{bc}
, data_{data}
, comp_type_{bc_.type()}
, pctx_{std::move(pctx)}
, data_comp_type_{data_comp_type} {
DWARFS_CHECK(bc_, "block_compressor must not be null");
}
void compress(worker_group& wg, std::optional meta) override {
std::promise prom;
future_ = prom.get_future();
wg.add_job(
[this, prom = std::move(prom), meta = std::move(meta)]() mutable {
try {
// TODO: we don't have to do this for uncompressed blocks
std::vector block;
block_decompressor bd(data_comp_type_, data_.data(), data_.size(),
block);
bd.decompress_frame(bd.uncompressed_size());
if (!meta) {
meta = bd.metadata();
}
pctx_->bytes_in += block.size(); // TODO: data_.size()?
try {
if (meta) {
block = bc_.compress(block, *meta);
} else {
block = bc_.compress(block);
}
} catch (bad_compression_ratio_error const&) {
comp_type_ = compression_type::NONE;
}
pctx_->bytes_out += block.size();
{
std::lock_guard lock(mx_);
block_data_.swap(block);
}
prom.set_value();
} catch (...) {
prom.set_exception(std::current_exception());
}
});
}
void wait_until_compressed() override { future_.get(); }
section_type type() const override { return type_; }
compression_type compression() const override { return comp_type_; }
std::string description() const override { return bc_.describe(); }
std::span data() const override { return block_data_; }
size_t uncompressed_size() const override { return data_.size(); }
size_t size() const override {
std::lock_guard lock(mx_);
return block_data_.size();
}
void set_block_no(uint32_t number) override {
{
std::lock_guard lock(mx_);
if (number_) {
DWARFS_THROW(runtime_error, "block number already set");
}
number_ = number;
}
}
uint32_t block_no() const override {
std::lock_guard lock(mx_);
return number_.value();
}
section_header_v2 const& header() const override {
std::lock_guard lock(mx_);
if (!header_) {
header_ = section_header_v2{};
fsblock::build_section_header(*header_, *this);
}
return header_.value();
}
private:
const section_type type_;
block_compressor const& bc_;
mutable std::recursive_mutex mx_;
std::span data_;
std::vector block_data_;
std::future future_;
std::optional number_;
std::optional mutable header_;
compression_type comp_type_;
std::shared_ptr pctx_;
compression_type const data_comp_type_;
};
fsblock::fsblock(section_type type, block_compressor const& bc,
std::shared_ptr&& data,
std::shared_ptr pctx,
folly::Function set_block_cb)
: impl_(std::make_unique(type, bc, std::move(data),
std::move(pctx),
std::move(set_block_cb))) {}
fsblock::fsblock(section_type type, compression_type compression,
std::span data)
: impl_(std::make_unique(type, compression, data)) {}
fsblock::fsblock(fs_section sec, std::span data,
std::shared_ptr pctx)
: impl_(std::make_unique(std::move(sec), data,
std::move(pctx))) {}
fsblock::fsblock(section_type type, block_compressor const& bc,
std::span data, compression_type data_comp_type,
std::shared_ptr pctx)
: impl_(std::make_unique(type, bc, data, data_comp_type,
std::move(pctx))) {}
void fsblock::build_section_header(section_header_v2& sh,
fsblock::impl const& fsb,
std::optional const& sec) {
auto range = fsb.data();
::memcpy(&sh.magic[0], "DWARFS", 6);
sh.major = MAJOR_VERSION;
sh.minor = MINOR_VERSION;
sh.number = fsb.block_no();
sh.type = static_cast(fsb.type());
sh.compression = static_cast(fsb.compression());
sh.length = range.size();
if (sec) {
// This isn't just an optimization, it is actually a bit of a safety
// feature. If we have an existing section header that we've previously
// validated and we use its checksums, we can be sure that any mistake
// in copying the data will be detected.
auto secnum = sec->section_number();
if (secnum && secnum.value() == sh.number) {
auto xxh = sec->xxh3_64_value();
auto sha = sec->sha2_512_256_value();
if (xxh && sha && sha->size() == sizeof(sh.sha2_512_256)) {
sh.xxh3_64 = xxh.value();
std::copy(sha->begin(), sha->end(), &sh.sha2_512_256[0]);
return;
}
}
}
checksum xxh(checksum::algorithm::XXH3_64);
xxh.update(&sh.number,
sizeof(section_header_v2) - offsetof(section_header_v2, number));
xxh.update(range.data(), range.size());
DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed");
checksum sha(checksum::algorithm::SHA2_512_256);
sha.update(&sh.xxh3_64,
sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64));
sha.update(range.data(), range.size());
DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed");
}
} // namespace
template
class filesystem_writer_ final : public filesystem_writer_detail {
public:
using physical_block_cb_type =
filesystem_writer_detail::physical_block_cb_type;
filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg,
progress& prog, filesystem_writer_options const& options,
std::istream* header);
~filesystem_writer_() noexcept override;
void add_default_compressor(block_compressor bc) override;
void add_category_compressor(fragment_category::value_type cat,
block_compressor bc) override;
void add_section_compressor(section_type type, block_compressor bc) override;
compression_constraints
get_compression_constraints(fragment_category::value_type cat,
std::string const& metadata) const override;
block_compressor const& get_compressor(
section_type type,
std::optional cat) const override;
void configure(std::vector const& expected_categories,
size_t max_active_slots) override;
void configure_rewrite(size_t filesystem_size, size_t block_count) override;
void copy_header(std::span header) override;
void write_block(fragment_category cat, std::shared_ptr&& data,
physical_block_cb_type physical_block_cb,
std::optional meta) override;
void finish_category(fragment_category cat) override;
void write_metadata_v2_schema(std::shared_ptr&& data) override;
void write_metadata_v2(std::shared_ptr&& data) override;
void write_history(std::shared_ptr&& data) override;
void check_block_compression(
compression_type compression, std::span data,
std::optional cat) override;
void write_section(section_type type, compression_type compression,
std::span data,
std::optional cat) override;
void write_compressed_section(fs_section const& sec,
std::span data) override;
void flush() override;
size_t size() const override { return image_size_; }
private:
using block_merger_type =
multi_queue_block_merger,
fsblock_merger_policy>;
using block_holder_type = block_merger_type::block_holder_type;
block_compressor const&
compressor_for_category(fragment_category::value_type cat) const;
void
write_block_impl(fragment_category cat, std::shared_ptr&& data,
block_compressor const& bc, std::optional meta,
physical_block_cb_type physical_block_cb);
void on_block_merged(block_holder_type holder);
void
write_section_impl(section_type type, std::shared_ptr&& data);
void write(fsblock const& fsb);
void write(const char* data, size_t size);
template
void write(const T& obj);
void write(std::span range);
void writer_thread();
void push_section_index(section_type type);
void write_section_index();
size_t mem_used() const;
std::ostream& os_;
size_t image_size_{0};
std::istream* header_;
worker_group& wg_;
progress& prog_;
std::optional default_bc_;
std::unordered_map
category_bc_;
std::unordered_map section_bc_;
filesystem_writer_options const options_;
LOG_PROXY_DECL(LoggerPolicy);
std::deque queue_;
std::shared_ptr pctx_;
mutable std::mutex mx_;
std::condition_variable cond_;
volatile bool flush_;
std::thread writer_thread_;
uint32_t section_number_{0};
std::vector section_index_;
std::ostream::pos_type header_size_{0};
std::unique_ptr merger_;
};
// TODO: Maybe we can factor out the logic to find the right compressor
// into something that gets passed a (section_type, category) pair?
template
filesystem_writer_::filesystem_writer_(
logger& lgr, std::ostream& os, worker_group& wg, progress& prog,
filesystem_writer_options const& options, std::istream* header)
: os_(os)
, header_(header)
, wg_(wg)
, prog_(prog)
, options_(options)
, LOG_PROXY_INIT(lgr)
, flush_{true} {
if (header_) {
if (options_.remove_header) {
LOG_WARN << "header will not be written because remove_header is set";
} else {
image_size_ = header_size_ = copy_stream(*header_, os_);
}
}
// TODO: the whole flush & thread thing needs to be revisited
flush_ = false;
writer_thread_ = std::thread(&filesystem_writer_::writer_thread, this);
}
template
filesystem_writer_::~filesystem_writer_() noexcept {
try {
if (!flush_) {
flush();
}
} catch (...) {
}
}
template
void filesystem_writer_::writer_thread() {
folly::setThreadName("writer");
for (;;) {
block_holder_type holder;
{
std::unique_lock lock(mx_);
if (!flush_ and queue_.empty()) {
cond_.wait(lock);
}
if (queue_.empty()) {
if (flush_)
break;
else
continue;
}
std::swap(holder, queue_.front());
queue_.pop_front();
}
cond_.notify_one();
auto const& fsb = holder.value();
// TODO: this may throw
fsb->wait_until_compressed();
LOG_DEBUG << get_friendly_section_name(fsb->type()) << " ["
<< fsb->block_no() << "] compressed from "
<< size_with_unit(fsb->uncompressed_size()) << " to "
<< size_with_unit(fsb->size()) << " [" << fsb->description()
<< "]";
write(*fsb);
}
}
template
size_t filesystem_writer_::mem_used() const {
size_t s = 0;
for (const auto& holder : queue_) {
s += holder.value()->size();
}
return s;
}
template
void filesystem_writer_::write(const char* data, size_t size) {
// TODO: error handling :-)
os_.write(data, size);
image_size_ += size;
prog_.compressed_size += size;
}
template
template
void filesystem_writer_::write(const T& obj) {
write(reinterpret_cast(&obj), sizeof(T));
}
template
void filesystem_writer_::write(std::span range) {
write(reinterpret_cast(range.data()), range.size());
}
template
void filesystem_writer_::write(fsblock const& fsb) {
if (fsb.type() != section_type::SECTION_INDEX) {
push_section_index(fsb.type());
}
write(fsb.header());
write(fsb.data());
if (fsb.type() == section_type::BLOCK) {
prog_.blocks_written++;
}
}
template
block_compressor const&
filesystem_writer_::compressor_for_category(
fragment_category::value_type cat) const {
if (auto it = category_bc_.find(cat); it != category_bc_.end()) {
LOG_DEBUG << "using compressor (" << it->second.describe()
<< ") for category " << cat;
return it->second;
}
LOG_DEBUG << "using default compressor (" << default_bc_.value().describe()
<< ") for category " << cat;
return default_bc_.value();
}
template
void filesystem_writer_::write_block_impl(
fragment_category cat, std::shared_ptr&& data,
block_compressor const& bc, std::optional meta,
physical_block_cb_type physical_block_cb) {
if (!merger_) {
DWARFS_THROW(runtime_error, "filesystem_writer not configured");
}
std::shared_ptr pctx;
{
std::unique_lock lock(mx_);
if (!pctx_) {
pctx_ = prog_.create_context();
}
pctx = pctx_;
}
auto fsb = std::make_unique(section_type::BLOCK, bc, std::move(data),
pctx, std::move(physical_block_cb));
fsb->compress(wg_, meta);
merger_->add(cat, std::move(fsb));
}
template
void filesystem_writer_::on_block_merged(
block_holder_type holder) {
uint32_t number;
{
std::unique_lock lock(mx_);
// TODO: move all section_number_ stuff to writer thread
// we probably can't do that if we want to build
// metadata in the background as we need to know
// the section numbers for that
number = section_number_;
holder.value()->set_block_no(section_number_++);
queue_.emplace_back(std::move(holder));
}
LOG_DEBUG << "merged block " << number;
cond_.notify_one();
}
template
void filesystem_writer_::finish_category(fragment_category cat) {
if (!merger_) {
DWARFS_THROW(runtime_error, "filesystem_writer not configured");
}
merger_->finish(cat);
}
template
void filesystem_writer_::write_section_impl(
section_type type, std::shared_ptr&& data) {
auto& bc = get_compressor(type, std::nullopt);
uint32_t number;
{
std::unique_lock lock(mx_);
if (!pctx_) {
pctx_ = prog_.create_context();
}
auto fsb = std::make_unique(type, bc, std::move(data), pctx_);
number = section_number_;
fsb->set_block_no(section_number_++);
fsb->compress(wg_);
queue_.emplace_back(std::move(fsb));
}
LOG_DEBUG << "write section " << number;
cond_.notify_one();
}
template
void filesystem_writer_::check_block_compression(
compression_type compression, std::span data,
std::optional cat) {
block_compressor const* bc{nullptr};
if (cat) {
bc = &compressor_for_category(*cat);
} else {
bc = &default_bc_.value();
}
if (auto reqstr = bc->metadata_requirements(); !reqstr.empty()) {
auto req = compression_metadata_requirements{reqstr};
std::vector tmp;
block_decompressor bd(compression, data.data(), data.size(), tmp);
try {
req.check(bd.metadata());
} catch (std::exception const& e) {
auto msg = fmt::format(
"cannot compress {} compressed block with compressor '{}' because "
"the following metadata requirements are not met: {}",
get_compression_name(compression), bc->describe(), e.what());
DWARFS_THROW(runtime_error, msg);
}
}
}
template
void filesystem_writer_::write_section(
section_type type, compression_type compression,
std::span data,
std::optional cat) {
{
std::unique_lock lock(mx_);
if (!pctx_) {
pctx_ = prog_.create_context();
}
// TODO: do we still need this with the merger in place?
while (mem_used() > options_.max_queue_size) {
cond_.wait(lock);
}
auto& bc = get_compressor(type, cat);
auto fsb = std::make_unique(type, bc, data, compression, pctx_);
fsb->set_block_no(section_number_++);
fsb->compress(wg_);
queue_.emplace_back(std::move(fsb));
}
cond_.notify_one();
}
template
void filesystem_writer_::write_compressed_section(
fs_section const& sec, std::span data) {
{
std::lock_guard lock(mx_);
if (!pctx_) {
pctx_ = prog_.create_context();
}
auto fsb = std::make_unique(sec, data, pctx_);
fsb->set_block_no(section_number_++);
fsb->compress(wg_);
queue_.emplace_back(std::move(fsb));
}
cond_.notify_one();
}
template
void filesystem_writer_::add_default_compressor(
block_compressor bc) {
DWARFS_CHECK(bc, "block_compressor must not be null");
LOG_DEBUG << "adding default compressor (" << bc.describe() << ")";
if (default_bc_) {
DWARFS_THROW(runtime_error, "default compressor registered more than once");
}
default_bc_ = std::move(bc);
}
template
void filesystem_writer_::add_category_compressor(
fragment_category::value_type cat, block_compressor bc) {
DWARFS_CHECK(bc, "block_compressor must not be null");
LOG_DEBUG << "adding compressor (" << bc.describe() << ") for category "
<< cat;
if (!category_bc_.emplace(cat, std::move(bc)).second) {
DWARFS_THROW(
runtime_error,
fmt::format("compressor registered more than once for category {}",
cat));
}
}
template
void filesystem_writer_::add_section_compressor(
section_type type, block_compressor bc) {
DWARFS_CHECK(bc, "block_compressor must not be null");
LOG_DEBUG << "adding compressor (" << bc.describe() << ") for section type "
<< get_friendly_section_name(type);
DWARFS_CHECK(type != section_type::SECTION_INDEX,
"SECTION_INDEX is always uncompressed");
if (auto reqstr = bc.metadata_requirements(); !reqstr.empty()) {
try {
auto req = compression_metadata_requirements{reqstr};
req.check(std::nullopt);
} catch (std::exception const& e) {
auto msg =
fmt::format("cannot use '{}' for {} compression because compression "
"metadata requirements are not met: {}",
bc.describe(), get_friendly_section_name(type), e.what());
DWARFS_THROW(runtime_error, msg);
}
}
if (!section_bc_.emplace(type, std::move(bc)).second) {
DWARFS_THROW(
runtime_error,
fmt::format("compressor registered more than once for section type {}",
get_friendly_section_name(type)));
}
}
template
auto filesystem_writer_::get_compression_constraints(
fragment_category::value_type cat,
std::string const& metadata) const -> compression_constraints {
return compressor_for_category(cat).get_compression_constraints(metadata);
}
template
block_compressor const& filesystem_writer_::get_compressor(
section_type type, std::optional cat) const {
if (cat) {
DWARFS_CHECK(type == section_type::BLOCK,
"category-specific compressors are only supported for blocks");
return compressor_for_category(*cat);
}
if (auto it = section_bc_.find(type); it != section_bc_.end()) {
return it->second;
}
return default_bc_.value();
}
template
void filesystem_writer_::configure(
std::vector const& expected_categories,
size_t max_active_slots) {
if (merger_) {
DWARFS_THROW(runtime_error, "filesystem_writer already configured");
}
merger_ = std::make_unique(
max_active_slots, options_.max_queue_size, expected_categories,
[this](auto&& holder) { on_block_merged(std::move(holder)); },
fsblock_merger_policy{options_.worst_case_block_size});
}
template
void filesystem_writer_::configure_rewrite(size_t filesystem_size,
size_t block_count) {
prog_.original_size = filesystem_size;
prog_.filesystem_size = filesystem_size;
prog_.block_count = block_count;
}
template
void filesystem_writer_::copy_header(
std::span header) {
if (!options_.remove_header) {
if (header_) {
LOG_WARN << "replacing old header";
} else {
write(header);
header_size_ = size();
}
}
}
template
void filesystem_writer_::write_block(
fragment_category cat, std::shared_ptr&& data,
physical_block_cb_type physical_block_cb, std::optional meta) {
write_block_impl(cat, std::move(data), compressor_for_category(cat.value()),
std::move(meta), std::move(physical_block_cb));
}
template
void filesystem_writer_::write_metadata_v2_schema(
std::shared_ptr&& data) {
write_section_impl(section_type::METADATA_V2_SCHEMA, std::move(data));
}
template
void filesystem_writer_::write_metadata_v2(
std::shared_ptr&& data) {
write_section_impl(section_type::METADATA_V2, std::move(data));
}
template
void filesystem_writer_::write_history(
std::shared_ptr&& data) {
write_section_impl(section_type::HISTORY, std::move(data));
}
template
void filesystem_writer_::flush() {
{
std::lock_guard lock(mx_);
if (flush_) {
return;
}
flush_ = true;
}
cond_.notify_one();
writer_thread_.join();
if (!options_.no_section_index) {
write_section_index();
}
}
template
void filesystem_writer_::push_section_index(section_type type) {
section_index_.push_back((static_cast(type) << 48) |
static_cast(size() - header_size_));
}
template
void filesystem_writer_::write_section_index() {
push_section_index(section_type::SECTION_INDEX);
auto data = std::span(reinterpret_cast(section_index_.data()),
sizeof(section_index_[0]) * section_index_.size());
auto fsb = fsblock(section_type::SECTION_INDEX, compression_type::NONE, data);
fsb.set_block_no(section_number_++);
fsb.compress(wg_);
fsb.wait_until_compressed();
write(fsb);
}
} // namespace internal
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
thread_pool& pool, writer_progress& prog)
: filesystem_writer(os, lgr, pool, prog, {}) {}
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
thread_pool& pool, writer_progress& prog,
filesystem_writer_options const& options,
std::istream* header)
: impl_{make_unique_logging_object(
lgr, os, pool.get_worker_group(), prog.get_internal(), options,
header)} {}
filesystem_writer::~filesystem_writer() = default;
filesystem_writer::filesystem_writer(filesystem_writer&&) = default;
filesystem_writer& filesystem_writer::operator=(filesystem_writer&&) = default;
void filesystem_writer::add_default_compressor(block_compressor bc) {
impl_->add_default_compressor(std::move(bc));
}
void filesystem_writer::add_category_compressor(
fragment_category::value_type cat, block_compressor bc) {
impl_->add_category_compressor(cat, std::move(bc));
}
void filesystem_writer::add_section_compressor(section_type type,
block_compressor bc) {
impl_->add_section_compressor(type, std::move(bc));
}
} // namespace dwarfs::writer