diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index 618bffa5aead8ef2267d1f2e9674651c3d33b408..8ff176db7ade12fa1c3b46588028f7667be6533e 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -41,6 +41,7 @@ class opendht_logger_t { + // TODO control dynamicaly from node public: opendht_logger_t(std::shared_ptr<dht::Logger> logger = nullptr){ if (logger) @@ -80,6 +81,7 @@ using RestRouterTraits = restinio::traits_t< restinio::asio_timer_manager_t, opendht_logger_t, RestRouter>; +using ServerSettings = restinio::run_on_thread_pool_settings_t<RestRouterTraits>; using RequestStatus = restinio::request_handling_status_t; using ResponseByParts = restinio::chunked_output_t; using ResponseByPartsBuilder = restinio::response_builder_t<ResponseByParts>; @@ -334,10 +336,12 @@ private: using clock = std::chrono::steady_clock; using time_point = clock::time_point; - std::thread server_thread {}; std::shared_ptr<DhtRunner> dht_; Json::StreamWriterBuilder jsonBuilder_; + restinio::http_server_t<RestRouterTraits> httpServer_; + std::thread httpServerThread_ {}; + std::mutex schedulerLock_; std::condition_variable schedulerCv_; Scheduler scheduler_; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 9e36e28586b0d6b8170694f5f406b318b28e8349..9215dbe5d7b80a3362e64d3e1e9dbb93abc08d14 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -52,7 +52,8 @@ constexpr const size_t IO_THREADS_MAX {64}; DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,const std::string& pushServer, std::shared_ptr<dht::Logger> logger) -: dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer) +: dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer), + httpServer_(restinio::own_io_context(), []( auto & settings ){}) { if (not dht_) throw std::invalid_argument("A DHT instance must be provided"); @@ -69,11 +70,11 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c jsonBuilder_["commentStyle"] = "None"; jsonBuilder_["indentation"] = ""; - server_thread = std::thread([this, port, logger](){ + httpServerThread_ = std::thread([this, port, logger](){ using namespace std::chrono; auto maxThreads = std::thread::hardware_concurrency() - 1; auto restThreads = maxThreads > 1 ? maxThreads : 1; - auto settings = restinio::on_thread_pool<RestRouterTraits>(restThreads); + auto settings = ServerSettings(restThreads); settings.logger(logger); settings.port(port); settings.protocol(restinio::asio_ns::ip::tcp::v6()); @@ -87,18 +88,25 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c settings.socket_options_setter([](auto & options){ options.set_option(asio::ip::tcp::no_delay{true}); }); + httpServer_ = restinio::http_server_t<RestRouterTraits>( + restinio::own_io_context(), // requirement: each thread has its own + std::forward<ServerSettings>(settings) + ); + restinio::asio_ns::post(httpServer_.io_context(), [&]{ + httpServer_.open_sync(); + }); try { - restinio::run(std::move(settings)); + httpServer_.io_context().run(); } catch(const std::exception &ex){ std::cerr << "Error starting RESTinio: " << ex.what() << std::endl; } }); listenThread_ = std::thread([this](){ - while (not server_thread.joinable() and not stopListeners){ + while (not httpServerThread_.joinable() and not stopListeners){ std::this_thread::sleep_for(std::chrono::seconds(1)); } - while (server_thread.joinable() and not stopListeners){ + while (httpServerThread_.joinable() and not stopListeners){ removeClosedListeners(); std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -106,10 +114,10 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c removeClosedListeners(false); }); schedulerThread_ = std::thread([this](){ - while (not server_thread.joinable() and not stopListeners){ + while (not httpServerThread_.joinable() and not stopListeners){ std::this_thread::sleep_for(std::chrono::seconds(1)); } - while (server_thread.joinable() and not stopListeners){ + while (httpServerThread_.joinable() and not stopListeners){ std::unique_lock<std::mutex> lock(schedulerLock_); auto next = scheduler_.run(); if (next == time_point::max()) @@ -122,7 +130,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c printStatsJob_ = scheduler_.add(scheduler_.time() + PRINT_STATS_PERIOD, [this] { if (stopListeners) return; - if (server_thread.joinable()) + if (httpServerThread_.joinable()) updateStats(); // Refresh stats cache auto newInfo = dht_->getNodeInfo(); @@ -152,13 +160,19 @@ DhtProxyServer::stop() } stopListeners = true; schedulerCv_.notify_all(); + + // stop its own io_context loop + httpServer_.close_sync(); + // listenThreads_ will stop because there is no more sessions if (listenThread_.joinable()) listenThread_.join(); if (schedulerThread_.joinable()) schedulerThread_.join(); - if (server_thread.joinable()) - server_thread.join(); + + if (httpServerThread_.joinable()) + httpServerThread_.join(); + threadPool_->stop(); }