diff --git a/src/dht.cpp b/src/dht.cpp index d647f80b8c7f6d42b198a9577b9dc4e6b738b710..4aa6ec329815dd8ddab0a6a07d49edc70cfda43c 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -237,7 +237,6 @@ struct Dht::Search { uint16_t tid; time_point refill_time {time_point::min()}; time_point step_time {time_point::min()}; /* the time of the last search step */ - unsigned current_get_requests {0}; /* number of concurrent sync requests */ std::shared_ptr<Scheduler::Job> nextSearchStep {}; bool expired {false}; /* no node, or all nodes expired */ @@ -267,6 +266,15 @@ struct Dht::Search { return (srn == nodes.end()) ? nullptr : &(*srn); } + /* number of concurrent sync requests */ + unsigned currentGetRequests() const { + unsigned count = 0; + for (const auto& n : nodes) + if (n.getStatus and n.getStatus->pending()) + count++; + return count; + } + /** * Can we use this search to announce ? */ @@ -596,7 +604,6 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& // Reset search timer if the search is empty else if (nodes.empty()) { step_time = TIME_INVALID; - current_get_requests = 0; } n = nodes.insert(n, SearchNode(node)); @@ -608,7 +615,6 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& auto to_remove = std::prev(nodes.end()); if (to_remove->getStatus and to_remove->getStatus->pending()) { to_remove->getStatus->cancel(); - current_get_requests--; } nodes.erase(to_remove); } @@ -653,7 +659,7 @@ Dht::expireSearches() Dht::SearchNode* Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update) { - if (sr->done or sr->current_get_requests >= SEARCH_REQUESTS) + if (sr->done or sr->currentGetRequests() >= SEARCH_REQUESTS) return nullptr; const auto& now = scheduler.time(); @@ -674,17 +680,14 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update return nullptr; } - DHT_LOG.DEBUG("[search %s IPv%c] [node %s] sending 'get'", + /*DHT_LOG.DEBUG("[search %s IPv%c] [node %s] sending 'get'", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - n->node->toString().c_str()); + n->node->toString().c_str());*/ std::weak_ptr<Search> ws = sr; auto onDone = [this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable { if (auto sr = ws.lock()) { - auto srn = sr->getNode(status.node); - if (srn and not srn->candidate) - sr->current_get_requests--; sr->insertNode(status.node, scheduler.time(), answer.ntoken); onGetValuesDone(status, answer, sr); } @@ -694,20 +697,15 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update if (auto sr = ws.lock()) { auto srn = sr->getNode(status.node); if (srn and not srn->candidate) { - DHT_LOG.DEBUG("[search %s IPv%c] [node %s] 'get' expired", + /*DHT_LOG.DEBUG("[search %s IPv%c] [node %s] 'get' expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - srn->node->toString().c_str()); - if (not over) { + srn->node->toString().c_str());*/ + if (not over) srn->candidate = true; - //DHT_LOG.DEBUG("[search %s] sn %s now candidate... %d", - // sr->id.toString().c_str(), srn->node->id.toString().c_str(), sr->current_get_requests); - } - sr->current_get_requests--; } scheduler.edit(sr->nextSearchStep, scheduler.time()); } }; - sr->current_get_requests++; std::shared_ptr<Request> rstatus; if (sr->callbacks.empty() and sr->listeners.empty()) rstatus = network_engine.sendFindNode(n->node, sr->id, -1, onDone, onExpired); @@ -725,7 +723,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) if (not sr or sr->expired or sr->done) return; const auto& now = scheduler.time(); - DHT_LOG.DEBUG("[search %s IPv%c] step (%d requests)", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', sr->current_get_requests); + DHT_LOG.DEBUG("[search %s IPv%c] step (%d requests)", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', sr->currentGetRequests()); sr->step_time = now; /* @@ -809,8 +807,8 @@ Dht::searchStep(std::shared_ptr<Search> sr) auto vid = a.value->id; const auto& type = getType(a.value->type); if (in) { - DHT_LOG.WARN("[search %s IPv%c] [value %lu] storing locally", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', vid); + /*DHT_LOG.WARN("[search %s IPv%c] [value %lu] storing locally", + sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', vid);*/ storageStore(sr->id, a.value, a.created); } for (auto& n : sr->nodes) { @@ -844,48 +842,48 @@ Dht::searchStep(std::shared_ptr<Search> sr) } if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) sr->done = true; - } else { - if (sr->current_get_requests < SEARCH_REQUESTS) { - unsigned i = 0; - SearchNode* sent; - do { - sent = searchSendGetValues(sr); - if (sent and not sent->candidate) - i++; + } + + if (sr->currentGetRequests() < SEARCH_REQUESTS) { + unsigned i = 0; + SearchNode* sent; + do { + sent = searchSendGetValues(sr); + if (sent and not sent->candidate) + i++; + } + while (sent and sr->currentGetRequests() < SEARCH_REQUESTS); + /*DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).", + sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->currentGetRequests());*/ + + auto expiredn = (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) { + return sn.candidate or sn.node->isExpired(); + }); + if (i == 0 && expiredn == sr->nodes.size()) + { + DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); + // no nodes or all expired nodes + sr->expired = true; + if (sr->announce.empty() && sr->listeners.empty()) { + // Listening or announcing requires keeping the cluster up to date. + sr->done = true; } - while (sent and sr->current_get_requests < SEARCH_REQUESTS); - DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->current_get_requests); - - auto expiredn = (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) { - return sn.candidate or sn.node->isExpired(); - }); - if (i == 0 && expiredn == sr->nodes.size()) { - DHT_LOG.ERROR("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); - // no nodes or all expired nodes - sr->expired = true; - if (sr->announce.empty() && sr->listeners.empty()) { - // Listening or announcing requires keeping the cluster up to date. - sr->done = true; - } - { - auto get_cbs = std::move(sr->callbacks); - for (const auto& g : get_cbs) { - if (g.done_cb) - g.done_cb(false, {}); - } - } - { - std::vector<DoneCallback> a_cbs; - a_cbs.reserve(sr->announce.size()); - for (const auto& a : sr->announce) - if (a.callback) - a_cbs.emplace_back(std::move(a.callback)); - for (const auto& a : a_cbs) - a(false, {}); + auto get_cbs = std::move(sr->callbacks); + for (const auto& g : get_cbs) { + if (g.done_cb) + g.done_cb(false, {}); } } + { + std::vector<DoneCallback> a_cbs; + a_cbs.reserve(sr->announce.size()); + for (const auto& a : sr->announce) + if (a.callback) + a_cbs.emplace_back(std::move(a.callback)); + for (const auto& a : a_cbs) + a(false, {}); + } } } @@ -989,13 +987,14 @@ Dht::Search::getUpdateTime(time_point now) const time_point ut = time_point::max(); const auto last_get = getLastGetTime(); unsigned i = 0, t = 0, d = 0; + const auto reqs = currentGetRequests(); for (const auto& sn : nodes) { if (sn.node->isExpired() or (sn.candidate and t >= TARGET_NODES)) continue; bool pending = sn.getStatus and sn.getStatus->pending(); if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) { // not isSynced - if (not pending and current_get_requests < SEARCH_REQUESTS) + if (not pending and reqs < SEARCH_REQUESTS) ut = std::min(ut, now); if (not sn.candidate) d++; @@ -1195,7 +1194,6 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallba sr->af = af; sr->tid = search_id++; sr->step_time = TIME_INVALID; - sr->current_get_requests = 0; sr->id = id; sr->done = false; sr->expired = false; @@ -1899,7 +1897,7 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const const auto& now = scheduler.time(); using namespace std::chrono; out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " G" << sr.callbacks.size(); - out << " age " << duration_cast<seconds>(now - sr.step_time).count() << "s sync ops: " << sr.current_get_requests; + out << " age " << duration_cast<seconds>(now - sr.step_time).count() << "s sync ops: " << sr.currentGetRequests(); if (sr.done) out << " [done]"; bool synced = sr.isSynced(now);