diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 7ac306fd56008843911f095997d124fac76befe7..de689cc84193b7b660937a78d76e2c2e44b5057e 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -354,12 +354,11 @@ private: * NOTE: empty by default to avoid to use services like FCM or APN. */ std::string deviceKey_ {}; - std::atomic_uint callbackId_ {0}; const std::function<void()> loopSignal_; #if OPENDHT_PUSH_NOTIFICATIONS - void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned callbackId); + void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned token = 0); #endif // OPENDHT_PUSH_NOTIFICATIONS bool isDestroying_ {false}; diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index cb15ba55303aa068c3e3d92b34a38d73ce814eb4..f15e05bcd84aed18374fbbe80be4aa69f0aac0c6 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -183,24 +183,18 @@ private: /** * Subscribe to push notifications for an iOS or Android device. * Method: SUBSCRIBE "/{InfoHash: .*}" - * Body: {"key": "device_key", (optional)"callback_id":y, - * (optional)"isAndroid":false (default true)}" + * Body: {"key": "device_key", (optional)"isAndroid":false (default true)}" * Return: {"token": x}" where x if a token to save * @note: the listen will timeout after six hours (and send a push notification). * so you need to refresh the operation each six hours. - * @note: callback_id is used to add the possibility to have multiple listen - * on same hash for same device and must be > 0 * @param session */ void subscribe(const std::shared_ptr<restbed::Session>& session); /** * Unsubscribe to push notifications for an iOS or Android device. * Method: UNSUBSCRIBE "/{InfoHash: .*}" - * Body: {"key": "device_key", "token": x, (optional)"callback_id":y" - * where x if the token to cancel + * Body: {"key": "device_key", "token": x} where x if the token to cancel * Return: nothing - * @note: callback id is used to add the possibility to have multiple listen - * on same hash for same device * @param session */ void unsubscribe(const std::shared_ptr<restbed::Session>& session); @@ -247,7 +241,7 @@ private: std::map<std::string, PushListener> pushListeners_; unsigned tokenPushNotif_ {0}; - void cancelPushListen(const std::string& pushToken, const InfoHash& key, unsigned token, unsigned callbackId); + void cancelPushListen(const std::string& pushToken, const InfoHash& key, unsigned token); #endif //OPENDHT_PUSH_NOTIFICATIONS const std::string pushServer_; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 41b2d6deba00e0b95791d9afd05b7101661e5e26..8876b22a4c1e07c09371daf70dc3580f2da5af04 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -48,8 +48,8 @@ struct DhtProxyClient::Listener Sp<bool> isCanceledViaClose; Sp<unsigned> pushNotifToken; // NOTE: unused if not using push notifications Sp<Scheduler::Job> refreshJob; - Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f, unsigned cid) - : cache(std::move(c)), filter(std::move(f)), req(r), callbackId(cid), isCanceledViaClose(std::make_shared<bool>(false)) + Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f) + : cache(std::move(c)), filter(std::move(f)), req(r), isCanceledViaClose(std::make_shared<bool>(false)) {} }; @@ -599,9 +599,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi } auto token = ++listener_token_; - auto callbackId = ++callbackId_; auto l = search->second.listeners.emplace(token, Listener{ - ValueCache(cb), req, std::move(filter), callbackId + ValueCache(cb), req, std::move(filter) }).first; if (not l->second.cacheExpirationJob) { l->second.cacheExpirationJob = scheduler.add(time_point::max(), [this, key, token]{ @@ -645,7 +644,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi } #if OPENDHT_PUSH_NOTIFICATIONS else - fillBodyToGetToken(req, callbackId); + fillBodyToGetToken(req); #endif struct State { @@ -746,7 +745,6 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) body["key"] = deviceKey_; body["client_id"] = pushClientId_; body["token"] = std::to_string(ltoken); - body["callback_id"] = listener.callbackId; Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; wbuilder["indentation"] = ""; @@ -914,10 +912,9 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) listener.thread.join(); listener.req = req; listener.pushNotifToken = pushNotifToken; - auto callbackId = listener.callbackId; listener.thread = std::thread([=]() { - fillBodyToGetToken(req, callbackId); + fillBodyToGetToken(req); auto settings = std::make_shared<restbed::Settings>(); auto ok = std::make_shared<std::atomic_bool>(true); restbed::Http::async(req, @@ -953,17 +950,18 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) #if OPENDHT_PUSH_NOTIFICATIONS void -DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req, unsigned callbackId) +DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req, unsigned token) { // Fill body with // { // "key":"device_key", - // "callback_id": xxx + // "token": xxx // } Json::Value body; body["key"] = deviceKey_; body["client_id"] = pushClientId_; - body["callback_id"] = callbackId; + if (token > 0) + body["token"] = token; #ifdef __ANDROID__ body["platform"] = "android"; #endif diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 034dc5241f86daf99ead222c5965522ab02ee429..bd3a877e923a7cf8f2a9ac76846abae4053ebe60 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -305,7 +305,6 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) struct DhtProxyServer::Listener { unsigned token; - unsigned callbackId {0}; std::future<size_t> internalToken; Sp<Scheduler::Job> expireJob; }; @@ -342,7 +341,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) } auto pushToken = root["key"].asString(); if (pushToken.empty()) return; - auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; + auto tokenFromReq = root.isMember("token") ? root["token"].asLargestUInt() : 0; auto platform = root["platform"].asString(); auto isAndroid = platform == "android"; auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string(); @@ -351,10 +350,11 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) { std::lock_guard<std::mutex> lock(lockListener_); // Check if listener is already present and refresh timeout if launched + auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first; auto listeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first; for (auto& listener: listeners->second) { - if (listener.callbackId == callbackId) { + if (listener.token == tokenFromReq) { { std::lock_guard<std::mutex> l(schedulerLock_); scheduler_.edit(listener.expireJob, scheduler_.time() + OP_TIMEOUT); @@ -371,14 +371,10 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) token = tokenPushNotif_; Listener listener; listener.token = token; - listener.callbackId = callbackId; listener.internalToken = dht_->listen(infoHash, - [this, pushToken, callbackId, token, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) { + [this, pushToken, token, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) { // Build message content. Json::Value json; - if (callbackId > 0) { - json["callback_id"] = callbackId; - } json["to"] = clientId; json["token"] = token; sendPushNotification(pushToken, json, isAndroid); @@ -387,14 +383,11 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) ); std::lock_guard<std::mutex> l(schedulerLock_); listener.expireJob = scheduler_.add(scheduler_.time() + OP_TIMEOUT, - [this, pushToken, infoHash, token, callbackId, clientId, isAndroid] { - cancelPushListen(pushToken, infoHash, token, callbackId); + [this, pushToken, infoHash, token, clientId, isAndroid] { + cancelPushListen(pushToken, infoHash, token); // Send a push notification to inform the client that this listen has timeout Json::Value json; json["timeout"] = infoHash.toString(); - if (callbackId > 0) { - json["callback_id"] = callbackId; - } json["to"] = clientId; json["token"] = token; sendPushNotification(pushToken, json, isAndroid); @@ -413,6 +406,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) void DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); @@ -439,9 +433,8 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) if (pushToken.empty()) return; auto token = std::stoull(root["token"].asString()); if (token == 0) return; - auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; - cancelPushListen(pushToken, infoHash, token, callbackId); + cancelPushListen(pushToken, infoHash, token); } catch (...) { // do nothing } @@ -450,7 +443,7 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) } void -DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, unsigned token, unsigned callbackId) +DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, unsigned token) { std::lock_guard<std::mutex> lock(lockListener_); auto pushListener = pushListeners_.find(pushToken); @@ -460,8 +453,7 @@ DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHa if (listeners == pushListener->second.listeners.end()) return; for (auto listener = listeners->second.begin(); listener != listeners->second.end();) { - if (listener->token == token && listener->callbackId == callbackId) { - std::cout << "cancelPushListen " << key << " token:" << token << " cid:" << callbackId << std::endl; + if (listener->token == token) { if (dht_) dht_->cancelListen(key, std::move(listener->internalToken)); listener = listeners->second.erase(listener); @@ -510,6 +502,7 @@ DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value req->set_header("Host", pushServer_); req->set_header("Content-Length", std::to_string(valueStr.length())); req->set_body(valueStr); + // Send request. restbed::Http::async(req, {}); } diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index d2d81775591f15212f213fff7146f6fd4297de43..22edd402bba2e7d17fe015f3c4c0920893cf91dd 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -73,7 +73,7 @@ void print_help() { #if OPENDHT_PUSH_NOTIFICATIONS << " stt [server_address] <device_key> Start the proxy client." << std::endl << " rs [token] Resubscribe to opendht." << std::endl - << " rp [push_notification] Inject a push notification in Opendht." << std::endl + << " rp [token] Inject a push notification in Opendht." << std::endl #else << " stt [server_address] Start the proxy client." << std::endl #endif // OPENDHT_PUSH_NOTIFICATIONS @@ -237,7 +237,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params #if OPENDHT_PUSH_NOTIFICATIONS else if (op == "rp") { iss >> value; - dht->pushNotificationReceived({{"token", value}}); + dht->pushNotificationReceived({{"to", "dhtnode"}, {"token", value}}); continue; } #endif // OPENDHT_PUSH_NOTIFICATIONS