Skip to content
Snippets Groups Projects
Commit 683570a3 authored by Adrien Béraud's avatar Adrien Béraud Committed by Sébastien Blin
Browse files

add ThreadPool

parent 43559577
No related branches found
No related tags found
No related merge requests found
......@@ -194,6 +194,8 @@ if (OPENDHT_PROXY_SERVER)
include/opendht/dht_proxy_server.h
)
list (APPEND opendht_SOURCES
src/thread_pool.h
src/thread_pool.cpp
src/dht_proxy_server.cpp
)
else ()
......
......@@ -56,7 +56,7 @@ nobase_include_HEADERS = \
../include/opendht/rng.h
if ENABLE_PROXY_SERVER
libopendht_la_SOURCES += dht_proxy_server.cpp
libopendht_la_SOURCES += dht_proxy_server.cpp thread_pool.h thread_pool.cpp
nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h
endif
......
/*
* Copyright (C) 2016-2019 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "thread_pool.h"
#include <atomic>
#include <thread>
#include <ciso646> // fix windows compiler bug
namespace dht {
struct ThreadPool::ThreadState
{
std::thread thread {};
std::atomic_bool run {true};
};
ThreadPool::ThreadPool(size_t maxThreads) : maxThreads_(maxThreads)
{
threads_.reserve(maxThreads_);
}
ThreadPool::ThreadPool()
: ThreadPool(std::max<size_t>(std::thread::hardware_concurrency(), 4))
{}
ThreadPool::~ThreadPool()
{
join();
}
void
ThreadPool::run(std::function<void()>&& cb)
{
std::unique_lock<std::mutex> l(lock_);
if (not running_) return;
// launch new thread if necessary
if (not readyThreads_ && threads_.size() < maxThreads_) {
threads_.emplace_back(new ThreadState());
auto& t = *threads_.back();
t.thread = std::thread([&]() {
while (t.run) {
std::function<void()> task;
// pick task from queue
{
std::unique_lock<std::mutex> l(lock_);
readyThreads_++;
cv_.wait(l, [&](){
return not t.run or not tasks_.empty();
});
readyThreads_--;
if (not t.run)
break;
task = std::move(tasks_.front());
tasks_.pop();
}
// run task
try {
if (task)
task();
} catch (const std::exception& e) {
// LOG_ERR("Exception running task: %s", e.what());
}
}
});
}
// push task to queue
tasks_.emplace(std::move(cb));
// notify thread
l.unlock();
cv_.notify_one();
}
void
ThreadPool::stop()
{
{
std::lock_guard<std::mutex> l(lock_);
running_ = false;
}
for (auto& t : threads_)
t->run = false;
cv_.notify_all();
}
void
ThreadPool::join()
{
stop();
for (auto& t : threads_)
t->thread.join();
threads_.clear();
}
}
/*
* Copyright (C) 2016-2019 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <condition_variable>
#include <vector>
#include <queue>
#include <future>
#include <functional>
namespace dht {
class ThreadPool {
public:
static ThreadPool& instance() {
static ThreadPool pool;
return pool;
}
ThreadPool();
ThreadPool(size_t maxThreads);
~ThreadPool();
void run(std::function<void()>&& cb);
template<class T>
std::future<T> get(std::function<T()>&& cb) {
auto ret = std::make_shared<std::promise<T>>();
run(std::bind([=](std::function<T()>& mcb) mutable {
ret->set_value(mcb());
}, std::move(cb)));
return ret->get_future();
}
template<class T>
std::shared_ptr<std::future<T>> getShared(std::function<T()>&& cb) {
return std::make_shared<std::future<T>>(get(std::move(cb)));
}
void stop();
void join();
private:
struct ThreadState;
std::queue<std::function<void()>> tasks_ {};
std::vector<std::unique_ptr<ThreadState>> threads_;
unsigned readyThreads_ {0};
std::mutex lock_ {};
std::condition_variable cv_ {};
const unsigned maxThreads_;
bool running_ {true};
};
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment