diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 6def61d585c013f83b2fd7cc84de7732e97acdb1..09229eb9e24af2e6eea4bf33e564aa908f555f57 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -197,10 +197,31 @@ public: } void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={}); - void bootstrap(const char* host, const char* service, DoneCallbackSimple&& cb={}); + /** + * Insert known nodes to the routing table, without necessarly ping them. + * Usefull to restart a node and get things running fast without putting load on the network. + */ void bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes, DoneCallbackSimple&& cb={}); + + /** + * Insert known nodes to the routing table, without necessarly ping them. + * Usefull to restart a node and get things running fast without putting load on the network. + */ void bootstrap(const std::vector<NodeExport>& nodes); + /** + * Add host:service to bootstrap nodes, and ping this node. + * DNS resolution is performed asynchronously. + * When disconnected, all bootstrap nodes added with this method will be tried regularly until connection + * to the DHT network is established. + */ + void bootstrap(const std::string& host, const std::string& service); + + /** + * Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&). + */ + void clearBootstrap(); + /** * Inform the DHT of lower-layer connectivity changes. * This will cause the DHT to assume an IP address change. @@ -331,11 +352,20 @@ public: void join(); private: + static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10}; + + /** + * Will try to resolve the list of hostnames `bootstrap_nodes` on seperate + * thread and then queue ping requests. This list should contain reliable + * nodes so that the DHT node can recover quickly from losing connection + * with the network. + */ + void tryBootstrapCoutinuously(); void doRun(const sockaddr_in* sin4, const sockaddr_in6* sin6, SecureDhtConfig config); time_point loop_(); - static std::vector<std::pair<sockaddr_storage, socklen_t>> getAddrInfo(const char* host, const char* service); + static std::vector<std::pair<sockaddr_storage, socklen_t>> getAddrInfo(const std::string& host, const std::string& service); NodeStatus getStatus() const { return std::max(status4, status6); @@ -350,6 +380,15 @@ private: std::mutex sock_mtx {}; std::vector<std::pair<Blob, SockAddr>> rcv {}; + /** true if currently actively boostraping */ + std::atomic_bool bootstraping {false}; + /* bootstrap nodes given as (host, service) pairs */ + std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {}; + std::vector<std::pair<std::string,std::string>> bootstrap_nodes {}; + std::thread bootstrap_thread {}; + /** protects bootstrap_nodes, bootstrap_thread */ + std::mutex bootstrap_mtx {}; + std::queue<std::function<void(SecureDht&)>> pending_ops_prio {}; std::queue<std::function<void(SecureDht&)>> pending_ops {}; std::mutex storage_mtx {}; diff --git a/src/dht.cpp b/src/dht.cpp index 19477f06c7efe3446f5a1947df2379fd78889611..971424ca2a1d9181c50048b5cb2542a0bff752a7 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -2385,9 +2385,8 @@ Dht::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_retu good++; if (n->time > n->reply_time) incoming++; - } else { + } else if (not n->isExpired()) dubious++; - } } if (b.cached) cached++; @@ -2669,11 +2668,12 @@ Dht::neighbourhoodMaintenance(RoutingTable& list) q = r; } - /* Since our node-id is the same in both DHTs, it's probably - profitable to query both families. */ auto n = q->randomNode(); if (n) { - DHT_LOG.DEBUG("[find %s IPv%c] sending find for neighborhood maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4'); + DHT_LOG.DEBUG("[find %s IPv%c] sending find for neighborhood maintenance.", + id.toString().c_str(), q->af == AF_INET6 ? '6' : '4'); + /* Since our node-id is the same in both DHTs, it's probably + profitable to query both families. */ network_engine.sendFindNode(n, id, network_engine.want(), nullptr, nullptr); } @@ -2876,9 +2876,9 @@ Dht::confirmNodes() case is roughly 27 seconds, assuming the table is 22 bits deep. We want to keep a margin for neighborhood maintenance, so keep this within 25 seconds. */ - auto time_dis = soon ? - uniform_duration_distribution<> {seconds(5) , seconds(25)} - : uniform_duration_distribution<> {seconds(60), seconds(180)}; + auto time_dis = soon + ? uniform_duration_distribution<> {seconds(5) , seconds(25)} + : uniform_duration_distribution<> {seconds(60), seconds(180)}; auto confirm_nodes_time = now + time_dis(rd); nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index ee1a07f43a7f6506ec693e5659b6df75047bcf31..8a39fee256110ed0cf5076fb59e1ad41713d2f90 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -37,6 +37,8 @@ namespace dht { +constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; + DhtRunner::DhtRunner() : dht_() { #ifdef _WIN32 @@ -131,6 +133,9 @@ DhtRunner::join() dht_thread.join(); if (rcv_thread.joinable()) rcv_thread.join(); + if (bootstrap_thread.joinable()) + bootstrap_thread.join(); + { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops = decltype(pending_ops)(); @@ -327,6 +332,15 @@ DhtRunner::loop_() if (nstatus4 != status4 || nstatus6 != status6) { status4 = nstatus4; status6 = nstatus6; + if (status4 == NodeStatus::Disconnected and status6 == NodeStatus::Disconnected) { + // We have lost connection with the DHT. Try to recover using bootstrap nodes. + std::unique_lock<std::mutex> lck(bootstrap_mtx); + bootstrap_nodes = bootstrap_nodes_all; + tryBootstrapCoutinuously(); + } else { + std::unique_lock<std::mutex> lck(bootstrap_mtx); + bootstrap_nodes.clear(); + } if (statusCb) statusCb(status4, status6); } @@ -571,18 +585,71 @@ DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, Done putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), cb); } +void +DhtRunner::tryBootstrapCoutinuously() +{ + if (bootstrap_thread.joinable()) { + if (bootstraping) + return; // already running + else + bootstrap_thread.join(); + } + bootstraping = true; + bootstrap_thread = std::thread([this]() { + auto next = clock::now(); + do { + decltype(bootstrap_nodes) nodes; + { + std::lock_guard<std::mutex> lck(bootstrap_mtx); + nodes = bootstrap_nodes; + } + + next += BOOTSTRAP_PERIOD; + { + std::mutex mtx; // dummy mutex + std::unique_lock<std::mutex> blck(mtx); + std::atomic<unsigned> ping_count(0); + // Reverse: try last inserted bootstrap nodes first + for (auto it = nodes.rbegin(); it != nodes.rend(); it++) { + ++ping_count; + try { + bootstrap(getAddrInfo(it->first, it->second), [&](bool) { + --ping_count; + cv.notify_all(); + }); + } catch (std::invalid_argument& e) { + --ping_count; + std::cerr << e.what() << std::endl; + } + } + // wait at least until the next BOOTSTRAP_PERIOD + cv.wait_until(blck, next, [&]() { return not running; }); + // wait for bootstrap requests to end. + cv.wait(blck, [&]() { return not running or ping_count == 0; }); + } + // update state + { + std::lock_guard<std::mutex> lck(dht_mtx); + bootstraping = running and + status4 == NodeStatus::Disconnected and + status6 == NodeStatus::Disconnected; + } + } while (bootstraping); + }); +} + std::vector<std::pair<sockaddr_storage, socklen_t>> -DhtRunner::getAddrInfo(const char* host, const char* service) +DhtRunner::getAddrInfo(const std::string& host, const std::string& service) { std::vector<std::pair<sockaddr_storage, socklen_t>> ips {}; - if (not host or not service or strlen(host) == 0) + if (host.empty()) return ips; addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_DGRAM; addrinfo* info = nullptr; - int rc = getaddrinfo(host, service, &hints, &info); + int rc = getaddrinfo(host.c_str(), service.c_str(), &hints, &info); if(rc != 0) throw std::invalid_argument(std::string("Error: `") + host + ":" + service + "`: " + gai_strerror(rc)); @@ -597,9 +664,19 @@ DhtRunner::getAddrInfo(const char* host, const char* service) } void -DhtRunner::bootstrap(const char* host, const char* service, DoneCallbackSimple&& cb) +DhtRunner::bootstrap(const std::string& host, const std::string& service) +{ + std::lock_guard<std::mutex> lck(bootstrap_mtx); + bootstrap_nodes_all.emplace_back(host, service); + bootstrap_nodes.emplace_back(host, service); + tryBootstrapCoutinuously(); +} + +void +DhtRunner::clearBootstrap() { - bootstrap(getAddrInfo(host, service), std::forward<DoneCallbackSimple>(cb)); + std::lock_guard<std::mutex> lck(bootstrap_mtx); + bootstrap_nodes_all.clear(); } void