diff --git a/CMakeLists.txt b/CMakeLists.txt index f98910f52a4cd3fdb82f7503e0ebd22077e2e02f..ed0a7685307a072ff1f91669a26da84574f57d5f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -184,40 +184,34 @@ if (OPENDHT_INDEX) endif() if (OPENDHT_PROXY_SERVER) - add_definitions(-DOPENDHT_PROXY_SERVER=true) + add_definitions(-DOPENDHT_PROXY_SERVER) if (OPENDHT_PROXY_SERVER_IDENTITY) - add_definitions(-DOPENDHT_PROXY_SERVER_IDENTITY=true) - else () - add_definitions(-DOPENDHT_PROXY_SERVER_IDENTITY=false) + add_definitions(-DOPENDHT_PROXY_SERVER_IDENTITY) endif() list (APPEND opendht_HEADERS include/opendht/dht_proxy_server.h ) list (APPEND opendht_SOURCES + src/thread_pool.h + src/thread_pool.cpp src/dht_proxy_server.cpp ) -else () - add_definitions(-DENABLE_PROXY_SERVER=false) endif () if (OPENDHT_PROXY_CLIENT) - add_definitions(-DOPENDHT_PROXY_CLIENT=true) + add_definitions(-DOPENDHT_PROXY_CLIENT) list (APPEND opendht_HEADERS include/opendht/dht_proxy_client.h ) list (APPEND opendht_SOURCES src/dht_proxy_client.cpp ) -else () - add_definitions(-DOPENDHT_PROXY_CLIENT=false) endif () if (OPENDHT_PROXY_SERVER OR OPENDHT_PROXY_CLIENT) if (OPENDHT_PUSH_NOTIFICATIONS) message("Using push notification") - add_definitions(-DOPENDHT_PUSH_NOTIFICATIONS=true) - else () - add_definitions(-DOPENDHT_PUSH_NOTIFICATIONS=false) + add_definitions(-DOPENDHT_PUSH_NOTIFICATIONS) endif () list (APPEND opendht_HEADERS include/opendht/proxy.h diff --git a/configure.ac b/configure.ac index 4a8776cc4ecfd459aac0fa927efa05e1aecb7bcf..b653bf9834e5d69055c9ad6509cabf87ee7ca70c 100644 --- a/configure.ac +++ b/configure.ac @@ -177,21 +177,10 @@ AM_COND_IF([ENABLE_TOOLS], [ ]) ]) -AM_COND_IF(ENABLE_PROXY_SERVER, - [CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=true"], - [CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=false"]) - -AM_COND_IF(ENABLE_PROXY_CLIENT, - [CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT=true"], - [CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT=false"]) - -AM_COND_IF(ENABLE_PUSH_NOTIFICATIONS, - [CPPFLAGS+=" -DOPENDHT_PUSH_NOTIFICATIONS=true"], - [CPPFLAGS+=" -DOPENDHT_PUSH_NOTIFICATIONS=false"]) - -AM_COND_IF(ENABLE_PROXY_SERVER_IDENTITY, - [CPPFLAGS+=" -DOPENDHT_PROXY_SERVER_IDENTITY=true"], - [CPPFLAGS+=" -DOPENDHT_PROXY_SERVER_IDENTITY=false"]) +AM_COND_IF(ENABLE_PROXY_SERVER, [CPPFLAGS+=" -DOPENDHT_PROXY_SERVER"], []) +AM_COND_IF(ENABLE_PROXY_CLIENT, [CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT"], []) +AM_COND_IF(ENABLE_PUSH_NOTIFICATIONS, [CPPFLAGS+=" -DOPENDHT_PUSH_NOTIFICATIONS"], []) +AM_COND_IF(ENABLE_PROXY_SERVER_IDENTITY, [CPPFLAGS+=" -DOPENDHT_PROXY_SERVER_IDENTITY"], []) AM_COND_IF([HAVE_DOXYGEN], [ AC_CONFIG_FILES([doc/Doxyfile doc/Makefile]) diff --git a/include/opendht.h b/include/opendht.h index 8d0154d385cd8251dd6ae68fd49204349d2c40d5..2c4b60fbc9971981d3fccb3db4f37e74d89fba19 100644 --- a/include/opendht.h +++ b/include/opendht.h @@ -19,7 +19,7 @@ #pragma once #include "opendht/dhtrunner.h" -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER #include "opendht/dht_proxy_server.h" #endif #include "opendht/log.h" diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 972167ca4769d59f9dd2f39faaaf597b6a5db204..2dae2a68a7f8ab71ab30bd04483e6af7948d2002 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -17,8 +17,6 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#if OPENDHT_PROXY_CLIENT - #pragma once #include <functional> @@ -49,7 +47,7 @@ public: explicit DhtProxyClient(std::function<void()> loopSignal, const std::string& serverHost, const std::string& pushClientId = ""); virtual void setPushNotificationToken(const std::string& token) { -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS deviceKey_ = token; #endif } @@ -376,7 +374,7 @@ private: const std::function<void()> loopSignal_; -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS void fillBody(std::shared_ptr<restbed::Request> request, bool resubscribe); void getPushRequest(Json::Value&) const; #endif // OPENDHT_PUSH_NOTIFICATIONS @@ -385,5 +383,3 @@ private: }; } - -#endif // OPENDHT_PROXY_CLIENT diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index 56f0e7fc256c8ab57dc619ef8e7a4c7eefefe7b7..8e1f8be2544ed326130d14b304b8f1a3ca7387bf 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -17,8 +17,6 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#if OPENDHT_PROXY_SERVER - #pragma once #include "callbacks.h" @@ -45,6 +43,7 @@ namespace Json { namespace dht { class DhtRunner; +class ThreadPool; /** * Describes the REST API @@ -178,7 +177,7 @@ private: void cancelPut(const InfoHash& key, Value::Id vid); -#if OPENDHT_PROXY_SERVER_IDENTITY +#ifdef OPENDHT_PROXY_SERVER_IDENTITY /** * Put a value to sign by the proxy on the DHT * Method: SIGN "/{InfoHash: .*}" @@ -229,7 +228,7 @@ private: */ void removeClosedListeners(bool testSession = true); -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS /** * Subscribe to push notifications for an iOS or Android device. * Method: SUBSCRIBE "/{InfoHash: .*}" @@ -253,7 +252,7 @@ private: * @param key of the device * @param json, the content to send */ - void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const; + void sendPushNotification(const std::string& key, Json::Value&& json, bool isAndroid) const; /** * Remove a push listener between a client and a hash @@ -272,11 +271,13 @@ private: std::thread server_thread {}; std::unique_ptr<restbed::Service> service_; std::shared_ptr<DhtRunner> dht_; + Json::StreamWriterBuilder jsonBuilder_; std::mutex schedulerLock_; std::condition_variable schedulerCv_; Scheduler scheduler_; std::thread schedulerThread_; + std::unique_ptr<ThreadPool> threadPool_; Sp<Scheduler::Job> printStatsJob_; mutable std::mutex statsMutex_; @@ -305,7 +306,7 @@ private: mutable ServerStats stats_; -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS struct Listener; struct PushListener; std::mutex lockPushListeners_; @@ -315,5 +316,3 @@ private: }; } - -#endif //OPENDHT_PROXY_SERVER diff --git a/src/Makefile.am b/src/Makefile.am index 2a6ce27ac9d8e08e8100b52af87a2b70b19a5f50..aee033e5c09175459fd269db0e6f19d6128b18df 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -56,7 +56,7 @@ nobase_include_HEADERS = \ ../include/opendht/rng.h if ENABLE_PROXY_SERVER -libopendht_la_SOURCES += dht_proxy_server.cpp +libopendht_la_SOURCES += dht_proxy_server.cpp thread_pool.h thread_pool.cpp nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h endif diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 38eaf72a97d62393e33a9d3b615096e452c42fc0..9a31f77c7ff50ab48985ab7209b22b9f30804b10 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -17,8 +17,6 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#if OPENDHT_PROXY_CLIENT - #include "dht_proxy_client.h" #include "dhtrunner.h" @@ -45,8 +43,7 @@ struct DhtProxyClient::ListenState { struct DhtProxyClient::Listener { - ValueCache cache; - Sp<Scheduler::Job> cacheExpirationJob {}; + OpValueCache cache; ValueCallback cb; Value::Filter filter; Sp<restbed::Request> req; @@ -54,23 +51,21 @@ struct DhtProxyClient::Listener unsigned callbackId; Sp<ListenState> state; Sp<Scheduler::Job> refreshJob; - Listener(ValueCache&& c, Sp<Scheduler::Job>&& j, const Sp<restbed::Request>& r, Value::Filter&& f) - : cache(std::move(c)), - cacheExpirationJob(std::move(j)), - filter(std::move(f)), - req(r) - {} + Listener(OpValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f) + : cache(std::move(c)), filter(std::move(f)),req(r) {} }; struct PermanentPut { Sp<Value> value; Sp<Scheduler::Job> refreshJob; Sp<std::atomic_bool> ok; + PermanentPut(const Sp<Value>& v, Sp<Scheduler::Job>&& j, const Sp<std::atomic_bool>& o) + : value(v), refreshJob(std::move(j)), ok(o) {} }; struct DhtProxyClient::ProxySearch { SearchCache ops {}; - Sp<Scheduler::Job> opExpirationJob; + Sp<Scheduler::Job> opExpirationJob {}; std::map<size_t, Listener> listeners {}; std::map<Value::Id, PermanentPut> puts {}; }; @@ -338,24 +333,26 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po if (permanent) { std::lock_guard<std::mutex> lock(searchLock_); auto id = val->id; - auto search = searches_.emplace(key, ProxySearch{}).first; + auto& search = searches_[key]; auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN; auto ok = std::make_shared<std::atomic_bool>(false); - search->second.puts.erase(id); - search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id, ok]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s != searches_.end()) { - auto p = s->second.puts.find(id); - if (p != s->second.puts.end()) { - doPut(key, p->second.value, - [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){ - *ok = result; - }, time_point::max(), true); - scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); + search.puts.erase(id); + search.puts.emplace(std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple(val, scheduler.add(nextRefresh, [this, key, id, ok]{ + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s != searches_.end()) { + auto p = s->second.puts.find(id); + if (p != s->second.puts.end()) { + doPut(key, p->second.value, + [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){ + *ok = result; + }, time_point::max(), true); + scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); + } } - } - }), ok}); + }), ok)); } doPut(key, val, std::move(cb), created, permanent); } @@ -373,7 +370,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ if (deviceKey_.empty()) { json["permanent"] = true; } else { -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS Json::Value refresh; getPushRequest(refresh); json["permanent"] = refresh; @@ -663,12 +660,9 @@ DhtProxyClient::getPublicAddress(sa_family_t family) size_t DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) { DHT_LOG.d(key, "[search %s]: listen", key.to_c_str()); - auto it = searches_.find(key); - if (it == searches_.end()) { - it = searches_.emplace(key, ProxySearch{}).first; - } + auto& search = searches_[key]; auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback scb) -> size_t { + auto token = search.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback /*scb*/) -> size_t { scheduler.syncTime(); restbed::Uri uri(serverHost_ + "/" + key.toString()); std::lock_guard<std::mutex> lock(searchLock_); @@ -684,21 +678,9 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt auto l = search->second.listeners.find(token); if (l == search->second.listeners.end()) { auto f = filter; - l = search->second.listeners.emplace(token, Listener { - ValueCache(cb, std::move(scb)), scheduler.add(time_point::max(), [this, key, token]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return; - } - auto next = l->second.cache.expireValues(scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - }), req, std::move(f) - }).first; + l = search->second.listeners.emplace(std::piecewise_construct, + std::forward_as_tuple(token), + std::forward_as_tuple(std::move(cb), req, std::move(f))).first; } else { if (l->second.state) l->second.state->cancel = true; @@ -718,17 +700,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt if (l == s->second.listeners.end()) { return false; } - const std::vector<Sp<Value>> new_values_empty; - std::vector<Value::Id> expired_ids; - if (expired) { - expired_ids.reserve(values.size()); - for (const auto& v : values) - expired_ids.emplace_back(v->id); - } - auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - loopSignal_(); - return true; + return l->second.cache.onValue(values, expired); }; auto vcb = l->second.cb; @@ -751,8 +723,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt } l->second.thread = std::thread([this, req, vcb, filter, state]() { sendListen(req, vcb, filter, state, - deviceKey_.empty() ? ListenMethod::LISTEN - : ListenMethod::SUBSCRIBE); + deviceKey_.empty() ? ListenMethod::LISTEN : ListenMethod::SUBSCRIBE); }); return token; }); @@ -791,23 +762,23 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, const Value::Filter &filter, const Sp<ListenState> &state, ListenMethod method) { - auto settings = std::make_shared<restbed::Settings>(); - if (method != ListenMethod::LISTEN) { - req->set_method("SUBSCRIBE"); - } else { - std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); - settings->set_connection_timeout( - timeout); // Avoid the client to close the socket after 5 seconds. - req->set_method("LISTEN"); - } - try { -#if OPENDHT_PUSH_NOTIFICATIONS - if (method != ListenMethod::LISTEN) - fillBody(req, method == ListenMethod::RESUBSCRIBE); -#endif + auto settings = std::make_shared<restbed::Settings>(); + if (method != ListenMethod::LISTEN) { + req->set_method("SUBSCRIBE"); + } else { + std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + req->set_method("LISTEN"); + } + try { +#ifdef OPENDHT_PUSH_NOTIFICATIONS + if (method != ListenMethod::LISTEN) + fillBody(req, method == ListenMethod::RESUBSCRIBE); + #endif restbed::Http::async(req, - [this, filter, cb, state](const std::shared_ptr<restbed::Request>& req, - const std::shared_ptr<restbed::Response>& reply) { + [this, filter, cb, state](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) + { auto code = reply->get_status_code(); if (code == 200) { try { @@ -994,7 +965,7 @@ DhtProxyClient::restartListeners() void DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string>& notification) { -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS scheduler.syncTime(); { // If a push notification is received, the proxy is up and running @@ -1021,20 +992,45 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string resubscribe(key, list.second); } } else { - auto keyidx = notification.find("key"); - InfoHash key(keyidx->second); - - for (auto& search: searches_) { - for (auto& list : search.second.listeners) { - if (search.first != key || list.second.state->cancel) - continue; - DHT_LOG.d(search.first, "[search %s] handling push notification", search.first.to_c_str()); + auto key = InfoHash(notification.at("key")); + auto& search = searches_.at(key); + for (auto& list : search.listeners) { + if (list.second.state->cancel) + continue; + DHT_LOG.d(key, "[search %s] handling push notification", key.to_c_str()); + auto expired = notification.find("exp"); + auto token = list.first; + auto state = list.second.state; + if (expired == notification.end()) { auto cb = list.second.cb; auto filter = list.second.filter; - get(search.first, [cb](const std::vector<Sp<Value>>& vals) { - cb(vals, false); - return true; - }, DoneCallbackSimple{}, std::move(filter)); + auto oldValues = list.second.cache.getValues(); + get(key, [cb](const std::vector<Sp<Value>>& vals) { + return cb(vals, false); + }, [cb, oldValues](bool /*ok*/) { + // Decrement old values refcount to expire values not present in the new list + cb(oldValues, true); + }, std::move(filter)); + } else { + std::stringstream ss(expired->second); + std::vector<Value::Id> ids; + while(ss.good()){ + std::string substr; + getline(ss, substr, ','); + ids.emplace_back(std::stoull(substr)); + } + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([this, key, token, state, ids]() { + if (state->cancel) return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) return; + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) return; + if (not state->cancel and not l->second.cache.onValuesExpired(ids)) + state->cancel = true; + }); + loopSignal_(); } } } @@ -1047,20 +1043,22 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string void DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) { -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS if (deviceKey_.empty()) return; scheduler.syncTime(); DHT_LOG.d(key, "[search %s] resubscribe push listener", key.to_c_str()); // Subscribe - restbed::Uri uri(serverHost_ + "/" + key.toString()); - auto req = std::make_shared<restbed::Request>(uri); - req->set_method("SUBSCRIBE"); - auto state = listener.state; - if (listener.thread.joinable()) + if (listener.thread.joinable()) { + state->cancel = true; + if (listener.req) + restbed::Http::close(listener.req); listener.thread.join(); + } state->cancel = false; state->ok = true; + auto req = std::make_shared<restbed::Request>(restbed::Uri {serverHost_ + "/" + key.toString()}); + req->set_method("SUBSCRIBE"); listener.req = req; scheduler.edit(listener.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); auto vcb = listener.cb; @@ -1071,7 +1069,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) #endif } -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS void DhtProxyClient::getPushRequest(Json::Value& body) const { @@ -1094,9 +1092,9 @@ DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req, bool resubscribe // } Json::Value body; getPushRequest(body); - if (!resubscribe) { + if (resubscribe) { // This is the first listen, we want to retrieve previous values. - body["previous_values"] = true; + body["refresh"] = true; } Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; @@ -1109,5 +1107,3 @@ DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req, bool resubscribe #endif // OPENDHT_PUSH_NOTIFICATIONS } // namespace dht - -#endif // OPENDHT_PROXY_CLIENT diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 8860db5db714a724c48c90508af13f3eff160191..04b146c55baf3bbd44e53864d09942731bd7c0ec 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -17,9 +17,9 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#if OPENDHT_PROXY_SERVER #include "dht_proxy_server.h" +#include "thread_pool.h" #include "default_types.h" #include "dhtrunner.h" @@ -47,9 +47,11 @@ struct DhtProxyServer::SearchPuts { }; constexpr const std::chrono::minutes PRINT_STATS_PERIOD {2}; +constexpr const size_t IO_THREADS_MAX {64}; + DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , const std::string& pushServer) -: dht_(dht) , pushServer_(pushServer) +: dht_(dht), threadPool_(new ThreadPool(IO_THREADS_MAX)), pushServer_(pushServer) { if (not dht_) throw std::invalid_argument("A DHT instance must be provided"); @@ -58,13 +60,16 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , std::cout << "Running DHT proxy server on port " << port << std::endl; if (not pushServer.empty()) { -#if !OPENDHT_PUSH_NOTIFICATIONS - std::cerr << "Push server defined but built OpenDHT built without push notification support" << std::endl; -#else +#ifdef OPENDHT_PUSH_NOTIFICATIONS std::cout << "Using push notification server: " << pushServer << std::endl; +#else + std::cerr << "Push server defined but built OpenDHT built without push notification support" << std::endl; #endif } + jsonBuilder_["commentStyle"] = "None"; + jsonBuilder_["indentation"] = ""; + server_thread = std::thread([this, port]() { // Create endpoints auto resource = std::make_shared<restbed::Resource>(); @@ -76,12 +81,12 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , resource->set_path("/{hash: .*}"); resource->set_method_handler("GET", std::bind(&DhtProxyServer::get, this, _1)); resource->set_method_handler("LISTEN", [this](const Sp<restbed::Session>& session) mutable { listen(session); } ); -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS resource->set_method_handler("SUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { subscribe(session); } ); resource->set_method_handler("UNSUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { unsubscribe(session); } ); #endif //OPENDHT_PUSH_NOTIFICATIONS resource->set_method_handler("POST", [this](const Sp<restbed::Session>& session) mutable { put(session); }); -#if OPENDHT_PROXY_SERVER_IDENTITY +#ifdef OPENDHT_PROXY_SERVER_IDENTITY resource->set_method_handler("SIGN", std::bind(&DhtProxyServer::putSigned, this, _1)); resource->set_method_handler("ENCRYPT", std::bind(&DhtProxyServer::putEncrypted, this, _1)); #endif // OPENDHT_PROXY_SERVER_IDENTITY @@ -177,6 +182,7 @@ DhtProxyServer::stop() schedulerThread_.join(); if (server_thread.joinable()) server_thread.join(); + threadPool_->stop(); } void @@ -187,7 +193,7 @@ DhtProxyServer::updateStats() const auto count = requestNum_.exchange(0); auto dt = std::chrono::duration<double>(now - last); stats_.requestRate = count / dt.count(); -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS stats_.pushListenersCount = pushListeners_.size(); #endif stats_.putCount = puts_.size(); @@ -219,10 +225,7 @@ DhtProxyServer::getNodeInfo(const Sp<restbed::Session>& session) const result = nodeInfo_.toJson(); } result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, result) + "\n"; + auto output = Json::writeString(jsonBuilder_, result) + "\n"; s->close(restbed::OK, output); } else @@ -246,10 +249,7 @@ DhtProxyServer::getStats(const Sp<restbed::Session>& session) const try { if (dht_) { #ifdef OPENDHT_JSONCPP - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, stats_.toJson()) + "\n"; + auto output = Json::writeString(jsonBuilder_, stats_.toJson()) + "\n"; s->close(restbed::OK, output); #else s->close(restbed::NotFound, "{\"err\":\"JSON not enabled on this instance\"}"); @@ -281,13 +281,10 @@ DhtProxyServer::get(const Sp<restbed::Session>& session) const infoHash = InfoHash::get(hash); } s->yield(restbed::OK, "", [=](const Sp<restbed::Session>&) {}); - dht_->get(infoHash, [s](const Sp<Value>& value) { + dht_->get(infoHash, [this,s](const Sp<Value>& value) { if (s->is_closed()) return false; // Send values as soon as we get them - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n"; s->yield(output, [](const Sp<restbed::Session>& /*session*/){ }); return true; }, [s](bool /*ok* */) { @@ -335,19 +332,16 @@ DhtProxyServer::listen(const Sp<restbed::Session>& session) // cache the session to avoid an incrementation of the shared_ptr's counter // else, the session->close() will not close the socket. auto cacheSession = std::weak_ptr<restbed::Session>(s); - listener.token = dht_->listen(infoHash, [cacheSession](const std::vector<Sp<Value>>& values, bool expired) { + listener.token = dht_->listen(infoHash, [this,cacheSession](const std::vector<Sp<Value>>& values, bool expired) { auto s = cacheSession.lock(); if (!s) return false; // Send values as soon as we get them if (!s->is_closed()) { - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; for (const auto& value : values) { auto val = value->toJson(); if (expired) val["expired"] = true; - auto output = Json::writeString(wbuilder, val) + "\n"; + auto output = Json::writeString(jsonBuilder_, val) + "\n"; s->yield(output, [](const Sp<restbed::Session>&){ }); } } @@ -367,7 +361,7 @@ DhtProxyServer::listen(const Sp<restbed::Session>& session) ); } -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS struct DhtProxyServer::Listener { std::string clientId; @@ -430,20 +424,14 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) scheduler_.edit(listener.expireNotifyJob, timeout - proxy::OP_MARGIN); s->yield(restbed::OK); - if (root.isMember("previous_values") && - root["previous_values"].asBool()) { + if (!root.isMember("refresh") or !root["refresh"].asBool()) { dht_->get( infoHash, [this, s](const Sp<Value> &value) { if (s->is_closed()) return false; // Send values as soon as we get them - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString( - wbuilder, value->toJson()) + - "\n"; + auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n"; s->yield(output, [](const Sp<restbed::Session> & /*session*/) {}); return true; @@ -473,12 +461,22 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) // The listener is not found, so add it. listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) { - // Build message content. - Json::Value json; - json["key"] = infoHash.toString(); - json["to"] = clientId; - sendPushNotification(pushToken, json, isAndroid); + [this, infoHash, pushToken, isAndroid, clientId](const std::vector<std::shared_ptr<Value>>& values, bool expired) { + threadPool_->run([this, infoHash, pushToken, isAndroid, clientId, values, expired]() { + // Build message content + Json::Value json; + json["key"] = infoHash.toString(); + json["to"] = clientId; + if (expired and values.size() < 3) { + std::stringstream ss; + for(size_t i = 0; i < values.size(); ++i) { + if(i != 0) ss << ","; + ss << values[i]->id; + } + json["exp"] = ss.str(); + } + sendPushNotification(pushToken, std::move(json), isAndroid); + }); return true; } ); @@ -493,7 +491,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) Json::Value json; json["timeout"] = infoHash.toString(); json["to"] = clientId; - sendPushNotification(pushToken, json, isAndroid); + sendPushNotification(pushToken, std::move(json), isAndroid); } ); } @@ -571,7 +569,7 @@ DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHa } void -DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value& json, bool isAndroid) const +DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, bool isAndroid) const { if (pushServer_.empty()) return; @@ -583,9 +581,9 @@ DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value Json::Value notification(Json::objectValue); Json::Value tokens(Json::arrayValue); tokens[0] = token; - notification["tokens"] = tokens; + notification["tokens"] = std::move(tokens); notification["platform"] = isAndroid ? 2 : 1; - notification["data"] = json; + notification["data"] = std::move(json); notification["priority"] = "high"; notification["time_to_live"] = 600; @@ -593,7 +591,7 @@ DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value notifications[0] = notification; Json::Value content; - content["notifications"] = notifications; + content["notifications"] = std::move(notifications); Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; @@ -684,7 +682,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) std::cout << "Permanent put expired: " << infoHash << " " << vid << std::endl; cancelPut(infoHash, vid); }); -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS if (not pushToken.empty()) { pput.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, [this, infoHash, vid, pushToken, clientId, isAndroid] @@ -694,7 +692,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) json["timeout"] = infoHash.toString(); json["to"] = clientId; json["vid"] = std::to_string(vid); - sendPushNotification(pushToken, json, isAndroid); + sendPushNotification(pushToken, std::move(json), isAndroid); }); } #endif @@ -734,7 +732,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) ); } -#if OPENDHT_PROXY_SERVER_IDENTITY +#ifdef OPENDHT_PROXY_SERVER_IDENTITY void DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) const { @@ -840,7 +838,7 @@ void DhtProxyServer::handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const { requestNum_++; -#if OPENDHT_PROXY_SERVER_IDENTITY +#ifdef OPENDHT_PROXY_SERVER_IDENTITY const auto allowed = "OPTIONS, GET, POST, LISTEN, SIGN, ENCRYPT"; #else const auto allowed = "OPTIONS, GET, POST, LISTEN"; @@ -910,4 +908,3 @@ DhtProxyServer::removeClosedListeners(bool testSession) } } -#endif //OPENDHT_PROXY_SERVER diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index acc455041ad68caea54982f8b0e2b0c9f964bc8a..7ab6f81f42893c7857148fa9c10a0d789a7b74b0 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -21,7 +21,7 @@ #include "dhtrunner.h" #include "securedht.h" -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT #include "dht_proxy_client.h" #endif @@ -55,7 +55,7 @@ struct DhtRunner::Listener { }; DhtRunner::DhtRunner() : dht_() -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT , dht_via_proxy_() #endif //OPENDHT_PROXY_CLIENT { @@ -105,7 +105,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config auto dht = std::unique_ptr<DhtInterface>(new Dht(s4, s6, SecureDht::getConfig(config.dht_config))); dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config)); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT config_ = config; #endif enableProxy(not config.proxy_server.empty()); @@ -151,7 +151,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config void DhtRunner::shutdown(ShutdownCallback cb) { -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (dht_via_proxy_) dht_via_proxy_->shutdown(cb); #endif @@ -250,7 +250,7 @@ DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { std::lock_guard<std::mutex> lck(dht_mtx); if (dht_) dht_->setLoggers(error, warn, debug); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (dht_via_proxy_) dht_via_proxy_->setLoggers(error, warn, debug); #endif @@ -262,7 +262,7 @@ DhtRunner::setLogFilter(const InfoHash& f) { activeDht()->setLogFilter(f); if (dht_) dht_->setLogFilter(f); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (dht_via_proxy_) dht_via_proxy_->setLogFilter(f); #endif @@ -376,7 +376,7 @@ DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) { void DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { std::lock_guard<std::mutex> lck(dht_mtx); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (dht_via_proxy_) dht_via_proxy_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); #endif @@ -658,7 +658,7 @@ DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) mutable { -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT auto tokenbGlobal = listener_token_++; Listener listener {}; listener.hash = hash; @@ -697,7 +697,7 @@ DhtRunner::cancelListen(InfoHash h, size_t token) { { std::lock_guard<std::mutex> lck(storage_mtx); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT pending_ops.emplace([=](SecureDht&) { auto it = listeners_.find(token); if (it == listeners_.end()) return; @@ -721,7 +721,7 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) { { std::lock_guard<std::mutex> lck(storage_mtx); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT pending_ops.emplace([=](SecureDht&) { auto it = listeners_.find(ftoken.get()); if (it == listeners_.end()) return; @@ -973,7 +973,7 @@ DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_p void DhtRunner::resetDht() { -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT listeners_.clear(); dht_via_proxy_.reset(); #endif // OPENDHT_PROXY_CLIENT @@ -983,7 +983,7 @@ DhtRunner::resetDht() SecureDht* DhtRunner::activeDht() const { -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT return use_proxy? dht_via_proxy_.get() : dht_.get(); #else return dht_.get(); @@ -993,7 +993,7 @@ DhtRunner::activeDht() const void DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeId) { -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (config_.proxy_server == proxy and config_.push_node_id == pushNodeId) return; config_.proxy_server = proxy; @@ -1008,7 +1008,7 @@ DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeI void DhtRunner::enableProxy(bool proxify) { -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (dht_via_proxy_) { dht_via_proxy_->shutdown({}); } @@ -1026,7 +1026,7 @@ DhtRunner::enableProxy(bool proxify) }, config_.proxy_server, config_.push_node_id) ); dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS if (not pushToken_.empty()) dht_via_proxy_->setPushNotificationToken(pushToken_); #endif @@ -1059,8 +1059,8 @@ DhtRunner::enableProxy(bool proxify) void DhtRunner::forwardAllMessages(bool forward) { -#if OPENDHT_PROXY_SERVER -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_CLIENT if (dht_via_proxy_) dht_via_proxy_->forwardAllMessages(forward); #endif // OPENDHT_PROXY_CLIENT @@ -1074,7 +1074,7 @@ DhtRunner::forwardAllMessages(bool forward) */ void DhtRunner::setPushNotificationToken(const std::string& token) { -#if OPENDHT_PROXY_CLIENT && OPENDHT_PUSH_NOTIFICATIONS +#if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS) pushToken_ = token; if (dht_via_proxy_) dht_via_proxy_->setPushNotificationToken(token); @@ -1084,7 +1084,7 @@ DhtRunner::setPushNotificationToken(const std::string& token) { void DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& data) { -#if OPENDHT_PROXY_CLIENT && OPENDHT_PUSH_NOTIFICATIONS +#if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops_prio.emplace([=](SecureDht&) { diff --git a/src/op_cache.cpp b/src/op_cache.cpp index b9aa9bcb84febf0cccab3ca7ff230a23278d40a6..899ef96394947fde106e3df49fee50a92687368e 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -34,6 +34,7 @@ OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { } return newValues.empty() ? true : callback(newValues, false); } + bool OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { std::vector<Sp<Value>> expiredValues; @@ -49,6 +50,24 @@ OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { } return expiredValues.empty() ? true : callback(expiredValues, true); } + +bool +OpValueCache::onValuesExpired(const std::vector<Value::Id>& vids) +{ + std::vector<Sp<Value>> expiredValues; + for (const auto& vid : vids) { + auto vit = values.find(vid); + if (vit != values.end()) { + vit->second.refCount--; + if (not vit->second.refCount) { + expiredValues.emplace_back(std::move(vit->second.data)); + values.erase(vit); + } + } + } + return expiredValues.empty() ? true : callback(expiredValues, true); +} + std::vector<Sp<Value>> OpValueCache::get(const Value::Filter& filter) const { std::vector<Sp<Value>> ret; diff --git a/src/op_cache.h b/src/op_cache.h index 79cd8f140faac06fe1e21a976fbf02c7fa0b8f18..ec6b2d1324215b46a4341d337e1b11856951a7b4 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -56,6 +56,7 @@ public: bool onValuesAdded(const std::vector<Sp<Value>>& vals); bool onValuesExpired(const std::vector<Sp<Value>>& vals); + bool onValuesExpired(const std::vector<Value::Id>& vals); void onNodeChanged(ListenSyncStatus status) { switch (status) { diff --git a/src/securedht.cpp b/src/securedht.cpp index 4be57f74ebb03c61a38be480db1821f90c6d31ed..7e6188995c82d1b104627153c8d0ecf4eca1933c 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -235,7 +235,7 @@ SecureDht::checkValue(const Sp<Value>& v) // Decrypt encrypted values if (v->isEncrypted()) { if (not key_) { -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER if (forward_all_) // We are currently a proxy, send messages to clients. return v; #endif diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..00f349f604d0b194b5fc43bc7efafdbb1e65f226 --- /dev/null +++ b/src/thread_pool.cpp @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2016-2019 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "thread_pool.h" + +#include <atomic> +#include <thread> + +#include <ciso646> // fix windows compiler bug + +namespace dht { + +struct ThreadPool::ThreadState +{ + std::thread thread {}; + std::atomic_bool run {true}; +}; + +ThreadPool::ThreadPool(size_t maxThreads) : maxThreads_(maxThreads) +{ + threads_.reserve(maxThreads_); +} + +ThreadPool::ThreadPool() + : ThreadPool(std::max<size_t>(std::thread::hardware_concurrency(), 4)) +{} + +ThreadPool::~ThreadPool() +{ + join(); +} + +void +ThreadPool::run(std::function<void()>&& cb) +{ + std::unique_lock<std::mutex> l(lock_); + if (not running_) return; + + // launch new thread if necessary + if (not readyThreads_ && threads_.size() < maxThreads_) { + threads_.emplace_back(new ThreadState()); + auto& t = *threads_.back(); + t.thread = std::thread([&]() { + while (t.run) { + std::function<void()> task; + + // pick task from queue + { + std::unique_lock<std::mutex> l(lock_); + readyThreads_++; + cv_.wait(l, [&](){ + return not t.run or not tasks_.empty(); + }); + readyThreads_--; + if (not t.run) + break; + task = std::move(tasks_.front()); + tasks_.pop(); + } + + // run task + try { + if (task) + task(); + } catch (const std::exception& e) { + // LOG_ERR("Exception running task: %s", e.what()); + } + } + }); + } + + // push task to queue + tasks_.emplace(std::move(cb)); + + // notify thread + l.unlock(); + cv_.notify_one(); +} + +void +ThreadPool::stop() +{ + { + std::lock_guard<std::mutex> l(lock_); + running_ = false; + } + for (auto& t : threads_) + t->run = false; + cv_.notify_all(); +} + +void +ThreadPool::join() +{ + stop(); + for (auto& t : threads_) + t->thread.join(); + threads_.clear(); +} + +} diff --git a/src/thread_pool.h b/src/thread_pool.h new file mode 100644 index 0000000000000000000000000000000000000000..024cb6bfdcfbc695f3a9a31d644ce779dd703dfe --- /dev/null +++ b/src/thread_pool.h @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2016-2019 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <condition_variable> +#include <vector> +#include <queue> +#include <future> +#include <functional> + +namespace dht { + +class ThreadPool { +public: + static ThreadPool& instance() { + static ThreadPool pool; + return pool; + } + + ThreadPool(); + ThreadPool(size_t maxThreads); + ~ThreadPool(); + + void run(std::function<void()>&& cb); + + template<class T> + std::future<T> get(std::function<T()>&& cb) { + auto ret = std::make_shared<std::promise<T>>(); + run(std::bind([=](std::function<T()>& mcb) mutable { + ret->set_value(mcb()); + }, std::move(cb))); + return ret->get_future(); + } + template<class T> + std::shared_ptr<std::future<T>> getShared(std::function<T()>&& cb) { + return std::make_shared<std::future<T>>(get(std::move(cb))); + } + + void stop(); + void join(); + +private: + struct ThreadState; + std::queue<std::function<void()>> tasks_ {}; + std::vector<std::unique_ptr<ThreadState>> threads_; + unsigned readyThreads_ {0}; + std::mutex lock_ {}; + std::condition_variable cv_ {}; + + const unsigned maxThreads_; + bool running_ {true}; +}; + +} diff --git a/tools/dhtchat.cpp b/tools/dhtchat.cpp index 9f1bff6771ba3e206ab534e108257d303f0abb33..03617faae411a72010a836735b4681052411f7e9 100644 --- a/tools/dhtchat.cpp +++ b/tools/dhtchat.cpp @@ -76,7 +76,7 @@ main(int argc, char **argv) if (not params.bootstrap.first.empty()) dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT if (!params.proxyclient.empty()) { dht.setProxyServer(params.proxyclient); dht.enableProxy(true); diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 98ae5e6ab93ddd00e4aa0571da22e995f994c1fc..ebab15377ae846107b665c37b1d094a9a43b824a 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -58,9 +58,9 @@ void print_help() { << " ld [key] Print basic information about currenty stored values on this node (or key)." << std::endl << " lr Print the full current routing table of this node." << std::endl; -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER std::cout << std::endl << "Operations with the proxy:" << std::endl -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS << " pst [port] <pushServer> Start the proxy interface on port." << std::endl #else << " pst [port] Start the proxy interface on port." << std::endl @@ -68,9 +68,9 @@ void print_help() { << " psp [port] Stop the proxy interface on port." << std::endl; #endif //OPENDHT_PROXY_SERVER -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT std::cout << std::endl << "Operations with the proxy:" << std::endl -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS << " stt [server_address] <device_key> Start the proxy client." << std::endl << " rs [token] Resubscribe to opendht." << std::endl << " rp [token] Inject a push notification in Opendht." << std::endl @@ -102,7 +102,7 @@ void print_help() { } void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER , std::map<in_port_t, std::unique_ptr<DhtProxyServer>>& proxies #endif ) @@ -141,7 +141,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params std::cout << dht->getNodesStats(AF_INET).toString() << std::endl; std::cout << "IPv6 stats:" << std::endl; std::cout << dht->getNodesStats(AF_INET6).toString() << std::endl; -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER for (const auto& proxy : proxies) { std::cout << "Stats for proxy on port " << proxy.first << std::endl; std::cout << " " << proxy.second->stats().toString() << std::endl; @@ -201,16 +201,16 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params dht->connectivityChanged(); continue; } -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER else if (op == "pst") { -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS iss >> idstr >> pushServer; #else iss >> idstr; #endif // OPENDHT_PUSH_NOTIFICATIONS try { unsigned int port = std::stoi(idstr); -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS proxies.emplace(port, std::unique_ptr<DhtProxyServer>(new DhtProxyServer(dht, port, pushServer))); #else proxies.emplace(port, std::unique_ptr<DhtProxyServer>(new DhtProxyServer(dht, port))); @@ -227,7 +227,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params continue; } #endif //OPENDHT_PROXY_SERVER -#if OPENDHT_PROXY_CLIENT +#ifdef OPENDHT_PROXY_CLIENT else if (op == "stt") { dht->enableProxy(true); continue; @@ -235,7 +235,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params dht->enableProxy(false); continue; } -#if OPENDHT_PUSH_NOTIFICATIONS +#ifdef OPENDHT_PUSH_NOTIFICATIONS else if (op == "rp") { iss >> value; dht->pushNotificationReceived({{"to", "dhtnode"}, {"token", value}}); @@ -523,11 +523,11 @@ main(int argc, char **argv) dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); } -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER std::map<in_port_t, std::unique_ptr<DhtProxyServer>> proxies; #endif if (params.proxyserver != 0) { -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER proxies.emplace(params.proxyserver, std::unique_ptr<DhtProxyServer>(new DhtProxyServer(dht, params.proxyserver, params.pushserver))); #else std::cerr << "DHT proxy server requested but OpenDHT built without proxy server support." << std::endl; @@ -539,7 +539,7 @@ main(int argc, char **argv) while (runner.wait()); else cmd_loop(dht, params -#if OPENDHT_PROXY_SERVER +#ifdef OPENDHT_PROXY_SERVER , proxies #endif );