From 92613f5fbfc3ba8f43dbd3e6e6d1e87204c74cce Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com>
Date: Tue, 3 May 2016 18:33:38 -0400
Subject: [PATCH] net: improve request state consistency

---
 include/opendht/dht.h            | 14 +++------
 include/opendht/network_engine.h | 53 ++++++++++++++++++--------------
 src/dht.cpp                      | 51 +++++++++++++++---------------
 src/network_engine.cpp           |  4 +--
 4 files changed, 63 insertions(+), 59 deletions(-)

diff --git a/include/opendht/dht.h b/include/opendht/dht.h
index c5839610..5d94a069 100644
--- a/include/opendht/dht.h
+++ b/include/opendht/dht.h
@@ -324,7 +324,7 @@ private:
     static constexpr unsigned SEARCH_NODES {14};
 
     /* Concurrent requests during a search */
-    static constexpr unsigned SEARCH_REQUESTS {3};
+    static constexpr unsigned SEARCH_REQUESTS {4};
 
     /* Number of listening nodes */
     static constexpr unsigned LISTEN_NODES {3};
@@ -389,7 +389,7 @@ private:
                 return true;*/
             return not node->isExpired(now) and
                    (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply)
-                   and (not getStatus or not getStatus->pending(now));
+                   and (not getStatus or not getStatus->pending());
                    // and now > getStatus->last_try + Node::MAX_RESPONSE_TIME;
         }
 
