diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h index 556f0de9a49159b6048e69bc6062017b5d283986..8303c410ed47f3c56a036e871ffffca2e4ef4268 100644 --- a/include/opendht/callbacks.h +++ b/include/opendht/callbacks.h @@ -163,6 +163,20 @@ struct OPENDHT_PUBLIC SecureDhtConfig bool cert_cache_all {false}; }; +enum class OPENDHT_PUBLIC PushNotificationResult: uint8_t { + /* success codes */ + PutRefresh, + ListenRefresh, + Values, + ValuesExpired, + /* ignored/error codes */ + IgnoredWrongSession = 0x80, + IgnoredNoOp, + IgnoredStopped, + IgnoredDisabled, + Error +}; + static constexpr size_t DEFAULT_STORAGE_LIMIT {1024 * 1024 * 64}; using ValuesExport = std::pair<InfoHash, Blob>; diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 6b51a210619c689443b6c6d8c9de7a45ba682dd4..b45fbab675a65400780060a0456dfd7ada8687e4 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -315,7 +315,9 @@ public: std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override; - void pushNotificationReceived(const std::map<std::string, std::string>&) override {} + PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>&) override { + return PushNotificationResult::IgnoredDisabled; + } void resubscribe(unsigned) {} private: diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index 57b946049e6a91e57587c78c29fb376b5058f6ee..783b0371db1db2932c0f03c0a95d74b029c6c6cf 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -262,7 +262,7 @@ public: * Call linked callback with a push notification * @param notification to process */ - virtual void pushNotificationReceived(const std::map<std::string, std::string>& data) = 0; + virtual PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>& data) = 0; protected: std::shared_ptr<Logger> logger_ {}; diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index f7dd0feadfdd9c23f58a0a488337b13251f73c58..21a9d81ded8f43a39f9721fb982eb778d81a6bf1 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -216,7 +216,7 @@ public: * Call linked callback with a push notification * @param notification to process */ - void pushNotificationReceived(const std::map<std::string, std::string>& notification) override; + PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>& notification) override; time_point periodic(const uint8_t*, size_t, SockAddr, const time_point& now) override; time_point periodic(const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override { diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index abfec2dbbdcd84760c440ac8e39715d25f3662b4..aa2d05d93e2c90a45bd53956810dc09c01e919d4 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -466,7 +466,7 @@ public: /** * Insert a push notification to process for OpenDHT */ - void pushNotificationReceived(const std::map<std::string, std::string>& data); + std::future<PushNotificationResult> pushNotificationReceived(const std::map<std::string, std::string>& data); /* Proxy server mothods */ void forwardAllMessages(bool forward); diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 094878d73e01913fab9bf4f2de0693f8fe768b3c..e057a72dd35a82af65f731e88dfdd440cdf2e156 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -328,8 +328,8 @@ public: * Call linked callback with push_notification * @param notification to process */ - void pushNotificationReceived(const std::map<std::string, std::string>& notification) override { - dht_->pushNotificationReceived(notification); + PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>& notification) override { + return dht_->pushNotificationReceived(notification); } void setLogger(const Logger& logger) override { diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 9d8849961d2a0e15860bc72485ab4e67d59f9d92..af579201a7b82b4af8cec21856d06f7390fb55cc 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -1233,9 +1233,10 @@ DhtProxyClient::restartListeners(const asio::error_code &ec) } } -void +PushNotificationResult DhtProxyClient::pushNotificationReceived([[maybe_unused]] const std::map<std::string, std::string>& notification) { + auto ret = PushNotificationResult::IgnoredNoOp; #ifdef OPENDHT_PUSH_NOTIFICATIONS { // If a push notification is received, the proxy is up and running @@ -1252,7 +1253,7 @@ DhtProxyClient::pushNotificationReceived([[maybe_unused]] const std::map<std::st if (sessionId != notification.end() and sessionId->second != pushSessionId_) { if (logger_) logger_->d("[proxy:client] [push] ignoring push for other session"); - return; + return PushNotificationResult::IgnoredWrongSession; } std::lock_guard<std::mutex> lock(searchLock_); auto timeout = notification.find("timeout"); @@ -1269,10 +1270,12 @@ DhtProxyClient::pushNotificationReceived([[maybe_unused]] const std::map<std::st else put.refreshPutTimer->expires_at(std::chrono::steady_clock::now()); put.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, vid)); + ret = PushNotificationResult::PutRefresh; } else { // Refresh listen for (auto& list : search.listeners) resubscribe(key, list.first, list.second); + ret = search.listeners.empty() ? PushNotificationResult::IgnoredNoOp : PushNotificationResult::ListenRefresh; } } else { auto key = InfoHash(notification.at("key")); @@ -1281,12 +1284,12 @@ DhtProxyClient::pushNotificationReceived([[maybe_unused]] const std::map<std::st sendTime = system_clock::time_point(std::chrono::milliseconds(std::stoull(notification.at("t")))); } catch (...) {} auto& search = searches_.at(key); + auto expired = notification.find("exp"); for (auto& list : search.listeners) { if (list.second.opstate->stop) continue; if (logger_) logger_->d("[proxy:client] [push] [search %s] received", key.to_c_str()); - auto expired = notification.find("exp"); auto token = list.first; auto opstate = list.second.opstate; if (expired == notification.end()) { @@ -1299,6 +1302,7 @@ DhtProxyClient::pushNotificationReceived([[maybe_unused]] const std::map<std::st // present in the new list cb(oldValues, true, sendTime); }); + ret = PushNotificationResult::Values; } else { std::stringstream ss(expired->second); std::vector<Value::Id> ids; @@ -1324,16 +1328,21 @@ DhtProxyClient::pushNotificationReceived([[maybe_unused]] const std::map<std::st }); } launchLoop = true; + ret = PushNotificationResult::ValuesExpired; } } } + } catch (const std::out_of_range& e) { + ret = PushNotificationResult::IgnoredNoOp; } catch (const std::exception& e) { if (logger_) logger_->e("[proxy:client] [push] receive error: %s", e.what()); + ret = PushNotificationResult::Error; } if (launchLoop) loopSignal_(); #endif + return ret; } void diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index e1965d273fb70250faa1b6cb458b79fb96cddb90..4c1e30401352bb8efb6b050e0a2e2437d92670e8 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -1186,18 +1186,25 @@ DhtRunner::setPushNotificationPlatform(const std::string& platform) { #endif } -void -DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& data) +std::future<PushNotificationResult> +DhtRunner::pushNotificationReceived([[maybe_unused]] const std::map<std::string, std::string>& data) { #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS) + auto ret_token = std::make_shared<std::promise<PushNotificationResult>>(); + auto future = ret_token->get_future(); std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops_prio.emplace([=](SecureDht&) { + pending_ops_prio.emplace([ret_token, this, data](SecureDht&) { if (dht_) - dht_->pushNotificationReceived(data); + ret_token->set_value(dht_->pushNotificationReceived(data)); + else + ret_token->set_value(PushNotificationResult::IgnoredStopped); }); cv.notify_all(); + return future; #else - (void) data; + std::promise<PushNotificationResult> p {}; + p.set_value(PushNotificationResult::IgnoredDisabled); + return p.get_future(); #endif }