diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index ad03d1eabbc17772918f4574f007b1feedb02112..47a29003ae4d1c05732d1aebe356a072aa8006c6 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -291,7 +291,7 @@ private: * @param isAndroid */ void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken, - Json::Value json, const bool isAndroid); + std::function<Json::Value()> json, const bool isAndroid); /** * Remove a push listener between a client and a hash @@ -339,10 +339,15 @@ private: // Connection Listener observing conn state changes. std::shared_ptr<ConnectionListener> connListener_; + struct PushSessionContext { + std::mutex lock; + std::string sessionId; + }; struct PermanentPut { time_point expiration; std::string pushToken; std::string clientId; + std::shared_ptr<PushSessionContext> sessionCtx; std::unique_ptr<asio::steady_timer> expireTimer; std::unique_ptr<asio::steady_timer> expireNotifyTimer; }; @@ -360,7 +365,7 @@ private: #ifdef OPENDHT_PUSH_NOTIFICATIONS struct Listener { std::string clientId; - std::string sessionId; + std::shared_ptr<PushSessionContext> sessionCtx; std::future<size_t> internalToken; std::unique_ptr<asio::steady_timer> expireTimer; std::unique_ptr<asio::steady_timer> expireNotifyTimer; diff --git a/include/opendht/proxy.h b/include/opendht/proxy.h index 73f8a3357239b5e4f4b8e618f354338b71c78de4..61533d335337e814ac36c455dd202444e14d18a2 100644 --- a/include/opendht/proxy.h +++ b/include/opendht/proxy.h @@ -22,8 +22,8 @@ namespace dht { namespace proxy { -constexpr const std::chrono::seconds OP_TIMEOUT {1 * 60 * 60}; // one hour -constexpr const std::chrono::seconds OP_MARGIN {5 * 60}; // 5 minutes +constexpr const std::chrono::hours OP_TIMEOUT {24}; // one day +constexpr const std::chrono::hours OP_MARGIN {2}; // two hours constexpr const char* const HTTP_PROTO {"http://"}; using ListenToken = uint64_t; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 776f5d3122a89d5954603ec85fc0065778454b47..def4068ef68345963905f22a919fddf38cb1ce74 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -651,10 +651,10 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, std::lock_guard<std::mutex> lock(lockPushListeners_); // Insert new or return existing push listeners of a token - auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first; - auto pushListeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first; + auto& pushListener = pushListeners_[pushToken]; + auto& pushListeners = pushListener.listeners[infoHash]; - for (auto &listener: pushListeners->second) { + for (auto &listener: pushListeners) { // Found -> Resubscribe if (listener.clientId == clientId) { if (logger_) @@ -662,12 +662,16 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, // Reset timers listener.expireTimer->expires_at(timeout); listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); + { + std::lock_guard<std::mutex> l(listener.sessionCtx->lock); + listener.sessionCtx->sessionId = sessionId; + } // Send response header auto response = std::make_shared<ResponseByPartsBuilder>( initHttpResponse(request->create_response<ResponseByParts>())); response->flush(); - // No Refresh - if (!root.isMember("refresh") or !root["refresh"].asBool()){ + if (!root["refresh"].asBool()) { + // No Refresh dht_->get(infoHash, [this, response](const Sp<Value>& value){ auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n"; response->append_chunk(output); @@ -677,8 +681,8 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, [response] (bool){ response->done(); }); - // Refresh } else { + // Refresh response->append_chunk("{}\n"); response->done(); } @@ -687,19 +691,24 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, } // =========== No existing listener for an infoHash ============ // Add new listener to list of listeners - pushListeners->second.emplace_back(Listener{}); - auto &listener = pushListeners->second.back(); + pushListeners.emplace_back(Listener{}); + auto &listener = pushListeners.back(); listener.clientId = clientId; + listener.sessionCtx = std::make_shared<PushSessionContext>(); + listener.sessionCtx->sessionId = sessionId; // Add listen on dht listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, isAndroid, clientId, sessionId] + [this, infoHash, pushToken, isAndroid, clientId, sessionCtx = listener.sessionCtx] (const std::vector<std::shared_ptr<Value>>& values, bool expired){ // Build message content Json::Value json; json["key"] = infoHash.toString(); json["to"] = clientId; - json["s"] = sessionId; + { + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + } if (expired and values.size() < 2){ std::stringstream ss; for(size_t i = 0; i < values.size(); ++i){ @@ -722,12 +731,16 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx, timeout - proxy::OP_MARGIN); else listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); - Json::Value json; - json["timeout"] = infoHash.toString(); - json["to"] = clientId; - json["s"] = sessionId; + auto jsonProvider = [infoHash, clientId, sessionCtx = listener.sessionCtx](){ + Json::Value json; + json["timeout"] = infoHash.toString(); + json["to"] = clientId; + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + return json; + }; listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this, - std::placeholders::_1, pushToken, std::move(json), isAndroid)); + std::placeholders::_1, pushToken, std::move(jsonProvider), isAndroid)); // cancel push listen if (!listener.expireTimer) listener.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout); @@ -798,17 +811,17 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request, void DhtProxyServer::handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken, - Json::Value json, const bool isAndroid) + std::function<Json::Value()> jsonProvider, const bool isAndroid) { if (ec == asio::error::operation_aborted) return; - else if (ec){ + else if (ec) { if (logger_) logger_->e("[proxy:server] [subscribe] error sending put refresh: %s", ec.message().c_str()); } if (logger_) logger_->d("[proxy:server] [subscribe] sending put refresh to %s token", pushToken.c_str()); - sendPushNotification(pushToken, std::move(json), isAndroid, false); + sendPushNotification(pushToken, jsonProvider(), isAndroid, false); } void @@ -970,36 +983,40 @@ DhtProxyServer::put(restinio::request_handle_t request, logger_->d("[proxy:server] [put %s] %s %s", infoHash.toString().c_str(), value->toString().c_str(), (permanent ? "permanent" : "")); if (permanent) { - std::string pushToken, clientId, platform; + std::string pushToken, clientId, sessionId, platform; auto& pVal = root["permanent"]; if (pVal.isObject()){ pushToken = pVal["key"].asString(); clientId = pVal["client_id"].asString(); platform = pVal["platform"].asString(); + sessionId = pVal["session_id"].asString(); } std::unique_lock<std::mutex> lock(lockSearchPuts_); auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT; auto vid = value->id; - auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first; - auto r = sPuts->second.puts.emplace(vid, PermanentPut{}); - auto& pput = r.first->second; - if (r.second){ + auto& sPuts = puts_[infoHash]; + auto& pput = sPuts.puts[vid]; + if (not pput.expireTimer) { auto &ctx = io_context(); // cancel permanent put - if (!pput.expireTimer) - pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout); - else - pput.expireTimer->expires_at(timeout); + pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout); pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this, std::placeholders::_1, infoHash, vid)); #ifdef OPENDHT_PUSH_NOTIFICATIONS if (not pushToken.empty()){ + pput.sessionCtx = std::make_shared<PushSessionContext>(); + pput.sessionCtx->sessionId = sessionId; // notify push listen expire bool isAndroid = platform == "android"; - Json::Value json; - json["timeout"] = infoHash.toString(); - json["to"] = clientId; - json["vid"] = std::to_string(vid); + auto jsonProvider = [infoHash, clientId, vid, sessionCtx = pput.sessionCtx](){ + Json::Value json; + json["timeout"] = infoHash.toString(); + json["to"] = clientId; + json["vid"] = std::to_string(vid); + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + return json; + }; if (!pput.expireNotifyTimer) pput.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx, timeout - proxy::OP_MARGIN); @@ -1007,10 +1024,14 @@ DhtProxyServer::put(restinio::request_handle_t request, pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); pput.expireNotifyTimer->async_wait(std::bind( &DhtProxyServer::handleNotifyPushListenExpire, this, - std::placeholders::_1, pushToken, json, isAndroid)); + std::placeholders::_1, pushToken, std::move(jsonProvider), isAndroid)); } #endif } else { + if (not sessionId.empty()) { + std::lock_guard<std::mutex> l(pput.sessionCtx->lock); + pput.sessionCtx->sessionId = sessionId; + } pput.expireTimer->expires_at(timeout); if (pput.expireNotifyTimer) pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);