diff --git a/include/opendht/dht.h b/include/opendht/dht.h index ab04113421245a5327e2685e9d3462a24fdd44d3..3cbd885a772368cc511cce603c8cad73d8bab20e 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -71,11 +71,6 @@ public: Dht(int s, int s6, Config config); virtual ~Dht(); - -#if OPENDHT_PROXY_CLIENT - void startProxy(const std::string&, const std::string&) {}; -#endif - /** * Get the ID of the node. */ diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index bf74b39700e338b2a1e909c4df4590770747a694..c56476fe17963b6c17f85b125248eb6ffadb57ad 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -29,7 +29,10 @@ public: virtual ~DhtInterface() = default; #if OPENDHT_PROXY_CLIENT - virtual void startProxy(const std::string& host, const std::string& deviceKey = "") = 0; + //virtual void startProxy() {}; +#if OPENDHT_PUSH_NOTIFICATIONS + virtual void setPushNotificationToken(const std::string& token) {}; +#endif #endif // [[deprecated]] diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 0cd9bafca513c09c9e894ec9161722e9a0a7b784..bd4705601dcf1b2de40d5e5938f4365bcffd7029 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -46,13 +46,13 @@ public: * and an ID for the node. * @param serverHost the proxy address */ - explicit DhtProxyClient(const std::string& serverHost); - /** - * Start the connection with a server. - * @param serverHost the server address - * @param deviceKey if we use push notifications - */ - void startProxy(const std::string& serverHost, const std::string& deviceKey = ""); + explicit DhtProxyClient(const std::string& serverHost, const std::string& pushClientId = ""); + +#if OPENDHT_PUSH_NOTIFICATIONS + virtual void setPushNotificationToken(const std::string& token) { + deviceKey_ = token; + } +#endif virtual ~DhtProxyClient(); @@ -248,6 +248,12 @@ public: private: const ValueType NO_VALUE; + + /** + * Start the connection with a server. + */ + void startProxy(); + /** * Get informations from the proxy node * @return the JSON returned by the proxy @@ -266,6 +272,7 @@ private: */ void cancelAllOperations(); std::string serverHost_; + std::string pushClientId_; NodeStatus statusIpv4_ {NodeStatus::Disconnected}; NodeStatus statusIpv6_ {NodeStatus::Disconnected}; diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index c8f09dde875f12080ae35c5cf698293b8c5b81cc..5ab1581221d61c07fed657fb3fa25e43fc734e75 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -210,13 +210,14 @@ private: #if OPENDHT_PUSH_NOTIFICATIONS struct PushListener { - std::string key; + std::string pushToken; InfoHash hash; unsigned token; std::future<size_t> internalToken; std::chrono::steady_clock::time_point deadline; bool started {false}; unsigned callbackId {0}; + std::string clientId {}; bool isAndroid {true}; }; mutable std::mutex lockPushListeners_; diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index f65a69856419c4904c25df0e3f51ffcbdeeea852..67c0cbf06f3312e42b150ef309bcb2ac18f03837 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -304,6 +304,7 @@ public: bool threaded; #if OPENDHT_PROXY_CLIENT std::string proxy_server; + std::string push_node_id; #endif //OPENDHT_PROXY_CLIENT }; @@ -313,11 +314,7 @@ public: * @param threaded: If false, ::loop() must be called periodically. Otherwise a thread is launched. * @param cb: Optional callback to receive general state information. */ - void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0 -#if OPENDHT_PROXY_CLIENT - , const std::string& proxy_server = "127.0.0.1:8000" -#endif //OPENDHT_PROXY_CLIENT -) { + void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0) { run(port, { /*.dht_config = */{ /*.node_config = */{ @@ -330,7 +327,8 @@ public: }, /*.threaded = */threaded, #if OPENDHT_PROXY_CLIENT - /*.proxy_server = */proxy_server + /*.proxy_server = */"", + /*.push_node_id = */"" #endif //OPENDHT_PROXY_CLIENT }); } @@ -378,21 +376,29 @@ public: void join(); #if OPENDHT_PROXY_CLIENT - void setProxyServer(const std::string& url = "127.0.0.1:8000") { + void setProxyServer(const std::string& url = "127.0.0.1:8000", const std::string& pushNodeId = "") { config_.proxy_server = url; + config_.push_node_id = pushNodeId; } + /** * Start or stop the proxy * @param proxify if we want to use the proxy * @param deviceKey non empty to enable push notifications */ - void enableProxy(bool proxify, const std::string& deviceKey = ""); + void enableProxy(bool proxify); + #endif // OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_SERVER void forwardAllMessages(bool forward); #endif // OPENDHT_PROXY_SERVER #if OPENDHT_PUSH_NOTIFICATIONS + /** + * Updates the push notification device token + */ + void setPushNotificationToken(const std::string& token); + /** * Insert a push notification to process for OpenDHT */ @@ -442,6 +448,10 @@ private: */ std::unique_ptr<SecureDht> dht_via_proxy_; Config config_; + +#if OPENDHT_PUSH_NOTIFICATIONS + std::string pushToken_; +#endif #endif // OPENDHT_PROXY_CLIENT /** * Store current listeners and translates global tokens for each client. diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 10c3b8e11bcbc70020fbe91f06ba02182308f068..0f0b96c2f6677159cf1be9fd98d96173d77ccaef 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -294,10 +294,12 @@ public: } #if OPENDHT_PROXY_CLIENT - void startProxy(const std::string& host, const std::string& deviceKey = "") { - dht_->startProxy(host, deviceKey); +#if OPENDHT_PUSH_NOTIFICATIONS + void setPushNotificationToken(const std::string& token = "") { + dht_->setPushNotificationToken(token); } #endif +#endif #if OPENDHT_PROXY_SERVER void forwardAllMessages(bool forward) { diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 884cac9eb83794aaec2f81c7b7916eec888252bc..016ea93054ccf713488a052d2d9da97972bd61c1 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -31,12 +31,12 @@ constexpr const char* const HTTP_PROTO {"http://"}; namespace dht { -DhtProxyClient::DhtProxyClient(const std::string& serverHost) -: serverHost_(serverHost), lockCurrentProxyInfos_(new std::mutex()), +DhtProxyClient::DhtProxyClient(const std::string& serverHost, const std::string& pushClientId) +: serverHost_(serverHost), pushClientId_(pushClientId), lockCurrentProxyInfos_(new std::mutex()), scheduler(DHT_LOG), currentProxyInfos_(new Json::Value()) { if (!serverHost_.empty()) - startProxy(serverHost_); + startProxy(); } void @@ -56,11 +56,9 @@ DhtProxyClient::confirmProxy() } void -DhtProxyClient::startProxy(const std::string& serverHost, const std::string& deviceKey) +DhtProxyClient::startProxy() { - serverHost_ = serverHost; if (serverHost_.empty()) return; - deviceKey_ = deviceKey; auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5); nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this)); auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5); @@ -753,16 +751,17 @@ DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) // } Json::Value body; body["key"] = deviceKey_; + body["client_id"] = pushClientId_; { std::lock_guard<std::mutex> lock(lockCallback_); callbackId_ += 1; body["callback_id"] = callbackId_; } #ifdef __ANDROID__ - body["isAndroid"] = true; + body["platform"] = "android"; #endif #ifdef __APPLE__ - body["isAndroid"] = false; + body["platform"] = "apple"; #endif Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index b09f4e27b7d1e39983053d715f2b3bb346d1b8d8..7abaca578ddbc717e30ad32e189212d00cd1a300 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -284,17 +284,19 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); return; } - auto userKey = root["key"].asString(); - if (userKey.empty()) return; + auto pushToken = root["key"].asString(); + if (pushToken.empty()) return; auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; - auto isAndroid = root.isMember("isAndroid") ? root["isAndroid"].asBool() : true; + auto platform = root["platform"].asString(); + auto isAndroid = platform == "android"; + auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string(); auto token = 0; { std::lock_guard<std::mutex> lock(lockListener_); // Check if listener is already present and refresh timeout if launched for(auto& listener: pushListeners_) { - if (listener.key == userKey && listener.hash == infoHash + if (listener.pushToken == pushToken && listener.hash == infoHash && listener.callbackId == callbackId) { if (listener.started) listener.deadline = std::chrono::steady_clock::now() @@ -307,11 +309,12 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons ++tokenPushNotif_; token = tokenPushNotif_; PushListener listener; - listener.key = userKey; + listener.pushToken = pushToken; listener.hash = std::move(infoHash); listener.token = token; listener.started = false; listener.callbackId = callbackId; + listener.clientId = clientId; listener.isAndroid = isAndroid; pushListeners_.emplace_back(std::move(listener)); } @@ -348,8 +351,8 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) co s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); return; } - auto userKey = root["key"].asString(); - if (userKey.empty()) return; + auto pushToken = root["key"].asString(); + if (pushToken.empty()) return; auto token = root["token"].asLargestUInt(); if (token == 0) return; auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; @@ -358,7 +361,7 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) co // Check if listener is already present and refresh timeout if launched auto listener = pushListeners_.begin(); while (listener != pushListeners_.end()) { - if (listener->key == userKey && listener->token == token + if (listener->pushToken == pushToken && listener->token == token && listener->hash == infoHash && listener->callbackId == callbackId) { if (dht_ && listener->started) dht_->cancelListen(listener->hash, std::move(listener->internalToken.get())); @@ -415,23 +418,24 @@ DhtProxyServer::handlePushListeners() while (pushListener != pushListeners_.end()) { if (dht_ && !pushListener->started) { // Try to start unstarted listeners - auto key = pushListener->key; + auto key = pushListener->pushToken; auto token = pushListener->token; auto callbackId = pushListener->callbackId; auto isAndroid = pushListener->isAndroid; - auto internalToken = std::move(dht_->listen(pushListener->hash, - [this, key, callbackId, token, isAndroid](std::shared_ptr<Value> /*value*/) { + auto clientId = pushListener->clientId; + pushListener->internalToken = dht_->listen(pushListener->hash, + [this, key, callbackId, token, isAndroid, clientId](std::shared_ptr<Value> /*value*/) { // Build message content. Json::Value json; if (callbackId > 0) { json["callback_id"] = callbackId; } + json["to"] = clientId; json["token"] = token; sendPushNotification(key, json, isAndroid); return true; } - )); - pushListener->internalToken = std::move(internalToken); + ); pushListener->deadline = std::chrono::steady_clock::now() + std::chrono::seconds(TIMEOUT); pushListener->started = true; pushListener++; @@ -444,8 +448,9 @@ DhtProxyServer::handlePushListeners() if (pushListener->callbackId > 0) { json["callback_id"] = pushListener->callbackId; } + json["to"] = pushListener->clientId; json["token"] = pushListener->token; - sendPushNotification(pushListener->key, json, pushListener->isAndroid); + sendPushNotification(pushListener->pushToken, json, pushListener->isAndroid); pushListener = pushListeners_.erase(pushListener); } else { pushListener++; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index e13c759b6739d2c8db665bbdaeb8b638d3c89174..ee38f549ff665085081c8ab2f1ca518328233f3d 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -89,8 +89,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config rcv_thread.join(); running = true; #if OPENDHT_PROXY_CLIENT - config_.dht_config = config.dht_config; - config_.threaded = config.threaded; + config_ = config; #endif //OPENDHT_PROXY_CLIENT doRun(local4, local6, config.dht_config); if (not config.threaded) @@ -436,7 +435,7 @@ DhtRunner::doRun(const SockAddr& sin4, const SockAddr& sin6, SecureDht::Config c #if OPENDHT_PROXY_CLIENT if (!dht_via_proxy_) { auto dht_via_proxy = std::unique_ptr<DhtInterface>( - new DhtProxyClient(config_.proxy_server) + new DhtProxyClient(config_.proxy_server, config_.push_node_id) ); dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); } @@ -861,16 +860,20 @@ DhtRunner::activeDht() const #if OPENDHT_PROXY_CLIENT void -DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { - if (!dht_via_proxy_) { - auto dht_via_proxy = std::unique_ptr<DhtInterface>( - new DhtProxyClient(config_.proxy_server) - ); - dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); +DhtRunner::enableProxy(bool proxify) +{ + if (dht_via_proxy_) { + dht_via_proxy_->shutdown({}); } if (proxify) { // Init the proxy client - dht_via_proxy_->startProxy(config_.proxy_server, deviceKey); + auto dht_via_proxy = std::unique_ptr<DhtInterface>( + new DhtProxyClient(config_.proxy_server, config_.push_node_id) + ); + dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); + if (not pushToken_.empty()) + dht_via_proxy_->setPushNotificationToken(pushToken_); + //dht_via_proxy_->startProxy(); // add current listeners for (auto& listener: listeners_) { auto tokenProxy = dht_via_proxy_->listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w)); @@ -881,9 +884,6 @@ DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { } else { use_proxy = proxify; loop_(); // Restart the classic DHT. - // We doesn't need to maintain the connection with the proxy. - // Delete it - dht_via_proxy_->shutdown({}); // update all proxyToken for all proxyListener auto it = listeners_.begin(); for (; it != listeners_.end(); ++it) { @@ -899,6 +899,17 @@ DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { } } } + +/** + * Updates the push notification device token + */ +void +DhtRunner::setPushNotificationToken(const std::string& token) { + pushToken_ = token; + if (dht_via_proxy_) + dht_via_proxy_->setPushNotificationToken(token); +} + #endif // OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_SERVER diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 348e625b00a0961c51136e2ef1ab05b7919302fd..4b1839815c7b5c2a8e3215dcf4b5011e988f6414 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -220,8 +220,10 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params #if OPENDHT_PROXY_CLIENT else if (op == "stt") { iss >> idstr >> deviceKey; + if (not deviceKey.empty()) + dht->setPushNotificationToken(deviceKey); dht->setProxyServer(idstr); - dht->enableProxy(true, deviceKey); + dht->enableProxy(true); continue; } else if (op == "stp") { dht->enableProxy(false); @@ -483,6 +485,13 @@ main(int argc, char **argv) } dht->run(params.port, crt, true, params.network); +#if OPENDHT_PROXY_CLIENT + if (!params.proxyclient.empty()) { + dht->setPushNotificationToken(params.devicekey); + dht->setProxyServer(params.proxyclient, "dhtnode"); + dht->enableProxy(true); + } +#endif //OPENDHT_PROXY_CLIENT if (params.log) { if (params.syslog or (params.daemonize and params.logfile.empty())) @@ -498,22 +507,16 @@ main(int argc, char **argv) dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); } - #if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER std::map<in_port_t, std::unique_ptr<DhtProxyServer>> proxies; if (params.proxyserver != 0) { proxies.emplace(params.proxyserver, new DhtProxyServer(dht, params.proxyserver - #if OPENDHT_PUSH_NOTIFICATIONS +#if OPENDHT_PUSH_NOTIFICATIONS , params.pushserver - #endif // OPENDHT_PUSH_NOTIFICATIONS +#endif // OPENDHT_PUSH_NOTIFICATIONS )); } - #endif //OPENDHT_PROXY_SERVER - #if OPENDHT_PROXY_CLIENT - if (!params.proxyclient.empty()) { - dht->setProxyServer(params.proxyclient); - dht->enableProxy(true, params.devicekey); - } - #endif //OPENDHT_PROXY_CLIENT +#endif //OPENDHT_PROXY_SERVER if (params.daemonize or params.service) while (runner.wait());