diff --git a/src/dht.cpp b/src/dht.cpp index 3a97ea60586a243aa8f48d507d23aee4f0b355db..8dd9fdb0700e67544b670f480370f665c710109d 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -437,22 +437,18 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { if (not hasValue or seq_no < a.value->seq) { DHT_LOG.d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = std::make_pair(network_engine.sendAnnounceValue(sn->node, - sr->id, - a.value, - a.permanent ? time_point::max() : a.created, - sn->token, - onDone, - onExpired), next_refresh_time); + auto created = a.permanent ? time_point::max() : a.created; + sn->acked[a.value->id] = { + network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired), + next_refresh_time + }; } else if (hasValue and a.permanent) { DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = std::make_pair(network_engine.sendRefreshValue(sn->node, - sr->id, - a.value->id, - sn->token, - onDone, - onExpired), next_refresh_time); + sn->acked[a.value->id] = { + network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, onExpired), + next_refresh_time + }; } else { DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); @@ -540,6 +536,13 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) l->second.get_cb(l->second.filter.filter(values), expired); } } + }, [ws,list_token] (ListenSyncStatus status) { + if (auto sr = ws.lock()) { + auto l = sr->listeners.find(list_token); + if (l != sr->listeners.end()) { + l->second.sync_cb(status); + } + } } }).first; auto node = n.node; @@ -557,8 +560,10 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) { /* on done */ if (auto sr = ws.lock()) { scheduler.edit(sr->nextSearchStep, scheduler.time()); - if (auto sn = sr->getNode(req.node)) + if (auto sn = sr->getNode(req.node)) { scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr)); + sn->onListenSynced(query); + } onListenDone(req.node, answer, sr); } }, @@ -2275,9 +2280,7 @@ Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t soc } void -Dht::onListenDone(const Sp<Node>& node, - net::RequestAnswer& answer, - Sp<Search>& sr) +Dht::onListenDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr) { // DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation", // sr->id.toString().c_str(), node->toString().c_str(), answer.values.size()); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index f36e78071b24a62a4e65205588bdd6656a026ede..2d112bb445e49d68cd0978aa27a67416bd1d3e91 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -668,7 +668,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt it = searches_.emplace(key, ProxySearch{}).first; } auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb){ + auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb, SyncCallback /*scb*/){ return doListen(key, vcb, filter); }); return token; diff --git a/src/listener.h b/src/listener.h index 4a057edfadd58839fbdd10d972b1809fdb694a16..bae2f570dbc7f91c42d0f0b65d7455c0e92bb7d2 100644 --- a/src/listener.h +++ b/src/listener.h @@ -48,11 +48,4 @@ struct LocalListener { ValueCallback get_cb; }; - -struct SearchListener { - Sp<Query> query; - Value::Filter filter; - ValueCallback get_cb; -}; - } diff --git a/src/op_cache.cpp b/src/op_cache.cpp index 772531c3b631d3bf4bf3e0949e2fd37a12a05fc1..e1055152051508cc59b1d38f9665d7e25087086a 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -50,7 +50,7 @@ OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { return expiredValues.empty() ? true : callback(expiredValues, true); } std::vector<Sp<Value>> -OpValueCache::get(Value::Filter& filter) const { +OpValueCache::get(const Value::Filter& filter) const { std::vector<Sp<Value>> ret; if (not filter) ret.reserve(values.size()); @@ -109,7 +109,7 @@ OpCache::getExpiration() const { } size_t -SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen) +SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)> onListen) { // find exact match auto op = ops.find(q); @@ -128,6 +128,8 @@ SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std auto& cache = *op->second; cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){ return cache.onValue(values, expired); + }, [&](ListenSyncStatus status) { + cache.onNodeChanged(status); }); } auto token = nextToken_++; @@ -177,7 +179,7 @@ SearchCache::expire(const time_point& now, std::function<void(size_t)> onCancel) } std::vector<Sp<Value>> -SearchCache::get(Value::Filter& filter) const { +SearchCache::get(const Value::Filter& filter) const { if (ops.size() == 1) return ops.begin()->second->get(filter); std::map<Value::Id, Sp<Value>> c; diff --git a/src/op_cache.h b/src/op_cache.h index 51dd89e92547954ce7fde39ff9425e9559b73562..15e134dbbfb46d7d3bee66f8f55239e9b3c74ab3 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -57,7 +57,18 @@ public: bool onValuesAdded(const std::vector<Sp<Value>>& vals); bool onValuesExpired(const std::vector<Sp<Value>>& vals); - std::vector<Sp<Value>> get(Value::Filter& filter) const; + void onNodeChanged(ListenSyncStatus status) { + switch (status) { + case ListenSyncStatus::ADDED: nodes++; break; + case ListenSyncStatus::REMOVED: nodes--; break; + case ListenSyncStatus::SYNCED : syncedNodes++; break; + case ListenSyncStatus::UNSYNCED: syncedNodes--; break; + } + } + + bool isSynced() const { return nodes > 0 and syncedNodes == nodes; } + + std::vector<Sp<Value>> get(const Value::Filter& filter) const; Sp<Value> get(Value::Id id) const; std::vector<Sp<Value>> getValues() const; @@ -65,6 +76,8 @@ private: OpValueCache(const OpValueCache&) = delete; OpValueCache& operator=(const OpValueCache&) = delete; + size_t nodes {0}; + size_t syncedNodes {0}; std::map<Value::Id, OpCacheValueStorage> values {}; ValueCallback callback; }; @@ -83,6 +96,9 @@ public: cache.onValue(vals, expired); return not listeners.empty(); } + void onNodeChanged(ListenSyncStatus status) { + cache.onNodeChanged(status); + } void onValuesAdded(const std::vector<Sp<Value>>& vals); void onValuesExpired(const std::vector<Sp<Value>>& vals); @@ -112,7 +128,7 @@ public: return listeners.empty(); } - std::vector<Sp<Value>> get(Value::Filter& filter) const { + std::vector<Sp<Value>> get(const Value::Filter& filter) const { return cache.get(filter); } @@ -120,6 +136,10 @@ public: return cache.get(id); } + bool isSynced() const { + return cache.isSynced(); + } + bool isExpired(const time_point& now) const { return listeners.empty() and (lastRemoved + EXPIRATION < now); } @@ -140,7 +160,7 @@ class SearchCache { public: SearchCache() {} SearchCache(SearchCache&&) = default; - size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen); + size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)> onListen); bool cancelListen(size_t gtoken, const time_point& now); void cancelAll(std::function<void(size_t)> onCancel); @@ -150,7 +170,7 @@ public: return nextExpiration_; } - std::vector<Sp<Value>> get(Value::Filter& filter) const; + std::vector<Sp<Value>> get(const Value::Filter& filter) const; Sp<Value> get(Value::Id id) const; private: diff --git a/src/search.h b/src/search.h index 00621809096e53f760f01aa86891fa0c2b166a95..0cbb72e1b3bdb07e8b0f3ad33d4df18ed3263319 100644 --- a/src/search.h +++ b/src/search.h @@ -65,7 +65,8 @@ struct Dht::SearchNode { ValueCache cache; Sp<Scheduler::Job> cacheExpirationJob {}; Sp<net::Request> req {}; - CachedListenStatus(ValueStateCallback&& cb) : cache(std::forward<ValueStateCallback>(cb)) {} + CachedListenStatus(ValueStateCallback&& cb, SyncCallback&& scb) + : cache(std::forward<ValueStateCallback>(cb), std::forward<SyncCallback>(scb)) {} CachedListenStatus(CachedListenStatus&&) = default; CachedListenStatus(const CachedListenStatus&) = delete; CachedListenStatus& operator=(const CachedListenStatus&) = delete; @@ -225,6 +226,13 @@ struct Dht::SearchNode { } } + void onListenSynced(const Sp<Query>& q, bool synced = true) { + auto l = listenStatus.find(q); + if (l != listenStatus.end()) { + l->second.cache.onSynced(synced); + } + } + void expireValues(const Sp<Query>& q, Scheduler& scheduler) { auto l = listenStatus.find(q); if (l != listenStatus.end()) { @@ -377,6 +385,12 @@ struct Dht::Search { std::multimap<time_point, Get> callbacks {}; /* listeners */ + struct SearchListener { + Sp<Query> query; + Value::Filter filter; + ValueCallback get_cb; + SyncCallback sync_cb; + }; std::map<size_t, SearchListener> listeners {}; size_t listener_token = 1; @@ -488,10 +502,10 @@ struct Dht::Search { size_t listen(ValueCallback cb, Value::Filter f, const Sp<Query>& q, Scheduler& scheduler) { //DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); - return cache.listen(cb, q, f, [&](const Sp<Query>& q, ValueCallback vcb){ + return cache.listen(cb, q, f, [&](const Sp<Query>& q, ValueCallback vcb, SyncCallback scb){ done = false; auto token = ++listener_token; - listeners.emplace(token, SearchListener{q, f, vcb}); + listeners.emplace(token, SearchListener{q, f, vcb, scb}); scheduler.edit(nextSearchStep, scheduler.time()); return token; }); diff --git a/src/value_cache.h b/src/value_cache.h index 7853420510bad0a80b78e00c255bb9f385c20790..74d6ea9a77c8ee760d1ae1468bcdd5bcc520f874 100644 --- a/src/value_cache.h +++ b/src/value_cache.h @@ -22,19 +22,32 @@ namespace dht { using ValueStateCallback = std::function<void(const std::vector<Sp<Value>>&, bool)>; +enum class ListenSyncStatus { ADDED, SYNCED, UNSYNCED, REMOVED }; +using SyncCallback = std::function<void(ListenSyncStatus)>; using CallbackQueue = std::list<std::function<void()>>; class ValueCache { public: - ValueCache(ValueStateCallback&& cb) : callback(std::forward<ValueStateCallback>(cb)) {} - ValueCache(ValueCache&& o) : values(std::move(o.values)), callback(std::move(o.callback)) { + ValueCache(ValueStateCallback&& cb, SyncCallback&& scb = {}) + : callback(std::forward<ValueStateCallback>(cb)), syncCallback(std::move(scb)) + { + if (syncCallback) + syncCallback(ListenSyncStatus::ADDED); + } + ValueCache(ValueCache&& o) : values(std::move(o.values)), callback(std::move(o.callback)), syncCallback(std::move(o.syncCallback)) { o.callback = {}; + o.syncCallback = {}; } ~ValueCache() { auto q = clear(); for (auto& cb: q) cb(); + if (syncCallback) { + if (status == ListenSyncStatus::SYNCED) + syncCallback(ListenSyncStatus::UNSYNCED); + syncCallback(ListenSyncStatus::REMOVED); + } } CallbackQueue clear() { @@ -120,6 +133,15 @@ public: return ret; } + void onSynced(bool synced) { + auto newStatus = synced ? ListenSyncStatus::SYNCED : ListenSyncStatus::UNSYNCED; + if (status != newStatus) { + status = newStatus; + if (syncCallback) + syncCallback(newStatus); + } + } + private: // prevent copy ValueCache(const ValueCache&) = delete; @@ -141,6 +163,8 @@ private: std::map<Value::Id, CacheValueStorage> values; ValueStateCallback callback; + SyncCallback syncCallback; + ListenSyncStatus status {ListenSyncStatus::UNSYNCED}; CallbackQueue addValues(const std::vector<Sp<Value>>& new_values, const TypeStore& types, const time_point& now) { std::vector<Sp<Value>> nvals;