From 25844a8f3d67a77414b97eef30cf1a53818a78ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Fri, 26 Apr 2019 13:33:43 -0400 Subject: [PATCH] use OpenDHT thread pool Change-Id: Id6dbc59b48a61f9d27f01dff22ca93ad3eebc4f2 --- MSVC/ring-daemon.vcxproj | 2 - src/Makefile.am | 2 - src/im/message_engine.cpp | 4 +- src/jamidht/jamiaccount.cpp | 44 +++++++------- src/jamidht/namedirectory.cpp | 10 ++-- src/manager.cpp | 8 ++- src/media/media_recorder.cpp | 5 +- src/security/certstore.cpp | 5 +- src/sip/sipcall.cpp | 4 +- src/thread_pool.cpp | 104 ---------------------------------- src/thread_pool.h | 68 ---------------------- 11 files changed, 43 insertions(+), 213 deletions(-) delete mode 100644 src/thread_pool.cpp delete mode 100644 src/thread_pool.h diff --git a/MSVC/ring-daemon.vcxproj b/MSVC/ring-daemon.vcxproj index e29eb765ff..89c807d78b 100644 --- a/MSVC/ring-daemon.vcxproj +++ b/MSVC/ring-daemon.vcxproj @@ -830,7 +830,6 @@ <ClCompile Include="..\src\smartools.cpp" /> <ClCompile Include="..\src\string_utils.cpp" /> <ClCompile Include="..\src\threadloop.cpp" /> - <ClCompile Include="..\src\thread_pool.cpp" /> <ClCompile Include="..\src\turn_transport.cpp" /> <ClCompile Include="..\src\upnp\upnp_context.cpp" /> <ClCompile Include="..\src\upnp\upnp_control.cpp" /> @@ -993,7 +992,6 @@ <ClInclude Include="..\src\smartools.h" /> <ClInclude Include="..\src\string_utils.h" /> <ClInclude Include="..\src\threadloop.h" /> - <ClInclude Include="..\src\thread_pool.h" /> <ClInclude Include="..\src\turn_transport.h" /> <ClInclude Include="..\src\upnp\upnp_context.h" /> <ClInclude Include="..\src\upnp\upnp_control.h" /> diff --git a/src/Makefile.am b/src/Makefile.am index 73cca67a26..0076e08805 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -101,14 +101,12 @@ libring_la_SOURCES = \ fileutils.cpp \ archiver.cpp \ threadloop.cpp \ - thread_pool.cpp \ ip_utils.h \ ip_utils.cpp \ utf8_utils.cpp \ ice_transport.cpp \ ice_transport.h \ threadloop.h \ - thread_pool.h \ conference.h \ account_factory.h \ call_factory.h \ diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index c0cfea9af6..5301e34e9b 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -20,12 +20,12 @@ #include "message_engine.h" #include "sip/sipaccountbase.h" #include "manager.h" -#include "thread_pool.h" #include "fileutils.h" #include "client/ring_signal.h" #include "dring/account_const.h" +#include <opendht/thread_pool.h> #include <json/json.h> #include <fstream> @@ -252,7 +252,7 @@ MessageEngine::save_() const root[c.first] = std::move(peerRoot); } // Save asynchronously - ThreadPool::instance().run([path = savePath_, + dht::ThreadPool::computation().run([path = savePath_, root = std::move(root), accountID = account_.getAccountID(), messageNum = messages_.size()] diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 1c695b24c7..24b87a70f6 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -32,7 +32,6 @@ #include "accountarchive.h" #include "ringcontact.h" #include "configkeys.h" -#include "thread_pool.h" #include "sip/sdp.h" #include "sip/sipvoiplink.h" @@ -69,6 +68,7 @@ #include "libdevcrypto/Common.h" #include "base64.h" +#include <opendht/thread_pool.h> #include <yaml-cpp/yaml.h> #include <json/json.h> @@ -1064,7 +1064,7 @@ generatePIN(size_t length = 8) void JamiAccount::addDevice(const std::string& password) { - ThreadPool::instance().run([this_=shared(), password]() { + dht::ThreadPool::computation().run([this_=shared(), password]() { std::vector<uint8_t> key; dht::InfoHash loc; std::string pin_str; @@ -1137,7 +1137,7 @@ bool JamiAccount::revokeDevice(const std::string& password, const std::string& device) { // shared_ptr of future - auto fa = ThreadPool::instance().getShared<AccountArchive>( + auto fa = dht::ThreadPool::computation().getShared<AccountArchive>( [this, password] { return readArchive(password); }); findCertificate(dht::InfoHash(device), [this,fa=std::move(fa),password,device](const std::shared_ptr<dht::crypto::Certificate>& crt) mutable @@ -1199,7 +1199,7 @@ JamiAccount::loadAccountFromFile(const std::string& archive_path, const std::str { setRegistrationState(RegistrationState::INITIALIZING); auto accountId = getAccountID(); - ThreadPool::instance().run([w=weak(), archive_password, archive_path, accountId]{ + dht::ThreadPool::computation().run([w=weak(), archive_password, archive_path, accountId]{ AccountArchive archive; try { archive = AccountArchive(archive_path, archive_password); @@ -1304,8 +1304,8 @@ JamiAccount::loadAccountFromDHT(const std::string& archive_password, const std:: } }; - ThreadPool::instance().run(std::bind(search, true, state_old)); - ThreadPool::instance().run(std::bind(search, false, state_new)); + dht::ThreadPool::computation().run(std::bind(search, true, state_old)); + dht::ThreadPool::computation().run(std::bind(search, false, state_new)); } void @@ -1313,11 +1313,11 @@ JamiAccount::createAccount(const std::string& archive_password, dht::crypto::Ide { JAMI_WARN("[Account %s] creating new account", getAccountID().c_str()); setRegistrationState(RegistrationState::INITIALIZING); - ThreadPool::instance().run([sthis=shared(), archive_password, migrate]() mutable { + dht::ThreadPool::computation().run([sthis=shared(), archive_password, migrate]() mutable { AccountArchive a; auto& this_ = *sthis; - auto future_keypair = ThreadPool::instance().get<dev::KeyPair>(std::bind(&dev::KeyPair::create)); + auto future_keypair = dht::ThreadPool::computation().get<dev::KeyPair>(std::bind(&dev::KeyPair::create)); try { if (migrate.first and migrate.second) { JAMI_WARN("[Account %s] converting certificate from old ring account %s", @@ -2733,7 +2733,7 @@ JamiAccount::loadTreatedMessages() void JamiAccount::saveTreatedMessages() const { - ThreadPool::instance().run([w = weak()](){ + dht::ThreadPool::io().run([w = weak()](){ if (auto sthis = w.lock()) { auto& this_ = *sthis; std::lock_guard<std::mutex> lock(this_.messageMutex_); @@ -2963,7 +2963,7 @@ JamiAccount::generateDhParams() { //make sure cachePath_ is writable fileutils::check_dir(cachePath_.c_str(), 0700); - dhParams_ = ThreadPool::instance().get<tls::DhParams>(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams")); + dhParams_ = dht::ThreadPool::computation().get<tls::DhParams>(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams")); } MatchRank @@ -3353,17 +3353,19 @@ JamiAccount::igdChanged() if (not dht_.isRunning()) return; if (upnp_) { - std::thread{[s = shared(), oldPort = static_cast<in_port_t>(dhtPortUsed_)] { - auto& this_ = *s; - if (not this_.mapPortUPnP()) - JAMI_WARN("UPnP: Could not map DHT port"); - auto newPort = static_cast<in_port_t>(this_.dhtPortUsed_); - if (oldPort != newPort) { - JAMI_WARN("DHT port changed: restarting network"); - this_.doRegister_(); - } else - this_.dht_.connectivityChanged(); - }}.detach(); + dht::ThreadPool::io().run([w = weak(), oldPort = static_cast<in_port_t>(dhtPortUsed_)] { + if (auto s = w.lock()) { + auto& this_ = *s; + if (not this_.mapPortUPnP()) + JAMI_WARN("UPnP: Could not map DHT port"); + auto newPort = static_cast<in_port_t>(this_.dhtPortUsed_); + if (oldPort != newPort) { + JAMI_WARN("DHT port changed: restarting network"); + this_.doRegister_(); + } else + this_.dht_.connectivityChanged(); + } + }); } else dht_.connectivityChanged(); } diff --git a/src/jamidht/namedirectory.cpp b/src/jamidht/namedirectory.cpp index 5344e0bd1a..e392e46954 100644 --- a/src/jamidht/namedirectory.cpp +++ b/src/jamidht/namedirectory.cpp @@ -20,10 +20,10 @@ #include "logger.h" #include "string_utils.h" -#include "thread_pool.h" #include "fileutils.h" #include "base64.h" +#include <opendht/thread_pool.h> #include <opendht/crypto.h> #include <msgpack.hpp> #include <json/json.h> @@ -166,7 +166,7 @@ void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb) }).share(); // avoid blocking on future destruction - ThreadPool::instance().run([ret](){ ret.get(); }); + dht::ThreadPool::io().run([ret](){ ret.get(); }); } catch (const std::exception& e) { JAMI_ERR("Error when performing address lookup: %s", e.what()); cb("", Response::error); @@ -265,7 +265,7 @@ void NameDirectory::lookupName(const std::string& n, LookupCallback cb) }).share(); // avoid blocking on future destruction - ThreadPool::instance().run([ret](){ ret.get(); }); + dht::ThreadPool::io().run([ret](){ ret.get(); }); } catch (const std::exception& e) { JAMI_ERR("Error when performing name lookup: %s", e.what()); cb("", Response::error); @@ -367,8 +367,8 @@ void NameDirectory::registerName(const std::string& addr, const std::string& n, } }, params).share(); - // avoid blocking on future destruction - ThreadPool::instance().run([ret](){ ret.get(); }); + // avoid blocking on future destruction + dht::ThreadPool::io().run([ret](){ ret.get(); }); } catch (const std::exception& e) { JAMI_ERR("Error when performing name registration: %s", e.what()); cb(RegistrationResponse::error); diff --git a/src/manager.cpp b/src/manager.cpp index 6f5a1c6202..5d9cb51ea7 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -34,7 +34,6 @@ #include "logger.h" #include "account_schema.h" -#include "thread_pool.h" #include "fileutils.h" #include "map_utils.h" @@ -81,6 +80,8 @@ using random_device = dht::crypto::random_device; #include "data_transfer.h" +#include <opendht/thread_pool.h> + #ifndef WIN32 #include <sys/time.h> #include <sys/resource.h> @@ -823,9 +824,10 @@ Manager::finish() noexcept // Flush remaining tasks (free lambda' with capture) pimpl_->scheduler_.stop(); + dht::ThreadPool::io().join(); + dht::ThreadPool::computation().join(); pj_shutdown(); - ThreadPool::instance().join(); } catch (const VoipLinkException &err) { JAMI_ERR("%s", err.what()); } @@ -2835,7 +2837,7 @@ Manager::loadAccountMap(const YAML::Node& node) continue; } remaining++; - ThreadPool::instance().run([ + dht::ThreadPool::computation().run([ this, dir, &cv, &remaining, &lock, configFile = accountBaseDir + DIR_SEPARATOR_STR + dir + DIR_SEPARATOR_STR + "config.yml" diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp index 72627eac6e..f275fb3438 100644 --- a/src/media/media_recorder.cpp +++ b/src/media/media_recorder.cpp @@ -25,12 +25,13 @@ #include "media_io_handle.h" #include "media_recorder.h" #include "system_codec_container.h" -#include "thread_pool.h" #include "video/filter_transpose.h" #ifdef RING_ACCEL #include "video/accel.h" #endif +#include <opendht/thread_pool.h> + #include <algorithm> #include <iomanip> #include <sstream> @@ -153,7 +154,7 @@ MediaRecorder::startRecording() if (initRecord() >= 0) { isRecording_ = true; // start thread after isRecording_ is set to true - ThreadPool::instance().run([rec = shared_from_this()] { + dht::ThreadPool::computation().run([rec = shared_from_this()] { while (rec->isRecording()) { rec->filterAndEncode(rec->videoFilter_.get(), rec->videoIdx_); rec->filterAndEncode(rec->audioFilter_.get(), rec->audioIdx_); diff --git a/src/security/certstore.cpp b/src/security/certstore.cpp index 291e6afd2a..05d5efbf61 100644 --- a/src/security/certstore.cpp +++ b/src/security/certstore.cpp @@ -22,10 +22,11 @@ #include "client/ring_signal.h" -#include "thread_pool.h" #include "fileutils.h" #include "logger.h" +#include <opendht/thread_pool.h> + #include <thread> #include <sstream> @@ -202,7 +203,7 @@ readCertificates(const std::string& path, const std::string& crl_path) void CertificateStore::pinCertificatePath(const std::string& path, std::function<void(const std::vector<std::string>&)> cb) { - ThreadPool::instance().run([&, path, cb]() { + dht::ThreadPool::computation().run([&, path, cb]() { auto certs = readCertificates(path, crlPath_); std::vector<std::string> ids; std::vector<std::weak_ptr<crypto::Certificate>> scerts; diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index b2bdb8b38b..7a24820a8d 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -40,7 +40,6 @@ #include "dring/media_const.h" #include "client/ring_signal.h" #include "ice_transport.h" -#include "thread_pool.h" #ifdef ENABLE_VIDEO #include "client/videomanager.h" @@ -52,6 +51,7 @@ #include "errno.h" #include <opendht/crypto.h> +#include <opendht/thread_pool.h> namespace jami { @@ -800,7 +800,7 @@ void SIPCall::sendKeyframe() { #ifdef ENABLE_VIDEO - ThreadPool::instance().run([w = weak()] { + dht::ThreadPool::computation().run([w = weak()] { if (auto sthis = w.lock()) { JAMI_DBG("handling picture fast update request"); sthis->getVideoRtp().forceKeyFrame(); diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp deleted file mode 100644 index ead9435c6b..0000000000 --- a/src/thread_pool.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (C) 2016-2019 Savoir-faire Linux Inc. - * - * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "thread_pool.h" -#include "logger.h" - -#include <atomic> -#include <thread> - -#include <ciso646> // fix windows compiler bug - -namespace jami { - -struct ThreadPool::ThreadState -{ - std::thread thread {}; - std::atomic_bool run {true}; -}; - -ThreadPool::ThreadPool() - : maxThreads_(std::max<size_t>(std::thread::hardware_concurrency(), 4)) -{ - threads_.reserve(maxThreads_); -} - -ThreadPool::~ThreadPool() -{ - join(); -} - -void -ThreadPool::run(std::function<void()>&& cb) -{ - std::unique_lock<std::mutex> l(lock_); - - // launch new thread if necessary - if (not readyThreads_ && threads_.size() < maxThreads_) { - threads_.emplace_back(new ThreadState()); - auto& t = *threads_.back(); - t.thread = std::thread([&]() { - while (t.run) { - std::function<void()> task; - - // pick task from queue - { - std::unique_lock<std::mutex> l(lock_); - readyThreads_++; - cv_.wait(l, [&](){ - return not t.run or not tasks_.empty(); - }); - readyThreads_--; - if (not t.run) - break; - task = std::move(tasks_.front()); - tasks_.pop(); - } - - // run task - try { - if (task) - task(); - } catch (const std::exception& e) { - JAMI_ERR("Exception running task: %s", e.what()); - } - } - }); - } - - // push task to queue - tasks_.emplace(std::move(cb)); - - // notify thread - l.unlock(); - cv_.notify_one(); -} - -void -ThreadPool::join() -{ - for (auto& t : threads_) - t->run = false; - cv_.notify_all(); - for (auto& t : threads_) - t->thread.join(); - threads_.clear(); -} - -} diff --git a/src/thread_pool.h b/src/thread_pool.h deleted file mode 100644 index 701231212e..0000000000 --- a/src/thread_pool.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (C) 2016-2019 Savoir-faire Linux Inc. - * - * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <condition_variable> -#include <vector> -#include <queue> -#include <future> -#include <functional> - -namespace jami { - -class ThreadPool { -public: - static ThreadPool& instance() { - static ThreadPool pool; - return pool; - } - - ThreadPool(); - ~ThreadPool(); - - void run(std::function<void()>&& cb); - - template<class T> - std::future<T> get(std::function<T()>&& cb) { - auto ret = std::make_shared<std::promise<T>>(); - run(std::bind([=](std::function<T()>& mcb) mutable { - ret->set_value(mcb()); - }, std::move(cb))); - return ret->get_future(); - } - template<class T> - std::shared_ptr<std::future<T>> getShared(std::function<T()>&& cb) { - return std::make_shared<std::future<T>>(get(std::move(cb))); - } - - void join(); - -private: - struct ThreadState; - std::queue<std::function<void()>> tasks_ {}; - std::vector<std::unique_ptr<ThreadState>> threads_; - unsigned readyThreads_ {0}; - std::mutex lock_ {}; - std::condition_variable cv_ {}; - - const unsigned maxThreads_; -}; - -} -- GitLab