diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 6b9f66558ec4cae0c95087ea2486c29c67129c74..5630f78b37090c0915b3ff48cb3181191435da10 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -303,7 +303,7 @@ private: /** * Send Listen with httpClient_ */ - void sendListen(const restinio::http_request_header_t header, const ValueCallback& cb, + void sendListen(const restinio::http_request_header_t& header, const ValueCallback& cb, const Sp<OperationState>& opstate, Listener& listener, ListenMethod method = ListenMethod::LISTEN); void handleResubscribe(const asio::error_code& ec, const InfoHash& key, const size_t token, std::shared_ptr<OperationState> opstate); @@ -327,6 +327,7 @@ private: std::shared_ptr<dht::crypto::Certificate> serverCertificate_; //std::pair<std::string, std::string> serverHostService_; std::string pushClientId_; + std::string pushSessionId_; mutable std::mutex lockCurrentProxyInfos_; NodeStatus statusIpv4_ {NodeStatus::Disconnected}; diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index bcb83405ca3b94e6e6cc2f4432a1b1a0ee4d5b0d..ad03d1eabbc17772918f4574f007b1feedb02112 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -360,6 +360,7 @@ private: #ifdef OPENDHT_PUSH_NOTIFICATIONS struct Listener { std::string clientId; + std::string sessionId; std::future<size_t> internalToken; std::unique_ptr<asio::steady_timer> expireTimer; std::unique_ptr<asio::steady_timer> expireNotifyTimer; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 3cfb68ae8c7f39767bc6ba4f0a85db577630181a..6793ed8a867876ff303d0272a1f0d8efded45f4e 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -93,6 +93,16 @@ private: std::string line_ {}; }; +std::string +getRandomSessionId(size_t length = 8) { + static constexpr const char chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!#$%&()*+,./:;<=>?@[]^_`{|}~"; + std::string str(length, 0); + crypto::random_device rdev; + std::uniform_int_distribution<> dist(0, (sizeof(chars)/sizeof(char)) - 1); + std::generate_n( str.begin(), length, [&]{ return chars[dist(rdev)]; } ); + return str; +} + DhtProxyClient::DhtProxyClient() {} DhtProxyClient::DhtProxyClient( @@ -101,7 +111,8 @@ DhtProxyClient::DhtProxyClient( const std::string& pushClientId, std::shared_ptr<dht::Logger> logger) : proxyUrl_(serverHost) , clientIdentity_(clientIdentity), serverCertificate_(serverCA) - , pushClientId_(pushClientId), loopSignal_(signal) + , pushClientId_(pushClientId), pushSessionId_(getRandomSessionId()) + , loopSignal_(signal) , jsonReader_(Json::CharReaderBuilder{}.newCharReader()) , logger_(logger) { @@ -929,7 +940,7 @@ DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& } void -DhtProxyClient::sendListen(const restinio::http_request_header_t header, +DhtProxyClient::sendListen(const restinio::http_request_header_t& header, const ValueCallback& cb, const Sp<OperationState>& opstate, Listener& listener, ListenMethod method) @@ -1118,6 +1129,12 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string statusIpv6_ = NodeStatus::Connected; } try { + auto sessionId = notification.find("s"); + if (sessionId != notification.end() and sessionId->second != pushSessionId_) { + if (logger_) + logger_->d("[proxy:client] [push] ignoring push for other session"); + return; + } std::lock_guard<std::mutex> lock(searchLock_); auto timeout = notification.find("timeout"); if (timeout != notification.cend()) { @@ -1162,7 +1179,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string } else { std::stringstream ss(expired->second); std::vector<Value::Id> ids; - while(ss.good()){ + while(ss.good()) { std::string substr; getline(ss, substr, ','); ids.emplace_back(std::stoull(substr)); @@ -1237,6 +1254,7 @@ DhtProxyClient::getPushRequest(Json::Value& body) const { body["key"] = deviceKey_; body["client_id"] = pushClientId_; + body["session_id"] = pushSessionId_; #ifdef __ANDROID__ body["platform"] = "android"; #endif diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 00123093f6734ae603309dac77a79ae45a19805b..776f5d3122a89d5954603ec85fc0065778454b47 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -622,15 +622,16 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, } try { std::string err; - Json::Value root; + Json::Value r; auto* char_data = reinterpret_cast<const char*>(request->body().data()); auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader()); - if (!reader->parse(char_data, char_data + request->body().size(), &root, &err)){ + if (!reader->parse(char_data, char_data + request->body().size(), &r, &err)){ auto response = initHttpResponse( request->create_response(restinio::status_bad_request())); response.set_body(RESP_MSG_JSON_INCORRECT); return response.done(); } + const Json::Value& root(r); // parse using const Json so [] never creates element auto pushToken = root["key"].asString(); if (pushToken.empty()){ auto response = initHttpResponse( @@ -638,12 +639,12 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, response.set_body(RESP_MSG_NO_TOKEN); return response.done(); } - auto platform = root["platform"].asString(); - auto isAndroid = platform == "android"; - auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string(); + auto isAndroid = root["platform"].asString() == "android"; + auto clientId = root["client_id"].asString(); + auto sessionId = root["session_id"].asString(); if (logger_) - logger_->d("[proxy:server] [subscribe %s] [client %s]", infoHash.toString().c_str(), clientId.c_str()); + logger_->d("[proxy:server] [subscribe %s] [client %s] [session %s]", infoHash.toString().c_str(), clientId.c_str(), sessionId.c_str()); // ================ Search for existing listener =================== // start the timer auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT; @@ -653,11 +654,11 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first; auto pushListeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first; - for (auto &listener: pushListeners->second){ - if (logger_) - logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str()); + for (auto &listener: pushListeners->second) { // Found -> Resubscribe - if (listener.clientId == clientId){ + if (listener.clientId == clientId) { + if (logger_) + logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str()); // Reset timers listener.expireTimer->expires_at(timeout); listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); @@ -692,12 +693,13 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, // Add listen on dht listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, isAndroid, clientId] + [this, infoHash, pushToken, isAndroid, clientId, sessionId] (const std::vector<std::shared_ptr<Value>>& values, bool expired){ // Build message content Json::Value json; json["key"] = infoHash.toString(); json["to"] = clientId; + json["s"] = sessionId; if (expired and values.size() < 2){ std::stringstream ss; for(size_t i = 0; i < values.size(); ++i){ @@ -714,7 +716,7 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, } ); // Launch timers - auto &ctx = io_context(); + auto& ctx = io_context(); // expire notify if (!listener.expireNotifyTimer) listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx, timeout - proxy::OP_MARGIN); @@ -723,8 +725,9 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, Json::Value json; json["timeout"] = infoHash.toString(); json["to"] = clientId; + json["s"] = sessionId; listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this, - std::placeholders::_1, pushToken, json, isAndroid)); + std::placeholders::_1, pushToken, std::move(json), isAndroid)); // cancel push listen if (!listener.expireTimer) listener.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);