Skip to content
Snippets Groups Projects
Commit cb349d9c authored by Seva's avatar Seva
Browse files

dhtproxy: stop server eventloop on cmd

parent 6266f88c
No related branches found
No related tags found
No related merge requests found
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
class opendht_logger_t class opendht_logger_t
{ {
// TODO control dynamicaly from node
public: public:
opendht_logger_t(std::shared_ptr<dht::Logger> logger = nullptr){ opendht_logger_t(std::shared_ptr<dht::Logger> logger = nullptr){
if (logger) if (logger)
...@@ -80,6 +81,7 @@ using RestRouterTraits = restinio::traits_t< ...@@ -80,6 +81,7 @@ using RestRouterTraits = restinio::traits_t<
restinio::asio_timer_manager_t, restinio::asio_timer_manager_t,
opendht_logger_t, opendht_logger_t,
RestRouter>; RestRouter>;
using ServerSettings = restinio::run_on_thread_pool_settings_t<RestRouterTraits>;
using RequestStatus = restinio::request_handling_status_t; using RequestStatus = restinio::request_handling_status_t;
using ResponseByParts = restinio::chunked_output_t; using ResponseByParts = restinio::chunked_output_t;
using ResponseByPartsBuilder = restinio::response_builder_t<ResponseByParts>; using ResponseByPartsBuilder = restinio::response_builder_t<ResponseByParts>;
...@@ -334,10 +336,12 @@ private: ...@@ -334,10 +336,12 @@ private:
using clock = std::chrono::steady_clock; using clock = std::chrono::steady_clock;
using time_point = clock::time_point; using time_point = clock::time_point;
std::thread server_thread {};
std::shared_ptr<DhtRunner> dht_; std::shared_ptr<DhtRunner> dht_;
Json::StreamWriterBuilder jsonBuilder_; Json::StreamWriterBuilder jsonBuilder_;
restinio::http_server_t<RestRouterTraits> httpServer_;
std::thread httpServerThread_ {};
std::mutex schedulerLock_; std::mutex schedulerLock_;
std::condition_variable schedulerCv_; std::condition_variable schedulerCv_;
Scheduler scheduler_; Scheduler scheduler_;
......
...@@ -52,7 +52,8 @@ constexpr const size_t IO_THREADS_MAX {64}; ...@@ -52,7 +52,8 @@ constexpr const size_t IO_THREADS_MAX {64};
DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,const std::string& pushServer, std::shared_ptr<dht::Logger> logger) DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,const std::string& pushServer, std::shared_ptr<dht::Logger> logger)
: dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer) : dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer),
httpServer_(restinio::own_io_context(), []( auto & settings ){})
{ {
if (not dht_) if (not dht_)
throw std::invalid_argument("A DHT instance must be provided"); throw std::invalid_argument("A DHT instance must be provided");
...@@ -69,11 +70,11 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c ...@@ -69,11 +70,11 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c
jsonBuilder_["commentStyle"] = "None"; jsonBuilder_["commentStyle"] = "None";
jsonBuilder_["indentation"] = ""; jsonBuilder_["indentation"] = "";
server_thread = std::thread([this, port, logger](){ httpServerThread_ = std::thread([this, port, logger](){
using namespace std::chrono; using namespace std::chrono;
auto maxThreads = std::thread::hardware_concurrency() - 1; auto maxThreads = std::thread::hardware_concurrency() - 1;
auto restThreads = maxThreads > 1 ? maxThreads : 1; auto restThreads = maxThreads > 1 ? maxThreads : 1;
auto settings = restinio::on_thread_pool<RestRouterTraits>(restThreads); auto settings = ServerSettings(restThreads);
settings.logger(logger); settings.logger(logger);
settings.port(port); settings.port(port);
settings.protocol(restinio::asio_ns::ip::tcp::v6()); settings.protocol(restinio::asio_ns::ip::tcp::v6());
...@@ -87,18 +88,25 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c ...@@ -87,18 +88,25 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c
settings.socket_options_setter([](auto & options){ settings.socket_options_setter([](auto & options){
options.set_option(asio::ip::tcp::no_delay{true}); options.set_option(asio::ip::tcp::no_delay{true});
}); });
httpServer_ = restinio::http_server_t<RestRouterTraits>(
restinio::own_io_context(), // requirement: each thread has its own
std::forward<ServerSettings>(settings)
);
restinio::asio_ns::post(httpServer_.io_context(), [&]{
httpServer_.open_sync();
});
try { try {
restinio::run(std::move(settings)); httpServer_.io_context().run();
} }
catch(const std::exception &ex){ catch(const std::exception &ex){
std::cerr << "Error starting RESTinio: " << ex.what() << std::endl; std::cerr << "Error starting RESTinio: " << ex.what() << std::endl;
} }
}); });
listenThread_ = std::thread([this](){ listenThread_ = std::thread([this](){
while (not server_thread.joinable() and not stopListeners){ while (not httpServerThread_.joinable() and not stopListeners){
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
while (server_thread.joinable() and not stopListeners){ while (httpServerThread_.joinable() and not stopListeners){
removeClosedListeners(); removeClosedListeners();
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
...@@ -106,10 +114,10 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c ...@@ -106,10 +114,10 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c
removeClosedListeners(false); removeClosedListeners(false);
}); });
schedulerThread_ = std::thread([this](){ schedulerThread_ = std::thread([this](){
while (not server_thread.joinable() and not stopListeners){ while (not httpServerThread_.joinable() and not stopListeners){
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
while (server_thread.joinable() and not stopListeners){ while (httpServerThread_.joinable() and not stopListeners){
std::unique_lock<std::mutex> lock(schedulerLock_); std::unique_lock<std::mutex> lock(schedulerLock_);
auto next = scheduler_.run(); auto next = scheduler_.run();
if (next == time_point::max()) if (next == time_point::max())
...@@ -122,7 +130,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c ...@@ -122,7 +130,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,c
printStatsJob_ = scheduler_.add(scheduler_.time() + PRINT_STATS_PERIOD, [this] { printStatsJob_ = scheduler_.add(scheduler_.time() + PRINT_STATS_PERIOD, [this] {
if (stopListeners) if (stopListeners)
return; return;
if (server_thread.joinable()) if (httpServerThread_.joinable())
updateStats(); updateStats();
// Refresh stats cache // Refresh stats cache
auto newInfo = dht_->getNodeInfo(); auto newInfo = dht_->getNodeInfo();
...@@ -152,13 +160,19 @@ DhtProxyServer::stop() ...@@ -152,13 +160,19 @@ DhtProxyServer::stop()
} }
stopListeners = true; stopListeners = true;
schedulerCv_.notify_all(); schedulerCv_.notify_all();
// stop its own io_context loop
httpServer_.close_sync();
// listenThreads_ will stop because there is no more sessions // listenThreads_ will stop because there is no more sessions
if (listenThread_.joinable()) if (listenThread_.joinable())
listenThread_.join(); listenThread_.join();
if (schedulerThread_.joinable()) if (schedulerThread_.joinable())
schedulerThread_.join(); schedulerThread_.join();
if (server_thread.joinable())
server_thread.join(); if (httpServerThread_.joinable())
httpServerThread_.join();
threadPool_->stop(); threadPool_->stop();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment