diff --git a/include/opendht/dht.h b/include/opendht/dht.h index f01e2c088da1a9240c9c84ddb61f5c6980466fab..600e5c246c4d397159311338831857da3b7cce20 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -294,6 +294,16 @@ private: the target 8 turn out to be dead. */ static constexpr unsigned SEARCH_NODES {14}; + /* The number of bad nodes is limited in order to help determine + * presence of connectivity changes. See + * https://github.com/savoirfairelinux/opendht/issues/137 for details. + * + * According to the tables, 25 is a good average value for big networks. If + * the network is small, normal search expiration process will handle the + * situation. + * */ + static constexpr unsigned SEARCH_MAX_BAD_NODES {25}; + /* Concurrent search nodes requested count */ static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4}; diff --git a/src/dht.cpp b/src/dht.cpp index b8fa169828a1a379400697ae470d05002aa61616..a803e66421962128ab653244eff00a4802bd4fcf 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -429,7 +429,7 @@ struct Dht::SearchNode { * Is this node expired or candidate */ bool isBad() const { - return !node || node->isExpired() || candidate; + return not node or node->isExpired() or candidate; } }; @@ -553,7 +553,21 @@ struct Dht::Search { /** * @return The number of non-good search nodes. */ - unsigned getNumberOfBadNodes() const; + unsigned getNumberOfBadNodes() const { + return std::count_if(nodes.begin(), nodes.end(), [](const SearchNode& sn) { + return sn.isBad(); + }); + } + unsigned getNumberOfConsecutiveBadNodes() const { + unsigned count = 0; + std::find_if(nodes.begin(), nodes.end(), [&count](const SearchNode& sn) { + if (not sn.isBad()) + return true; + ++count; + return false; + }); + return count; + } /** * Returns the time of the next "announce" event for this search, @@ -575,7 +589,66 @@ struct Dht::Search { */ time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; - bool removeExpiredNode(time_point now); + /** + * Removes a node which have been expired for at least + * NODE::NODE_EXPIRE_TIME minutes. The search for an expired node starts + * from the end. + * + * @param now The reference to now. + * + * @return true if a node has been removed, else false. + */ + bool removeExpiredNode(time_point now) { + auto e = nodes.end(); + while (e != nodes.cbegin()) { + e = std::prev(e); + const Node& n = *e->node; + if (n.isExpired() and n.time + Node::NODE_EXPIRE_TIME < now) { + //std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl; + nodes.erase(e); + return true; + } + } + return false; + } + + /** + * This method is called when we have discovered that the search is expired. + * We have to + * + * - remove all nodes from the search; + * - clear (non-permanent) callbacks; + */ + void expire() { + // no nodes or all expired nodes. This is most likely a connectivity change event. + expired = true; + + nodes.clear(); + if (announce.empty() && listeners.empty()) + // Listening or announcing requires keeping the cluster up to date. + setDone(); + { + auto get_cbs = std::move(callbacks); + for (const auto& g : get_cbs) { + if (g.second.done_cb) + g.second.done_cb(false, {}); + } + } + { + std::vector<DoneCallback> a_cbs; + a_cbs.reserve(announce.size()); + for (auto ait = announce.begin() ; ait != announce.end(); ) { + if (ait->callback) + a_cbs.emplace_back(std::move(ait->callback)); + if (not ait->permanent) + ait = announce.erase(ait); + else + ait++; + } + for (const auto& a : a_cbs) + a(false, {}); + } + } /** * If the value was just successfully announced, call the callback and erase it if not permanent. @@ -652,8 +725,12 @@ Dht::shutdown(ShutdownCallback cb) { for (const auto& str : store) { *remaining += maintainStorage(str.first, true, str_donecb); } - DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); - if (!*remaining && cb) { cb(); } + + if (!*remaining) { + DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); + if (cb) + cb(); + } } bool @@ -836,22 +913,6 @@ Dht::expireBuckets(RoutingTable& list) } } -bool -Dht::Search::removeExpiredNode(time_point now) -{ - auto e = nodes.end(); - while (e != nodes.cbegin()) { - e = std::prev(e); - const Node& n = *e->node; - if (n.isExpired() and n.time + Node::NODE_EXPIRE_TIME < now) { - //std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl; - nodes.erase(e); - return true; - } - } - return false; -} - /* A search contains a list of nodes, sorted by decreasing distance to the target. We just got a new candidate, insert it at the right spot or discard it. */ @@ -872,6 +933,8 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons found = true; break; } + + /* Node not found. We could insert it after this one. */ if (id.xorCmp(nid, n->node->id) > 0) { ++n; break; @@ -879,7 +942,7 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons } bool new_search_node = false; - if (!found) { + if (not found) { // find if and where to trim excessive nodes auto t = nodes.cend(); size_t bad = 0; // number of bad nodes (if search is not expired) @@ -936,9 +999,8 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons n->token = token; expired = false; } - if (new_search_node) { + if (new_search_node) removeExpiredNode(now); - } return new_search_node; } @@ -1341,44 +1403,17 @@ Dht::searchStep(std::shared_ptr<Search> sr) while (sent and sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES); /*DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->currentlySolicitedNodeCount());*/ + } - auto expiredn = (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) { - return sn.candidate or sn.node->isExpired(); - }); - if (i == 0 && expiredn == sr->nodes.size()) - { - DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); - // no nodes or all expired nodes - sr->expired = true; - if (sr->announce.empty() && sr->listeners.empty()) { - // Listening or announcing requires keeping the cluster up to date. - sr->setDone(); - } - { - auto get_cbs = std::move(sr->callbacks); - for (const auto& g : get_cbs) { - if (g.second.done_cb) - g.second.done_cb(false, {}); - } - } - { - std::vector<DoneCallback> a_cbs; - a_cbs.reserve(sr->announce.size()); - for (auto ait = sr->announce.begin() ; ait != sr->announce.end(); ) { - if (ait->callback) - a_cbs.emplace_back(std::move(ait->callback)); - if (not ait->permanent) - ait = sr->announce.erase(ait); - else - ait++; - } - for (const auto& a : a_cbs) - a(false, {}); - } - } + if (sr->getNumberOfConsecutiveBadNodes() >= std::min(sr->nodes.size(), + static_cast<size_t>(SEARCH_MAX_BAD_NODES))) + { + DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); + sr->expire(); + connectivityChanged(); } - //dumpSearch(*sr, std::cout); + /* dumpSearch(*sr, std::cout); */ /* periodic searchStep scheduling. */ if (not sr->done) @@ -1400,12 +1435,6 @@ Dht::Search::isSynced(time_point now) const return i > 0; } -unsigned Dht::Search::getNumberOfBadNodes() const { - return std::count_if(nodes.begin(), nodes.end(), - [=](const SearchNode& sn) { return sn.isBad(); } - ); -} - time_point Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const {