/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace dwarfs::internal {
namespace {
template
class basic_worker_group final : public worker_group::impl, private Policy {
public:
template
basic_worker_group(logger& lgr, os_access const& os, const char* group_name,
size_t num_workers, size_t max_queue_len,
int niceness [[maybe_unused]], Args&&... args)
: Policy(std::forward(args)...)
, LOG_PROXY_INIT(lgr)
, os_{os}
, running_(true)
, pending_(0)
, max_queue_len_(max_queue_len) {
if (num_workers < 1) {
num_workers = std::max(hardware_concurrency(), 1u);
}
if (!group_name) {
group_name = "worker";
}
for (size_t i = 0; i < num_workers; ++i) {
workers_.emplace_back([this, niceness, group_name, i] {
folly::setThreadName(fmt::format("{}{}", group_name, i + 1));
set_thread_niceness(niceness);
do_work(niceness > 10);
});
}
check_set_affinity_from_enviroment(group_name);
}
basic_worker_group(const basic_worker_group&) = delete;
basic_worker_group& operator=(const basic_worker_group&) = delete;
/**
* Stop and destroy a worker group
*/
~basic_worker_group() noexcept override {
try {
stop();
} catch (...) {
}
}
/**
* Stop a worker group
*/
void stop() override {
if (running_) {
{
std::lock_guard lock(mx_);
running_ = false;
}
cond_.notify_all();
for (auto& w : workers_) {
w.join();
}
}
}
/**
* Wait until all work has been done
*/
void wait() override {
if (running_) {
std::unique_lock lock(mx_);
wait_.wait(lock, [&] { return pending_ == 0; });
}
}
/**
* Check whether the worker group is still running
*/
bool running() const override { return running_; }
/**
* Add a new job to the worker group
*
* The new job will be dispatched to the first available worker thread.
*
* \param job The job to add to the dispatcher.
*/
bool add_job(worker_group::job_t&& job) override {
return add_job_impl(std::move(job));
}
/**
* Add a new move-only job to the worker group
*
* The new job will be dispatched to the first available worker thread.
*
* \param job The job to add to the dispatcher.
*/
bool add_moveonly_job(worker_group::moveonly_job_t&& job) override {
return add_job_impl(std::move(job));
}
/**
* Return the number of worker threads
*
* \returns The number of worker threads.
*/
size_t size() const override { return workers_.size(); }
/**
* Return the number of queued jobs
*
* \returns The number of queued jobs.
*/
size_t queue_size() const override {
std::lock_guard lock(mx_);
return jobs_.size();
}
std::chrono::nanoseconds get_cpu_time(std::error_code& ec) const override {
ec.clear();
std::lock_guard lock(mx_);
std::chrono::nanoseconds t{};
for (auto const& w : workers_) {
t += os_.thread_get_cpu_time(w.get_id(), ec);
if (ec) {
return {};
}
}
return t;
}
std::optional try_get_cpu_time() const override {
std::error_code ec;
auto t = get_cpu_time(ec);
return ec ? std::nullopt : std::make_optional(t);
}
bool set_affinity(std::vector const& cpus) override {
if (cpus.empty()) {
return false;
}
std::lock_guard lock(mx_);
for (size_t i = 0; i < workers_.size(); ++i) {
std::error_code ec;
os_.thread_set_affinity(workers_[i].get_id(), cpus, ec);
if (ec) {
return false;
}
}
return true;
}
private:
using any_job_t =
std::variant;
using jobs_t = std::queue;
bool add_job_impl(any_job_t&& job) {
if (running_) {
{
std::unique_lock lock(mx_);
queue_.wait(lock, [this] { return jobs_.size() < max_queue_len_; });
jobs_.emplace(std::move(job));
++pending_;
}
cond_.notify_one();
return true;
}
return false;
}
void check_set_affinity_from_enviroment(const char* group_name) {
if (auto var = os_.getenv("DWARFS_WORKER_GROUP_AFFINITY")) {
auto groups = split_to>(var.value(), ':');
for (auto& group : groups) {
auto parts = split_to>(group, '=');
if (parts.size() == 2 && parts[0] == group_name) {
auto cpus = split_to>(parts[1], ',');
set_affinity(cpus);
}
}
}
}
// TODO: move out of this class
static void set_thread_niceness(int niceness) {
if (niceness > 0) {
#ifdef _WIN32
auto hthr = ::GetCurrentThread();
int priority =
niceness > 5 ? THREAD_PRIORITY_LOWEST : THREAD_PRIORITY_BELOW_NORMAL;
::SetThreadPriority(hthr, priority);
#else
// XXX:
// According to POSIX, the nice value is a per-process setting. However,
// under the current Linux/NPTL implementation of POSIX threads, the nice
// value is a per-thread attribute: different threads in the same process
// can have different nice values. Portable applications should avoid
// relying on the Linux behavior, which may be made standards conformant
// in the future.
auto rv [[maybe_unused]] = ::nice(niceness);
#endif
}
}
void do_work(bool is_background [[maybe_unused]]) {
#ifdef _WIN32
auto hthr = ::GetCurrentThread();
#endif
for (;;) {
any_job_t job;
{
std::unique_lock lock(mx_);
while (jobs_.empty() && running_) {
cond_.wait(lock);
}
if (jobs_.empty()) {
if (running_) {
continue;
} else {
break;
}
}
job = std::move(jobs_.front());
jobs_.pop();
}
{
typename Policy::task task(this);
#ifdef _WIN32
if (is_background) {
::SetThreadPriority(hthr, THREAD_MODE_BACKGROUND_BEGIN);
}
#endif
try {
std::visit([](auto&& j) { j(); }, job);
} catch (...) {
LOG_FATAL << "exception thrown in worker thread: "
<< exception_str(std::current_exception());
}
#ifdef _WIN32
if (is_background) {
::SetThreadPriority(hthr, THREAD_MODE_BACKGROUND_END);
}
#endif
}
{
std::lock_guard lock(mx_);
pending_--;
}
wait_.notify_one();
queue_.notify_one();
}
}
LOG_PROXY_DECL(LoggerPolicy);
os_access const& os_;
std::vector workers_;
jobs_t jobs_;
std::condition_variable cond_;
std::condition_variable queue_;
std::condition_variable wait_;
mutable std::mutex mx_;
std::atomic running_;
std::atomic pending_;
const size_t max_queue_len_;
};
class no_policy {
public:
class task {
public:
explicit task(no_policy*) {}
};
};
template
using default_worker_group = basic_worker_group;
} // namespace
worker_group::worker_group(logger& lgr, os_access const& os,
const char* group_name, size_t num_workers,
size_t max_queue_len, int niceness)
: impl_{make_unique_logging_object(
lgr, os, group_name, num_workers, max_queue_len, niceness)} {}
} // namespace dwarfs::internal