diff --git a/configure.ac b/configure.ac index aa279477826cafb8bea40b02e8f7272ef9cc27a9..85d848d81f571ac6029c3bb5cceac271706826c5 100644 --- a/configure.ac +++ b/configure.ac @@ -140,7 +140,7 @@ AM_CONDITIONAL(ENABLE_PROXY_CLIENT, test x$proxy_client == xyes) AM_COND_IF([ENABLE_PROXY_SERVER], [ AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files])) - PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.4]) + PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.2]) CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=true -ljsoncpp -lrestbed" ], [ CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=false" diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 2bc18b5bb7162c7a4151d4d15d511ff0c34ed368..ab04113421245a5327e2685e9d3462a24fdd44d3 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -73,7 +73,7 @@ public: #if OPENDHT_PROXY_CLIENT - void startProxy(const std::string&) {}; + void startProxy(const std::string&, const std::string&) {}; #endif /** @@ -295,6 +295,17 @@ public: std::vector<SockAddr> getPublicAddress(sa_family_t family = 0); +#if OPENDHT_PUSH_NOTIFICATIONS + /** + * Call linked callback with a push notification + * @param notification to process + */ + void pushNotificationReceived(const Json::Value&) { + // Ignore this + } + void resubscribe(const unsigned) {} +#endif // OPENDHT_PUSH_NOTIFICATIONS + private: /* When performing a search, we search for up to SEARCH_NODES closest nodes diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index d981745326c6fa51f5861fb5132cf27fe9ca0a4f..d6243769028610082e224bc3eab0d49dbc4a5b00 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -29,7 +29,7 @@ public: virtual ~DhtInterface() = default; #if OPENDHT_PROXY_CLIENT - virtual void startProxy(const std::string& host) = 0; + virtual void startProxy(const std::string& host, const std::string& deviceKey = "") = 0; #endif // [[deprecated]] @@ -231,6 +231,19 @@ public: virtual void setLogFilter(const InfoHash& f) { DHT_LOG.setFilter(f); } + +#if OPENDHT_PUSH_NOTIFICATIONS + /** + * Call linked callback with a push notification + * @param notification to process + */ + virtual void pushNotificationReceived(const Json::Value& notification) = 0; + /** + * Refresh a listen via a token + * @param token + */ + virtual void resubscribe(const unsigned token) = 0; +#endif // OPENDHT_PUSH_NOTIFICATIONS protected: bool logFilerEnable_ {}; InfoHash logFiler_ {}; diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index adeff41077d0e5c2b6661ccbd8eef38d688b57bc..0cd9bafca513c09c9e894ec9161722e9a0a7b784 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -44,13 +44,15 @@ public: /** * Initialise the DhtProxyClient with two open sockets (for IPv4 and IP6) * and an ID for the node. + * @param serverHost the proxy address */ explicit DhtProxyClient(const std::string& serverHost); /** * Start the connection with a server. * @param serverHost the server address + * @param deviceKey if we use push notifications */ - void startProxy(const std::string& serverHost); + void startProxy(const std::string& serverHost, const std::string& deviceKey = ""); virtual ~DhtProxyClient(); @@ -101,6 +103,8 @@ public: get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } + void get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain); + /** * Announce a value on all available protocols (IPv4, IPv6). * @@ -162,18 +166,26 @@ public: virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } + virtual bool cancelListen(const InfoHash&, size_t token); + +#if OPENDHT_PUSH_NOTIFICATIONS + /** + * Call linked callback with a push notification + * @param notification to process + */ + void pushNotificationReceived(const Json::Value& notification); + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(const unsigned token); +#endif // OPENDHT_PUSH_NOTIFICATIONS time_point periodic(const uint8_t*, size_t, const SockAddr&); time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { return periodic(buf, buflen, SockAddr(from, fromlen)); } - /** - * TODO - * NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods. - * It will come in another version. (with push_notifications support) - */ - virtual bool cancelListen(const InfoHash&, size_t token); /** * Similar to Dht::get, but sends a Query to filter data remotely. @@ -270,6 +282,7 @@ private: GetCallback cb; Value::Filter filterChain; std::thread thread; + std::shared_ptr<unsigned> pushNotifToken; // NOTE: unused if not using push notifications }; std::vector<Listener> listeners_; size_t listener_token_ {0}; @@ -313,6 +326,19 @@ private: * Store the current proxy status */ std::unique_ptr<Json::Value> currentProxyInfos_; + + /** + * If we want to use push notifications by default. + * NOTE: empty by default to avoid to use services like FCM or APN. + */ + std::string deviceKey_ {}; + unsigned callbackId_ {0}; + std::mutex lockCallback_; + +#if OPENDHT_PUSH_NOTIFICATIONS + void fillBodyToGetToken(std::shared_ptr<restbed::Request> request); +#endif // OPENDHT_PUSH_NOTIFICATIONS + }; } diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index d7f573e56b1a7f2f7dfe3fe9ef56dc5236751ffa..f65a69856419c4904c25df0e3f51ffcbdeeea852 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -381,12 +381,30 @@ public: void setProxyServer(const std::string& url = "127.0.0.1:8000") { config_.proxy_server = url; } - void enableProxy(bool proxify); + /** + * Start or stop the proxy + * @param proxify if we want to use the proxy + * @param deviceKey non empty to enable push notifications + */ + void enableProxy(bool proxify, const std::string& deviceKey = ""); #endif // OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_SERVER void forwardAllMessages(bool forward); #endif // OPENDHT_PROXY_SERVER +#if OPENDHT_PUSH_NOTIFICATIONS + /** + * Insert a push notification to process for OpenDHT + */ + void pushNotificationReceived(const std::string& notification) const; + void pushNotificationReceived(const Json::Value& notification) const; + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(const unsigned token); +#endif // OPENDHT_PUSH_NOTIFICATIONS + private: static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10}; diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index a4ff6ec4a0b0342bdb66560f9312817c704031e8..ea7332373541b90a45c9df45510227895bb143b5 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -294,8 +294,8 @@ public: } #if OPENDHT_PROXY_CLIENT - void startProxy(const std::string& host) { - dht_->startProxy(host); + void startProxy(const std::string& host, const std::string& deviceKey = "") { + dht_->startProxy(host, deviceKey); } #endif @@ -305,6 +305,23 @@ public: } #endif //OPENDHT_PROXY_SERVER +#if OPENDHT_PUSH_NOTIFICATIONS + /** + * Call linked callback with push_notification + * @param notification to process + */ + void pushNotificationReceived(const Json::Value& notification) { + dht_->pushNotificationReceived(notification); + } + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(const unsigned token) { + dht_->resubscribe(token); + } +#endif // OPENDHT_PUSH_NOTIFICATIONS + private: std::unique_ptr<DhtInterface> dht_; // prevent copy diff --git a/include/opendht/sockaddr.h b/include/opendht/sockaddr.h index 37efa2d92c7d5e24a6a51469007cc42809f69994..3aa273d3f72ea82f0e605270f45141e3155b18d7 100644 --- a/include/opendht/sockaddr.h +++ b/include/opendht/sockaddr.h @@ -28,6 +28,8 @@ typedef uint16_t in_port_t; #endif #else #include <iso646.h> +#include <stdint.h> +#include <winsock2.h> #include <ws2def.h> #include <ws2tcpip.h> typedef uint16_t sa_family_t; @@ -37,6 +39,7 @@ typedef uint16_t in_port_t; #include <string> #include <memory> #include <vector> +#include <stdlib.h> #include <cstring> #include <cstddef> @@ -123,7 +126,7 @@ public: } if (new_length != len) { len = new_length; - if (len) addr.reset((sockaddr*)std::calloc(len, 1)); + if (len) addr.reset((sockaddr*)::calloc(len, 1)); else addr.reset(); } if (len > sizeof(sa_family_t)) @@ -239,13 +242,13 @@ public: }; private: socklen_t len {0}; - struct free_delete { void operator()(void* p) { std::free(p); } }; + struct free_delete { void operator()(void* p) { ::free(p); } }; std::unique_ptr<sockaddr, free_delete> addr {}; void set(const sockaddr* sa, socklen_t length) { if (len != length) { len = length; - if (len) addr.reset((sockaddr*)std::malloc(len)); + if (len) addr.reset((sockaddr*)::malloc(len)); else addr.reset(); } if (len) diff --git a/src/Makefile.am b/src/Makefile.am index 4ff14741cfdbfd909b1966869eb8cb623d09a111..d21212a352de914925bf8c921753c0e09b93e63b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -59,6 +59,11 @@ libopendht_la_SOURCES += base64.h base64.cpp dht_proxy_server.cpp nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h endif +if ENABLE_PROXY_CLIENT +libopendht_la_SOURCES += dht_proxy_client.cpp +nobase_include_HEADERS += ../include/opendht/dht_proxy_client.h ../include/opendht/dht_interface.h +endif + clean-local: rm -rf libargon2.la diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 31db722bc248fd732615a688f8a760a5d249367b..884cac9eb83794aaec2f81c7b7916eec888252bc 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -56,10 +56,11 @@ DhtProxyClient::confirmProxy() } void -DhtProxyClient::startProxy(const std::string& serverHost) +DhtProxyClient::startProxy(const std::string& serverHost, const std::string& deviceKey) { serverHost_ = serverHost; if (serverHost_.empty()) return; + deviceKey_ = deviceKey; auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5); nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this)); auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5); @@ -182,13 +183,10 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) } void -DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, - Value::Filter&& filter, Where&& where) +DhtProxyClient::get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain) { restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); auto req = std::make_shared<restbed::Request>(uri); - Query query {{}, where}; - auto filterChain = filter.chain(query.where.getFilter()); auto finished = std::make_shared<std::atomic_bool>(false); Operation o; @@ -223,7 +221,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, if ((not filterChain or filterChain(*value)) && cb) { std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([cb, value, finished]() { - if (not *finished and not cb({value})) + if (not cb({value})) *finished = true; }); } @@ -254,6 +252,15 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, } } +void +DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, + Value::Filter&& filter, Where&& where) +{ + Query query {{}, where}; + auto filterChain = filter.chain(query.where.getFilter()); + get(key, cb, donecb, filterChain); +} + void DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point, bool permanent) { @@ -391,22 +398,37 @@ DhtProxyClient::getPublicAddress(sa_family_t family) if (public_ip.length() > endIp + 2) { port = public_ip.substr(endIp + 2); auto ips = public_ip.substr(1, endIp - 1); - auto ipv4And6Separator = ips.find_last_of(':'); - ipv4Address = ips.substr(ipv4And6Separator + 1); - ipv6Address = ips.substr(0, ipv4And6Separator - 1); + if (ips.find(".") != std::string::npos) { + // Format: [ipv6:ipv4]:service + auto ipv4And6Separator = ips.find_last_of(':'); + ipv4Address = ips.substr(ipv4And6Separator + 1); + ipv6Address = ips.substr(0, ipv4And6Separator - 1); + } else { + // Format: [ipv6]:service + ipv6Address = ips; + } } } else { + // Format: ipv4:service auto endIp = public_ip.find_last_of(':'); port = public_ip.substr(endIp + 1); ipv4Address = public_ip.substr(0, endIp - 1); } - switch (family) - { - case AF_INET: - return SockAddr::resolve(ipv4Address, port); - case AF_INET6: - return SockAddr::resolve(ipv6Address, port); - default: + try { + switch (family) + { + case AF_INET: { + auto result = SockAddr::resolve(ipv4Address, port); + return result; + } + case AF_INET6: { + auto result = SockAddr::resolve(ipv6Address, port); + return result; + } + default: + return {}; + } + } catch (std::invalid_argument& e) { return {}; } } @@ -416,10 +438,11 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt { restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); auto req = std::make_shared<restbed::Request>(uri); - req->set_method("LISTEN"); + req->set_method(deviceKey_.empty() ? "LISTEN" : "SUBSCRIBE"); Query query {{}, where}; auto filterChain = filter.chain(query.where.getFilter()); + auto pushNotifToken = std::make_shared<unsigned>(0); Listener l; ++listener_token_; @@ -427,12 +450,16 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt l.token = listener_token_; l.req = req; l.cb = cb; + l.pushNotifToken = pushNotifToken; l.filterChain = std::move(filterChain); l.thread = std::thread([=]() { auto settings = std::make_shared<restbed::Settings>(); - std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); - settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + if (deviceKey_.empty()) { + std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + } else + fillBodyToGetToken(req); struct State { std::atomic_bool ok {true}; @@ -440,38 +467,53 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt }; auto state = std::make_shared<State>(); restbed::Http::async(req, - [this, filterChain, cb, state](const std::shared_ptr<restbed::Request>& req, - const std::shared_ptr<restbed::Response>& reply) { + [this, filterChain, cb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); - if (code == 200) { try { - while (restbed::Http::is_open(req) and not state->cancel) { + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (!deviceKey_.empty()) { restbed::Http::fetch("\n", reply); if (state->cancel) - break; + return; std::string body; reply->get_body(body); - reply->set_body(""); // Reset the body for the next fetch - Json::Value json; - std::string err; - Json::CharReaderBuilder rbuilder; auto* char_data = reinterpret_cast<const char*>(&body[0]); - auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); if (reader->parse(char_data, char_data + body.size(), &json, &err)) { - auto value = std::make_shared<Value>(json); - if ((not filterChain or filterChain(*value)) && cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, state]() { - if (not state->cancel and not cb({value})) { - state->cancel = true; - } - }); - } + if (!json.isMember("token")) return; + *pushNotifToken = json["token"].asLargestUInt(); } else { state->ok = false; } + } else { + while (restbed::Http::is_open(req) and not state->cancel) { + restbed::Http::fetch("\n", reply); + if (state->cancel) + break; + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + auto* char_data = reinterpret_cast<const char*>(&body[0]); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + auto value = std::make_shared<Value>(json); + if ((not filterChain or filterChain(*value)) && cb) { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state]() { + if (not state->cancel and not cb({value})) { + state->cancel = true; + } + }); + } + } else { + state->ok = false; + } + } } } catch (std::runtime_error&) { state->ok = false; @@ -499,13 +541,32 @@ DhtProxyClient::cancelListen(const InfoHash&, size_t token) for (auto it = listeners_.begin(); it != listeners_.end(); ++it) { auto& listener = *it; if (listener.token == token) { - if (listener.thread.joinable()) { - // Close connection to stop listener? - if (listener.req) - restbed::Http::close(listener.req); - listener.thread.join(); + if (!deviceKey_.empty()) { + // First, be sure to have a token + if (listener.thread.joinable()) { + listener.thread.join(); + } + // UNSUBSCRIBE + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("UNSUBSCRIBE"); + restbed::Http::async(req, + [](const std::shared_ptr<restbed::Request>&, + const std::shared_ptr<restbed::Response>&){} + ); + // And remove listeners_.erase(it); return true; + } else { + // Just stop the request + if (listener.thread.joinable()) { + // Close connection to stop listener? + if (listener.req) + restbed::Http::close(listener.req); + listener.thread.join(); + listeners_.erase(it); + return true; + } } } } @@ -559,9 +620,10 @@ DhtProxyClient::restartListeners() std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + auto ok = std::make_shared<std::atomic_bool>(true); restbed::Http::async(req, - [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req, - const std::shared_ptr<restbed::Response>& reply) { + [this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); if (code == 200) { @@ -598,18 +660,119 @@ DhtProxyClient::restartListeners() } catch (std::runtime_error&) { // NOTE: Http::close() can occurs here. Ignore this. } - } else { - this->statusIpv4_ = NodeStatus::Disconnected; - this->statusIpv6_ = NodeStatus::Disconnected; + *ok = false; } }, settings).get(); - getConnectivityStatus(); + if (!ok) getConnectivityStatus(); } ); } } +#if OPENDHT_PUSH_NOTIFICATIONS +void +DhtProxyClient::pushNotificationReceived(const Json::Value& notification) +{ + if (!notification.isMember("token")) return; + auto token = notification["token"].asLargestUInt(); + // Find listener + for (const auto& listener: listeners_) + if (*(listener.pushNotifToken) == token) { + if (notification.isMember("timeout")) { + // A timeout has occured, we need to relaunch the listener + resubscribe(token); + } else { + // Wake up daemon and get values + get(InfoHash(listener.key), listener.cb, {}, listener.filterChain); + } + } +} + +void +DhtProxyClient::resubscribe(const unsigned token) +{ + if (deviceKey_.empty()) return; + for (auto& listener: listeners_) { + if (*(listener.pushNotifToken) == token) { + // Subscribe + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("SUBSCRIBE"); + + auto pushNotifToken = std::make_shared<unsigned>(0); + + if (listener.thread.joinable()) + listener.thread.join(); + listener.req = req; + listener.pushNotifToken = pushNotifToken; + listener.thread = std::thread([=]() + { + fillBodyToGetToken(req); + auto settings = std::make_shared<restbed::Settings>(); + auto ok = std::make_shared<std::atomic_bool>(true); + restbed::Http::async(req, + [this, pushNotifToken, ok](const std::shared_ptr<restbed::Request>&, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + if (code == 200) { + try { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast<const char*>(&body[0]); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + if (!json.isMember("token")) return; + *pushNotifToken = json["token"].asLargestUInt(); + } + } catch (std::runtime_error&) { + // NOTE: Http::close() can occurs here. Ignore this. + } + } else { + *ok = false; + } + }, settings).get(); + if (!ok) getConnectivityStatus(); + }); + } + } +} + +void +DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) +{ + // Fill body with + // { + // "key":"device_key", + // "callback_id": xxx + // } + Json::Value body; + body["key"] = deviceKey_; + { + std::lock_guard<std::mutex> lock(lockCallback_); + callbackId_ += 1; + body["callback_id"] = callbackId_; + } +#ifdef __ANDROID__ + body["isAndroid"] = true; +#endif +#ifdef __APPLE__ + body["isAndroid"] = false; +#endif + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto content = Json::writeString(wbuilder, body) + "\n"; + std::replace(content.begin(), content.end(), '\n', ' '); + req->set_body(content); + req->set_header("Content-Length", std::to_string(content.size())); +} +#endif // OPENDHT_PUSH_NOTIFICATIONS } // namespace dht diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index a1aeff9e595d06c17a4f597dba939897b2f4c936..b09f4e27b7d1e39983053d715f2b3bb346d1b8d8 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -420,9 +420,9 @@ DhtProxyServer::handlePushListeners() auto callbackId = pushListener->callbackId; auto isAndroid = pushListener->isAndroid; auto internalToken = std::move(dht_->listen(pushListener->hash, - [this, key, callbackId, token, isAndroid](std::shared_ptr<Value> value) { + [this, key, callbackId, token, isAndroid](std::shared_ptr<Value> /*value*/) { // Build message content. - auto json = value->toJson(); + Json::Value json; if (callbackId > 0) { json["callback_id"] = callbackId; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 6a1fab5fa9ec1e3be8dad92df4cf917024bb2b07..e13c759b6739d2c8db665bbdaeb8b638d3c89174 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -861,7 +861,7 @@ DhtRunner::activeDht() const #if OPENDHT_PROXY_CLIENT void -DhtRunner::enableProxy(bool proxify) { +DhtRunner::enableProxy(bool proxify, const std::string& deviceKey) { if (!dht_via_proxy_) { auto dht_via_proxy = std::unique_ptr<DhtInterface>( new DhtProxyClient(config_.proxy_server) @@ -870,7 +870,7 @@ DhtRunner::enableProxy(bool proxify) { } if (proxify) { // Init the proxy client - dht_via_proxy_->startProxy(config_.proxy_server); + dht_via_proxy_->startProxy(config_.proxy_server, deviceKey); // add current listeners for (auto& listener: listeners_) { auto tokenProxy = dht_via_proxy_->listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w)); @@ -911,4 +911,33 @@ DhtRunner::forwardAllMessages(bool forward) dht_->forwardAllMessages(forward); } #endif // OPENDHT_PROXY_SERVER + +#if OPENDHT_PUSH_NOTIFICATIONS && OPENDHT_PROXY_CLIENT +void +DhtRunner::pushNotificationReceived(const std::string& notification) const +{ + try { + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast<const char*>(¬ification[0]); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + notification.size(), &root, &err)) + pushNotificationReceived(root); + } catch (...) { } +} + +void +DhtRunner::pushNotificationReceived(const Json::Value& notification) const +{ + dht_via_proxy_->pushNotificationReceived(notification); +} + +void +DhtRunner::resubscribe(const unsigned token) +{ + dht_via_proxy_->resubscribe(token); +} + +#endif // OPENDHT_PUSH_NOTIFICATIONS && OPENDHT_PROXY_CLIENT } diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index e74cad78f0b6a7b26301503406a074b8ec956e24..348e625b00a0961c51136e2ef1ab05b7919302fd 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -70,8 +70,16 @@ void print_help() { #if OPENDHT_PROXY_CLIENT std::cout << std::endl << "Operations with the proxy:" << std::endl - << " stt [server_address] Start the proxy client." << std::endl - << " stp Stop the proxy client." << std::endl; +#if OPENDHT_PUSH_NOTIFICATIONS + << " stt [server_address] <device_key> Start the proxy client." << std::endl +#else + << " stt [server_address] Start the proxy client." << std::endl +#endif // OPENDHT_PUSH_NOTIFICATIONS +#if OPENDHT_PUSH_NOTIFICATIONS + << " rs [token] Resubscribe to opendht." << std::endl + << " rp [push_notification] Inject a push notification in Opendht." << std::endl +#endif // OPENDHT_PUSH_NOTIFICATIONS + << " stp Stop the proxy client." << std::endl; #endif //OPENDHT_PROXY_CLIENT std::cout << std::endl << "Operations on the DHT:" << std::endl @@ -91,7 +99,11 @@ void print_help() { << std::endl; } -void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) +void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params +#if OPENDHT_PROXY_SERVER + , std::map<in_port_t, std::unique_ptr<DhtProxyServer>>& proxies +#endif +) { print_node_info(dht, params); std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl << std::endl; @@ -102,22 +114,6 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) #endif std::map<std::string, indexation::Pht> indexes; -#if OPENDHT_PROXY_SERVER - std::map<in_port_t, std::unique_ptr<DhtProxyServer>> proxies; - if (params.proxyserver != 0) { - proxies.emplace(params.proxyserver, new DhtProxyServer(dht, params.proxyserver -#if OPENDHT_PUSH_NOTIFICATIONS - , params.pushserver -#endif // OPENDHT_PUSH_NOTIFICATIONS - )); - } -#endif //OPENDHT_PROXY_SERVER -#if OPENDHT_PROXY_CLIENT - if (!params.proxyclient.empty()) { - dht->setProxyServer(params.proxyclient); - dht->enableProxy(true); - } -#endif //OPENDHT_PROXY_CLIENT while (true) { @@ -127,7 +123,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) break; std::istringstream iss(line); - std::string op, idstr, value, index, keystr, pushServer; + std::string op, idstr, value, index, keystr, pushServer, deviceKey; iss >> op; if (op == "x" || op == "exit" || op == "quit") { @@ -223,14 +219,28 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) #endif //OPENDHT_PROXY_SERVER #if OPENDHT_PROXY_CLIENT else if (op == "stt") { - iss >> idstr; + iss >> idstr >> deviceKey; dht->setProxyServer(idstr); - dht->enableProxy(true); + dht->enableProxy(true, deviceKey); continue; } else if (op == "stp") { dht->enableProxy(false); continue; } +#if OPENDHT_PUSH_NOTIFICATIONS + else if (op == "rp") { + iss >> value; + dht->pushNotificationReceived(value); + continue; + } else if (op == "re") { + iss >> value; + try { + unsigned token = std::stoul(value); + dht->resubscribe(token); + } catch (...) { } + continue; + } +#endif // OPENDHT_PUSH_NOTIFICATIONS #endif //OPENDHT_PROXY_CLIENT if (op.empty()) @@ -459,6 +469,7 @@ main(int argc, char **argv) print_usage(); return 0; } + if (params.daemonize) { daemonize(); } else if (params.service) { @@ -487,11 +498,31 @@ main(int argc, char **argv) dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); } - if (params.daemonize or params.service) { + #if OPENDHT_PROXY_SERVER + std::map<in_port_t, std::unique_ptr<DhtProxyServer>> proxies; + if (params.proxyserver != 0) { + proxies.emplace(params.proxyserver, new DhtProxyServer(dht, params.proxyserver + #if OPENDHT_PUSH_NOTIFICATIONS + , params.pushserver + #endif // OPENDHT_PUSH_NOTIFICATIONS + )); + } + #endif //OPENDHT_PROXY_SERVER + #if OPENDHT_PROXY_CLIENT + if (!params.proxyclient.empty()) { + dht->setProxyServer(params.proxyclient); + dht->enableProxy(true, params.devicekey); + } + #endif //OPENDHT_PROXY_CLIENT + + if (params.daemonize or params.service) while (runner.wait()); - } else { - cmd_loop(dht, params); - } + else + cmd_loop(dht, params +#if OPENDHT_PROXY_SERVER + , proxies +#endif + ); } catch(const std::exception&e) { std::cerr << std::endl << e.what() << std::endl; diff --git a/tools/tools_common.h b/tools/tools_common.h index ba864294e018596e45364396d999b2a5aa38da48..791a8f11a71110f142b200e2d037c31e61917af6 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -123,6 +123,7 @@ struct dht_params { in_port_t proxyserver {0}; std::string proxyclient {}; std::string pushserver {}; + std::string devicekey {}; }; static const constexpr struct option long_options[] = { @@ -139,6 +140,7 @@ static const constexpr struct option long_options[] = { {"proxyserver",required_argument, nullptr, 'S'}, {"proxyclient",required_argument, nullptr, 'C'}, {"pushserver", required_argument, nullptr, 'P'}, + {"devicekey", required_argument, nullptr, 'D'}, {nullptr, 0 , nullptr, 0} }; @@ -170,6 +172,9 @@ parseArgs(int argc, char **argv) { case 'C': params.proxyclient = optarg; break; + case 'D': + params.devicekey = optarg; + break; case 'n': params.network = strtoul(optarg, nullptr, 0); break;