From 424fe61a496e0adf81c8c63ace16c3f5a1f960a2 Mon Sep 17 00:00:00 2001
From: Adrien Beraud <adrien.beraud@savoirfairelinux.com>
Date: Sat, 25 Mar 2023 14:04:37 -0400
Subject: [PATCH] dht: send large amounts of values in multiple messages

---
 include/opendht/network_engine.h | 17 ++++++++--
 src/network_engine.cpp           | 53 ++++++++++++++++++++++++--------
 src/parsed_message.h             |  5 +--
 src/value.cpp                    |  2 +-
 4 files changed, 59 insertions(+), 18 deletions(-)

diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h
index 4204060b..94dbdb91 100644
--- a/include/opendht/network_engine.h
+++ b/include/opendht/network_engine.h
@@ -416,12 +416,19 @@ public:
      *
      * @return the request with information concerning its success.
      */
+    void sendUpdateValues(const Sp<Node>& n,
+                                 const InfoHash& infohash,
+                                 std::vector<Sp<Value>>&& values,
+                                 time_point created,
+                                 const Blob& token,
+                                 size_t sid);
     Sp<Request> sendUpdateValues(const Sp<Node>& n,
                                  const InfoHash& infohash,
-                                 const std::vector<Sp<Value>>& values,
+                                 std::vector<Sp<Value>>::iterator begin,
+                                 std::vector<Sp<Value>>::iterator end,
                                  time_point created,
                                  const Blob& token,
-                                 const size_t& sid);
+                                 size_t sid);
 
     /**
      * Parses a message and calls appropriate callbacks.
@@ -492,6 +499,7 @@ private:
 
     static constexpr size_t MTU {1280};
     static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
+    static constexpr size_t MAX_MESSAGE_VALUE_SIZE {56 * 1024};
 
     static const std::string my_v;
 
@@ -525,7 +533,10 @@ private:
     int send(const SockAddr& addr, const char *buf, size_t len, bool confirmed = false);
 
     void sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr);
-    std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<Sp<Value>>&);
+    std::vector<Blob> packValueHeader(msgpack::sbuffer&, std::vector<Sp<Value>>::const_iterator, std::vector<Sp<Value>>::const_iterator) const;
+    std::vector<Blob> packValueHeader(msgpack::sbuffer& buf, const std::vector<Sp<Value>>& values) const {
+        return packValueHeader(buf, values.begin(), values.end());
+    }
     void maintainRxBuffer(Tid tid);
 
     /*************
diff --git a/src/network_engine.cpp b/src/network_engine.cpp
index b25bd9c4..a92915f8 100644
--- a/src/network_engine.cpp
+++ b/src/network_engine.cpp
@@ -124,9 +124,9 @@ NetworkEngine::tellListener(const Sp<Node>& node, Tid socket_id, const InfoHash&
     auto nnodes = bufferNodes(node->getFamily(), hash, want, nodes, nodes6);
     try {
         if (version >= 1) {
-            sendUpdateValues(node, hash, values, scheduler.time(), ntoken, socket_id);
+            sendUpdateValues(node, hash, std::move(values), scheduler.time(), ntoken, socket_id);
         } else {
-            sendNodesValues(node->getAddr(), socket_id, nnodes.first, nnodes.second, values, query, ntoken);
+            sendNodesValues(node->getAddr(), socket_id, nnodes.first, nnodes.second, std::move(values), query, ntoken);
         }
     } catch (const std::overflow_error& e) {
         if (logger_)
@@ -501,9 +501,8 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f)
             pmsg.last_part = now;
             scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, k));
             scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, k));
-        } else
-            if (logger_)
-                logger_->e("Partial message with given TID already exists");
+        } else if (logger_)
+            logger_->e("Partial message with given TID %u already exists", k);
     }
 }
 
@@ -912,12 +911,16 @@ NetworkEngine::deserializeNodes(ParsedMessage& msg, const SockAddr& from) {
 }
 
 std::vector<Blob>
-NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<Sp<Value>>& st)
+NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, std::vector<Sp<Value>>::const_iterator b, std::vector<Sp<Value>>::const_iterator e) const
 {
-    auto svals = serializeValues(st);
+    std::vector<Blob> svals;
     size_t total_size = 0;
-    for (const auto& v : svals)
-        total_size += v.size();
+
+    svals.reserve(std::distance(b, e));
+    for (; b != e; ++b) {
+        svals.emplace_back(packMsg(*b));
+        total_size += svals.back().size();
+    }
 
     msgpack::packer<msgpack::sbuffer> pk(&buffer);
     pk.pack(KEY_REQ_VALUES);
@@ -1200,13 +1203,39 @@ NetworkEngine::sendAnnounceValue(const Sp<Node>& n,
     return req;
 }
 
+void
+NetworkEngine::sendUpdateValues(const Sp<Node>& n,
+                                const InfoHash& infohash,
+                                std::vector<Sp<Value>>&& values,
+                                time_point created,
+                                const Blob& token,
+                                size_t socket_id)
+{
+    size_t total_size = 0;
+
+    auto b = values.begin(), e = values.begin();
+    while (e != values.end()) {
+        if (total_size >= MAX_MESSAGE_VALUE_SIZE) {
+            sendUpdateValues(n, infohash, b, e, created, token, socket_id);
+            b = e;
+            total_size = 0;
+        }
+        total_size += (*e)->size();
+        ++e;
+    }
+    if (b != e) {
+        sendUpdateValues(n, infohash, b, e, created, token, socket_id);
+    }
+}
+
 Sp<Request>
 NetworkEngine::sendUpdateValues(const Sp<Node>& n,
                                 const InfoHash& infohash,
-                                const std::vector<Sp<Value>>& values,
+                                std::vector<Sp<Value>>::iterator begin,
+                                std::vector<Sp<Value>>::iterator end,
                                 time_point created,
                                 const Blob& token,
-                                const size_t& socket_id)
+                                size_t socket_id)
 {
     Tid tid (n->getNewTid());
     Tid sid (socket_id);
@@ -1220,7 +1249,7 @@ NetworkEngine::sendUpdateValues(const Sp<Node>& n,
       pk.pack(KEY_VERSION);    pk.pack(1);
       pk.pack(KEY_REQ_H);      pk.pack(infohash);
       pk.pack(KEY_REQ_SID);   pk.pack(sid);
-      auto v = packValueHeader(buffer, values);
+      auto v = packValueHeader(buffer, begin, end);
       if (created < scheduler.time()) {
           pk.pack(KEY_REQ_CREATION);
           pk.pack(to_time_t(created));
diff --git a/src/parsed_message.h b/src/parsed_message.h
index 6f5063c0..a1441028 100644
--- a/src/parsed_message.h
+++ b/src/parsed_message.h
@@ -152,9 +152,10 @@ bool
 ParsedMessage::complete()
 {
     for (auto& e : value_parts) {
-        //std::cout << "part " << e.first << ": " << e.second.second.size() << "/" << e.second.first << std::endl;
-        if (e.second.first > e.second.second.size())
+        if (e.second.first > e.second.second.size()) {
+            //std::cout << "uncomplete part " << e.first << ": " << e.second.second.size() << "/" << e.second.first << std::endl;
             return false;
+        }
     }
     for (auto& e : value_parts) {
         msgpack::unpacked msg;
diff --git a/src/value.cpp b/src/value.cpp
index 9529a472..d433978d 100644
--- a/src/value.cpp
+++ b/src/value.cpp
@@ -98,7 +98,7 @@ ValueType::DEFAULT_STORE_POLICY(InfoHash, const std::shared_ptr<Value>& v, const
 size_t
 Value::size() const
 {
-    return cypher.size() + data.size() + signature.size()  + user_type.size();
+    return cypher.size() + data.size() + signature.size() + user_type.size();
 }
 
 void
-- 
GitLab