From 56db1bba1901d1e5acf6ca8ea7ba827f600239b5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com>
Date: Wed, 22 Jan 2020 15:47:38 -0500
Subject: [PATCH] proxy server: merge identical put requests

---
 include/opendht/dht_proxy_server.h |  1 +
 include/opendht/value.h            | 12 +++++++++---
 src/dht_proxy_server.cpp           | 26 +++++++++++++++++++++++---
 3 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h
index 4108411c..346a7c84 100644
--- a/include/opendht/dht_proxy_server.h
+++ b/include/opendht/dht_proxy_server.h
@@ -352,6 +352,7 @@ private:
         std::shared_ptr<PushSessionContext> sessionCtx;
         std::unique_ptr<asio::steady_timer> expireTimer;
         std::unique_ptr<asio::steady_timer> expireNotifyTimer;
+        Sp<Value> value;
     };
     struct SearchPuts {
         std::map<dht::Value::Id, PermanentPut> puts;
diff --git a/include/opendht/value.h b/include/opendht/value.h
index 5cfe69fa..d6b928c2 100644
--- a/include/opendht/value.h
+++ b/include/opendht/value.h
@@ -436,10 +436,16 @@ struct OPENDHT_PUBLIC Value
         msgpack_unpack(o);
     }
 
+    /**
+     * Returns true if value contents are equals (not considering the value ID)
+     */
+    inline bool contentEquals(const Value& o) {
+        return isEncrypted() ? cypher == o.cypher :
+            ((owner == o.owner || *owner == *o.owner) && type == o.type && data == o.data && user_type == o.user_type && signature == o.signature);
+    }
+
     inline bool operator== (const Value& o) {
-        return id == o.id &&
-        (isEncrypted() ? cypher == o.cypher :
-        ((owner == o.owner || *owner == *o.owner) && type == o.type && data == o.data && user_type == o.user_type && signature == o.signature));
+        return id == o.id and contentEquals(o);
     }
 
     void setRecipient(const InfoHash& r) {
diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp
index 915830b8..04c71847 100644
--- a/src/dht_proxy_server.cpp
+++ b/src/dht_proxy_server.cpp
@@ -941,15 +941,33 @@ DhtProxyServer::put(restinio::request_handle_t request,
                 }
                 std::unique_lock<std::mutex> lock(lockSearchPuts_);
                 auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT;
-                auto vid = value->id;
                 auto& sPuts = puts_[infoHash];
+                if (value->id == Value::INVALID_ID) {
+                    for (auto& pp : sPuts.puts) {
+                        if (pp.second.pushToken == pushToken and pp.second.value->contentEquals(*value)) {
+                            pp.second.expireTimer->expires_at(timeout);
+                            pp.second.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
+                                                        std::placeholders::_1, infoHash, pp.second.value->id));
+                            if (not sessionId.empty()) {
+                                if (not pp.second.sessionCtx)
+                                    pp.second.sessionCtx = std::make_shared<PushSessionContext>();
+                                std::lock_guard<std::mutex> l(pp.second.sessionCtx->lock);
+                                pp.second.sessionCtx->sessionId = sessionId;
+                            }
+                            auto response = initHttpResponse(request->create_response());
+                            response.append_body(Json::writeString(jsonBuilder_, value->toJson()) + "\n");
+                            return response.done();
+                        }
+                    }
+                }
+
+                auto vid = value->id;
                 auto& pput = sPuts.puts[vid];
+                pput.value = value;
                 if (not pput.expireTimer) {
                     auto &ctx = io_context();
                     // cancel permanent put
                     pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
-                    pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
-                                                 std::placeholders::_1, infoHash, vid));
 #ifdef OPENDHT_PUSH_NOTIFICATIONS
                     if (not pushToken.empty()){
                         pput.sessionCtx = std::make_shared<PushSessionContext>();
@@ -986,6 +1004,8 @@ DhtProxyServer::put(restinio::request_handle_t request,
                     if (pput.expireNotifyTimer)
                         pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
                 }
+                pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
+                                                std::placeholders::_1, infoHash, vid));
             }
             dht_->put(infoHash, value, [this, request, value](bool ok){
                 if (ok){
-- 
GitLab