diff --git a/src/Makefile.am b/src/Makefile.am index c77f35c83831f565a5222133f754a994039a26ac..87a4716060e2d6e08b6bb8badea9a59124e5b400 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -93,6 +93,7 @@ libring_la_SOURCES = \ fileutils.cpp \ archiver.cpp \ threadloop.cpp \ + thread_pool.cpp \ ip_utils.h \ ip_utils.cpp \ utf8_utils.cpp \ @@ -104,6 +105,7 @@ libring_la_SOURCES = \ plugin_loader.h \ plugin_manager.h \ threadloop.h \ + thread_pool.h \ conference.h \ account_factory.h \ call_factory.h \ diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 95baebbe63e1830d6131acb6310109c3f12ab9eb..8b6a26ce612d17bcd0beb0f7b19da1f038aa023d 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -25,6 +25,8 @@ #include "config.h" #endif +#include "thread_pool.h" + #include "sip/sdp.h" #include "sip/sipvoiplink.h" #include "sip/sipcall.h" @@ -1293,11 +1295,7 @@ RingAccount::generateDhParams() { //make sure cachePath_ is writable fileutils::check_dir(cachePath_.c_str(), 0700); - - std::packaged_task<decltype(loadDhParams)> task(loadDhParams); - dhParams_ = task.get_future(); - std::thread task_td(std::move(task), cachePath_ + DIR_SEPARATOR_STR "dhParams"); - task_td.detach(); + dhParams_ = ThreadPool::instance().get<tls::DhParams>(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams")); } MatchRank diff --git a/src/security/certstore.cpp b/src/security/certstore.cpp index 4385786d8a1e38f1b230562d0d64dae9c27b7ecf..1e6bcb83d73e3f0c9d617dfdba72d6a2eb9a8d39 100644 --- a/src/security/certstore.cpp +++ b/src/security/certstore.cpp @@ -22,6 +22,7 @@ #include "client/ring_signal.h" +#include "thread_pool.h" #include "fileutils.h" #include "logger.h" @@ -179,7 +180,7 @@ readCertificates(const std::string& path) void CertificateStore::pinCertificatePath(const std::string& path, std::function<void(const std::vector<std::string>&)> cb) { - std::thread([&, path, cb]() { + ThreadPool::instance().run([&, path, cb]() { auto certs = readCertificates(path); std::vector<std::string> ids; std::vector<std::weak_ptr<crypto::Certificate>> scerts; @@ -201,7 +202,7 @@ CertificateStore::pinCertificatePath(const std::string& path, std::function<void if (cb) cb(ids); emitSignal<DRing::ConfigurationSignal::CertificatePathPinned>(path, ids); - }).detach(); + }); } unsigned diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d913ce78d55daf11fa5859749926ef2a97e76a82 --- /dev/null +++ b/src/thread_pool.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "thread_pool.h" +#include "logger.h" + +#include <atomic> +#include <thread> + +namespace ring { + +struct ThreadPool::ThreadState +{ + std::thread thread {}; + std::atomic_bool run {true}; +}; + +ThreadPool::ThreadPool() + : maxThreads_(std::thread::hardware_concurrency()) +{ + threads_.reserve(maxThreads_); +} + +ThreadPool::~ThreadPool() +{ + join(); +} + +void +ThreadPool::run(std::function<void()>&& cb) +{ + std::unique_lock<std::mutex> l(lock_); + + // launch new thread if necessary + if (not readyThreads_ && threads_.size() < maxThreads_) { + threads_.emplace_back(new ThreadState()); + auto& t = *threads_.back(); + t.thread = std::thread([&]() { + while (t.run) { + std::function<void()> task; + + // pick task from queue + { + std::unique_lock<std::mutex> l(lock_); + readyThreads_++; + cv_.wait(l, [&](){ + return not t.run or not tasks_.empty(); + }); + readyThreads_--; + if (not t.run) + break; + task = std::move(tasks_.front()); + tasks_.pop(); + } + + // run task + try { + if (task) + task(); + } catch (const std::exception& e) { + RING_ERR("Exception running task: %s", e.what()); + } + } + }); + } + + // push task to queue + tasks_.emplace(std::move(cb)); + + // notify thread + l.unlock(); + cv_.notify_one(); +} + +void +ThreadPool::join() +{ + for (auto& t : threads_) + t->run = false; + cv_.notify_all(); + for (auto& t : threads_) + t->thread.join(); + threads_.clear(); +} + +} diff --git a/src/thread_pool.h b/src/thread_pool.h new file mode 100644 index 0000000000000000000000000000000000000000..406d7d0db2aff7352681854ff1096e0a2c5b4ba8 --- /dev/null +++ b/src/thread_pool.h @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <condition_variable> +#include <vector> +#include <queue> +#include <future> + +namespace ring { + +class ThreadPool { +public: + static ThreadPool& instance() { + static ThreadPool pool; + return pool; + } + + ThreadPool(); + ~ThreadPool(); + + void run(std::function<void()>&& cb); + + template<class T> + std::future<T> get(std::function<T()>&& cb) { + auto ret = std::make_shared<std::promise<T>>(); + run(std::bind([=](std::function<T()>& mcb) mutable { + ret->set_value(mcb()); + }, std::move(cb))); + return ret->get_future(); + } + + void join(); + +private: + struct ThreadState; + std::queue<std::function<void()>> tasks_ {}; + std::vector<std::unique_ptr<ThreadState>> threads_; + unsigned readyThreads_ {0}; + std::mutex lock_ {}; + std::condition_variable cv_ {}; + + const unsigned maxThreads_; +}; + +}