From baa2a5d21c5e30d63f33796548f1af9989f3ec5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Tue, 7 Jun 2022 15:59:20 -0400 Subject: [PATCH] dhtrunner: remove support for running both dht and proxy client If configured to run as a proxy client, no port will be bound and the local dht node won't be instantiated. --- include/opendht/dht_proxy_client.h | 2 +- include/opendht/dhtrunner.h | 17 +- src/dhtrunner.cpp | 250 ++++++----------------------- 3 files changed, 55 insertions(+), 214 deletions(-) diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 3c011b43..7be198f9 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -263,7 +263,7 @@ public: void dumpTables() const override {} std::vector<unsigned> getNodeMessageStats(bool) override { return {}; } void setStorageLimit(size_t) override {} - virtual size_t getStorageLimit() const { return 0; } + virtual size_t getStorageLimit() const override { return 0; } void connectivityChanged(sa_family_t) override { getProxyInfos(); } diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 11e3810e..882f3f92 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -437,7 +437,6 @@ public: /** * Start or stop the proxy * @param proxify if we want to use the proxy - * @param deviceKey non empty to enable push notifications */ void enableProxy(bool proxify); @@ -474,12 +473,9 @@ private: DoneCallback bindOpDoneCallback(DoneCallback&& cb); DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb); - /** Local DHT instance */ + /** DHT instance */ std::unique_ptr<SecureDht> dht_; - /** Proxy client instance */ - std::unique_ptr<SecureDht> dht_via_proxy_; - /** true if we are currently using a proxy */ std::atomic_bool use_proxy {false}; @@ -491,17 +487,6 @@ private: * reset dht clients */ void resetDht(); - /** - * @return the current active DHT - */ - SecureDht* activeDht() const; - - /** - * Store current listeners and translates global tokens for each client. - */ - struct Listener; - std::map<size_t, Listener> listeners_; - size_t listener_token_ {1}; mutable std::mutex dht_mtx {}; std::thread dht_thread {}; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 8e6781e4..c40ea491 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -38,15 +38,6 @@ namespace dht { static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht"; -struct DhtRunner::Listener { - size_t tokenClassicDht {0}; - size_t tokenProxyDht {0}; - ValueCallback gcb; - InfoHash hash {}; - Value::Filter f; - Where w; -}; - struct NodeInsertionPack { dht::InfoHash nodeId; in_port_t port; @@ -55,9 +46,6 @@ struct NodeInsertionPack { }; DhtRunner::DhtRunner() : dht_() -#ifdef OPENDHT_PROXY_CLIENT -, dht_via_proxy_() -#endif //OPENDHT_PROXY_CLIENT { #ifdef _WIN32 WSADATA wsd; @@ -183,30 +171,25 @@ DhtRunner::run(const Config& config, Context&& context) config_ = config; identityAnnouncedCb_ = context.identityAnnouncedCb; #endif - auto dht = std::make_unique<Dht>(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger); - dht_ = std::make_unique<SecureDht>(std::move(dht), config.dht_config, std::move(context.identityAnnouncedCb), context.logger); - enableProxy(not config.proxy_server.empty()); + if (config.proxy_server.empty()) { + auto dht = std::make_unique<Dht>(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger); + dht_ = std::make_unique<SecureDht>(std::move(dht), config.dht_config, std::move(context.identityAnnouncedCb), context.logger); + } else { + enableProxy(true); + } } catch(const std::exception& e) { config_ = {}; identityAnnouncedCb_ = {}; dht_.reset(); -#ifdef OPENDHT_PROXY_CLIENT - dht_via_proxy_.reset(); -#endif running = State::Idle; throw; } - if (context.logger and dht_via_proxy_) { - dht_via_proxy_->setLogger(context.logger); - } if (context.statusChangedCallback) { statusCb = std::move(context.statusChangedCallback); } if (context.certificateStore) { dht_->setLocalCertificateStore(std::move(context.certificateStore)); - if (dht_via_proxy_) - dht_via_proxy_->setLocalCertificateStore(std::move(context.certificateStore)); } if (not config.threaded) @@ -289,7 +272,8 @@ DhtRunner::shutdown(ShutdownCallback cb, bool stop) { auto expected = State::Running; if (not running.compare_exchange_strong(expected, State::Stopping)) { if (expected == State::Stopping and ongoing_ops) { - shutdownCallbacks_.emplace_back(std::move(cb)); + if (cb) + shutdownCallbacks_.emplace_back(std::move(cb)); } else if (cb) { lck.unlock(); @@ -299,20 +283,10 @@ DhtRunner::shutdown(ShutdownCallback cb, bool stop) { } if (logger_) logger_->d("[runner %p] state changed to Stopping, %zu ongoing ops", this, ongoing_ops.load()); -#ifdef OPENDHT_PROXY_CLIENT - ongoing_ops += 2; -#else ongoing_ops++; -#endif shutdownCallbacks_.emplace_back(std::move(cb)); pending_ops.emplace([=](SecureDht&) mutable { auto onShutdown = [this]{ opEnded(); }; -#ifdef OPENDHT_PROXY_CLIENT - if (dht_via_proxy_) - dht_via_proxy_->shutdown(onShutdown, stop); - else - opEnded(); -#endif if (dht_) dht_->shutdown(onShutdown, stop); else @@ -419,31 +393,25 @@ void DhtRunner::dumpTables() const { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->dumpTables(); + dht_->dumpTables(); } InfoHash DhtRunner::getId() const { - if (auto dht = activeDht()) - return dht->getId(); - return {}; + return dht_ ? dht_->getId() : InfoHash{}; } std::shared_ptr<crypto::PublicKey> DhtRunner::getPublicKey() const { - if (auto dht = activeDht()) - return dht->getPublicKey(); - return {}; + return dht_ ? dht_->getPublicKey() : std::shared_ptr<crypto::PublicKey>{}; } InfoHash DhtRunner::getNodeId() const { - if (auto dht = activeDht()) - return dht->getNodeId(); - return {}; + return dht_ ? dht_->getNodeId() : InfoHash{}; } @@ -485,10 +453,6 @@ DhtRunner::setLogger(const Sp<Logger>& logger) { logger_ = logger; if (dht_) dht_->setLogger(logger); -#ifdef OPENDHT_PROXY_CLIENT - if (dht_via_proxy_) - dht_via_proxy_->setLogger(logger); -#endif } void @@ -502,16 +466,12 @@ DhtRunner::setLogFilter(const InfoHash& f) { std::lock_guard<std::mutex> lck(dht_mtx); if (dht_) dht_->setLogFilter(f); -#ifdef OPENDHT_PROXY_CLIENT - if (dht_via_proxy_) - dht_via_proxy_->setLogFilter(f); -#endif } void DhtRunner::registerType(const ValueType& type) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->registerType(type); + dht_->registerType(type); } void @@ -524,7 +484,7 @@ unsigned DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const { std::lock_guard<std::mutex> lck(dht_mtx); - const auto stats = activeDht()->getNodesStats(af); + const auto stats = dht_->getNodesStats(af); if (good_return) *good_return = stats.good_nodes; if (dubious_return) @@ -540,19 +500,19 @@ NodeStats DhtRunner::getNodesStats(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getNodesStats(af); + return dht_->getNodesStats(af); } NodeInfo DhtRunner::getNodeInfo() const { std::lock_guard<std::mutex> lck(dht_mtx); NodeInfo info {}; - if (auto dht = activeDht()) { - info.id = dht->getId(); - info.node_id = dht->getNodeId(); - info.ipv4 = dht->getNodesStats(AF_INET); - info.ipv6 = dht->getNodesStats(AF_INET6); - if (auto sock = dht->getSocket()) { + if (dht_) { + info.id = dht_->getId(); + info.node_id = dht_->getNodeId(); + info.ipv4 = dht_->getNodesStats(AF_INET); + info.ipv6 = dht_->getNodesStats(AF_INET6); + if (auto sock = dht_->getSocket()) { info.bound4 = sock->getBoundRef(AF_INET).getPort(); info.bound6 = sock->getBoundRef(AF_INET6).getPort(); } @@ -589,45 +549,45 @@ std::vector<unsigned> DhtRunner::getNodeMessageStats(bool in) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getNodeMessageStats(in); + return dht_->getNodeMessageStats(in); } std::string DhtRunner::getStorageLog() const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getStorageLog(); + return dht_->getStorageLog(); } std::string DhtRunner::getStorageLog(const InfoHash& f) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getStorageLog(f); + return dht_->getStorageLog(f); } std::string DhtRunner::getRoutingTablesLog(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getRoutingTablesLog(af); + return dht_->getRoutingTablesLog(af); } std::string DhtRunner::getSearchesLog(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getSearchesLog(af); + return dht_->getSearchesLog(af); } std::string DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getSearchLog(f, af); + return dht_->getSearchLog(f, af); } std::vector<SockAddr> DhtRunner::getPublicAddress(sa_family_t af) { std::lock_guard<std::mutex> lck(dht_mtx); - if (auto dht = activeDht()) - return dht->getPublicAddress(af); + if (dht_) + return dht_->getPublicAddress(af); return {}; } std::vector<std::string> @@ -654,15 +614,12 @@ DhtRunner::getPublicAddress(std::function<void(std::vector<SockAddr>&&)> cb, sa_ void DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->registerCertificate(cert); + dht_->registerCertificate(cert); } + void DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { std::lock_guard<std::mutex> lck(dht_mtx); -#ifdef OPENDHT_PROXY_CLIENT - if (dht_via_proxy_) - dht_via_proxy_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); -#endif if (dht_) dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); } @@ -670,8 +627,7 @@ DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { time_point DhtRunner::loop_() { - auto dht = activeDht(); - if (not dht) + if (not dht_) return {}; decltype(pending_ops) ops {}; @@ -682,7 +638,7 @@ DhtRunner::loop_() std::move(pending_ops) : std::move(pending_ops_prio); } while (not ops.empty()) { - ops.front()(*dht); + ops.front()(*dht_); ops.pop(); } @@ -715,13 +671,13 @@ DhtRunner::loop_() if (now - pkt.received > net::RX_QUEUE_MAX_DELAY) dropped++; else - wakeup = dht->periodic(pkt.data.data(), pkt.data.size(), std::move(pkt.from), now); + wakeup = dht_->periodic(pkt.data.data(), pkt.data.size(), std::move(pkt.from), now); pkt.data.clear(); } received_treated.splice(received_treated.end(), std::move(received)); } else { // Or just run the scheduler - wakeup = dht->periodic(nullptr, 0, nullptr, 0, clock::now()); + wakeup = dht_->periodic(nullptr, 0, nullptr, 0, clock::now()); } if (not received_treated.empty()) { @@ -733,8 +689,8 @@ DhtRunner::loop_() if (dropped && logger_) logger_->e("[runner %p] Dropped %zu packets with high delay.", this, dropped); - NodeStatus nstatus4 = dht->updateStatus(AF_INET); - NodeStatus nstatus6 = dht->updateStatus(AF_INET6); + NodeStatus nstatus4 = dht_->updateStatus(AF_INET); + NodeStatus nstatus6 = dht_->updateStatus(AF_INET6); if (nstatus4 != status4 || nstatus6 != status6) { status4 = nstatus4; status6 = nstatus6; @@ -792,27 +748,7 @@ DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w) return ret_token->get_future(); } pending_ops.emplace([=](SecureDht& dht) mutable { -#ifdef OPENDHT_PROXY_CLIENT - auto tokenbGlobal = listener_token_++; - auto& listener = listeners_[tokenbGlobal]; - listener.hash = hash; - listener.f = std::move(f); - listener.w = std::move(w); - listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector<Sp<Value>>& vals, bool expired) { - if (not vcb(vals, expired)) { - cancelListen(hash, tokenbGlobal); - return false; - } - return true; - }; - if (auto token = dht.listen(hash, listener.gcb, listener.f, listener.w)) { - if (use_proxy) listener.tokenProxyDht = token; - else listener.tokenClassicDht = token; - } - ret_token->set_value(tokenbGlobal); -#else ret_token->set_value(dht.listen(hash, std::move(vcb), std::move(f), std::move(w))); -#endif }); cv.notify_all(); return ret_token->get_future(); @@ -831,27 +767,10 @@ DhtRunner::cancelListen(InfoHash h, size_t token) if (running != State::Running) return; ongoing_ops++; -#ifdef OPENDHT_PROXY_CLIENT - pending_ops.emplace([=](SecureDht&) { - auto it = listeners_.find(token); - if (it != listeners_.end()) { - if (it->second.tokenClassicDht) - dht_->cancelListen(h, it->second.tokenClassicDht); - if (it->second.tokenProxyDht and dht_via_proxy_) - dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); - listeners_.erase(it); - } else { - if (logger_) - logger_->w("[runner %p] cancelListen: unknown token %zu.", this, token); - } - opEnded(); - }); -#else pending_ops.emplace([=](SecureDht& dht) { dht.cancelListen(h, token); opEnded(); }); -#endif // OPENDHT_PROXY_CLIENT cv.notify_all(); } @@ -862,28 +781,10 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) if (running != State::Running) return; ongoing_ops++; -#ifdef OPENDHT_PROXY_CLIENT - pending_ops.emplace([this, h, ftoken = std::move(ftoken)](SecureDht&) { - auto token = ftoken.get(); - auto it = listeners_.find(token); - if (it != listeners_.end()) { - if (it->second.tokenClassicDht) - dht_->cancelListen(h, it->second.tokenClassicDht); - if (it->second.tokenProxyDht and dht_via_proxy_) - dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); - listeners_.erase(it); - } else { - if (logger_) - logger_->w("[runner %p] cancelListen: unknown token %zu.", this, token); - } - opEnded(); - }); -#else pending_ops.emplace([this, h, ftoken = std::move(ftoken)](SecureDht& dht) { dht.cancelListen(h, ftoken.get()); opEnded(); }); -#endif // OPENDHT_PROXY_CLIENT cv.notify_all(); } @@ -1172,23 +1073,9 @@ void DhtRunner::resetDht() { peerDiscovery_.reset(); -#ifdef OPENDHT_PROXY_CLIENT - listeners_.clear(); - dht_via_proxy_.reset(); -#endif // OPENDHT_PROXY_CLIENT dht_.reset(); } -SecureDht* -DhtRunner::activeDht() const -{ -#ifdef OPENDHT_PROXY_CLIENT - return use_proxy? dht_via_proxy_.get() : dht_.get(); -#else - return dht_.get(); -#endif // OPENDHT_PROXY_CLIENT -} - void DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeId) { @@ -1201,7 +1088,7 @@ DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeI enableProxy(use_proxy and not config_.proxy_server.empty()); #else if (not proxy.empty()) - std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl; + throw std::runtime_error("DHT proxy requested but OpenDHT built without proxy support."); #endif } @@ -1209,54 +1096,33 @@ void DhtRunner::enableProxy(bool proxify) { #ifdef OPENDHT_PROXY_CLIENT - if (dht_via_proxy_) { - dht_via_proxy_->shutdown({}); + if (dht_) { + dht_->shutdown({}); } if (proxify) { // Init the proxy client - auto dht_via_proxy = std::unique_ptr<DhtInterface>( - new DhtProxyClient( + auto dht_via_proxy = std::make_unique<DhtProxyClient>( config_.server_ca, config_.client_identity, [this]{ if (config_.threaded) { - { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops_prio.emplace([=](SecureDht&) mutable {}); - } + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops_prio.emplace([=](SecureDht&) mutable {}); cv.notify_all(); } }, - config_.proxy_server, config_.push_node_id, logger_) - ); -#ifdef OPENDHT_PUSH_NOTIFICATIONS + config_.proxy_server, config_.push_node_id, logger_); if (not config_.push_token.empty()) dht_via_proxy->setPushNotificationToken(config_.push_token); -#endif - dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config, identityAnnouncedCb_, logger_)); - // add current listeners - for (auto& l: listeners_) - l.second.tokenProxyDht = dht_via_proxy_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w); + dht_ = std::make_unique<SecureDht>(std::move(dht_via_proxy), config_.dht_config, identityAnnouncedCb_, logger_); // and use it use_proxy = proxify; } else { use_proxy = proxify; - std::lock_guard<std::mutex> lck(storage_mtx); - if (not listeners_.empty()) { - pending_ops.emplace([this](SecureDht& /*dht*/) mutable { - if (not dht_) - return; - for (auto& l : listeners_) { - if (not l.second.tokenClassicDht) { - l.second.tokenClassicDht = dht_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w); - } - } - }); - } } #else if (proxify) - std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl; + throw std::runtime_error("DHT proxy requested but OpenDHT built without proxy support."); #endif } @@ -1264,16 +1130,8 @@ void DhtRunner::forwardAllMessages(bool forward) { std::lock_guard<std::mutex> lck(dht_mtx); -#ifdef OPENDHT_PROXY_SERVER -#ifdef OPENDHT_PROXY_CLIENT - if (dht_via_proxy_) - dht_via_proxy_->forwardAllMessages(forward); -#endif // OPENDHT_PROXY_CLIENT if (dht_) dht_->forwardAllMessages(forward); -#else - (void) forward; -#endif // OPENDHT_PROXY_SERVER } /** @@ -1284,8 +1142,8 @@ DhtRunner::setPushNotificationToken(const std::string& token) { std::lock_guard<std::mutex> lck(dht_mtx); #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS) config_.push_token = token; - if (dht_via_proxy_) - dht_via_proxy_->setPushNotificationToken(token); + if (dht_) + dht_->setPushNotificationToken(token); #else (void) token; #endif @@ -1295,13 +1153,11 @@ void DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& data) { #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS) - { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops_prio.emplace([=](SecureDht&) { - if (dht_via_proxy_) - dht_via_proxy_->pushNotificationReceived(data); - }); - } + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops_prio.emplace([=](SecureDht&) { + if (dht_) + dht_->pushNotificationReceived(data); + }); cv.notify_all(); #else (void) data; -- GitLab