diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 0e08e168d14ffdf1aa570b6f483deb567c622f7c..06a490e0993c1a804c9ec2558087f1df24e1dd6d 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -495,10 +495,6 @@ private: * */ bool candidate {false}; - - // Generic temporary flag. - // Must be reset to false after use by the algorithm. - bool pending {false}; }; /** @@ -828,8 +824,6 @@ private: const std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af) const; bool trySearchInsert(const std::shared_ptr<Node>& node); - void pinged(Node& n, Bucket *b = nullptr); - void blacklistNode(const InfoHash* id, const sockaddr*, socklen_t); bool isNodeBlacklisted(const sockaddr*, socklen_t) const; static bool isMartian(const sockaddr*, socklen_t); diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 5eeaab5c91db49b40b507473293508dc0b3e4f7d..5243ed7ecfb00765acb3651ca74e4d3ba2df3101 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -446,6 +446,8 @@ private: bool rateLimit(); + void pinged(Node&); + void requestStep(std::shared_ptr<Request> req) { if (req->completed or req->cancelled) return; @@ -462,6 +464,7 @@ private: send((char*)req->msg.data(), req->msg.size(), (req->node->reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM, (sockaddr*)&req->node->ss, req->node->sslen); + pinged(*req->node); ++req->attempt_count; req->last_try = now; std::weak_ptr<Request> wreq = req; diff --git a/include/opendht/node.h b/include/opendht/node.h index 601a9e1f032e969b60859f81589798f52d1ceecb..7dea83446dda84fb54d6f6237c39bae109228c5d 100644 --- a/include/opendht/node.h +++ b/include/opendht/node.h @@ -35,13 +35,14 @@ struct NodeExport { }; struct Node { + friend class NetworkEngine; + InfoHash id {}; sockaddr_storage ss; socklen_t sslen {0}; time_point time {time_point::min()}; /* last time eared about */ time_point reply_time {time_point::min()}; /* time of last correct reply received */ time_point pinged_time {time_point::min()}; /* time of last message sent */ - unsigned pinged {0}; /* how many requests we sent since last reply */ Node() : ss() { std::fill_n((uint8_t*)&ss, sizeof(ss), 0); @@ -77,6 +78,11 @@ struct Node { Answer should be true if the message was an aswer to a request we made*/ void received(time_point now, bool answer); + /** + * Resets the state of the node so it's not expired anymore. + */ + void reset() { pinged = 0; } + friend std::ostream& operator<< (std::ostream& s, const Node& h); static constexpr const std::chrono::minutes NODE_GOOD_TIME {120}; @@ -86,6 +92,9 @@ struct Node { /* Time for a request to timeout */ static constexpr const std::chrono::seconds MAX_RESPONSE_TIME {3}; + +private: + unsigned pinged {0}; /* how many requests we sent since last reply */ }; } diff --git a/src/dht.cpp b/src/dht.cpp index 7e9dd8c05911b50467c7ca88e643d74e5301497e..98197f7152e4c4d873bf810b532747871c1d8a3d 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -370,7 +370,7 @@ Dht::NodeCache::clearBadNodes(sa_family_t family) auto& list = family == AF_INET ? cache_4 : cache_6; for (auto n = list.begin(); n != list.end();) { if (auto ln = n->lock()) { - ln->pinged = 0; + ln->reset(); ++n; } else { n = list.erase(n); @@ -394,22 +394,6 @@ Dht::sendCachedPing(Bucket& b) return 0; } -/* Called whenever we send a request to a node, increases the ping count - and, if that reaches 3, sends a ping to a new candidate. */ -void -Dht::pinged(Node& n, Bucket* b) -{ - const auto& now = scheduler.time(); - if (not n.isExpired(now)) { - n.requested(now); - if (n.pinged >= 3) { - if (not b) - b = findBucket(n.id, n.ss.ss_family); - if (b) sendCachedPing(*b); - } - } -} - /* The internal blacklist is an LRU cache of nodes that have sent incorrect messages. */ void @@ -418,12 +402,7 @@ Dht::blacklistNode(const InfoHash* id, const sockaddr *sa, socklen_t salen) DHT_LOG.WARN("Blacklisting broken node."); if (id) { - /* Make the node easy to discard. */ auto n = findNode(*id, sa->sa_family); - if (n) { - n->pinged = 3; - pinged(*n); - } /* Discard it from any searches in progress. */ auto black_list_in = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { for (auto& srp : srs) { @@ -597,9 +576,6 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir if (n->pinged_time + Node::MAX_RESPONSE_TIME < now) { DHT_LOG.DEBUG("Sending ping to dubious node."); network_engine.sendPing(n, nullptr, nullptr); - n->pinged++; - n->pinged_time = now; - //pinged(n, b); break; } } @@ -637,8 +613,8 @@ Dht::expireBuckets(RoutingTable& list) { for (auto& b : list) { bool changed = false; - b.nodes.remove_if([&changed](const std::shared_ptr<Node>& n) { - if (n->pinged >= 4) { + b.nodes.remove_if([this,&changed](const std::shared_ptr<Node>& n) { + if (n->isExpired(scheduler.time())) { changed = true; return true; } @@ -787,11 +763,10 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update return nullptr; } - DHT_LOG.DEBUG("[search %s IPv%c] [node %s %s] sending 'get' (p %d)", + DHT_LOG.DEBUG("[search %s IPv%c] [node %s %s] sending 'get'", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', n->node->id.toString().c_str(), - print_addr(n->node->ss, n->node->sslen).c_str(), - n->node->pinged); + print_addr(n->node->ss, n->node->sslen).c_str()); auto onDone = [=](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable { @@ -824,7 +799,6 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update else rstatus = network_engine.sendGetValues(n->node, sr->id, -1, onDone, onExpired); n->getStatus = rstatus; - pinged(*n->node); return n; } @@ -902,7 +876,6 @@ Dht::searchStep(std::shared_ptr<Search> sr) } } ); - n.pending = true; } t++; if (not n.candidate and ++i == LISTEN_NODES) @@ -944,9 +917,6 @@ Dht::searchStep(std::shared_ptr<Search> sr) if (sr) { searchStep(sr); } } ); - // use the "pending" flag so we update the "pinged" - // fields after sending the announce requests for every value to announce - n.pending = true; } t++; if (not n.candidate and ++i == TARGET_NODES) @@ -962,11 +932,8 @@ Dht::searchStep(std::shared_ptr<Search> sr) SearchNode* sent; do { sent = searchSendGetValues(sr); - if (sent) { - sent->pending = false; - if (not sent->candidate) - i++; - } + if (sent and not sent->candidate) + i++; } while (sent and i < 3); DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests.", @@ -1003,14 +970,6 @@ Dht::searchStep(std::shared_ptr<Search> sr) } } } - - for (auto& n : sr->nodes) { - if (n.pending) { - n.pending = false; - pinged(*n.node); - } - } - } @@ -1989,8 +1948,6 @@ Dht::dumpBucket(const Bucket& b, std::ostream& out) const out << " age " << duration_cast<seconds>(now - n->time).count() << ", reply: " << duration_cast<seconds>(now - n->reply_time).count(); else out << " age " << duration_cast<seconds>(now - n->time).count(); - if (n->pinged) - out << " [p " << n->pinged << "]"; if (n->isExpired(now)) out << " [expired]"; else if (n->isGood(now)) @@ -2031,10 +1988,7 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const out << ' ' << (n.candidate ? 'c' : ' '); out << " [" << (n.node->isMessagePending(now) ? 'f':' '); - if (n.node->pinged) - out << n.node->pinged; - else - out << ' '; + out << ' '; out << (n.node->isExpired(now) ? 'x' : ' ') << "]"; { @@ -2238,7 +2192,6 @@ Dht::neighbourhoodMaintenance(RoutingTable& list) if (n) { DHT_LOG.DEBUG("[find %s IPv%c] sending find for neighborhood maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4'); network_engine.sendFindNode(n, id, network_engine.want(), nullptr, nullptr); - pinged(*n, &(*q)); } return true; @@ -2288,7 +2241,6 @@ Dht::bucketMaintenance(RoutingTable& list) DHT_LOG.DEBUG("[find %s IPv%c] sending for bucket maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4'); network_engine.sendFindNode(n, id, want, nullptr, nullptr); - pinged(*n, &(*q)); /* In order to avoid sending queries back-to-back, give up for now and reschedule us soon. */ return true; diff --git a/src/network_engine.cpp b/src/network_engine.cpp index ea968eebabd2054d3d551a2200febad2d3e8e258..3f2d703671a1fc91f7a1ed4b57496dd27f85e7c1 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -44,6 +44,16 @@ const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::ANNOUNCE_VALUES = const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::LISTEN = {"lt"}; constexpr long unsigned NetworkEngine::MAX_REQUESTS_PER_SEC; +/* Called whenever we send a request to a node, increases the ping count + and, if that reaches 3, sends a ping to a new candidate. */ +void +NetworkEngine::pinged(Node& n) +{ + const auto& now = scheduler.time(); + if (not n.isExpired(now)) + n.requested(now); +} + void NetworkEngine::tellListener(const sockaddr *sa, socklen_t salen, uint16_t rid, InfoHash hash, want_t want, Blob ntoken, std::vector<std::shared_ptr<Node>> nodes, std::vector<std::shared_ptr<Node>> nodes6,