diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 64a2881fc5219e5b13f87c4d0279b1e3f154c079..c6ac30935c5f2e4b7bf034eff6edc0cae5f54d32 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -131,16 +131,17 @@ public: put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent); } + /** + * @param af the socket family + * @return node stats from the proxy + */ NodeStats getNodesStats(sa_family_t af) const; - std::vector<SockAddr> getPublicAddress(sa_family_t family = 0); - - /** - * TODO - * NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods. - * It will come in another version. + * @param family the socket family + * @return public address */ + std::vector<SockAddr> getPublicAddress(sa_family_t family = 0); /** * Listen on the network for any changes involving a specified hash. @@ -149,11 +150,18 @@ public: * * @return a token to cancel the listener later. */ - virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&&={}) { return 0; }; + virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&&={}); virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } - virtual bool cancelListen(const InfoHash&, size_t /*token*/) { return false; } + + + /** + * TODO + * NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods. + * It will come in another version. (with push_notifications support) + */ + virtual bool cancelListen(const InfoHash&, size_t token); /** * Similar to Dht::get, but sends a Query to filter data remotely. @@ -257,6 +265,7 @@ public: } time_point periodic(const uint8_t*, size_t, const SockAddr&) { + // The DhtProxyClient doesn't use NetworkEngine, so here, we have nothing to do for now. scheduler.syncTime(); return scheduler.run(); } @@ -265,11 +274,19 @@ public: } private: + /** + * Get informations from the proxy node + * @return the JSON returned by the proxy + */ Json::Value getProxyInfos() const; /** * Initialize statusIpvX_ */ void getConnectivityStatus(); + /** + * cancel all Listeners + */ + void cancelAllListeners(); /** * cancel all Operations */ @@ -279,6 +296,24 @@ private: NodeStatus statusIpv6_ {NodeStatus::Disconnected}; InfoHash myid {}; + + /** + * Store listen requests. + */ + struct Listener + { + size_t token; + std::shared_ptr<restbed::Request> req; + std::string key; + GetCallback cb; + Value::Filter filterChain; + std::unique_ptr<std::thread> thread; + }; + std::vector<Listener> listeners_; + size_t listener_token_ {0}; + /** + * Store current put and get requests. + */ struct Operation { std::shared_ptr<restbed::Request> req; @@ -287,8 +322,20 @@ private: std::vector<Operation> operations_; Scheduler scheduler; - Sp<Scheduler::Job> nextNodesConfirmation {}; + /** + * Retrieve if we can connect to the proxy (update statusIpvX_) + */ void confirmProxy(); + Sp<Scheduler::Job> nextProxyConfirmation {}; + /** + * Verify if we are still connected. + */ + void confirmConnectivity(); + Sp<Scheduler::Job> nextConnectivityConfirmation {}; + /** + * Relaunch LISTEN requests if the client disconnect/reconnect. + */ + void restartListeners(); }; } diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index d75e977f8e6b04e35671142eb9a893b8296d5ad0..1d4a645c0539f91d8a70d4ba85a5adbfc8bbf72b 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -403,13 +403,40 @@ private: } std::unique_ptr<SecureDht> dht_; + /** + * reset dht clients + */ void resetDht(); + /** + * @return the current active DHT + */ SecureDht* activeDht() const; #if OPENDHT_PROXY_CLIENT + /** + * true if we are currently using a proxy + */ std::atomic_bool use_proxy {false}; + /** + * The current proxy client + */ std::unique_ptr<SecureDht> dht_via_proxy_; Config config_; #endif // OPENDHT_PROXY_CLIENT + /** + * Store current listeners and translates global tokens for each client. + */ + struct Listener { + size_t globalToken; + size_t tokenClassicDht; + size_t tokenProxyDht; + GetCallback gcb; + InfoHash hash; + Value::Filter f; + Where w; + }; + std::vector<std::unique_ptr<Listener>> listeners_ {}; + size_t listener_token_ {1}; + mutable std::mutex dht_mtx {}; std::thread dht_thread {}; std::condition_variable cv {}; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 9f175f5bdd0c916fed6400e36c40b9c1f100a72f..fca754e411c58cdfc1f52d9525047596d631aa47 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -24,21 +24,21 @@ #include <json/json.h> #include <restbed> #include <vector> +#include <signal.h> #include "dhtrunner.h" constexpr const char* const HTTP_PROTO {"http://"}; -// TODO connectivity changed -// TODO follow listen between non proxified and proxified - namespace dht { DhtProxyClient::DhtProxyClient(const std::string& serverHost) : serverHost_(serverHost), scheduler(DHT_LOG) { - auto confirm_nodes_time = scheduler.time() + std::chrono::seconds(5); - nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&DhtProxyClient::confirmProxy, this)); + auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5); + nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this)); + auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5); + nextConnectivityConfirmation = scheduler.add(confirm_connectivity, std::bind(&DhtProxyClient::confirmConnectivity, this)); getConnectivityStatus(); } @@ -46,16 +46,30 @@ DhtProxyClient::DhtProxyClient(const std::string& serverHost) void DhtProxyClient::confirmProxy() { + // Retrieve the connectivity each hours if connected, else every 5 seconds. + auto disconnected_old_status = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected; getConnectivityStatus(); - auto disconnected = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected; - auto time = disconnected ? std::chrono::seconds(5) : std::chrono::seconds(600); - auto confirm_nodes_time = scheduler.time() + time; - scheduler.edit(nextNodesConfirmation, confirm_nodes_time); + auto disconnected_new_status = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected; + auto time = disconnected_new_status ? std::chrono::seconds(5) : std::chrono::hours(1); + if (disconnected_old_status && !disconnected_new_status) { + restartListeners(); + } + auto confirm_proxy_time = scheduler.time() + time; + scheduler.edit(nextProxyConfirmation, confirm_proxy_time); +} + +void +DhtProxyClient::confirmConnectivity() +{ + // The scheduler must get if the proxy is disconnected + auto confirm_connectivity = scheduler.time() + std::chrono::seconds(3); + scheduler.edit(nextConnectivityConfirmation, confirm_connectivity); } DhtProxyClient::~DhtProxyClient() { cancelAllOperations(); + cancelAllListeners(); } void @@ -70,10 +84,23 @@ DhtProxyClient::cancelAllOperations() } } +void +DhtProxyClient::cancelAllListeners() +{ + for (auto& listener: listeners_) { + if (listener.thread && listener.thread->joinable()) { + // Close connection to stop listener? + restbed::Http::close(listener.req); + listener.thread->join(); + } + } +} + void DhtProxyClient::shutdown(ShutdownCallback cb) { cancelAllOperations(); + cancelAllListeners(); cb(); } @@ -293,6 +320,84 @@ DhtProxyClient::getPublicAddress(sa_family_t family) } } +size_t +DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filter, Where&& where) +{ + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("LISTEN"); + + Query query {{}, where}; + auto filterChain = filter.chain(query.where.getFilter()); + + Listener l; + ++listener_token_; + l.key = key.toString(); + l.token = listener_token_; + l.req = req; + l.cb = cb; + l.filterChain = std::move(filterChain); + l.thread = std::move(std::unique_ptr<std::thread>(new std::thread([=]() + { + auto settings = std::make_shared<restbed::Settings>(); + std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + + restbed::Http::async(req, + [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + try { + while (restbed::Http::is_open(req)) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + Json::Value json; + Json::Reader reader; + if (reader.parse(body, json)) { + auto value = std::make_shared<Value>(json); + if (not filterChain or filterChain(*value)) + cb({value}); + } + } + } catch (std::runtime_error&) { + // NOTE: Http::close() can occurs here. Ignore this. + } + + } else { + this->statusIpv4_ = NodeStatus::Disconnected; + this->statusIpv6_ = NodeStatus::Disconnected; + } + }, settings).get(); + getConnectivityStatus(); + }) + )); + listeners_.emplace_back(std::move(l)); + return listener_token_; +} + +bool +DhtProxyClient::cancelListen(const InfoHash&, size_t token) +{ + for (auto it = listeners_.begin(); it != listeners_.end(); ++it) { + auto& listener = *it; + if (listener.token == token) { + if (listener.thread->joinable()) { + // Close connection to stop listener? + restbed::Http::close(listener.req); + listener.thread->join(); + listeners_.erase(it); + return true; + } + } + } + return false; +} + void DhtProxyClient::getConnectivityStatus() { @@ -303,21 +408,84 @@ DhtProxyClient::getConnectivityStatus() auto dubiousIpv4 = static_cast<long>(proxyInfos["ipv4"]["dubious"].asLargestUInt()); if (goodIpv4 + dubiousIpv4 > 0) { statusIpv4_ = NodeStatus::Connected; + } else { + statusIpv4_ = NodeStatus::Disconnected; } auto goodIpv6 = static_cast<long>(proxyInfos["ipv6"]["good"].asLargestUInt()); auto dubiousIpv6 = static_cast<long>(proxyInfos["ipv6"]["dubious"].asLargestUInt()); if (goodIpv6 + dubiousIpv6 > 0) { statusIpv6_ = NodeStatus::Connected; + } else { + statusIpv6_ = NodeStatus::Disconnected; } myid = InfoHash(proxyInfos["node_id"].asString()); + if (statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected) { + const auto& now = scheduler.time(); + scheduler.edit(nextProxyConfirmation, now); + } } catch (...) { statusIpv4_ = NodeStatus::Disconnected; statusIpv6_ = NodeStatus::Disconnected; + const auto& now = scheduler.time(); + scheduler.edit(nextProxyConfirmation, now); } +} + +void +DhtProxyClient::restartListeners() +{ + for (auto& listener: listeners_) { + if (listener.thread && listener.thread->joinable()) + listener.thread->join(); + // Redo listen + auto filterChain = listener.filterChain; + auto cb = listener.cb; + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("LISTEN"); + listener.thread = std::move(std::unique_ptr<std::thread>(new std::thread([this, filterChain, cb, req]() + { + auto settings = std::make_shared<restbed::Settings>(); + std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + + restbed::Http::async(req, + [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + try { + while (restbed::Http::is_open(req)) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + Json::Value json; + Json::Reader reader; + if (reader.parse(body, json)) { + auto value = std::make_shared<Value>(json); + if (not filterChain or filterChain(*value)) + cb({value}); + } + } + } catch (std::runtime_error&) { + // NOTE: Http::close() can occurs here. Ignore this. + } - // TODO for now, we don't handle connectivity issues. (when the proxy is down, we don't try to reconnect) + } else { + this->statusIpv4_ = NodeStatus::Disconnected; + this->statusIpv6_ = NodeStatus::Disconnected; + } + }, settings).get(); + getConnectivityStatus(); + }) + )); + } } + } // namespace dht #endif // OPENDHT_PROXY_CLIENT diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 57be7d7533384245271a9d23b0d900e9cc69e324..6881025f15994e8be9c3c8f0829640e05b5151bf 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -70,6 +70,8 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port) std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); settings->set_connection_timeout(timeout); // there is a timeout, but really huge settings->set_port(port); + auto maxThreads = std::thread::hardware_concurrency() - 1; + settings->set_worker_limit(maxThreads > 1 ? maxThreads : 1); try { service_->start(settings); } catch(std::system_error& e) { @@ -78,11 +80,13 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port) }); listenThread_ = std::thread([this]() { - auto stop = false; - while (!stop) { + while (!service_->is_up() && !stopListeners) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + while (service_->is_up() && !stopListeners) { auto listener = currentListeners_.begin(); while (listener != currentListeners_.end()) { - if (listener->session->is_closed() && dht_) { + if (dht_ && listener->session->is_closed()) { dht_->cancelListen(listener->hash, std::move(listener->token)); // Remove listener if unused listener = currentListeners_.erase(listener); @@ -90,10 +94,18 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port) ++listener; } } - //NOTE: When supports restbed 5.0: service_->is_up() and remove stopListeners - stop = stopListeners; - if (!stop) - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + // Remove last listeners + auto listener = currentListeners_.begin(); + while (listener != currentListeners_.end()) { + if (dht_) { + dht_->cancelListen(listener->hash, std::move(listener->token)); + // Remove listener if unused + listener = currentListeners_.erase(listener); + } else { + ++listener; + } } }); } @@ -107,6 +119,11 @@ void DhtProxyServer::stop() { service_->stop(); + auto listener = currentListeners_.begin(); + while (listener != currentListeners_.end()) { + listener->session->close(); + ++ listener; + } stopListeners = true; // listenThreads_ will stop because there is no more sessions if (listenThread_.joinable()) @@ -193,20 +210,25 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const infoHash = InfoHash::get(hash); } s->yield(restbed::OK); - // Handle client deconnection - // NOTE: for now, there is no handler, so we test the session in a thread - // will be the case in restbed 5.0 + // Handle client deconnection + // NOTE: for now, there is no handler, so we test the session in a thread + // will be the case in restbed 5.0 SessionToHashToken listener; listener.session = session; listener.hash = infoHash; - listener.token = dht_->listen(infoHash, [s](std::shared_ptr<Value> value) { + // cache the session to avoid an incrementation of the shared_ptr's counter + // else, the session->close() will not close the socket. + auto cacheSession = std::weak_ptr<restbed::Session>(s); + listener.token = std::move(dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) { + auto s = cacheSession.lock(); + if (!s) return false; // Send values as soon as we get them if (!s->is_closed()) { Json::FastWriter writer; - s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session> /*session*/){ }); + s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session>){ }); } return !s->is_closed(); - }); + })); currentListeners_.emplace_back(std::move(listener)); } else { session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 2e8b6dc3d6778782b35a4426ef84885df182bbf7..53f573f403256ed10dc124067fd990d3ded366f6 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -122,6 +122,10 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config void DhtRunner::shutdown(ShutdownCallback cb) { +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) + dht_via_proxy_->shutdown(cb); +#endif std::lock_guard<std::mutex> lck(storage_mtx); pending_ops_prio.emplace([=](SecureDht& dht) mutable { dht.shutdown(cb); @@ -513,8 +517,29 @@ DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f, Where w) auto ret_token = std::make_shared<std::promise<size_t>>(); { std::lock_guard<std::mutex> lck(storage_mtx); +#if OPENDHT_PROXY_CLIENT + pending_ops.emplace([=](SecureDht&) mutable { + auto tokenProxy = 0, tokenClassic = 0; + if (!use_proxy) + tokenClassic = dht_->listen(hash, vcb, std::move(f), std::move(w)); + else if (dht_via_proxy_) + tokenProxy = dht_via_proxy_->listen(hash, vcb, std::move(f), std::move(w)); +#else pending_ops.emplace([=](SecureDht& dht) mutable { - ret_token->set_value(dht.listen(hash, vcb, std::move(f), std::move(w))); + auto tokenClassic = dht.listen(hash, vcb, std::move(f), std::move(w)); + auto tokenProxy = 0; +#endif + auto listener = std::unique_ptr<Listener>(new Listener()); + listener->globalToken = listener_token_; + listener->tokenClassicDht = tokenClassic; + listener->tokenProxyDht = tokenProxy; + listener->gcb = vcb; + listener->hash = hash; + listener->f = f; + listener->w = w; + this->listeners_.emplace_back(std::move(listener)); + ret_token->set_value(listener_token_); + listener_token_++; }); } cv.notify_all(); @@ -531,9 +556,30 @@ void DhtRunner::cancelListen(InfoHash h, size_t token) { { +#if OPENDHT_PROXY_CLIENT + auto it = listeners_.begin(); + for (; it != listeners_.end(); ++it) { + auto& listener = *it; + if (listener->globalToken == token) { + break; + } + } + if (it == listeners_.end()) return; +#endif // OPENDHT_PROXY_CLIENT std::lock_guard<std::mutex> lck(storage_mtx); +#if OPENDHT_PROXY_CLIENT + pending_ops.emplace([=](SecureDht&) { + auto& listener = *it; + if (listener->tokenClassicDht != 0) { + dht_->cancelListen(h, listener->tokenClassicDht); + } + if (dht_via_proxy_ && listener->tokenProxyDht > 0) { + dht_via_proxy_->cancelListen(h, listener->tokenProxyDht); + } +#else pending_ops.emplace([=](SecureDht& dht) { dht.cancelListen(h, token); +#endif // OPENDHT_PROXY_CLIENT }); } cv.notify_all(); @@ -702,6 +748,30 @@ DhtRunner::tryBootstrapContinuously() }); } +std::vector<SockAddr> +DhtRunner::getAddrInfo(const std::string& host, const std::string& service) +{ + std::vector<SockAddr> ips {}; + if (host.empty()) + return ips; + + addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_DGRAM; + addrinfo* info = nullptr; + int rc = getaddrinfo(host.c_str(), service.c_str(), &hints, &info); + if(rc != 0) + throw std::invalid_argument(std::string("Error: `") + host + ":" + service + "`: " + gai_strerror(rc)); + + addrinfo* infop = info; + while (infop) { + ips.emplace_back(infop->ai_addr, infop->ai_addrlen); + infop = infop->ai_next; + } + freeaddrinfo(info); + return ips; +} + void DhtRunner::bootstrap(const std::string& host, const std::string& service) { @@ -812,15 +882,33 @@ DhtRunner::enableProxy(bool proxify) { new DhtProxyClient(serverHost) ); dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); + // add current listeners + for (auto& listener: listeners_) { + auto tokenProxy = dht_via_proxy_->listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w)); + listener->tokenProxyDht = tokenProxy; + } // and use it use_proxy = proxify; } else { use_proxy = proxify; + loop_(); // Restart the classic DHT. // We doesn't need to maintain the connection with the proxy. // Delete it dht_via_proxy_.reset(nullptr); + // update all proxyToken for all proxyListener + auto it = listeners_.begin(); + for (; it != listeners_.end(); ++it) { + auto& listener = *it; + if (listener->tokenClassicDht == 0) { + pending_ops.emplace([it](SecureDht& dht) mutable { + auto& listener = *it; + auto token = dht.listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w)); + listener->tokenClassicDht = token; + }); + } + listener->tokenProxyDht = 0; + } } } #endif // OPENDHT_PROXY_CLIENT - }