Commit 25844a8f authored by Adrien Béraud's avatar Adrien Béraud
Browse files

use OpenDHT thread pool

Change-Id: Id6dbc59b48a61f9d27f01dff22ca93ad3eebc4f2
parent 95668855
...@@ -830,7 +830,6 @@ ...@@ -830,7 +830,6 @@
<ClCompile Include="..\src\smartools.cpp" /> <ClCompile Include="..\src\smartools.cpp" />
<ClCompile Include="..\src\string_utils.cpp" /> <ClCompile Include="..\src\string_utils.cpp" />
<ClCompile Include="..\src\threadloop.cpp" /> <ClCompile Include="..\src\threadloop.cpp" />
<ClCompile Include="..\src\thread_pool.cpp" />
<ClCompile Include="..\src\turn_transport.cpp" /> <ClCompile Include="..\src\turn_transport.cpp" />
<ClCompile Include="..\src\upnp\upnp_context.cpp" /> <ClCompile Include="..\src\upnp\upnp_context.cpp" />
<ClCompile Include="..\src\upnp\upnp_control.cpp" /> <ClCompile Include="..\src\upnp\upnp_control.cpp" />
...@@ -993,7 +992,6 @@ ...@@ -993,7 +992,6 @@
<ClInclude Include="..\src\smartools.h" /> <ClInclude Include="..\src\smartools.h" />
<ClInclude Include="..\src\string_utils.h" /> <ClInclude Include="..\src\string_utils.h" />
<ClInclude Include="..\src\threadloop.h" /> <ClInclude Include="..\src\threadloop.h" />
<ClInclude Include="..\src\thread_pool.h" />
<ClInclude Include="..\src\turn_transport.h" /> <ClInclude Include="..\src\turn_transport.h" />
<ClInclude Include="..\src\upnp\upnp_context.h" /> <ClInclude Include="..\src\upnp\upnp_context.h" />
<ClInclude Include="..\src\upnp\upnp_control.h" /> <ClInclude Include="..\src\upnp\upnp_control.h" />
......
...@@ -101,14 +101,12 @@ libring_la_SOURCES = \ ...@@ -101,14 +101,12 @@ libring_la_SOURCES = \
fileutils.cpp \ fileutils.cpp \
archiver.cpp \ archiver.cpp \
threadloop.cpp \ threadloop.cpp \
thread_pool.cpp \
ip_utils.h \ ip_utils.h \
ip_utils.cpp \ ip_utils.cpp \
utf8_utils.cpp \ utf8_utils.cpp \
ice_transport.cpp \ ice_transport.cpp \
ice_transport.h \ ice_transport.h \
threadloop.h \ threadloop.h \
thread_pool.h \
conference.h \ conference.h \
account_factory.h \ account_factory.h \
call_factory.h \ call_factory.h \
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
#include "message_engine.h" #include "message_engine.h"
#include "sip/sipaccountbase.h" #include "sip/sipaccountbase.h"
#include "manager.h" #include "manager.h"
#include "thread_pool.h"
#include "fileutils.h" #include "fileutils.h"
#include "client/ring_signal.h" #include "client/ring_signal.h"
#include "dring/account_const.h" #include "dring/account_const.h"
#include <opendht/thread_pool.h>
#include <json/json.h> #include <json/json.h>
#include <fstream> #include <fstream>
...@@ -252,7 +252,7 @@ MessageEngine::save_() const ...@@ -252,7 +252,7 @@ MessageEngine::save_() const
root[c.first] = std::move(peerRoot); root[c.first] = std::move(peerRoot);
} }
// Save asynchronously // Save asynchronously
ThreadPool::instance().run([path = savePath_, dht::ThreadPool::computation().run([path = savePath_,
root = std::move(root), root = std::move(root),
accountID = account_.getAccountID(), accountID = account_.getAccountID(),
messageNum = messages_.size()] messageNum = messages_.size()]
......
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
#include "accountarchive.h" #include "accountarchive.h"
#include "ringcontact.h" #include "ringcontact.h"
#include "configkeys.h" #include "configkeys.h"
#include "thread_pool.h"
#include "sip/sdp.h" #include "sip/sdp.h"
#include "sip/sipvoiplink.h" #include "sip/sipvoiplink.h"
...@@ -69,6 +68,7 @@ ...@@ -69,6 +68,7 @@
#include "libdevcrypto/Common.h" #include "libdevcrypto/Common.h"
#include "base64.h" #include "base64.h"
#include <opendht/thread_pool.h>
#include <yaml-cpp/yaml.h> #include <yaml-cpp/yaml.h>
#include <json/json.h> #include <json/json.h>
...@@ -1064,7 +1064,7 @@ generatePIN(size_t length = 8) ...@@ -1064,7 +1064,7 @@ generatePIN(size_t length = 8)
void void
JamiAccount::addDevice(const std::string& password) JamiAccount::addDevice(const std::string& password)
{ {
ThreadPool::instance().run([this_=shared(), password]() { dht::ThreadPool::computation().run([this_=shared(), password]() {
std::vector<uint8_t> key; std::vector<uint8_t> key;
dht::InfoHash loc; dht::InfoHash loc;
std::string pin_str; std::string pin_str;
...@@ -1137,7 +1137,7 @@ bool ...@@ -1137,7 +1137,7 @@ bool
JamiAccount::revokeDevice(const std::string& password, const std::string& device) JamiAccount::revokeDevice(const std::string& password, const std::string& device)
{ {
// shared_ptr of future // shared_ptr of future
auto fa = ThreadPool::instance().getShared<AccountArchive>( auto fa = dht::ThreadPool::computation().getShared<AccountArchive>(
[this, password] { return readArchive(password); }); [this, password] { return readArchive(password); });
findCertificate(dht::InfoHash(device), findCertificate(dht::InfoHash(device),
[this,fa=std::move(fa),password,device](const std::shared_ptr<dht::crypto::Certificate>& crt) mutable [this,fa=std::move(fa),password,device](const std::shared_ptr<dht::crypto::Certificate>& crt) mutable
...@@ -1199,7 +1199,7 @@ JamiAccount::loadAccountFromFile(const std::string& archive_path, const std::str ...@@ -1199,7 +1199,7 @@ JamiAccount::loadAccountFromFile(const std::string& archive_path, const std::str
{ {
setRegistrationState(RegistrationState::INITIALIZING); setRegistrationState(RegistrationState::INITIALIZING);
auto accountId = getAccountID(); auto accountId = getAccountID();
ThreadPool::instance().run([w=weak(), archive_password, archive_path, accountId]{ dht::ThreadPool::computation().run([w=weak(), archive_password, archive_path, accountId]{
AccountArchive archive; AccountArchive archive;
try { try {
archive = AccountArchive(archive_path, archive_password); archive = AccountArchive(archive_path, archive_password);
...@@ -1304,8 +1304,8 @@ JamiAccount::loadAccountFromDHT(const std::string& archive_password, const std:: ...@@ -1304,8 +1304,8 @@ JamiAccount::loadAccountFromDHT(const std::string& archive_password, const std::
} }
}; };
ThreadPool::instance().run(std::bind(search, true, state_old)); dht::ThreadPool::computation().run(std::bind(search, true, state_old));
ThreadPool::instance().run(std::bind(search, false, state_new)); dht::ThreadPool::computation().run(std::bind(search, false, state_new));
} }
void void
...@@ -1313,11 +1313,11 @@ JamiAccount::createAccount(const std::string& archive_password, dht::crypto::Ide ...@@ -1313,11 +1313,11 @@ JamiAccount::createAccount(const std::string& archive_password, dht::crypto::Ide
{ {
JAMI_WARN("[Account %s] creating new account", getAccountID().c_str()); JAMI_WARN("[Account %s] creating new account", getAccountID().c_str());
setRegistrationState(RegistrationState::INITIALIZING); setRegistrationState(RegistrationState::INITIALIZING);
ThreadPool::instance().run([sthis=shared(), archive_password, migrate]() mutable { dht::ThreadPool::computation().run([sthis=shared(), archive_password, migrate]() mutable {
AccountArchive a; AccountArchive a;
auto& this_ = *sthis; auto& this_ = *sthis;
auto future_keypair = ThreadPool::instance().get<dev::KeyPair>(std::bind(&dev::KeyPair::create)); auto future_keypair = dht::ThreadPool::computation().get<dev::KeyPair>(std::bind(&dev::KeyPair::create));
try { try {
if (migrate.first and migrate.second) { if (migrate.first and migrate.second) {
JAMI_WARN("[Account %s] converting certificate from old ring account %s", JAMI_WARN("[Account %s] converting certificate from old ring account %s",
...@@ -2733,7 +2733,7 @@ JamiAccount::loadTreatedMessages() ...@@ -2733,7 +2733,7 @@ JamiAccount::loadTreatedMessages()
void void
JamiAccount::saveTreatedMessages() const JamiAccount::saveTreatedMessages() const
{ {
ThreadPool::instance().run([w = weak()](){ dht::ThreadPool::io().run([w = weak()](){
if (auto sthis = w.lock()) { if (auto sthis = w.lock()) {
auto& this_ = *sthis; auto& this_ = *sthis;
std::lock_guard<std::mutex> lock(this_.messageMutex_); std::lock_guard<std::mutex> lock(this_.messageMutex_);
...@@ -2963,7 +2963,7 @@ JamiAccount::generateDhParams() ...@@ -2963,7 +2963,7 @@ JamiAccount::generateDhParams()
{ {
//make sure cachePath_ is writable //make sure cachePath_ is writable
fileutils::check_dir(cachePath_.c_str(), 0700); fileutils::check_dir(cachePath_.c_str(), 0700);
dhParams_ = ThreadPool::instance().get<tls::DhParams>(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams")); dhParams_ = dht::ThreadPool::computation().get<tls::DhParams>(std::bind(loadDhParams, cachePath_ + DIR_SEPARATOR_STR "dhParams"));
} }
MatchRank MatchRank
...@@ -3353,17 +3353,19 @@ JamiAccount::igdChanged() ...@@ -3353,17 +3353,19 @@ JamiAccount::igdChanged()
if (not dht_.isRunning()) if (not dht_.isRunning())
return; return;
if (upnp_) { if (upnp_) {
std::thread{[s = shared(), oldPort = static_cast<in_port_t>(dhtPortUsed_)] { dht::ThreadPool::io().run([w = weak(), oldPort = static_cast<in_port_t>(dhtPortUsed_)] {
auto& this_ = *s; if (auto s = w.lock()) {
if (not this_.mapPortUPnP()) auto& this_ = *s;
JAMI_WARN("UPnP: Could not map DHT port"); if (not this_.mapPortUPnP())
auto newPort = static_cast<in_port_t>(this_.dhtPortUsed_); JAMI_WARN("UPnP: Could not map DHT port");
if (oldPort != newPort) { auto newPort = static_cast<in_port_t>(this_.dhtPortUsed_);
JAMI_WARN("DHT port changed: restarting network"); if (oldPort != newPort) {
this_.doRegister_(); JAMI_WARN("DHT port changed: restarting network");
} else this_.doRegister_();
this_.dht_.connectivityChanged(); } else
}}.detach(); this_.dht_.connectivityChanged();
}
});
} else } else
dht_.connectivityChanged(); dht_.connectivityChanged();
} }
......
...@@ -20,10 +20,10 @@ ...@@ -20,10 +20,10 @@
#include "logger.h" #include "logger.h"
#include "string_utils.h" #include "string_utils.h"
#include "thread_pool.h"
#include "fileutils.h" #include "fileutils.h"
#include "base64.h" #include "base64.h"
#include <opendht/thread_pool.h>
#include <opendht/crypto.h> #include <opendht/crypto.h>
#include <msgpack.hpp> #include <msgpack.hpp>
#include <json/json.h> #include <json/json.h>
...@@ -166,7 +166,7 @@ void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb) ...@@ -166,7 +166,7 @@ void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb)
}).share(); }).share();
// avoid blocking on future destruction // avoid blocking on future destruction
ThreadPool::instance().run([ret](){ ret.get(); }); dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) { } catch (const std::exception& e) {
JAMI_ERR("Error when performing address lookup: %s", e.what()); JAMI_ERR("Error when performing address lookup: %s", e.what());
cb("", Response::error); cb("", Response::error);
...@@ -265,7 +265,7 @@ void NameDirectory::lookupName(const std::string& n, LookupCallback cb) ...@@ -265,7 +265,7 @@ void NameDirectory::lookupName(const std::string& n, LookupCallback cb)
}).share(); }).share();
// avoid blocking on future destruction // avoid blocking on future destruction
ThreadPool::instance().run([ret](){ ret.get(); }); dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) { } catch (const std::exception& e) {
JAMI_ERR("Error when performing name lookup: %s", e.what()); JAMI_ERR("Error when performing name lookup: %s", e.what());
cb("", Response::error); cb("", Response::error);
...@@ -367,8 +367,8 @@ void NameDirectory::registerName(const std::string& addr, const std::string& n, ...@@ -367,8 +367,8 @@ void NameDirectory::registerName(const std::string& addr, const std::string& n,
} }
}, params).share(); }, params).share();
// avoid blocking on future destruction // avoid blocking on future destruction
ThreadPool::instance().run([ret](){ ret.get(); }); dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) { } catch (const std::exception& e) {
JAMI_ERR("Error when performing name registration: %s", e.what()); JAMI_ERR("Error when performing name registration: %s", e.what());
cb(RegistrationResponse::error); cb(RegistrationResponse::error);
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#include "logger.h" #include "logger.h"
#include "account_schema.h" #include "account_schema.h"
#include "thread_pool.h"
#include "fileutils.h" #include "fileutils.h"
#include "map_utils.h" #include "map_utils.h"
...@@ -81,6 +80,8 @@ using random_device = dht::crypto::random_device; ...@@ -81,6 +80,8 @@ using random_device = dht::crypto::random_device;
#include "data_transfer.h" #include "data_transfer.h"
#include <opendht/thread_pool.h>
#ifndef WIN32 #ifndef WIN32
#include <sys/time.h> #include <sys/time.h>
#include <sys/resource.h> #include <sys/resource.h>
...@@ -823,9 +824,10 @@ Manager::finish() noexcept ...@@ -823,9 +824,10 @@ Manager::finish() noexcept
// Flush remaining tasks (free lambda' with capture) // Flush remaining tasks (free lambda' with capture)
pimpl_->scheduler_.stop(); pimpl_->scheduler_.stop();
dht::ThreadPool::io().join();
dht::ThreadPool::computation().join();
pj_shutdown(); pj_shutdown();
ThreadPool::instance().join();
} catch (const VoipLinkException &err) { } catch (const VoipLinkException &err) {
JAMI_ERR("%s", err.what()); JAMI_ERR("%s", err.what());
} }
...@@ -2835,7 +2837,7 @@ Manager::loadAccountMap(const YAML::Node& node) ...@@ -2835,7 +2837,7 @@ Manager::loadAccountMap(const YAML::Node& node)
continue; continue;
} }
remaining++; remaining++;
ThreadPool::instance().run([ dht::ThreadPool::computation().run([
this, dir, this, dir,
&cv, &remaining, &lock, &cv, &remaining, &lock,
configFile = accountBaseDir + DIR_SEPARATOR_STR + dir + DIR_SEPARATOR_STR + "config.yml" configFile = accountBaseDir + DIR_SEPARATOR_STR + dir + DIR_SEPARATOR_STR + "config.yml"
......
...@@ -25,12 +25,13 @@ ...@@ -25,12 +25,13 @@
#include "media_io_handle.h" #include "media_io_handle.h"
#include "media_recorder.h" #include "media_recorder.h"
#include "system_codec_container.h" #include "system_codec_container.h"
#include "thread_pool.h"
#include "video/filter_transpose.h" #include "video/filter_transpose.h"
#ifdef RING_ACCEL #ifdef RING_ACCEL
#include "video/accel.h" #include "video/accel.h"
#endif #endif
#include <opendht/thread_pool.h>
#include <algorithm> #include <algorithm>
#include <iomanip> #include <iomanip>
#include <sstream> #include <sstream>
...@@ -153,7 +154,7 @@ MediaRecorder::startRecording() ...@@ -153,7 +154,7 @@ MediaRecorder::startRecording()
if (initRecord() >= 0) { if (initRecord() >= 0) {
isRecording_ = true; isRecording_ = true;
// start thread after isRecording_ is set to true // start thread after isRecording_ is set to true
ThreadPool::instance().run([rec = shared_from_this()] { dht::ThreadPool::computation().run([rec = shared_from_this()] {
while (rec->isRecording()) { while (rec->isRecording()) {
rec->filterAndEncode(rec->videoFilter_.get(), rec->videoIdx_); rec->filterAndEncode(rec->videoFilter_.get(), rec->videoIdx_);
rec->filterAndEncode(rec->audioFilter_.get(), rec->audioIdx_); rec->filterAndEncode(rec->audioFilter_.get(), rec->audioIdx_);
......
...@@ -22,10 +22,11 @@ ...@@ -22,10 +22,11 @@
#include "client/ring_signal.h" #include "client/ring_signal.h"
#include "thread_pool.h"
#include "fileutils.h" #include "fileutils.h"
#include "logger.h" #include "logger.h"
#include <opendht/thread_pool.h>
#include <thread> #include <thread>
#include <sstream> #include <sstream>
...@@ -202,7 +203,7 @@ readCertificates(const std::string& path, const std::string& crl_path) ...@@ -202,7 +203,7 @@ readCertificates(const std::string& path, const std::string& crl_path)
void void
CertificateStore::pinCertificatePath(const std::string& path, std::function<void(const std::vector<std::string>&)> cb) CertificateStore::pinCertificatePath(const std::string& path, std::function<void(const std::vector<std::string>&)> cb)
{ {
ThreadPool::instance().run([&, path, cb]() { dht::ThreadPool::computation().run([&, path, cb]() {
auto certs = readCertificates(path, crlPath_); auto certs = readCertificates(path, crlPath_);
std::vector<std::string> ids; std::vector<std::string> ids;
std::vector<std::weak_ptr<crypto::Certificate>> scerts; std::vector<std::weak_ptr<crypto::Certificate>> scerts;
......
...@@ -40,7 +40,6 @@ ...@@ -40,7 +40,6 @@
#include "dring/media_const.h" #include "dring/media_const.h"
#include "client/ring_signal.h" #include "client/ring_signal.h"
#include "ice_transport.h" #include "ice_transport.h"
#include "thread_pool.h"
#ifdef ENABLE_VIDEO #ifdef ENABLE_VIDEO
#include "client/videomanager.h" #include "client/videomanager.h"
...@@ -52,6 +51,7 @@ ...@@ -52,6 +51,7 @@
#include "errno.h" #include "errno.h"
#include <opendht/crypto.h> #include <opendht/crypto.h>
#include <opendht/thread_pool.h>
namespace jami { namespace jami {
...@@ -800,7 +800,7 @@ void ...@@ -800,7 +800,7 @@ void
SIPCall::sendKeyframe() SIPCall::sendKeyframe()
{ {
#ifdef ENABLE_VIDEO #ifdef ENABLE_VIDEO
ThreadPool::instance().run([w = weak()] { dht::ThreadPool::computation().run([w = weak()] {
if (auto sthis = w.lock()) { if (auto sthis = w.lock()) {
JAMI_DBG("handling picture fast update request"); JAMI_DBG("handling picture fast update request");
sthis->getVideoRtp().forceKeyFrame(); sthis->getVideoRtp().forceKeyFrame();
......
/*
* Copyright (C) 2016-2019 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "thread_pool.h"
#include "logger.h"
#include <atomic>
#include <thread>
#include <ciso646> // fix windows compiler bug
namespace jami {
struct ThreadPool::ThreadState
{
std::thread thread {};
std::atomic_bool run {true};
};
ThreadPool::ThreadPool()
: maxThreads_(std::max<size_t>(std::thread::hardware_concurrency(), 4))
{
threads_.reserve(maxThreads_);
}
ThreadPool::~ThreadPool()
{
join();
}
void
ThreadPool::run(std::function<void()>&& cb)
{
std::unique_lock<std::mutex> l(lock_);
// launch new thread if necessary
if (not readyThreads_ && threads_.size() < maxThreads_) {
threads_.emplace_back(new ThreadState());
auto& t = *threads_.back();
t.thread = std::thread([&]() {
while (t.run) {
std::function<void()> task;
// pick task from queue
{
std::unique_lock<std::mutex> l(lock_);
readyThreads_++;
cv_.wait(l, [&](){
return not t.run or not tasks_.empty();
});
readyThreads_--;
if (not t.run)
break;
task = std::move(tasks_.front());
tasks_.pop();
}
// run task
try {
if (task)
task();
} catch (const std::exception& e) {
JAMI_ERR("Exception running task: %s", e.what());
}
}
});
}
// push task to queue
tasks_.emplace(std::move(cb));
// notify thread
l.unlock();
cv_.notify_one();
}
void
ThreadPool::join()
{
for (auto& t : threads_)
t->run = false;
cv_.notify_all();
for (auto& t : threads_)
t->thread.join();
threads_.clear();
}
}
/*
* Copyright (C) 2016-2019 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <condition_variable>
#include <vector>
#include <queue>
#include <future>
#include <functional>
namespace jami {
class ThreadPool {
public:
static ThreadPool& instance() {
static ThreadPool pool;
return pool;