diff --git a/CMakeLists.txt b/CMakeLists.txt index f98910f52a4cd3fdb82f7503e0ebd22077e2e02f..39d463a3069dc87c59ec1d855f90030ad707df47 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -194,6 +194,8 @@ if (OPENDHT_PROXY_SERVER) include/opendht/dht_proxy_server.h ) list (APPEND opendht_SOURCES + src/thread_pool.h + src/thread_pool.cpp src/dht_proxy_server.cpp ) else () diff --git a/src/Makefile.am b/src/Makefile.am index 2a6ce27ac9d8e08e8100b52af87a2b70b19a5f50..aee033e5c09175459fd269db0e6f19d6128b18df 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -56,7 +56,7 @@ nobase_include_HEADERS = \ ../include/opendht/rng.h if ENABLE_PROXY_SERVER -libopendht_la_SOURCES += dht_proxy_server.cpp +libopendht_la_SOURCES += dht_proxy_server.cpp thread_pool.h thread_pool.cpp nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h endif diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..00f349f604d0b194b5fc43bc7efafdbb1e65f226 --- /dev/null +++ b/src/thread_pool.cpp @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2016-2019 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "thread_pool.h" + +#include <atomic> +#include <thread> + +#include <ciso646> // fix windows compiler bug + +namespace dht { + +struct ThreadPool::ThreadState +{ + std::thread thread {}; + std::atomic_bool run {true}; +}; + +ThreadPool::ThreadPool(size_t maxThreads) : maxThreads_(maxThreads) +{ + threads_.reserve(maxThreads_); +} + +ThreadPool::ThreadPool() + : ThreadPool(std::max<size_t>(std::thread::hardware_concurrency(), 4)) +{} + +ThreadPool::~ThreadPool() +{ + join(); +} + +void +ThreadPool::run(std::function<void()>&& cb) +{ + std::unique_lock<std::mutex> l(lock_); + if (not running_) return; + + // launch new thread if necessary + if (not readyThreads_ && threads_.size() < maxThreads_) { + threads_.emplace_back(new ThreadState()); + auto& t = *threads_.back(); + t.thread = std::thread([&]() { + while (t.run) { + std::function<void()> task; + + // pick task from queue + { + std::unique_lock<std::mutex> l(lock_); + readyThreads_++; + cv_.wait(l, [&](){ + return not t.run or not tasks_.empty(); + }); + readyThreads_--; + if (not t.run) + break; + task = std::move(tasks_.front()); + tasks_.pop(); + } + + // run task + try { + if (task) + task(); + } catch (const std::exception& e) { + // LOG_ERR("Exception running task: %s", e.what()); + } + } + }); + } + + // push task to queue + tasks_.emplace(std::move(cb)); + + // notify thread + l.unlock(); + cv_.notify_one(); +} + +void +ThreadPool::stop() +{ + { + std::lock_guard<std::mutex> l(lock_); + running_ = false; + } + for (auto& t : threads_) + t->run = false; + cv_.notify_all(); +} + +void +ThreadPool::join() +{ + stop(); + for (auto& t : threads_) + t->thread.join(); + threads_.clear(); +} + +} diff --git a/src/thread_pool.h b/src/thread_pool.h new file mode 100644 index 0000000000000000000000000000000000000000..024cb6bfdcfbc695f3a9a31d644ce779dd703dfe --- /dev/null +++ b/src/thread_pool.h @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2016-2019 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <condition_variable> +#include <vector> +#include <queue> +#include <future> +#include <functional> + +namespace dht { + +class ThreadPool { +public: + static ThreadPool& instance() { + static ThreadPool pool; + return pool; + } + + ThreadPool(); + ThreadPool(size_t maxThreads); + ~ThreadPool(); + + void run(std::function<void()>&& cb); + + template<class T> + std::future<T> get(std::function<T()>&& cb) { + auto ret = std::make_shared<std::promise<T>>(); + run(std::bind([=](std::function<T()>& mcb) mutable { + ret->set_value(mcb()); + }, std::move(cb))); + return ret->get_future(); + } + template<class T> + std::shared_ptr<std::future<T>> getShared(std::function<T()>&& cb) { + return std::make_shared<std::future<T>>(get(std::move(cb))); + } + + void stop(); + void join(); + +private: + struct ThreadState; + std::queue<std::function<void()>> tasks_ {}; + std::vector<std::unique_ptr<ThreadState>> threads_; + unsigned readyThreads_ {0}; + std::mutex lock_ {}; + std::condition_variable cv_ {}; + + const unsigned maxThreads_; + bool running_ {true}; +}; + +}