diff --git a/include/opendht/dht.h b/include/opendht/dht.h index f193984a0103319f98225f5d871a88f5d4ae75a2..038efd55aaa7c1edd1f23e4d63ca8fa744cac49b 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -322,6 +322,11 @@ private: to the destination, and use the additional ones to backtrack if any of the target 8 turn out to be dead. */ static constexpr unsigned SEARCH_NODES {14}; + + /* Concurrent requests during a search */ + static constexpr unsigned SEARCH_REQUESTS {3}; + + /* Number of listening nodes */ static constexpr unsigned LISTEN_NODES {3}; /* The maximum number of values we store for a given hash. */ @@ -333,10 +338,6 @@ private: /* The maximum number of searches we keep data about. */ static constexpr unsigned MAX_SEARCHES {128}; - /* The time after which we can send get requests for - a search in case of no answers. */ - static constexpr std::chrono::seconds SEARCH_GET_STEP {3}; - static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10}; /* The time after which we consider a search to be expirable. */ @@ -486,7 +487,7 @@ private: uint16_t tid; time_point refill_time {time_point::min()}; time_point step_time {time_point::min()}; /* the time of the last search step */ - time_point get_step_time {time_point::min()}; /* the time of the last get step */ + unsigned current_get_requests {0}; /* number of concurrent sync requests */ std::shared_ptr<Scheduler::Job> nextSearchStep {}; bool expired {false}; /* no node, or all nodes expired */ @@ -509,6 +510,13 @@ private: bool insertNode(std::shared_ptr<Node> n, time_point now, const Blob& token={}); unsigned insertBucket(const Bucket&, time_point now); + SearchNode* getNode(std::shared_ptr<Node>& n) { + auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) { + return n == sn.node; + }); + return (srn == nodes.end()) ? nullptr : &(*srn); + } + /** * Can we use this search to announce ? */ diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index ace4b6d01fc865d0671057874672856ab6661591..16a9637b5c10039aaeaf7ac89015dc1f5a1f8a17 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -478,6 +478,7 @@ private: auto now = scheduler.time(); if (req->node->isExpired(now) or req->expired(now)) { + req->completed = true; req->on_expired(req, true); req->clear(); requests.erase(req->tid); diff --git a/src/dht.cpp b/src/dht.cpp index 6498045715c4a6034f205d83a766f0acda3da262..852907bc338571c301239a8a9c1e3653d43f39f3 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -89,7 +89,6 @@ namespace dht { using namespace std::placeholders; -constexpr std::chrono::seconds Dht::SEARCH_GET_STEP; constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME; constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME; @@ -418,7 +417,7 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir if (not n->isGood(now)) { dubious = true; if (n->pinged_time + Node::MAX_RESPONSE_TIME < now) { - DHT_LOG.DEBUG("Sending ping to dubious node."); + DHT_LOG.DEBUG("Sending ping to dubious node %s.", n->toString().c_str()); network_engine.sendPing(n, nullptr, nullptr); break; } @@ -524,7 +523,7 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& // Reset search timer if the search is empty if (nodes.empty()) { step_time = TIME_INVALID; - get_step_time = TIME_INVALID; + current_get_requests = 0; } //bool synced = isSynced(now); @@ -542,7 +541,7 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& num_bad_nodes--; auto farthest_not_bad_node = std::find_if(nodes.rbegin(), nodes.rend(), - [&](const SearchNode& n) { return not n.isBad(now); } + [&](const SearchNode& n) { return not n.isBad(now) and not (n.getStatus and n.getStatus->pending(now)); } ); if (farthest_not_bad_node != nodes.rend()) { nodes.erase(std::prev(farthest_not_bad_node.base())); @@ -619,6 +618,9 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update auto onDone = [this,ws](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable { if (auto sr = ws.lock()) { + auto srn = sr->getNode(status->node); + if (srn and not srn->candidate) + sr->current_get_requests--; sr->insertNode(status->node, scheduler.time(), answer.ntoken); onGetValuesDone(status, answer, sr); } @@ -626,21 +628,21 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update auto onExpired = [this,ws](std::shared_ptr<NetworkEngine::Request> status, bool over) mutable { if (auto sr = ws.lock()) { - if (not over) { - auto srn = std::find_if(sr->nodes.begin(), sr->nodes.end(), [&status](SearchNode& sn) { - return status->node == sn.node; - }); - if (srn != sr->nodes.end()) { - DHT_LOG.DEBUG("[search %s] sn %s now candidate...", - sr->id.toString().c_str(), srn->node->id.toString().c_str()); - srn->candidate = true; + auto srn = sr->getNode(status->node); + if (srn and not srn->candidate) { + if (not over) { + srn->candidate = true; + sr->current_get_requests--; + //DHT_LOG.DEBUG("[search %s] sn %s now candidate... %d", + // sr->id.toString().c_str(), srn->node->id.toString().c_str(), sr->current_get_requests); + } else { + sr->current_get_requests--; } } - if (searchSendGetValues(sr)) - sr->get_step_time = scheduler.time(); - searchStep(sr); + scheduler.edit(sr->nextSearchStep, scheduler.time()); } }; + sr->current_get_requests++; std::shared_ptr<NetworkEngine::Request> rstatus; if (sr->callbacks.empty() and sr->listeners.empty()) rstatus = network_engine.sendFindNode(n->node, sr->id, -1, onDone, onExpired); @@ -655,10 +657,10 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update void Dht::searchStep(std::shared_ptr<Search> sr) { - if (not sr or sr->expired) return; + if (not sr or sr->expired or sr->done) return; const auto& now = scheduler.time(); - DHT_LOG.DEBUG("[search %s IPv%c] step", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); + DHT_LOG.DEBUG("[search %s IPv%c] step (%d requests)", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', sr->current_get_requests); sr->step_time = now; /* @@ -778,7 +780,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) sr->done = true; } - if (sr->get_step_time + SEARCH_GET_STEP <= now) { + if (sr->current_get_requests < SEARCH_REQUESTS) { unsigned i = 0; SearchNode* sent; do { @@ -786,13 +788,11 @@ Dht::searchStep(std::shared_ptr<Search> sr) if (sent and not sent->candidate) i++; } - while (sent and i < 3); + while (sent and sr->current_get_requests < SEARCH_REQUESTS); DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests.", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i); - if (i > 0) - sr->get_step_time = now; - else if ((size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) { + if (i == 0 && (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) { return sn.candidate or sn.node->isExpired(now); }) == sr->nodes.size()) { @@ -928,8 +928,8 @@ Dht::Search::getUpdateTime(time_point now) const bool pending = sn.getStatus and sn.getStatus->pending(now); if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) { // not isSynced - if (not pending) - ut = std::min(ut, get_step_time + SEARCH_GET_STEP); + if (not pending and current_get_requests < SEARCH_REQUESTS) + ut = std::min(ut, now); if (not sn.candidate) d++; } else { @@ -1032,7 +1032,7 @@ Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, ti auto ut = getUpdateTime(now); if (ut != time_point::max()) { - std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl; + //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl; next_step = std::min(next_step, ut); } @@ -1040,13 +1040,13 @@ Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, ti { auto at = getAnnounceTime(types, now); if (at != time_point::max()) { - std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl; + //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl; next_step = std::min(next_step, at); } auto lt = getListenTime(now); if (lt != time_point::max()) { - std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl; + //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl; next_step = std::min(next_step, lt); } } @@ -1128,7 +1128,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallba sr->af = af; sr->tid = search_id++; sr->step_time = TIME_INVALID; - sr->get_step_time = TIME_INVALID; + sr->current_get_requests = 0; sr->id = id; sr->done = false; sr->expired = false; @@ -2382,8 +2382,7 @@ Dht::onError(std::shared_ptr<NetworkEngine::Request> req, DhtProtocolException e network_engine.cancelRequest(n.getStatus); n.last_get_reply = time_point::min(); cleared++; - if (searchSendGetValues(sr)) - sr->get_step_time = scheduler.time(); + searchSendGetValues(sr); break; } } @@ -2468,7 +2467,7 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::Request> status, return; } - DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status->node->id.toString().c_str()); + DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status->node->toString().c_str()); if (not a.ntoken.empty()) { if (!a.values.empty()) { @@ -2509,8 +2508,7 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::Request> status, if (not sr->done) { const auto& now = scheduler.time(); - if (searchSendGetValues(sr)) /* always keep a 'get' request in progress if possible. */ - sr->get_step_time = now; + searchSendGetValues(sr); // Force to recompute the next step time scheduler.edit(sr->nextSearchStep, now); @@ -2549,8 +2547,7 @@ Dht::onListenDone(std::shared_ptr<NetworkEngine::Request>& status, NetworkEngine if (not sr->done) { const auto& now = scheduler.time(); - if (searchSendGetValues(sr)) - sr->get_step_time = now; + searchSendGetValues(sr); scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); } } else @@ -2634,8 +2631,7 @@ Dht::onAnnounceDone(std::shared_ptr<NetworkEngine::Request>&, NetworkEngine::Req DHT_LOG.DEBUG("[search %s IPv%c] got reply to put!", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', answer.values.size()); - if (searchSendGetValues(sr)) - sr->get_step_time = now; + searchSendGetValues(sr); // If the value was just successfully announced, call the callback sr->announce.erase(std::remove_if(sr->announce.begin(), sr->announce.end(),