From 7c5db84f3373d1e12bc7a85edcb7878c48e84152 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20D=C3=A9saulniers?= <sim.desaulniers@gmail.com>
Date: Tue, 13 Sep 2016 18:59:18 -0400
Subject: [PATCH] dht: take multiple queries into account

The code preceding this would just not take multiple concurrent queries for a
same search into account. This is a bug fix for a bad behavior since the
queries.
---
 src/dht.cpp | 61 +++++++++++++++++++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 20 deletions(-)

diff --git a/src/dht.cpp b/src/dht.cpp
index 31c3851f..8d2cfb60 100644
--- a/src/dht.cpp
+++ b/src/dht.cpp
@@ -504,8 +504,10 @@ struct Dht::Search {
     /**
      * Get the time of the last "get" operation performed on this search,
      * or time_point::min() if no such operation have been performed.
+     *
+     * @param query  The query identifying a 'get' request.
      */
-    time_point getLastGetTime() const;
+    time_point getLastGetTime(std::shared_ptr<Query> query = {}) const;
 
     /**
      * Is this get operation done ?
@@ -943,8 +945,21 @@ Dht::searchNodeGetDone(const Request& status,
         std::weak_ptr<Search> ws,
         std::shared_ptr<Query> query)
 {
+    const auto& now = scheduler.time();
     if (auto sr = ws.lock()) {
-        sr->insertNode(status.node, scheduler.time(), answer.ntoken);
+        if (auto srn = sr->getNode(status.node)) {
+            /* all other get requests which are satisfied by this answer
+               should not be sent anymore */
+            for (auto& g : sr->callbacks) {
+                auto& q = g.second.query;
+                if (q->isSatisfiedBy(*query) and q != query) {
+                    auto req = std::make_shared<Request>();
+                    req->cancel();
+                    srn->getStatus[q] = std::move(req);
+                }
+            }
+        }
+        sr->insertNode(status.node, now, answer.ntoken);
         onGetValuesDone(status, answer, sr, query);
     }
 }
@@ -1028,18 +1043,17 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
         return nullptr;
 
     const auto& now = scheduler.time();
-    const time_point up = update ? sr->getLastGetTime() : time_point::min();
 
     std::weak_ptr<Search> ws = sr;
-    SearchNode* n = nullptr;
     auto cb = sr->callbacks.begin();
-    do { /* for all queries to send */
-
-        /* cases                                  v 'get'            v 'find_node' */
-        auto query = cb != sr->callbacks.end() ? cb->second.query : std::make_shared<Query>();
-        if (pn) {
-            if (not pn->canGet(now, up, query))
-                return nullptr;
+    do { /* for all requests to send */
+        SearchNode* n = nullptr;
+        auto query = not sr->callbacks.empty() ? cb->second.query : std::make_shared<Query>();
+        const time_point up = not sr->callbacks.empty() and update
+                                ? sr->getLastGetTime(query)
+                                : time_point::min();
+
+        if (pn and pn->canGet(now, up, query)) {
             n = pn;
         } else {
             for (auto& sn : sr->nodes) {
@@ -1048,11 +1062,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
                     break;
                 }
             }
+        }
+
+        if (sr->callbacks.empty()) { /* 'find_node' request */
             if (not n)
                 return nullptr;
-        }
 
-        if (sr->callbacks.empty()) {
             DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'find_node'",
                     sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
                     n->node->toString().c_str());
@@ -1067,9 +1082,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
                         }
                         searchNodeGetDone(status, std::forward<NetworkEngine::RequestAnswer>(answer), ws, query);
                     },
-                    /* std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), */
                     std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
-        } else {
+
+        } else { /* 'get' request */
+            if (not n)
+                continue;
+
             if (query and not query->select.getSelection().empty()) {
                 /* The request contains a select. No need to paginate... */
                 DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'get'",
@@ -1085,10 +1103,13 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
                 paginate(ws, query, n);
         }
 
-        if (not sr->isSynced(now) or cb == sr->callbacks.end())
-            break; /* only trying to find nodes, only send the oldest query */
+        /* We only try to send one request. return. */
+        return n;
+
     } while (++cb != sr->callbacks.end());
-    return n;
+
+    /* no request were sent */
+    return nullptr;
 }
 
 void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) {
@@ -1365,11 +1386,11 @@ unsigned Dht::Search::getNumberOfBadNodes() const {
 }
 
 time_point
-Dht::Search::getLastGetTime() const
+Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const
 {
     time_point last = time_point::min();
     for (const auto& g : callbacks)
-        last = std::max(last, g.second.start);
+        last = std::max(last, (not q or q->isSatisfiedBy(*g.second.query) ? g.second.start : time_point::min()));
     return last;
 }
 
-- 
GitLab