diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index b0edcf532d76b982cfc60bbae7e50d369cfdb3fb..c6dd4c538873af340cf3e90aa3505bd888ef7405 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -296,6 +296,7 @@ private: GetCallback cb; Value::Filter filterChain; std::thread thread; + unsigned callbackId; std::shared_ptr<unsigned> pushNotifToken; // NOTE: unused if not using push notifications }; std::vector<Listener> listeners_; @@ -344,7 +345,7 @@ private: const std::function<void()> loopSignal_; #if OPENDHT_PUSH_NOTIFICATIONS - void fillBodyToGetToken(std::shared_ptr<restbed::Request> request); + void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned callbackId); #endif // OPENDHT_PUSH_NOTIFICATIONS }; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 5d76389af29ce97dc911a8ddc4b385cc92db8afd..0352d4dd36239bca19c3d3f68a7cbdf931ca50df 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -436,12 +436,20 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter auto filterChain = filter.chain(query.where.getFilter()); auto pushNotifToken = std::make_shared<unsigned>(0); + unsigned callbackId = 0; + { + std::lock_guard<std::mutex> lock(lockCallback_); + callbackId_ += 1; + callbackId = callbackId_; + } + Listener l; ++listener_token_; l.key = key.toString(); l.token = listener_token_; l.req = req; l.cb = cb; + l.callbackId = callbackId; l.pushNotifToken = pushNotifToken; l.filterChain = std::move(filterChain); l.thread = std::thread([=]() @@ -453,7 +461,7 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter } #if OPENDHT_PUSH_NOTIFICATIONS else - fillBodyToGetToken(req); + fillBodyToGetToken(req, callbackId); #endif struct State { @@ -546,6 +554,20 @@ DhtProxyClient::cancelListen(const InfoHash&, size_t token) restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); auto req = std::make_shared<restbed::Request>(uri); req->set_method("UNSUBSCRIBE"); + // fill request body + Json::Value body; + body["key"] = deviceKey_; + body["client_id"] = pushClientId_; + body["token"] = std::to_string(token); + body["callback_id"] = listener.callbackId; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto content = Json::writeString(wbuilder, body) + "\n"; + std::replace(content.begin(), content.end(), '\n', ' '); + req->set_body(content); + req->set_header("Content-Length", std::to_string(content.size())); + restbed::Http::async(req, [](const std::shared_ptr<restbed::Request>&, const std::shared_ptr<restbed::Response>&){} @@ -701,9 +723,10 @@ DhtProxyClient::resubscribe(const unsigned token) listener.thread.join(); listener.req = req; listener.pushNotifToken = pushNotifToken; + auto callbackId = listener.callbackId; listener.thread = std::thread([=]() { - fillBodyToGetToken(req); + fillBodyToGetToken(req, callbackId); auto settings = std::make_shared<restbed::Settings>(); auto ok = std::make_shared<std::atomic_bool>(true); restbed::Http::async(req, @@ -741,7 +764,7 @@ DhtProxyClient::resubscribe(const unsigned token) #if OPENDHT_PUSH_NOTIFICATIONS void -DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) +DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req, unsigned callbackId) { // Fill body with // { @@ -751,11 +774,7 @@ DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) Json::Value body; body["key"] = deviceKey_; body["client_id"] = pushClientId_; - { - std::lock_guard<std::mutex> lock(lockCallback_); - callbackId_ += 1; - body["callback_id"] = callbackId_; - } + body["callback_id"] = callbackId; #ifdef __ANDROID__ body["platform"] = "android"; #endif diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 2b5909106ceab9f33541396f640f2682008fc164..f486756a8ce0d8051d481733f8aa53a03d606eec 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -274,9 +274,9 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params } catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; } } } else if (op == "cl") { - std::string rem; - iss >> rem; - dht->cancelListen(id, std::stoul(rem)); + std::string hash, rem; + iss >> hash >> rem; + dht->cancelListen(dht::InfoHash(hash), std::stoul(rem)); } else { // Dht syntax