/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "dwarfs/multi_queue_block_merger.h"
using namespace dwarfs;
namespace {
constexpr int const debuglevel{0};
constexpr size_t const max_runs_regular{250};
constexpr size_t const max_runs_partial{50};
constexpr size_t const num_runner_threads{16};
constexpr size_t const num_repetitions{4};
struct block {
static constexpr bool const kIsSized{false};
block() = default;
block(size_t src_id, size_t idx, size_t /*sz*/)
: source_id{src_id}
, index{idx} {}
bool operator==(block const&) const = default;
auto operator<=>(block const&) const = default;
std::ostream& operator<<(std::ostream& os) const {
return os << source_id << "." << index;
}
size_t source_id;
size_t index;
};
struct sized_block {
static constexpr bool const kIsSized{true};
sized_block() = default;
sized_block(size_t src_id, size_t idx, size_t sz)
: source_id{src_id}
, index{idx}
, size{sz} {}
bool operator==(sized_block const&) const = default;
auto operator<=>(sized_block const&) const = default;
std::ostream& operator<<(std::ostream& os) const {
return os << source_id << "." << index << " (" << size << ")";
}
size_t source_id{0};
size_t index{0};
size_t size{0};
};
class sized_block_merger_policy {
public:
explicit sized_block_merger_policy(
std::vector&& worst_case_block_size)
: worst_case_block_size_{std::move(worst_case_block_size)} {}
static size_t block_size(sized_block const& blk) { return blk.size; }
size_t worst_case_source_block_size(size_t source_id) const {
return worst_case_block_size_[source_id];
}
private:
std::vector worst_case_block_size_;
};
template
struct timed_release_block {
std::chrono::steady_clock::time_point when;
merged_block_holder holder;
timed_release_block(std::chrono::steady_clock::time_point when,
merged_block_holder&& holder)
: when{when}
, holder{std::move(holder)} {}
bool operator<(timed_release_block const& other) const {
return when > other.when;
}
};
// Use std::shared_mutex because folly::SharedMutex might trigger TSAN
template
using synchronized = folly::Synchronized;
template
using sync_queue = synchronized>;
template
class source {
public:
source(size_t id, std::mt19937& delay_rng, std::mt19937& rng,
size_t max_blocks = 20, double ips = 5000.0, size_t max_size = 10000)
: id_{id}
, blocks_{init_blocks(delay_rng, rng, max_blocks, ips, max_size)} {}
std::tuple next() {
auto idx = idx_++;
return {BlockT(id_, idx, blocks_[idx].first), idx_ >= blocks_.size(),
blocks_[idx].second};
}
size_t id() const { return id_; }
size_t num_blocks() const { return blocks_.size(); }
std::chrono::nanoseconds total_time() const {
auto seconds = std::accumulate(
begin(blocks_), end(blocks_), 0.0,
[](auto const& a, auto const& b) { return a + b.second; });
return std::chrono::duration_cast(
std::chrono::duration(seconds));
}
private:
static std::vector>
init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks,
double ips, size_t max_size) {
std::uniform_int_distribution bdist(1, max_blocks);
std::uniform_int_distribution sdist(BlockT::kIsSized ? 1 : 0,
max_size);
std::exponential_distribution<> edist(ips);
std::vector> blocks;
blocks.resize(bdist(rng));
std::generate(begin(blocks), end(blocks),
[&] { return std::make_pair(sdist(rng), edist(delay_rng)); });
return blocks;
}
size_t idx_{0};
size_t id_;
std::vector> blocks_;
};
template
void emitter(sync_queue