@@ -410,10 +410,7 @@ private:
         time_point getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const {
             if (ack == acked.end() or not ack->second)
                 return time_point::min();
-            return std::max(
-                ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN,
-                ack->second->last_try + Node::MAX_RESPONSE_TIME
-            );
+            return ack->second->pending() ? time_point::max() : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN;
         }
 
         time_point getAnnounceTime(Value::Id vid, const ValueType& type) const {
@@ -424,10 +421,7 @@ private:
             if (not listenStatus)
                 return time_point::min();
 
-            return std::max(
-                listenStatus->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN,
-                listenStatus->last_try + Node::MAX_RESPONSE_TIME
-            );
+            return listenStatus->pending() ? time_point::max() : listenStatus->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN;
         }
         bool isBad(const time_point& now) const {
             return !node || node->isExpired(now) || candidate;
diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h
index d5afd2dd..d9e22a86 100644
--- a/include/opendht/network_engine.h
+++ b/include/opendht/network_engine.h
@@ -198,29 +198,17 @@ public:
         static const constexpr size_t MAX_ATTEMPT_COUNT {3};
 
         std::shared_ptr<Node> node {};             /* the node to whom the request is destined. */
-        bool cancelled {false};                    /* whether the request is canceled before done. */
-        bool completed {false};                    /* whether the request is completed. */
-        unsigned attempt_count {0};                /* number of attempt to process the request. */
-        time_point start {time_point::min()};      /* time when the request is created. */
-        time_point last_try {time_point::min()};   /* time of the last attempt to process the request. */
         time_point reply_time {time_point::min()}; /* time when we received the response to the request. */
 
-        bool expired(time_point now) const {
-            return now > last_try + Node::MAX_RESPONSE_TIME and attempt_count >= Request::MAX_ATTEMPT_COUNT
-                and not completed;
-        }
-
-        bool pending(time_point /*now*/) const {
-            return not cancelled
-               and not completed;
-        }
-
-        void cancel() {
-            if (not completed) {
-                cancelled = true;
-                clear();
-            }
+        bool expired() const { return expired_; }
+        bool completed() const { return completed_; }
+        bool cancelled() const { return cancelled_; }
+        bool pending() const {
+            return not cancelled_  
+               and not completed_
+               and not expired_;
         }
+        bool over() const { return not pending(); }
 
         Request() {}
 
@@ -232,12 +220,31 @@ public:
                 std::function<void(std::shared_ptr<Request> req_status, bool)> on_expired, bool persistent = false) :
             node(node), on_done(on_done), on_expired(on_expired), tid(tid), msg(std::move(msg)), persistent(persistent) { }
 
+        bool isExpired(time_point now) const {
+            return now > last_try + Node::MAX_RESPONSE_TIME and attempt_count >= Request::MAX_ATTEMPT_COUNT
+                and not completed_ and not cancelled_;
+        }
+
+        void cancel() {
+            if (not completed_ and not expired_) {
+                cancelled_ = true;
+                clear();
+            }
+        }
+
         void clear() {
             on_done = {};
             on_expired = {};
             msg.clear();
         }
 
+        bool cancelled_ {false};                    /* whether the request is canceled before done. */
+        bool completed_ {false};                    /* whether the request is completed. */
+        bool expired_ {false};
+        unsigned attempt_count {0};                /* number of attempt to process the request. */
+        time_point start {time_point::min()};      /* time when the request is created. */
+        time_point last_try {time_point::min()};   /* time of the last attempt to process the request. */
+
         std::function<void(std::shared_ptr<Request> req_status, ParsedMessage&&)> on_done {};
         std::function<void(std::shared_ptr<Request> req_status, bool)> on_expired {};
 
@@ -471,12 +478,12 @@ private:
     void pinged(Node&);
 
     void requestStep(std::shared_ptr<Request> req) {
-        if (req->completed or req->cancelled)
+        if (req->over())
             return;
 
         auto now = scheduler.time();
-        if (req->node->isExpired(now) or req->expired(now)) {
-            req->completed = true;
+        if (req->node->isExpired(now) or req->isExpired(now)) {
+            req->expired_ = true;
             req->on_expired(req, true);
             req->clear();
             requests.erase(req->tid);
diff --git a/src/dht.cpp b/src/dht.cpp
index 852907bc..1fed7ff7 100644
--- a/src/dht.cpp
+++ b/src/dht.cpp
@@ -540,11 +540,13 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob&
             if (removeExpiredNode(now))
                 num_bad_nodes--;
 
-            auto farthest_not_bad_node = std::find_if(nodes.rbegin(), nodes.rend(),
-                [&](const SearchNode& n) { return not n.isBad(now) and not (n.getStatus and n.getStatus->pending(now)); }
+            auto to_remove = std::find_if(nodes.rbegin(), nodes.rend(),
+                [&](const SearchNode& n) { return not n.isBad(now)/* and not (n.getStatus and n.getStatus->pending(now))*/; }
             );
-            if (farthest_not_bad_node != nodes.rend()) {
-                nodes.erase(std::prev(farthest_not_bad_node.base()));
+            if (to_remove != nodes.rend()) {
+                if (to_remove->getStatus and to_remove->getStatus->pending())
+                    current_get_requests--;
+                nodes.erase(std::prev(to_remove.base()));
             } // else, all nodes are expired.
         }
         expired = false;
@@ -588,7 +590,7 @@ Dht::expireSearches()
 Dht::SearchNode*
 Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update)
 {
-    if (sr->done)
+    if (sr->done or sr->current_get_requests >= SEARCH_REQUESTS)
         return nullptr;
 
     const auto& now = scheduler.time();
@@ -631,13 +633,11 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
                 auto srn = sr->getNode(status->node);
                 if (srn and not srn->candidate) {
                     if (not over) {
-                            srn->candidate = true;
-                            sr->current_get_requests--;
-                            //DHT_LOG.DEBUG("[search %s] sn %s now candidate... %d",
-                            //        sr->id.toString().c_str(), srn->node->id.toString().c_str(), sr->current_get_requests);
-                    } else {
-                        sr->current_get_requests--;
+                        srn->candidate = true;
+                        //DHT_LOG.DEBUG("[search %s] sn %s now candidate... %d",
+                        //        sr->id.toString().c_str(), srn->node->id.toString().c_str(), sr->current_get_requests);
                     }
+                    sr->current_get_requests--;
                 }
                 scheduler.edit(sr->nextSearchStep, scheduler.time());
             }
@@ -709,20 +709,23 @@ Dht::searchStep(std::shared_ptr<Search> sr)
                         print_addr(n.node->ss, n.node->sslen).c_str());
                     //std::cout << "Sending listen to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl;
 
-                    network_engine.cancelRequest(n.listenStatus);
+                    //network_engine.cancelRequest(n.listenStatus);
+                    auto ls = n.listenStatus;
 
                     std::weak_ptr<Search> ws = sr;
                     n.listenStatus = network_engine.sendListen(n.node, sr->id, n.token,
-                        [this,ws](std::shared_ptr<NetworkEngine::Request> status,
+                        [this,ws,ls](std::shared_ptr<NetworkEngine::Request> status,
                                 NetworkEngine::RequestAnswer&& answer) mutable
                         { /* on done */
+                            network_engine.cancelRequest(ls);
                             if (auto sr = ws.lock()) {
                                 onListenDone(status, answer, sr);
                                 searchStep(sr);
                             }
                         },
-                        [this,ws](std::shared_ptr<NetworkEngine::Request>, bool) mutable
+                        [this,ws,ls](std::shared_ptr<NetworkEngine::Request>, bool) mutable
                         { /* on expired */
+                            network_engine.cancelRequest(ls);
                             if (auto sr = ws.lock()) {
                                 searchStep(sr);
                             }
@@ -925,7 +928,7 @@ Dht::Search::getUpdateTime(time_point now) const
     for (const auto& sn : nodes) {
         if (sn.node->isExpired(now) or (sn.candidate and t >= TARGET_NODES))
             continue;
-        bool pending = sn.getStatus and sn.getStatus->pending(now);
+        bool pending = sn.getStatus and sn.getStatus->pending();
         if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) {
             // not isSynced
             if (not pending and current_get_requests < SEARCH_REQUESTS)
@@ -1815,7 +1818,7 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const
     const auto& now = scheduler.time();
     using namespace std::chrono;
     out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " G" << sr.callbacks.size();
-    out << " age " << duration_cast<seconds>(now - sr.step_time).count() << " s tid " << sr.tid;
+    out << " age " << duration_cast<seconds>(now - sr.step_time).count() << "s sync ops: " << sr.current_get_requests;
     if (sr.done)
         out << " [done]";
     bool synced = sr.isSynced(now);
@@ -1847,8 +1850,8 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const
         {
             bool pending {false}, expired {false};
             if (n.getStatus) {
-                pending = n.getStatus->pending(now);
-                expired = n.getStatus->expired(now);
+                pending = n.getStatus->pending();
+                expired = n.getStatus->expired();
             }
             out << " ["
                 << (pending ? 'f' : (expired ? 'x' : ' ')) << (n.isSynced(now) ? 's' : '-')
@@ -1858,13 +1861,13 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const
         {
             bool pending {false}, expired {false};
             if (n.listenStatus) {
-                pending = n.listenStatus->pending(now);
-                expired = n.listenStatus->expired(now);
+                pending = n.listenStatus->pending();
+                expired = n.listenStatus->expired();
             }
             if (not sr.listeners.empty() and n.listenStatus) {
-                if (n.listenStatus->last_try == time_point::min())
+                /*if (!n.listenStatus)
                     out << "     ";
-                else
+                else*/
                     out << "["
                         << (pending ? 'f' : (expired ? 'x' : ' '))
                         << (n.isListening(now) ? 'l' : '-') << "] ";
@@ -1886,9 +1889,9 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const
                         auto& astatus = ack->second;
                         if (astatus and astatus->reply_time + getType(a.value->type).expiration > now)
                             out << 'a';
-                        else if (astatus and astatus->pending(now))
+                        else if (astatus and astatus->pending())
                             out << 'f';
-                        else if (astatus and astatus->expired(now))
+                        else if (astatus and astatus->expired())
                             out << 'x';
                         else
                             out << ' ';
diff --git a/src/network_engine.cpp b/src/network_engine.cpp
index 09b0ac0f..2979e18e 100644
--- a/src/network_engine.cpp
+++ b/src/network_engine.cpp
@@ -137,7 +137,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr
         if (reqp == requests.end())
             throw DhtProtocolException {DhtProtocolException::UNKNOWN_TID, "Can't find transaction", msg.id};
         auto req = reqp->second;
-        if (req->cancelled)
+        if (req->cancelled())
             return;
 
         auto node = onNewNode(msg.id, from, fromlen, 2);
@@ -164,7 +164,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr
             if (not req->persistent)
                 requests.erase(reqp);
             req->reply_time = scheduler.time();
-            req->completed = true;
+            req->completed_ = true;
             req->on_done(req, std::move(msg));
             if (not req->persistent)
                 req->clear();
-- 
GitLab