diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index 4108411c57b4b9b49b3d2049bcbc5f690545fc47..346a7c84da39c6285f0328084e03423609c213bc 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -352,6 +352,7 @@ private: std::shared_ptr<PushSessionContext> sessionCtx; std::unique_ptr<asio::steady_timer> expireTimer; std::unique_ptr<asio::steady_timer> expireNotifyTimer; + Sp<Value> value; }; struct SearchPuts { std::map<dht::Value::Id, PermanentPut> puts; diff --git a/include/opendht/value.h b/include/opendht/value.h index 5cfe69fa359b48232669bd95411c2277ba1a0706..d6b928c292584951854e596febd07cd5daf3bcd8 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -436,10 +436,16 @@ struct OPENDHT_PUBLIC Value msgpack_unpack(o); } + /** + * Returns true if value contents are equals (not considering the value ID) + */ + inline bool contentEquals(const Value& o) { + return isEncrypted() ? cypher == o.cypher : + ((owner == o.owner || *owner == *o.owner) && type == o.type && data == o.data && user_type == o.user_type && signature == o.signature); + } + inline bool operator== (const Value& o) { - return id == o.id && - (isEncrypted() ? cypher == o.cypher : - ((owner == o.owner || *owner == *o.owner) && type == o.type && data == o.data && user_type == o.user_type && signature == o.signature)); + return id == o.id and contentEquals(o); } void setRecipient(const InfoHash& r) { diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 915830b83b95d4061ca59e3fd6a26cea6b829520..04c718474bd19da08e5285bab09173af8e86cf84 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -941,15 +941,33 @@ DhtProxyServer::put(restinio::request_handle_t request, } std::unique_lock<std::mutex> lock(lockSearchPuts_); auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT; - auto vid = value->id; auto& sPuts = puts_[infoHash]; + if (value->id == Value::INVALID_ID) { + for (auto& pp : sPuts.puts) { + if (pp.second.pushToken == pushToken and pp.second.value->contentEquals(*value)) { + pp.second.expireTimer->expires_at(timeout); + pp.second.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this, + std::placeholders::_1, infoHash, pp.second.value->id)); + if (not sessionId.empty()) { + if (not pp.second.sessionCtx) + pp.second.sessionCtx = std::make_shared<PushSessionContext>(); + std::lock_guard<std::mutex> l(pp.second.sessionCtx->lock); + pp.second.sessionCtx->sessionId = sessionId; + } + auto response = initHttpResponse(request->create_response()); + response.append_body(Json::writeString(jsonBuilder_, value->toJson()) + "\n"); + return response.done(); + } + } + } + + auto vid = value->id; auto& pput = sPuts.puts[vid]; + pput.value = value; if (not pput.expireTimer) { auto &ctx = io_context(); // cancel permanent put pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout); - pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this, - std::placeholders::_1, infoHash, vid)); #ifdef OPENDHT_PUSH_NOTIFICATIONS if (not pushToken.empty()){ pput.sessionCtx = std::make_shared<PushSessionContext>(); @@ -986,6 +1004,8 @@ DhtProxyServer::put(restinio::request_handle_t request, if (pput.expireNotifyTimer) pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); } + pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this, + std::placeholders::_1, infoHash, vid)); } dht_->put(infoHash, value, [this, request, value](bool ok){ if (ok){