diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 1f8075daea946d2fd9d08995b37d3f795ae953bc..60da359c1a57d7d52ad0316ce6556de90ea6d5a7 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -300,10 +300,12 @@ private: SUBSCRIBE, RESUBSCRIBE, }; + using CacheValueCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values, bool expired, system_clock::time_point)>; + /** * Send Listen with httpClient_ */ - void sendListen(const restinio::http_request_header_t& header, const ValueCallback& cb, + void sendListen(const restinio::http_request_header_t& header, const CacheValueCallback& cb, const Sp<OperationState>& opstate, Listener& listener, ListenMethod method = ListenMethod::LISTEN); void handleResubscribe(const asio::error_code& ec, const InfoHash& key, const size_t token, std::shared_ptr<OperationState> opstate); diff --git a/include/opendht/utils.h b/include/opendht/utils.h index a8099547112bc3375d1f1fd6ac721462572b296a..20e585fb8a1565fdc5df1f8c3a20ed7ccab11bc5 100644 --- a/include/opendht/utils.h +++ b/include/opendht/utils.h @@ -77,6 +77,7 @@ public: // Time related definitions and utility functions using clock = std::chrono::steady_clock; +using system_clock = std::chrono::system_clock; using time_point = clock::time_point; using duration = clock::duration; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 881d6455678dbd3ae23c1b52062440e1a5869a7b..74ad6a95b30f7190750c72afdfd6cf9c172242ff 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -46,7 +46,7 @@ struct DhtProxyClient::Listener unsigned callbackId; OpValueCache cache; - ValueCallback cb; + CacheValueCallback cb; Sp<OperationState> opstate; std::shared_ptr<http::Request> request; std::unique_ptr<asio::steady_timer> refreshSubscriberTimer; @@ -761,7 +761,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt // Add cache callback auto opstate = std::make_shared<OperationState>(); l->second.opstate = opstate; - l->second.cb = [this,key,token,opstate](const std::vector<Sp<Value>>& values, bool expired){ + l->second.cb = [this,key,token,opstate](const std::vector<Sp<Value>>& values, bool expired, system_clock::time_point t){ if (opstate->stop) return false; std::lock_guard<std::mutex> lock(searchLock_); @@ -769,7 +769,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt if (s != searches_.end()) { auto l = s->second.listeners.find(token); if (l != s->second.listeners.end()) { - return l->second.cache.onValue(values, expired); + return l->second.cache.onValue(values, expired, t); } } return false; @@ -941,7 +941,7 @@ DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& void DhtProxyClient::sendListen(const restinio::http_request_header_t& header, - const ValueCallback& cb, + const CacheValueCallback& cb, const Sp<OperationState>& opstate, Listener& listener, ListenMethod method) { @@ -968,7 +968,7 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t& header, b.append(at, length); // one value per body line - while (b.getLine('\n') and !opstate->stop){ + while (b.getLine('\n') and !opstate->stop) { std::string err; Json::Value json; const auto& line = b.line(); @@ -986,7 +986,7 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t& header, { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([cb, value, opstate, expired]() { - if (not opstate->stop.load() and not cb({value}, expired)) + if (not opstate->stop.load() and not cb({value}, expired, system_clock::time_point::min())) opstate->stop.store(true); }); } @@ -1155,6 +1155,10 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string } } else { auto key = InfoHash(notification.at("key")); + system_clock::time_point sendTime = system_clock::time_point::min(); + try { + sendTime = system_clock::time_point(std::chrono::milliseconds(std::stoull(notification.at("t")))); + } catch (...) {} auto& search = searches_.at(key); for (auto& list : search.listeners) { if (list.second.opstate->stop) @@ -1167,12 +1171,12 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string if (expired == notification.end()) { auto cb = list.second.cb; auto oldValues = list.second.cache.getValues(); - get(key, [cb](const std::vector<Sp<Value>>& vals) { - return cb(vals, false); - }, [cb, oldValues](bool /*ok*/) { + get(key, [cb, sendTime](const std::vector<Sp<Value>>& vals) { + return cb(vals, false, sendTime); + }, [cb, oldValues, sendTime](bool /*ok*/) { // Decrement old values refcount to expire values not // present in the new list - cb(oldValues, true); + cb(oldValues, true, sendTime); }); } else { std::stringstream ss(expired->second); @@ -1184,7 +1188,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string } { std::lock_guard<std::mutex> lock(lockCallbacks_); - callbacks_.emplace_back([this, key, token, opstate, ids]() { + callbacks_.emplace_back([this, key, token, opstate, ids, sendTime]() { if (opstate->stop) return; std::lock_guard<std::mutex> lock(searchLock_); @@ -1194,7 +1198,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string auto l = s->second.listeners.find(token); if (l == s->second.listeners.end()) return; - if (not opstate->stop and not l->second.cache.onValuesExpired(ids)) + if (not opstate->stop and not l->second.cache.onValuesExpired(ids, sendTime)) opstate->stop = true; }); } diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index def4068ef68345963905f22a919fddf38cb1ce74..e0251d08c57d7b2c4113cef2087a241285d7fcff 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -705,6 +705,7 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, Json::Value json; json["key"] = infoHash.toString(); json["to"] = clientId; + json["t"] = std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count(); { std::lock_guard<std::mutex> l(sessionCtx->lock); json["s"] = sessionCtx->sessionId; diff --git a/src/op_cache.cpp b/src/op_cache.cpp index d995e46523a4974d0d129c82d89fb7edeb7d7992..386748da6692efaf8e24889e705b3244f188ff2c 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -22,7 +22,7 @@ namespace dht { constexpr const std::chrono::seconds OpCache::EXPIRATION; bool -OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { +OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals, const system_clock::time_point& t) { std::vector<Sp<Value>> newValues; for (const auto& v : vals) { auto viop = values.emplace(v->id, v); @@ -31,16 +31,21 @@ OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { } else { viop.first->second.refCount++; } + viop.first->second.updated = t; } return newValues.empty() ? true : callback(newValues, false); } bool -OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { +OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals, const system_clock::time_point& t) { std::vector<Sp<Value>> expiredValues; for (const auto& v : vals) { auto vit = values.find(v->id); if (vit != values.end()) { + if (vit->second.updated > t) + continue; + + vit->second.updated = t; vit->second.refCount--; if (not vit->second.refCount) { expiredValues.emplace_back(std::move(vit->second.data)); @@ -52,12 +57,16 @@ OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { } bool -OpValueCache::onValuesExpired(const std::vector<Value::Id>& vids) +OpValueCache::onValuesExpired(const std::vector<Value::Id>& vids, const system_clock::time_point& t) { std::vector<Sp<Value>> expiredValues; for (const auto& vid : vids) { auto vit = values.find(vid); if (vit != values.end()) { + if (vit->second.updated > t) + continue; + + vit->second.updated = t; vit->second.refCount--; if (not vit->second.refCount) { expiredValues.emplace_back(std::move(vit->second.data)); diff --git a/src/op_cache.h b/src/op_cache.h index 12e7486a48bbbeb20eb6c5bc4510ff6d27c59efd..9358f3427a25bec5f091f2e88f4bdfb7c9694d96 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -27,6 +27,7 @@ struct OpCacheValueStorage { Sp<Value> data {}; unsigned refCount {1}; + system_clock::time_point updated {system_clock::time_point::min()}; OpCacheValueStorage(Sp<Value> val) : data(val) {} }; @@ -47,16 +48,16 @@ public: }; } - bool onValue(const std::vector<Sp<Value>>& vals, bool expired) { + bool onValue(const std::vector<Sp<Value>>& vals, bool expired, const system_clock::time_point& t = system_clock::time_point::min()) { if (expired) - return onValuesExpired(vals); + return onValuesExpired(vals, t); else - return onValuesAdded(vals); + return onValuesAdded(vals, t); } - bool onValuesAdded(const std::vector<Sp<Value>>& vals); - bool onValuesExpired(const std::vector<Sp<Value>>& vals); - bool onValuesExpired(const std::vector<Value::Id>& vals); + bool onValuesAdded(const std::vector<Sp<Value>>& vals, const system_clock::time_point& t = system_clock::time_point::min()); + bool onValuesExpired(const std::vector<Sp<Value>>& vals, const system_clock::time_point& t = system_clock::time_point::min()); + bool onValuesExpired(const std::vector<Value::Id>& vals, const system_clock::time_point& t = system_clock::time_point::min()); void onNodeChanged(ListenSyncStatus status) { switch (status) {