diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index d0af6a87ec0f27af78e522ca99b0c1a4b2bac97a..b5216ad4935ec6ed65c1b54ecddbe4f1f38b4778 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -213,7 +213,13 @@ private: */ void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const; - void cancelPushListen(const std::string& pushToken, const InfoHash& key, proxy::ListenToken token); + /** + * Remove a push listener between a client and a hash + * @param pushToken + * @param key + * @param clientId + */ + void cancelPushListen(const std::string& pushToken, const InfoHash& key, const std::string& clientId); #endif //OPENDHT_PUSH_NOTIFICATIONS diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 199150f61c4e2c13f37d5a85ba73dc692048a576..efd4e2c80de8eaa1a5f31b11123a19ff28b90934 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -391,6 +391,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) } listeners->second.emplace_back(Listener{}); auto& listener = listeners->second.back(); + listener.clientId = clientId; // New listener pushListener->second.isAndroid = isAndroid; @@ -410,8 +411,8 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) } ); listener.expireJob = scheduler_.add(timeout, - [this, token, infoHash, pushToken] { - cancelPushListen(pushToken, infoHash, *token); + [this, clientId, infoHash, pushToken] { + cancelPushListen(pushToken, infoHash, clientId); } ); listener.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, @@ -460,10 +461,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) } auto pushToken = root["key"].asString(); if (pushToken.empty()) return; - auto token = unpackId(root, "token"); - if (token == 0) return; + auto clientId = root["client_id"].asString(); - cancelPushListen(pushToken, infoHash, token); + cancelPushListen(pushToken, infoHash, clientId); s->close(restbed::OK); } catch (...) { s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); @@ -473,9 +473,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) } void -DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, proxy::ListenToken token) +DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, const std::string& clientId) { - std::cout << "cancelPushListen: " << key << " token:" << token << std::endl; + std::cout << "cancelPushListen: " << key << " clientId:" << clientId << std::endl; std::lock_guard<std::mutex> lock(lockListener_); auto pushListener = pushListeners_.find(pushToken); if (pushListener == pushListeners_.end()) @@ -484,7 +484,7 @@ DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHa if (listeners == pushListener->second.listeners.end()) return; for (auto listener = listeners->second.begin(); listener != listeners->second.end();) { - if (*listener->token == token) { + if (listener->clientId == clientId) { if (dht_) dht_->cancelListen(key, std::move(listener->internalToken)); listener = listeners->second.erase(listener);