diff --git a/include/opendht/thread_pool.h b/include/opendht/thread_pool.h index 814c14f38e35f071bf251c6383e86abf81cd4b66..3c0f932d8725013f487cc82c5c58c34d9355de85 100644 --- a/include/opendht/thread_pool.h +++ b/include/opendht/thread_pool.h @@ -68,4 +68,23 @@ private: bool running_ {true}; }; +class OPENDHT_PUBLIC Executor : public std::enable_shared_from_this<Executor> { +public: + Executor(ThreadPool& pool, unsigned maxConcurrent = 1) + : threadPool_(pool), maxConcurrent_(maxConcurrent) + {} + + void run(std::function<void()>&& task); + +private: + std::reference_wrapper<ThreadPool> threadPool_; + const unsigned maxConcurrent_ {1}; + std::mutex lock_ {}; + unsigned current_ {0}; + std::queue<std::function<void()>> tasks_ {}; + + void run_(std::function<void()>&& task); + void schedule(); +}; + } diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index ea8d76cbdb3ddbbf59f33d2e9311f91f657f0eb8..e8c0b23d7a7257be3ccb398ed9df0b7e14b2e1c5 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -21,7 +21,7 @@ #include <atomic> #include <thread> - +#include <iostream> #include <ciso646> // fix windows compiler bug namespace dht { @@ -97,6 +97,7 @@ ThreadPool::run(std::function<void()>&& cb) task(); } catch (const std::exception& e) { // LOG_ERR("Exception running task: %s", e.what()); + std::cerr << "Exception running task: " << e.what() << std::endl; } } }); @@ -131,4 +132,44 @@ ThreadPool::join() threads_.clear(); } +void +Executor::run(std::function<void()>&& task) +{ + std::lock_guard<std::mutex> l(lock_); + if (current_ < maxConcurrent_) { + run_(std::move(task)); + } else { + tasks_.emplace(std::move(task)); + } +} + +void +Executor::run_(std::function<void()>&& task) +{ + current_++; + std::weak_ptr<Executor> w = shared_from_this(); + threadPool_.get().run([w,task] { + try { + task(); + } catch (const std::exception& e) { + std::cerr << "Exception running task: " << e.what() << std::endl; + } + if (auto sthis = w.lock()) { + auto& this_ = *sthis; + std::lock_guard<std::mutex> l(this_.lock_); + this_.current_--; + this_.schedule(); + } + }); +} + +void +Executor::schedule() +{ + if (not tasks_.empty() and current_ < maxConcurrent_) { + run_(std::move(tasks_.front())); + tasks_.pop(); + } +} + }