diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index a9552d8e7a4affbebb256498dd28763cb833d24f..14b3d24f1b8fd436ef5149c41f34389c37ffd948 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -45,6 +45,7 @@ namespace Json { namespace dht { class DhtRunner; +class ThreadPool; /** * Describes the REST API @@ -277,6 +278,7 @@ private: std::condition_variable schedulerCv_; Scheduler scheduler_; std::thread schedulerThread_; + std::unique_ptr<ThreadPool> threadPool_; Sp<Scheduler::Job> printStatsJob_; mutable std::mutex statsMutex_; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index ab7b7326a8625b73a1c29d41cd81e6cc39728204..2a241267fbff5e72ea7225e26bc9ec1f8fc14f2a 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -20,6 +20,7 @@ #if OPENDHT_PROXY_SERVER #include "dht_proxy_server.h" +#include "thread_pool.h" #include "default_types.h" #include "dhtrunner.h" @@ -47,9 +48,11 @@ struct DhtProxyServer::SearchPuts { }; constexpr const std::chrono::minutes PRINT_STATS_PERIOD {2}; +constexpr const size_t IO_THREADS_MAX {64}; + DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , const std::string& pushServer) -: dht_(dht) , pushServer_(pushServer) +: dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer) { if (not dht_) throw std::invalid_argument("A DHT instance must be provided"); @@ -177,6 +180,7 @@ DhtProxyServer::stop() schedulerThread_.join(); if (server_thread.joinable()) server_thread.join(); + threadPool_->stop(); } void @@ -441,9 +445,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; wbuilder["indentation"] = ""; - auto output = Json::writeString( - wbuilder, value->toJson()) + - "\n"; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; s->yield(output, [](const Sp<restbed::Session> & /*session*/) {}); return true; @@ -473,12 +475,14 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) // The listener is not found, so add it. listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) { - // Build message content. - Json::Value json; - json["key"] = infoHash.toString(); - json["to"] = clientId; - sendPushNotification(pushToken, std::move(json), isAndroid); + [this, infoHash, pushToken, isAndroid, clientId](const std::vector<std::shared_ptr<Value>>& /*values*/) { + threadPool_->run([this, infoHash, pushToken, isAndroid, clientId](){ + // Build message content + Json::Value json; + json["key"] = infoHash.toString(); + json["to"] = clientId; + sendPushNotification(pushToken, std::move(json), isAndroid); + }); return true; } );