From a0d8f4734ac3e46abb3748a1ceb4450c8df4825d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Wed, 10 Jan 2018 16:32:57 -0500 Subject: [PATCH] dhtproxy: allow token refresh, set clientID (#6) * add method to set the push token independently from other configuration, since the push token can be updated during runtime. * allow to provide a "clientID" to be sent in push notifications, used to demultiplex push notifications to multiple OpenDHT nodes running on the same host. For instance Ring uses the accountID as the clientID to know which account received the push notification. * use {"platform":"android"/"ios"} to communicate the platform instead of {"isAndroid":true/false}. --- include/opendht/dht.h | 5 ---- include/opendht/dht_interface.h | 5 +++- include/opendht/dht_proxy_client.h | 21 +++++++++++------ include/opendht/dht_proxy_server.h | 3 ++- include/opendht/dhtrunner.h | 26 ++++++++++++++------- include/opendht/securedht.h | 6 +++-- src/dht_proxy_client.cpp | 15 ++++++------ src/dht_proxy_server.cpp | 33 +++++++++++++++----------- src/dhtrunner.cpp | 37 +++++++++++++++++++----------- tools/dhtnode.cpp | 25 +++++++++++--------- 10 files changed, 106 insertions(+), 70 deletions(-) diff --git a/include/opendht/dht.h b/include/opendht/dht.h index ab041134..3cbd885a 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -71,11 +71,6 @@ public: Dht(int s, int s6, Config config); virtual ~Dht(); - -#if OPENDHT_PROXY_CLIENT - void startProxy(const std::string&, const std::string&) {}; -#endif - /** * Get the ID of the node. */ diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index bf74b397..c56476fe 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -29,7 +29,10 @@ public: virtual ~DhtInterface() = default; #if OPENDHT_PROXY_CLIENT - virtual void startProxy(const std::string& host, const std::string& deviceKey = "") = 0; + //virtual void startProxy() {}; +#if OPENDHT_PUSH_NOTIFICATIONS + virtual void setPushNotificationToken(const std::string& token) {}; +#endif #endif // [[deprecated]] diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 0cd9bafc..bd470560 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -46,13 +46,13 @@ public: * and an ID for the node. * @param serverHost the proxy address */ - explicit DhtProxyClient(const std::string& serverHost); - /** - * Start the connection with a server. - * @param serverHost the server address - * @param deviceKey if we use push notifications - */ - void startProxy(const std::string& serverHost, const std::string& deviceKey = ""); + explicit DhtProxyClient(const std::string& serverHost, const std::string& pushClientId = ""); + +#if OPENDHT_PUSH_NOTIFICATIONS + virtual void setPushNotificationToken(const std::string& token) { + deviceKey_ = token; + } +#endif virtual ~DhtProxyClient(); @@ -248,6 +248,12 @@ public: private: const ValueType NO_VALUE; + + /** + * Start the connection with a server. + */ + void startProxy(); + /** * Get informations from the proxy node * @return the JSON returned by the proxy @@ -266,6 +272,7 @@ private: */ void cancelAllOperations(); std::string serverHost_; + std::string pushClientId_; NodeStatus statusIpv4_ {NodeStatus::Disconnected}; NodeStatus statusIpv6_ {NodeStatus::Disconnected}; diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index c8f09dde..5ab15812 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -210,13 +210,14 @@ private: #if OPENDHT_PUSH_NOTIFICATIONS struct PushListener { - std::string key; + std::string pushToken; InfoHash hash; unsigned token; std::future<size_t> internalToken; std::chrono::steady_clock::time_point deadline; bool started {false}; unsigned callbackId {0}; + std::string clientId {}; bool isAndroid {true}; }; mutable std::mutex lockPushListeners_; diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index f65a6985..67c0cbf0 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -304,6 +304,7 @@ public: bool threaded; #if OPENDHT_PROXY_CLIENT std::string proxy_server; + std::string push_node_id; #endif //OPENDHT_PROXY_CLIENT }; @@ -313,11 +314,7 @@ public: * @param threaded: If false, ::loop() must be called periodically. Otherwise a thread is launched. * @param cb: Optional callback to receive general state information. */ - void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0 -#if OPENDHT_PROXY_CLIENT - , const std::string& proxy_server = "127.0.0.1:8000" -#endif //OPENDHT_PROXY_CLIENT -) { + void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0) { run(port, { /*.dht_config = */{ /*.node_config = */{ @@ -330,7 +327,8 @@ public: }, /*.threaded = */threaded, #if OPENDHT_PROXY_CLIENT - /*.proxy_server = */proxy_server + /*.proxy_server = */"", + /*.push_node_id = */"" #endif //OPENDHT_PROXY_CLIENT }); } @@ -378,21 +376,29 @@ public: void join(); #if OPENDHT_PROXY_CLIENT - void setProxyServer(const std::string& url = "127.0.0.1:8000") { + void setProxyServer(const std::string& url = "127.0.0.1:8000", const std::string& pushNodeId = "") { config_.proxy_server = url; + config_.push_node_id = pushNodeId; } + /** * Start or stop the proxy * @param proxify if we want to use the proxy * @param deviceKey non empty to enable push notifications */ - void enableProxy(bool proxify, const std::string& deviceKey = ""); + void enableProxy(bool proxify); + #endif // OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_SERVER void forwardAllMessages(bool forward); #endif // OPENDHT_PROXY_SERVER #if OPENDHT_PUSH_NOTIFICATIONS + /** + * Updates the push notification device token + */ + void setPushNotificationToken(const std::string& token); + /** * Insert a push notification to process for OpenDHT */ @@ -442,6 +448,10 @@ private: */ std::unique_ptr<SecureDht> dht_via_proxy_; Config config_; + +#if OPENDHT_PUSH_NOTIFICATIONS + std::string pushToken_; +#endif #endif // OPENDHT_PROXY_CLIENT /** * Store current listeners and translates global tokens for each client. diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 10c3b8e1..0f0b96c2 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -294,10 +294,12 @@ public: } #if OPENDHT_PROXY_CLIENT - void startProxy(const std::string& host, const std::string& deviceKey = "") { - dht_->startProxy(host, deviceKey); +#if OPENDHT_PUSH_NOTIFICATIONS + void setPushNotificationToken(const std::string& token = "") { + dht_->setPushNotificationToken(token); } #endif +#endif #if OPENDHT_PROXY_SERVER void forwardAllMessages(bool forward) { diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 884cac9e..016ea930 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -31,12 +31,12 @@ constexpr const char* const HTTP_PROTO {"http://"}; namespace dht { -DhtProxyClient::DhtProxyClient(const std::string& serverHost) -: serverHost_(serverHost), lockCurrentProxyInfos_(new std::mutex()), +DhtProxyClient::DhtProxyClient(const std::string& serverHost, const std::string& pushClientId) +: serverHost_(serverHost), pushClientId_(pushClientId), lockCurrentProxyInfos_(new std::mutex()), scheduler(DHT_LOG), currentProxyInfos_(new Json::Value()) { if (!serverHost_.empty()) - startProxy(serverHost_); + startProxy(); } void @@ -56,11 +56,9 @@ DhtProxyClient::confirmProxy() } void -DhtProxyClient::startProxy(const std::string& serverHost, const std::string& deviceKey) +DhtProxyClient::startProxy() { - serverHost_ = serverHost; if (serverHost_.empty()) return; - deviceKey_ = deviceKey; auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5); nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this)); auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5); @@ -753,16 +751,17 @@ DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) // } Json::Value body; body["key"] = deviceKey_; + body["client_id"] = pushClientId_; { std::lock_guard<std::mutex> lock(lockCallback_); callbackId_ += 1; body["callback_id"] = callbackId_; } #ifdef __ANDROID__ - body["isAndroid"] = true; + body["platform"] = "android"; #endif #ifdef __APPLE__ - body["isAndroid"] = false; + body["platform"] = "apple"; #endif Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index b09f4e27..7abaca57 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -284,17 +284,19 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); return; } - auto userKey = root["key"].asString(); - if (userKey.empty()) return; + auto pushToken = root["key"].asString(); + if (pushToken.empty()) return; auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; - auto isAndroid = root.isMember("isAndroid") ? root["isAndroid"].asBool() : true; + auto platform = root["platform"].asString(); + auto isAndroid = platform == "android"; + auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string(); auto token = 0; { std::lock_guard<std::mutex> lock(lockListener_); // Check if listener is already present and refresh timeout if launched for(auto& listener: pushListeners_) { - if (listener.key == userKey && listener.hash == infoHash + if (listener.pushToken == pushToken && listener.hash == infoHash && listener.callbackId == callbackId) { if (listener.started) listener.deadline = std::chrono::steady_clock::now() @@ -307,11 +309,12 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons ++tokenPushNotif_; token = tokenPushNotif_; PushListener listener; - listener.key = userKey; + listener.pushToken = pushToken; listener.hash = std::move(infoHash); listener.token = token; listener.started = false; listener.callbackId = callbackId; + listener.clientId = clientId; listener.isAndroid = isAndroid; pushListeners_.emplace_back(std::move(listener)); } @@ -348,8 +351,8 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) co s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); return; } - auto userKey = root["key"].asString(); - if (userKey.empty()) return; + auto pushToken = root["key"].asString(); + if (pushToken.empty()) return; auto token = root["token"].asLargestUInt(); if (token == 0) return; auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; @@ -358,7 +361,7 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) co // Check if listener is already present and refresh timeout if launched auto listener = pushListeners_.begin(); while (listener != pushListeners_.end()) { - if (listener->key == userKey && listener->token == token + if (listener->pushToken == pushToken && listener->token == token && listener->hash == infoHash && listener->callbackId == callbackId) { if (dht_ && listener->started) dht_->cancelListen(listener->hash, std::move(listener->internalToken.get())); @@ -415,23 +418,24 @@ DhtProxyServer::handlePushListeners() while (pushListener != pushListeners_.end()) { if (dht_ && !pushListener->started) { // Try to start unstarted listeners - auto key = pushListener->key; + auto key = pushListener->pushToken; auto token = pushListener->token; auto callbackId = pushListener->callbackId; auto isAndroid = pushListener->isAndroid; - auto internalToken = std::move(dht_->listen(pushListener->hash, - [this, key, callbackId, token, isAndroid](std::shared_ptr<Value> /*value*/) { + auto clientId = pushListener->clientId; + pushListener->internalToken = dht_->listen(pushListener->hash, + [this, key, callbackId, token, isAndroid, clientId](std::shared_ptr<Value> /*value*/) { // Build message content. Json::Value json; if (callbackId > 0) { json["callback_id"] = callbackId; } + json["to"] = clientId; json["token"] = token; sendPushNotification(key, json, isAndroid); return true; } - )); - pushListener->internalToken = std::move(internalToken); + ); pushListener->deadline = std::chrono::steady_clock::now() + std::chrono::seconds(TIMEOUT); pushListener->started = true; pushListener++; @@ -444,8 +448,9 @@ DhtProxyServer::handlePushListeners() if (pushListener->callbackId > 0) { json["callback_id"] = pushListener->callbackId; } + json["to"] = pushListener->clientId; json["token"] = pushListener->token; - sendPushNotification(pushListener->key, json, pushListener->isAndroid); + sendPushNotification(pushListener->pushToken, json, pushListener->isAndroid); pushListener = pushListeners_.erase(pushListener); } else { pushListener++; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index e13c759b..ee38f549 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -89,8 +89,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config rcv_thread.join(); running = true; #if OPENDHT_PROXY_CLIENT - config_.dht_config = config.dht_config; - config_.threaded = config.threaded; + config_ = config; #endif //OPENDHT_PROXY_CLIENT doRun(local4, local6, config.dht_config); if (not config.threaded) @@ -436,7 +435,7 @@ DhtRunner::doRun(const SockAddr& sin4, const SockAddr& sin6, SecureDht::Config c #if OPENDHT_PROXY_CLIENT if (!dht_via_proxy_) { auto dht_via_proxy = std::unique_ptr<DhtInterface>( - new DhtProxyClient(config_.proxy_server) + new DhtProxyClient(config_.proxy_server, config_.push_node_id) ); dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); } @@ -861,16 +860,20 @@ DhtRunner::activeDht() const #if OPENDHT_PROXY_CLIENT void -DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { - if (!dht_via_proxy_) { - auto dht_via_proxy = std::unique_ptr<DhtInterface>( - new DhtProxyClient(config_.proxy_server) - ); - dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); +DhtRunner::enableProxy(bool proxify) +{ + if (dht_via_proxy_) { + dht_via_proxy_->shutdown({}); } if (proxify) { // Init the proxy client - dht_via_proxy_->startProxy(config_.proxy_server, deviceKey); + auto dht_via_proxy = std::unique_ptr<DhtInterface>( + new DhtProxyClient(config_.proxy_server, config_.push_node_id) + ); + dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); + if (not pushToken_.empty()) + dht_via_proxy_->setPushNotificationToken(pushToken_); + //dht_via_proxy_->startProxy(); // add current listeners for (auto& listener: listeners_) { auto tokenProxy = dht_via_proxy_->listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w)); @@ -881,9 +884,6 @@ DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { } else { use_proxy = proxify; loop_(); // Restart the classic DHT. - // We doesn't need to maintain the connection with the proxy. - // Delete it - dht_via_proxy_->shutdown({}); // update all proxyToken for all proxyListener auto it = listeners_.begin(); for (; it != listeners_.end(); ++it) { @@ -899,6 +899,17 @@ DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { } } } + +/** + * Updates the push notification device token + */ +void +DhtRunner::setPushNotificationToken(const std::string& token) { + pushToken_ = token; + if (dht_via_proxy_) + dht_via_proxy_->setPushNotificationToken(token); +} + #endif // OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_SERVER diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 348e625b..4b183981 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -220,8 +220,10 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params #if OPENDHT_PROXY_CLIENT else if (op == "stt") { iss >> idstr >> deviceKey; + if (not deviceKey.empty()) + dht->setPushNotificationToken(deviceKey); dht->setProxyServer(idstr); - dht->enableProxy(true, deviceKey); + dht->enableProxy(true); continue; } else if (op == "stp") { dht->enableProxy(false); @@ -483,6 +485,13 @@ main(int argc, char **argv) } dht->run(params.port, crt, true, params.network); +#if OPENDHT_PROXY_CLIENT + if (!params.proxyclient.empty()) { + dht->setPushNotificationToken(params.devicekey); + dht->setProxyServer(params.proxyclient, "dhtnode"); + dht->enableProxy(true); + } +#endif //OPENDHT_PROXY_CLIENT if (params.log) { if (params.syslog or (params.daemonize and params.logfile.empty())) @@ -498,22 +507,16 @@ main(int argc, char **argv) dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); } - #if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER std::map<in_port_t, std::unique_ptr<DhtProxyServer>> proxies; if (params.proxyserver != 0) { proxies.emplace(params.proxyserver, new DhtProxyServer(dht, params.proxyserver - #if OPENDHT_PUSH_NOTIFICATIONS +#if OPENDHT_PUSH_NOTIFICATIONS , params.pushserver - #endif // OPENDHT_PUSH_NOTIFICATIONS +#endif // OPENDHT_PUSH_NOTIFICATIONS )); } - #endif //OPENDHT_PROXY_SERVER - #if OPENDHT_PROXY_CLIENT - if (!params.proxyclient.empty()) { - dht->setProxyServer(params.proxyclient); - dht->enableProxy(true, params.devicekey); - } - #endif //OPENDHT_PROXY_CLIENT +#endif //OPENDHT_PROXY_SERVER if (params.daemonize or params.service) while (runner.wait()); -- GitLab