diff --git a/src/dht.cpp b/src/dht.cpp index 720d7eabf2a889300fb168b2bcbc5a19c17748cf..bc50a54d7da3603aef9c06bfb751363fb82ff127 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -265,31 +265,31 @@ struct Dht::SearchNode { * met: * * - The node is not expired; - * - If we have heard from the node, we must have heard from him in the last - * NODE_EXPIRE_TIME minutes; - * - The request must not already have been sent; - * - No other request satisfying the request must be pending; * - The pagination process for this particular 'get' must not have begun; + * - There hasn't been any response for a request, satisfying the initial + * request, anytime following the initial request. + * - No other request satisfying the request must be pending; * * @param now The time reference to now. - * @param update Time of the last "get" op for the search. + * @param update The time of the last 'get' op satisfying this request. * @param q The query defining the "get" operation we're referring to. * * @return true if we can send get, else false. */ bool canGet(time_point now, time_point update, std::shared_ptr<Query> q = {}) const { - /* Find request status for the given query */ - const auto& get_status = getStatus.find(q); - /* Find request status for a query satisfying the initial query */ - const auto& sq_status = std::find_if(getStatus.cbegin(), getStatus.cend(), - [&q](const SyncStatus::value_type& s) { - return s.first and q and q->isSatisfiedBy(*s.first) and s.second and s.second->pending(); + if (node->isExpired() or not (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply)) + return false; + + auto completed_sq_status {false}, pending_sq_status {false}; + for (const auto& s : getStatus) { + if (s.first and q and q->isSatisfiedBy(*s.first) and s.second) { + if (s.second->pending() and not pending_sq_status) + pending_sq_status = true; + if (s.second->reply_time > update and not completed_sq_status) + completed_sq_status = true; } - ); - return not node->isExpired() and (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply) - and not hasStartedPagination(q) - and (get_status == getStatus.cend() or not get_status->second) - and sq_status == getStatus.cend(); + } + return not (hasStartedPagination(q) or completed_sq_status or pending_sq_status); } /** @@ -331,8 +331,7 @@ struct Dht::SearchNode { return req != getStatus.cend() and req->second and req->second->pending(); }) != pqs->second.cend(); return not paginationPending; - } - else { /* no pagination yet */ + } else { /* no pagination yet */ const auto& gs = get.query ? getStatus.find(get.query) : getStatus.cend(); return gs != getStatus.end() and gs->second and not gs->second->pending(); } @@ -505,14 +504,51 @@ struct Dht::Search { /** * Get the time of the last "get" operation performed on this search, * or time_point::min() if no such operation have been performed. + * + * @param query The query identifying a 'get' request. */ - time_point getLastGetTime() const; + time_point getLastGetTime(std::shared_ptr<Query> query = {}) const; /** * Is this get operation done ? */ bool isDone(const Get& get) const; + /** + * Sets a consistent state of the search after a given 'get' operation as + * been completed. + * + * This will also make sure to call the associated 'done callback'. + * + * @param get The 'get' operation which is now over. + */ + void setDone(const Get& get) { + for (auto& n : nodes) { + auto pqs = n.pagination_queries.find(get.query); + if (pqs != n.pagination_queries.cend()) { + for (auto& pq : pqs->second) + n.getStatus.erase(pq); + } + n.getStatus.erase(get.query); + } + if (get.done_cb) + get.done_cb(true, getNodes()); + } + + /** + * Set the search in a consistent state after the search is done. This is + * the opportunity we have to clear some entries in the SearchNodes status + * maps. + */ + void setDone() { + for (auto& n : nodes) { + n.getStatus.clear(); + n.listenStatus.clear(); + n.acked.clear(); + } + done = true; + } + time_point getUpdateTime(time_point now) const; bool isAnnounced(Value::Id id, const ValueType& type, time_point now) const; @@ -944,8 +980,21 @@ Dht::searchNodeGetDone(const Request& status, std::weak_ptr<Search> ws, std::shared_ptr<Query> query) { + const auto& now = scheduler.time(); if (auto sr = ws.lock()) { - sr->insertNode(status.node, scheduler.time(), answer.ntoken); + if (auto srn = sr->getNode(status.node)) { + /* all other get requests which are satisfied by this answer + should not be sent anymore */ + for (auto& g : sr->callbacks) { + auto& q = g.second.query; + if (q->isSatisfiedBy(*query) and q != query) { + auto req = std::make_shared<Request>(); + req->cancel(); + srn->getStatus[q] = std::move(req); + } + } + } + sr->insertNode(status.node, now, answer.ntoken); onGetValuesDone(status, answer, sr, query); } } @@ -1029,18 +1078,17 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update return nullptr; const auto& now = scheduler.time(); - const time_point up = update ? sr->getLastGetTime() : time_point::min(); std::weak_ptr<Search> ws = sr; - SearchNode* n = nullptr; auto cb = sr->callbacks.begin(); - do { /* for all queries to send */ - - /* cases v 'get' v 'find_node' */ - auto query = cb != sr->callbacks.end() ? cb->second.query : std::make_shared<Query>(); - if (pn) { - if (not pn->canGet(now, up, query)) - return nullptr; + do { /* for all requests to send */ + SearchNode* n = nullptr; + auto query = not sr->callbacks.empty() ? cb->second.query : std::make_shared<Query>(); + const time_point up = not sr->callbacks.empty() and update + ? sr->getLastGetTime(query) + : time_point::min(); + + if (pn and pn->canGet(now, up, query)) { n = pn; } else { for (auto& sn : sr->nodes) { @@ -1049,11 +1097,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update break; } } + } + + if (sr->callbacks.empty()) { /* 'find_node' request */ if (not n) return nullptr; - } - if (sr->callbacks.empty()) { DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'find_node'", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', n->node->toString().c_str()); @@ -1068,9 +1117,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update } searchNodeGetDone(status, std::forward<NetworkEngine::RequestAnswer>(answer), ws, query); }, - /* std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), */ std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query)); - } else { + + } else { /* 'get' request */ + if (not n) + continue; + if (query and not query->select.getSelection().empty()) { /* The request contains a select. No need to paginate... */ DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'get'", @@ -1086,10 +1138,13 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update paginate(ws, query, n); } - if (not sr->isSynced(now) or cb == sr->callbacks.end()) - break; /* only trying to find nodes, only send the oldest query */ + /* We only try to send one request. return. */ + return n; + } while (++cb != sr->callbacks.end()); - return n; + + /* no request were sent */ + return nullptr; } void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { @@ -1214,10 +1269,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) // Call callbacks when done for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) { if (sr->isDone(b->second)) { - if (b->second.done_cb) - b->second.done_cb(true, sr->getNodes()); - for (auto& n : sr->nodes) - n.getStatus.erase(b->second.query); + sr->setDone(b->second); b = sr->callbacks.erase(b); } else @@ -1228,7 +1280,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) sr->checkAnnounced(types, now); if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) - sr->done = true; + sr->setDone(); } // true if this node is part of the target nodes cluter. @@ -1286,7 +1338,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) searchSendAnnounceValue(sr); if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) - sr->done = true; + sr->setDone(); } if (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES) { @@ -1311,7 +1363,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) sr->expired = true; if (sr->announce.empty() && sr->listeners.empty()) { // Listening or announcing requires keeping the cluster up to date. - sr->done = true; + sr->setDone(); } { auto get_cbs = std::move(sr->callbacks); @@ -1366,11 +1418,11 @@ unsigned Dht::Search::getNumberOfBadNodes() const { } time_point -Dht::Search::getLastGetTime() const +Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const { time_point last = time_point::min(); for (const auto& g : callbacks) - last = std::max(last, g.second.start); + last = std::max(last, (not q or q->isSatisfiedBy(*g.second.query) ? g.second.start : time_point::min())); return last; }