From 8da15ad45004777a87e84e535ab7e3baa6cf6609 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com>
Date: Sun, 22 May 2016 02:17:13 -0400
Subject: [PATCH] dht: adapt search routine to network engine

---
 src/dht.cpp | 51 ++++++++++++++++++++++++++++-----------------------
 1 file changed, 28 insertions(+), 23 deletions(-)

diff --git a/src/dht.cpp b/src/dht.cpp
index 3d64fc7f..54a4ccf4 100644
--- a/src/dht.cpp
+++ b/src/dht.cpp
@@ -587,7 +587,6 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons
         return false;
 
     bool found = false;
-    unsigned num_bad_nodes = getNumberOfBadNodes();
     auto n = std::find_if(nodes.begin(), nodes.end(), [&](const SearchNode& sn) {
         if (sn.node == snode) {
             found = true;
@@ -600,6 +599,7 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons
     if (!found) {
         // Be more restricitve if there are too many
         // good or unknown nodes in this search,
+        unsigned num_bad_nodes = getNumberOfBadNodes();
         if (nodes.size() - num_bad_nodes >= SEARCH_NODES) {
             if (node.isExpired() or n == nodes.end())
                 return false;
@@ -767,9 +767,9 @@ Dht::searchStep(std::shared_ptr<Search> sr)
         DHT_LOG.DEBUG("[search %s IPv%c] synced%s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");
 
         if (not sr->listeners.empty()) {
-            unsigned i = 0, t = 0;
+            unsigned i = 0;
             for (auto& n : sr->nodes) {
-                if (not n.isSynced(now) or (n.candidate and t >= LISTEN_NODES))
+                if (not n.isSynced(now))
                     continue;
                 if (n.getListenTime() <= now) {
                     DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'listen'",
@@ -785,66 +785,71 @@ Dht::searchStep(std::shared_ptr<Search> sr)
                         [this,ws,ls](const Request& status,
                                 NetworkEngine::RequestAnswer&& answer) mutable
                         { /* on done */
+                            // cancel previous request
                             network_engine.cancelRequest(ls);
                             if (auto sr = ws.lock()) {
                                 onListenDone(status, answer, sr);
                                 searchStep(sr);
                             }
                         },
-                        [this,ws,ls](const Request&, bool) mutable
+                        [this,ws,ls](const Request&, bool over) mutable
                         { /* on expired */
-                            network_engine.cancelRequest(ls);
-                            if (auto sr = ws.lock()) {
-                                searchStep(sr);
+                            if (over) {
+                                network_engine.cancelRequest(ls);
+                                if (auto sr = ws.lock())
+                                    scheduler.edit(sr->nextSearchStep, scheduler.time());
                             }
                         }
                     );
                 }
-                t++;
                 if (not n.candidate and ++i == LISTEN_NODES)
                     break;
             }
         }
 
         // Announce requests
-        for (auto& a : sr->announce) {
+        for (auto ait = sr->announce.begin(); ait != sr->announce.end();) {
+            auto& a = *ait;
             if (!a.value) continue;
-            unsigned i = 0, t = 0;
             auto vid = a.value->id;
             const auto& type = getType(a.value->type);
-            if (in) {
-                /*DHT_LOG.WARN("[search %s IPv%c] [value %lu] storing locally",
-                    sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', vid);*/
-                storageStore(sr->id, a.value, a.created);
+            if (sr->isAnnounced(vid, type, now)) {
+                if (a.callback) {
+                    a.callback(true, sr->getNodes());
+                    a.callback = nullptr;
+                }
+                ait = sr->announce.erase(ait);
+                continue;
             }
+            if (in) storageStore(sr->id, a.value, a.created);
+            unsigned i = 0;
             for (auto& n : sr->nodes) {
-                if (not n.isSynced(now) or (n.candidate and t >= TARGET_NODES))
+                if (not n.isSynced(now))
                     continue;
-                auto at = n.getAnnounceTime(vid, type);
-                if ( at <= now ) {
+                if (n.getAnnounceTime(vid, type) <= now) {
                     DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)",
                         sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', n.node->toString().c_str(), vid);
-                    //std::cout << "Sending announce_value to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl;
-
                     std::weak_ptr<Search> ws = sr;
                     n.acked[vid] = network_engine.sendAnnounceValue(n.node, sr->id, *a.value, a.created, n.token,
-                        [this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable
+                        [this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer)
                         { /* on done */
                             if (auto sr = ws.lock()) {
                                 onAnnounceDone(status, answer, sr);
                                 searchStep(sr);
                             }
                         },
-                        [this,ws](const Request&, bool) mutable
+                        [this,ws](const Request&, bool over)
                         { /* on expired */
-                            if (auto sr = ws.lock()) { searchStep(sr); }
+                            if (over)
+                                if (auto sr = ws.lock())
+                                    scheduler.edit(sr->nextSearchStep, scheduler.time());
                         }
                     );
                 }
-                t++;
                 if (not n.candidate and ++i == TARGET_NODES)
                     break;
             }
+            ++ait;
         }
         if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
             sr->done = true;
-- 
GitLab