From 6e12193f7f59eb17a773d6181d895573175db81c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com>
Date: Tue, 9 Dec 2014 19:00:53 -0500
Subject: [PATCH] fix put callback

---
 include/opendht/dht.h | 10 ++++++
 src/dht.cpp           | 76 +++++++++++++++++++++++++++++--------------
 2 files changed, 61 insertions(+), 25 deletions(-)

diff --git a/include/opendht/dht.h b/include/opendht/dht.h
index c7bf07b0..c96d4062 100644
--- a/include/opendht/dht.h
+++ b/include/opendht/dht.h
@@ -324,6 +324,14 @@ private:
             return /*pinged < 3 && replied &&*/ reply_time >= now - NODE_EXPIRE_TIME;
         }
 
+        bool isAnnounced(Value::Id vid, const ValueType& type, time_t now) const {
+            auto ack = acked.find(vid);
+            if (ack == acked.end()) {
+
+                return false;
+            }
+            return ack->second.reply_time + type.expiration > now;
+        }
         time_t getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const {
             if (ack == acked.end())
                 return request_time + 5;
@@ -410,6 +418,8 @@ private:
 
         time_t getUpdateTime(time_t now) const;
 
+        bool isAnnounced(Value::Id id, const ValueType& type, time_t now) const;
+
         /**
          * ret = 0 : no announce required.
          * ret > 0 : (re-)announce required at time ret.
diff --git a/src/dht.cpp b/src/dht.cpp
index 2c622041..8e60a927 100644
--- a/src/dht.cpp
+++ b/src/dht.cpp
@@ -662,11 +662,22 @@ Dht::searchStep(Search& sr)
         if (now.tv_sec - sr.step_time >= SEARCH_TIMEOUT) {
             DHT_WARN("Search IPv%c %s timed out.", sr.af == AF_INET ? '4' : '6', sr.id.toString().c_str());
             sr.step_time = now.tv_sec;
-            for (const auto& g : sr.callbacks) {
-                if (g.done_cb)
-                    g.done_cb(false);
+            {
+                auto get_cbs = std::move(sr.callbacks);
+                for (const auto& g : get_cbs) {
+                    if (g.done_cb)
+                        g.done_cb(false);
+                }
+            }
+            {
+                std::vector<DoneCallback> a_cbs;
+                a_cbs.reserve(sr.announce.size());
+                for (const auto& a : sr.announce)
+                    if (a.callback)
+                        a_cbs.emplace_back(std::move(a.callback));
+                for (const auto& a : a_cbs)
+                    a(false);
             }
-            sr.callbacks.clear();
             if (sr.announce.empty() && sr.listeners.empty())
                 sr.done = true;
         }
@@ -731,7 +742,6 @@ Dht::searchStep(Search& sr)
                     DHT_ERROR("Trying to announce a null value !");
                 }
                 unsigned i = 0;
-                bool all_acked = true;
                 auto vid = a.value->id;
                 const auto& type = getType(a.value->type);
                 if (in) {
@@ -744,7 +754,6 @@ Dht::searchStep(Search& sr)
                     auto a_status = n.acked.find(vid);
                     auto at = n.getAnnounceTime(a_status, type);
                     if ( at <= now.tv_sec ) {
-                        all_acked = false;
                         {
                             char hbuf[NI_MAXHOST];
                             char sbuf[NI_MAXSERV];
@@ -766,10 +775,6 @@ Dht::searchStep(Search& sr)
                     if (++i == 8)
                         break;
                 }
-                if (all_acked && a.callback) {
-                    a.callback(true);
-                    a.callback = nullptr;
-                }
             }
             for (auto& n : sr.nodes) {
                 if (n.pending) {
@@ -780,13 +785,9 @@ Dht::searchStep(Search& sr)
                         pinged(*node);
                 }
             }
-            DHT_DEBUG("Search done.");
-            /*if (sr.done_callback) {
-                sr.done_callback(true);
-                sr.done_callback = nullptr;
-            }*/
             if (sr.announce.empty() && sr.listeners.empty())
                 sr.done = true;
+            DHT_DEBUG("Searchstep done.");
         }
 
     } else {
@@ -915,6 +916,23 @@ Dht::Search::getUpdateTime(time_t now) const
     return ut == STEP_INVALID ? (callbacks.empty() ? 0 : now) : ut;
 }
 
+bool
+Dht::Search::isAnnounced(Value::Id id, const ValueType& type, time_t now) const
+{
+    if (nodes.empty())
+        return false;
+    unsigned i = 0;
+    for (const auto& n : nodes) {
+        if (n.pinged >= 3)
+            continue;
+        if (!n.isAnnounced(id, type, now))
+            return false;
+        if (++i == 8)
+            break;
+    }
+    return i;
+}
+
 time_t
 Dht::Search::getAnnounceTime(const std::map<ValueType::Id, ValueType>& types) const
 {
@@ -1101,13 +1119,13 @@ Dht::announce(const InfoHash& id, sa_family_t af, const std::shared_ptr<Value>&
             for (auto& n : sr->nodes)
                 n.acked[value->id] = {0, 0};
         }
+        if (a_sr->callback)
+            a_sr->callback(false);
         a_sr->callback = callback;
     }
-    if (not sr->nodes.empty()) {
-        time_t tm = sr->getNextStepTime(types, now.tv_sec);
-        if (tm != 0 && (search_time == 0 || search_time > tm))
-            search_time = tm;
-    }
+    time_t tm = sr->getNextStepTime(types, now.tv_sec);
+    if (tm != 0 && (search_time == 0 || search_time > tm))
+        search_time = tm;
 }
 
 size_t
@@ -1126,10 +1144,8 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter
     auto token = ++sr->listener_token;
     sr->listeners.insert({token, {f, cb}});
     time_t tm = sr->getNextStepTime(types, now.tv_sec);
-    if (tm != 0 && (search_time == 0 || search_time > tm)) {
-
+    if (tm != 0 && (search_time == 0 || search_time > tm))
         search_time = tm;
-    }
     return token;
 }
 
@@ -1205,13 +1221,13 @@ Dht::put(const InfoHash& id, Value&& value, DoneCallback callback)
         }
     };
     announce(id, AF_INET, val, [=](bool ok4) {
-        DHT_DEBUG("search done IPv4 %d", ok4);
+        DHT_DEBUG("Announce done IPv4 %d", ok4);
         *done4 = true;
         *ok |= ok4;
         donecb();
     });
     announce(id, AF_INET6, val, [=](bool ok6) {
-        DHT_DEBUG("search done IPv6 %d", ok6);
+        DHT_DEBUG("Announce done IPv6 %d", ok6);
         *done6 = true;
         *ok |= ok6;
         donecb();
@@ -2054,6 +2070,16 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc
                     }
                 /* See comment for gp above. */
                 searchSendGetValues(*sr);
+
+                // If the value was just successfully announced, call the callback
+                for (auto& a : sr->announce) {
+                    if (!a.callback || !a.value || a.value->id != value_id)
+                        continue;
+                    if (sr->isAnnounced(value_id, getType(a.value->type), now.tv_sec)) {
+                        a.callback(true);
+                        a.callback = nullptr;
+                    }
+                }
             }
         } else if (tid.matches(TransPrefix::LISTEN, &ttid)) { 
             DHT_DEBUG("Got reply to listen.");
-- 
GitLab