/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace dwarfs::reader::internal {
using namespace dwarfs::internal;
namespace {
class sequential_access_detector {
public:
virtual ~sequential_access_detector() = default;
virtual void set_block_count(size_t) = 0;
virtual void touch(size_t block_no) = 0;
virtual std::optional prefetch() const = 0;
};
class no_sequential_access_detector : public sequential_access_detector {
public:
void set_block_count(size_t) override {}
void touch(size_t) override {}
std::optional prefetch() const override { return std::nullopt; }
};
class lru_sequential_access_detector : public sequential_access_detector {
public:
explicit lru_sequential_access_detector(size_t seq_blocks)
: lru_{seq_blocks}
, seq_blocks_{seq_blocks} {}
void set_block_count(size_t num_blocks) override {
std::lock_guard lock(mx_);
num_blocks_ = num_blocks;
lru_.clear();
is_sequential_.reset();
}
void touch(size_t block_no) override {
std::lock_guard lock(mx_);
lru_.set(block_no, block_no, true,
[this](size_t, size_t&&) { is_sequential_.reset(); });
}
std::optional prefetch() const override {
std::lock_guard lock(mx_);
if (lru_.size() < seq_blocks_) {
return std::nullopt;
}
if (is_sequential_.has_value()) {
return std::nullopt;
}
auto minmax = std::minmax_element(
lru_.begin(), lru_.end(),
[](auto const& a, auto const& b) { return a.first < b.first; });
auto min = minmax.first->first;
auto max = minmax.second->first;
is_sequential_ = max - min + 1 == seq_blocks_;
if (*is_sequential_ && max + 1 < num_blocks_) {
return max + 1;
}
return std::nullopt;
}
private:
using lru_type = folly::EvictingCacheMap;
std::mutex mutable mx_;
lru_type lru_;
std::optional mutable is_sequential_;
size_t num_blocks_{0};
size_t const seq_blocks_;
};
class block_request {
public:
block_request() = default;
block_request(size_t begin, size_t end, std::promise&& promise)
: begin_(begin)
, end_(end)
, promise_(std::move(promise)) {
DWARFS_CHECK(begin_ < end_, "invalid block_request");
}
block_request(block_request&&) = default;
block_request& operator=(block_request&&) = default;
bool operator<(const block_request& rhs) const { return end_ < rhs.end_; }
size_t end() const { return end_; }
void fulfill(std::shared_ptr block) {
promise_.set_value(block_range(std::move(block), begin_, end_ - begin_));
}
void error(std::exception_ptr error) { promise_.set_exception(error); }
private:
size_t begin_{0};
size_t end_{0};
std::promise promise_;
};
class block_request_set {
public:
block_request_set(std::shared_ptr block, size_t block_no)
: range_end_(0)
, block_(std::move(block))
, block_no_(block_no) {}
~block_request_set() { assert(queue_.empty()); }
size_t range_end() const { return range_end_; }
void add(size_t begin, size_t end, std::promise&& promise) {
if (end > range_end_) {
range_end_ = end;
}
queue_.emplace_back(begin, end, std::move(promise));
std::push_heap(queue_.begin(), queue_.end());
}
void merge(block_request_set&& other) {
queue_.reserve(queue_.size() + other.queue_.size());
std::move(other.queue_.begin(), other.queue_.end(),
std::back_inserter(queue_));
other.queue_.clear();
std::make_heap(queue_.begin(), queue_.end());
range_end_ = std::max(range_end_, other.range_end_);
}
block_request get() {
std::pop_heap(queue_.begin(), queue_.end());
block_request tmp = std::move(queue_.back());
queue_.pop_back();
return tmp;
}
bool empty() const { return queue_.empty(); }
std::shared_ptr block() const { return block_; }
size_t block_no() const { return block_no_; }
private:
std::vector queue_;
size_t range_end_;
std::shared_ptr block_;
const size_t block_no_;
};
// multi-threaded block cache
template
class block_cache_ final : public block_cache::impl {
public:
block_cache_(logger& lgr, os_access const& os, std::shared_ptr mm,
block_cache_options const& options,
std::shared_ptr perfmon
[[maybe_unused]])
: cache_(0)
, mm_(std::move(mm))
, LOG_PROXY_INIT(lgr)
// clang-format off
PERFMON_CLS_PROXY_INIT(perfmon, "block_cache")
PERFMON_CLS_TIMER_INIT(get, "block_no", "offset", "size")
PERFMON_CLS_TIMER_INIT(process, "block_no")
PERFMON_CLS_TIMER_INIT(decompress, "range_end") // clang-format on
, seq_access_detector_{create_seq_access_detector(
options.sequential_access_detector_threshold)}
, os_{os}
, options_(options) {
if (options.init_workers) {
wg_ = worker_group(lgr, os_, "blkcache",
std::max(options.num_workers > 0
? options.num_workers
: hardware_concurrency(),
static_cast(1)));
}
}
~block_cache_() noexcept override {
LOG_DEBUG << "stopping cache workers";
if (tidy_running_) {
stop_tidy_thread();
}
if (wg_) {
wg_.stop();
}
if (!blocks_created_.load()) {
return;
}
LOG_DEBUG << "cached blocks:";
for (const auto& cb : cache_) {
LOG_DEBUG << " block " << cb.first << ", decompression ratio = "
<< double(cb.second->range_end()) /
double(cb.second->uncompressed_size());
update_block_stats(*cb.second);
}
double fast_hit_rate =
100.0 * (active_hits_fast_ + cache_hits_fast_) / range_requests_;
double slow_hit_rate =
100.0 * (active_hits_slow_ + cache_hits_slow_) / range_requests_;
double miss_rate = 100.0 - (fast_hit_rate + slow_hit_rate);
double avg_decompression =
100.0 * total_decompressed_bytes_ / total_block_bytes_;
// The same block can be evicted multiple times. Active requests may hold
// on to a block that has been evicted from the cache and re-insert the
// block after the request is complete. So it's not a bug to see the
// number of evicted blocks outgrow the number of created blocks.
LOG_VERBOSE << "blocks created: " << blocks_created_.load();
LOG_VERBOSE << "blocks evicted: " << blocks_evicted_.load();
LOG_VERBOSE << "blocks tidied: " << blocks_tidied_.load();
LOG_VERBOSE << "request sets merged: " << sets_merged_.load();
LOG_VERBOSE << "total requests: " << range_requests_.load();
LOG_VERBOSE << "sequential prefetches: " << sequential_prefetches_.load();
LOG_VERBOSE << "active hits (fast): " << active_hits_fast_.load();
LOG_VERBOSE << "active hits (slow): " << active_hits_slow_.load();
LOG_VERBOSE << "cache hits (fast): " << cache_hits_fast_.load();
LOG_VERBOSE << "cache hits (slow): " << cache_hits_slow_.load();
LOG_VERBOSE << "total bytes decompressed: " << total_decompressed_bytes_;
LOG_VERBOSE << "average block decompression: "
<< fmt::format("{:.1f}", avg_decompression) << "%";
LOG_VERBOSE << "fast hit rate: " << fmt::format("{:.3f}", fast_hit_rate)
<< "%";
LOG_VERBOSE << "slow hit rate: " << fmt::format("{:.3f}", slow_hit_rate)
<< "%";
LOG_VERBOSE << "miss rate: " << fmt::format("{:.3f}", miss_rate) << "%";
LOG_VERBOSE << "expired active requests: " << active_expired_.load();
auto active_pct = [&](double p) {
return active_set_size_.getPercentileEstimate(p);
};
LOG_VERBOSE << "active set size p50: " << active_pct(0.5)
<< ", p75: " << active_pct(0.75) << ", p90: " << active_pct(0.9)
<< ", p95: " << active_pct(0.95)
<< ", p99: " << active_pct(0.99);
}
size_t block_count() const override { return block_.size(); }
void insert(fs_section const& section) override {
block_.emplace_back(section);
seq_access_detector_->set_block_count(block_.size());
}
void set_block_size(size_t size) override {
// XXX: This currently inevitably clears the cache
if (size == 0) {
DWARFS_THROW(runtime_error, "block size is zero");
}
auto max_blocks = std::max(options_.max_bytes / size, 1);
if (!block_.empty() && max_blocks > block_.size()) {
max_blocks = block_.size();
}
std::lock_guard lock(mx_);
cache_.~lru_type();
new (&cache_) lru_type(max_blocks);
cache_.setPruneHook(
[this](size_t block_no, std::shared_ptr&& block) {
LOG_DEBUG << "evicting block " << block_no
<< " from cache, decompression ratio = "
<< double(block->range_end()) /
double(block->uncompressed_size());
blocks_evicted_.fetch_add(1, std::memory_order_relaxed);
update_block_stats(*block);
});
}
void set_num_workers(size_t num) override {
std::unique_lock lock(mx_wg_);
if (wg_) {
wg_.stop();
}
wg_ = worker_group(LOG_GET_LOGGER, os_, "blkcache", num);
}
void set_tidy_config(cache_tidy_config const& cfg) override {
if (cfg.strategy == cache_tidy_strategy::NONE) {
if (tidy_running_) {
stop_tidy_thread();
}
} else {
if (cfg.interval == std::chrono::milliseconds::zero()) {
DWARFS_THROW(runtime_error, "tidy interval is zero");
}
std::lock_guard lock(mx_);
tidy_config_ = cfg;
if (tidy_running_) {
tidy_cond_.notify_all();
} else {
tidy_running_ = true;
tidy_thread_ = std::thread(&block_cache_::tidy_thread, this);
}
}
}
std::future
get(size_t block_no, size_t offset, size_t size) const override {
PERFMON_CLS_SCOPED_SECTION(get)
PERFMON_SET_CONTEXT(block_no, offset, size)
seq_access_detector_->touch(block_no);
scope_exit do_prefetch{[&] {
if (auto next = seq_access_detector_->prefetch()) {
sequential_prefetches_.fetch_add(1, std::memory_order_relaxed);
{
std::lock_guard lock(mx_);
create_cached_block(*next, std::promise{}, 0,
std::numeric_limits::max());
}
}
}};
range_requests_.fetch_add(1, std::memory_order_relaxed);
std::promise promise;
auto future = promise.get_future();
// First, let's see if it's an uncompressed block, in which case we
// can completely bypass the cache
try {
if (block_no >= block_.size()) {
DWARFS_THROW(runtime_error,
fmt::format("block number out of range {0} >= {1}",
block_no, block_.size()));
}
auto const& section = DWARFS_NOTHROW(block_.at(block_no));
if (section.compression() == compression_type::NONE) {
LOG_TRACE << "block " << block_no
<< " is uncompressed, bypassing cache";
promise.set_value(block_range(section.data(*mm_).data(), offset, size));
return future;
}
} catch (...) {
promise.set_exception(std::current_exception());
return future;
}
// That is a mighty long lock, let's see how it works...
std::lock_guard lock(mx_);
const auto range_end = offset + size;
// See if the block is currently active (about-to-be decompressed)
auto ia = active_.find(block_no);
std::shared_ptr brs;
if (ia != active_.end()) {
LOG_TRACE << "active sets found for block " << block_no;
bool add_to_set = false;
// Try to find a suitable request set to hook on to
auto end =
std::remove_if(ia->second.begin(), ia->second.end(),
[&brs, range_end, &add_to_set](
const std::weak_ptr& wp) {
if (auto rs = wp.lock()) {
bool can_add_to_set = range_end <= rs->range_end();
if (!brs || (can_add_to_set && !add_to_set)) {
brs = std::move(rs);
add_to_set = can_add_to_set;
}
return false;
}
return true;
});
if (end != ia->second.end()) {
active_expired_.fetch_add(std::distance(end, ia->second.end()),
std::memory_order_relaxed);
// Remove all expired weak pointers
ia->second.erase(end, ia->second.end());
}
if (ia->second.empty()) {
// No request sets left at all? M'kay.
assert(!brs);
active_.erase(ia);
} else if (brs) {
// That's the one
// Check if by any chance the block has already
// been decompressed far enough to fulfill the
// promise immediately, otherwise add a new
// request to the request set.
LOG_TRACE << "block " << block_no << " found in active set";
auto block = brs->block();
if (range_end <= block->range_end()) {
// We can immediately satisfy the promise
promise.set_value(block_range(std::move(block), offset, size));
active_hits_fast_.fetch_add(1, std::memory_order_relaxed);
} else {
if (!add_to_set) {
// Make a new set for the same block
brs =
std::make_shared(std::move(block), block_no);
}
// Promise will be fulfilled asynchronously
brs->add(offset, range_end, std::move(promise));
active_hits_slow_.fetch_add(1, std::memory_order_relaxed);
if (!add_to_set) {
ia->second.emplace_back(brs);
active_set_size_.addValue(ia->second.size());
enqueue_job(std::move(brs));
}
}
return future;
}
LOG_TRACE << "block " << block_no << " not found in active set";
}
// See if it's cached (fully or partially decompressed)
auto ic = cache_.find(block_no);
if (ic != cache_.end()) {
// Nice, at least the block is already there.
LOG_TRACE << "block " << block_no << " found in cache";
auto block = ic->second;
if (range_end <= block->range_end()) {
// We can immediately satisfy the promise
promise.set_value(block_range(std::move(block), offset, size));
cache_hits_fast_.fetch_add(1, std::memory_order_relaxed);
} else {
// Make a new set for the block
brs = std::make_shared(std::move(block), block_no);
// Promise will be fulfilled asynchronously
brs->add(offset, range_end, std::move(promise));
cache_hits_slow_.fetch_add(1, std::memory_order_relaxed);
auto& active = active_[block_no];
active.emplace_back(brs);
active_set_size_.addValue(active.size());
enqueue_job(std::move(brs));
}
return future;
}
// Bummer. We don't know anything about the block.
LOG_TRACE << "block " << block_no << " not found";
create_cached_block(block_no, std::move(promise), offset, range_end);
return future;
}
private:
static std::unique_ptr
create_seq_access_detector(size_t threshold) {
if (threshold == 0) {
return std::make_unique();
}
return std::make_unique(threshold);
}
void create_cached_block(size_t block_no, std::promise&& promise,
size_t offset, size_t range_end) const {
try {
std::shared_ptr block = cached_block::create(
LOG_GET_LOGGER, DWARFS_NOTHROW(block_.at(block_no)), mm_,
options_.mm_release, options_.disable_block_integrity_check);
blocks_created_.fetch_add(1, std::memory_order_relaxed);
// Make a new set for the block
auto brs =
std::make_shared(std::move(block), block_no);
// Promise will be fulfilled asynchronously
brs->add(offset, range_end, std::move(promise));
auto& active = active_[block_no];
active.emplace_back(brs);
active_set_size_.addValue(active.size());
enqueue_job(std::move(brs));
} catch (...) {
promise.set_exception(std::current_exception());
}
}
void stop_tidy_thread() {
{
std::lock_guard lock(mx_);
tidy_running_ = false;
}
tidy_cond_.notify_all();
tidy_thread_.join();
}
void update_block_stats(cached_block const& cb) {
if (cb.range_end() < cb.uncompressed_size()) {
partially_decompressed_.fetch_add(1, std::memory_order_relaxed);
}
total_decompressed_bytes_.fetch_add(cb.range_end(),
std::memory_order_relaxed);
total_block_bytes_.fetch_add(cb.uncompressed_size(),
std::memory_order_relaxed);
}
void enqueue_job(std::shared_ptr brs) const {
std::shared_lock lock(mx_wg_);
// Lambda needs to be mutable so we can actually move out of it
wg_.add_job([this, brs = std::move(brs)]() mutable {
process_job(std::move(brs));
});
}
void process_job(std::shared_ptr brs) const {
PERFMON_CLS_SCOPED_SECTION(process)
auto block_no = brs->block_no();
PERFMON_SET_CONTEXT(block_no)
LOG_TRACE << "processing block " << block_no;
// Check if another worker is already processing this block
{
std::lock_guard lock_dec(mx_dec_);
auto di = decompressing_.find(block_no);
if (di != decompressing_.end()) {
std::lock_guard lock(mx_);
if (auto other = di->second.lock()) {
LOG_TRACE << "merging sets for block " << block_no;
other->merge(std::move(*brs));
sets_merged_.fetch_add(1, std::memory_order_relaxed);
brs.reset();
return;
}
}
decompressing_[block_no] = brs;
}
auto block = brs->block();
for (;;) {
block_request req;
bool is_last_req = false;
// Fetch the next request, if any
{
std::lock_guard lock(mx_);
if (brs->empty()) {
// This is absolutely crucial! At this point, we can no longer
// allow other code to add to this request set, so we need to
// expire all weak pointer from within this critial section.
brs.reset();
break;
}
req = brs->get();
is_last_req = brs->empty();
}
// Process this request!
size_t range_end = req.end();
auto max_end = block->uncompressed_size();
if (range_end == std::numeric_limits::max()) {
range_end = max_end;
}
if (is_last_req) {
double ratio = double(range_end) / double(max_end);
if (ratio > options_.decompress_ratio) {
LOG_TRACE << "block " << block_no << " over ratio: " << ratio << " > "
<< options_.decompress_ratio;
range_end = max_end;
}
}
try {
if (range_end > block->range_end()) {
PERFMON_CLS_SCOPED_SECTION(decompress)
PERFMON_SET_CONTEXT(range_end)
LOG_TRACE << "decompressing block " << block_no << " until position "
<< range_end;
block->decompress_until(range_end);
}
req.fulfill(block);
} catch (...) {
req.error(std::current_exception());
}
}
// Finally, put the block into the cache; it might already be
// in there, in which case we just promote it to the front of
// the LRU queue.
{
std::lock_guard lock(mx_);
if (tidy_config_.strategy == cache_tidy_strategy::EXPIRY_TIME) {
block->touch();
}
cache_.set(block_no, std::move(block));
}
}
template
void remove_block_if(Pred const& predicate) {
auto it = cache_.begin();
while (it != cache_.end()) {
if (predicate(*it->second)) {
it = cache_.erase(it);
blocks_tidied_.fetch_add(1, std::memory_order_relaxed);
} else {
++it;
}
}
}
void tidy_thread() {
folly::setThreadName("cache-tidy");
std::unique_lock lock(mx_);
while (tidy_running_) {
if (tidy_cond_.wait_for(lock, tidy_config_.interval) ==
std::cv_status::timeout) {
switch (tidy_config_.strategy) {
case cache_tidy_strategy::EXPIRY_TIME:
remove_block_if(
[tp = std::chrono::steady_clock::now() -
tidy_config_.expiry_time](cached_block const& blk) {
return blk.last_used_before(tp);
});
break;
case cache_tidy_strategy::BLOCK_SWAPPED_OUT: {
std::vector tmp;
remove_block_if([&tmp](cached_block const& blk) {
return blk.any_pages_swapped_out(tmp);
});
} break;
default:
break;
}
}
}
}
using lru_type =
folly::EvictingCacheMap>;
mutable std::mutex mx_;
mutable lru_type cache_;
mutable folly::F14FastMap>>
active_;
std::thread tidy_thread_;
std::condition_variable tidy_cond_;
bool tidy_running_{false};
mutable std::mutex mx_dec_;
mutable folly::F14FastMap>
decompressing_;
mutable std::atomic blocks_created_{0};
mutable std::atomic blocks_evicted_{0};
mutable std::atomic sets_merged_{0};
mutable std::atomic range_requests_{0};
mutable std::atomic active_hits_fast_{0};
mutable std::atomic active_hits_slow_{0};
mutable std::atomic cache_hits_fast_{0};
mutable std::atomic cache_hits_slow_{0};
mutable std::atomic partially_decompressed_{0};
mutable std::atomic total_block_bytes_{0};
mutable std::atomic total_decompressed_bytes_{0};
mutable std::atomic blocks_tidied_{0};
mutable std::atomic active_expired_{0};
mutable std::atomic sequential_prefetches_{0};
mutable folly::Histogram active_set_size_{1, 0, 1024};
mutable std::shared_mutex mx_wg_;
mutable worker_group wg_;
std::vector block_;
std::shared_ptr mm_;
LOG_PROXY_DECL(LoggerPolicy);
PERFMON_CLS_PROXY_DECL
PERFMON_CLS_TIMER_DECL(get)
PERFMON_CLS_TIMER_DECL(process)
PERFMON_CLS_TIMER_DECL(decompress)
std::unique_ptr seq_access_detector_;
os_access const& os_;
const block_cache_options options_;
cache_tidy_config tidy_config_;
};
} // namespace
block_cache::block_cache(logger& lgr, os_access const& os,
std::shared_ptr mm,
const block_cache_options& options,
std::shared_ptr perfmon)
: impl_(make_unique_logging_object(
lgr, os, std::move(mm), options, std::move(perfmon))) {}
} // namespace dwarfs::reader::internal