diff --git a/CMakeLists.txt b/CMakeLists.txt index ebf9a6fd7cd04dcd0b3667dd300cbd5f0cfea602..ece0a8a945ab3051d877a95be5312ab6b1d5b2eb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -121,6 +121,7 @@ list (APPEND opendht_SOURCES src/node.cpp src/value.cpp src/dht.cpp + src/op_cache.cpp src/storage.h src/listener.h src/search.h diff --git a/src/Makefile.am b/src/Makefile.am index 5de7e82041ef5a0af47f673ef2fa5aa849376adf..67e4e83322680174dc5aa6a4d994c8f64e5789f6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -11,6 +11,7 @@ libopendht_la_SOURCES = \ search.h \ value_cache.h \ op_cache.h \ + op_cache.cpp \ net.h \ parsed_message.h \ node_cache.cpp \ diff --git a/src/dht.cpp b/src/dht.cpp index 0febcbab3671ffa19510b0bbc418ef5cfd9020a1..3e6cbec77d1904c5e0c6c9e4b9ed9e6c0cc7d1ff 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -874,10 +874,10 @@ Dht::cancelListen(const InfoHash& id, size_t token) if (st != store.end() && tokenlocal) st->second.local_listeners.erase(tokenlocal); - auto searches_cancel_listen = [&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) { + auto searches_cancel_listen = [this,&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) { auto srp = srs.find(id); if (srp != srs.end() and token) - srp->second->cancelListen(token); + srp->second->cancelListen(token, scheduler); }; searches_cancel_listen(searches4, std::get<1>(it->second)); searches_cancel_listen(searches6, std::get<2>(it->second)); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 5d4de69d427a453c693ea01b8cd01148c59a6c57..80612380168e1e518a7fd53a4cf65e5daea1676a 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -64,6 +64,7 @@ struct PermanentPut { struct DhtProxyClient::ProxySearch { SearchCache ops {}; + Sp<Scheduler::Job> opExpirationJob; std::map<size_t, Listener> listeners {}; std::map<Value::Id, PermanentPut> puts {}; }; @@ -626,13 +627,28 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt bool DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) { + scheduler.syncTime(); DHT_LOG.d(key, "[search %s]: cancelListen %zu", key.to_c_str(), gtoken); auto it = searches_.find(key); if (it == searches_.end()) return false; - return it->second.ops.cancelListen(gtoken, [&](size_t ltoken){ - doCancelListen(key, ltoken); - }); + auto& ops = it->second.ops; + bool canceled = ops.cancelListen(gtoken, scheduler.time()); + if (not it->second.opExpirationJob) { + it->second.opExpirationJob = scheduler.add(time_point::max(), [this,key](){ + auto it = searches_.find(key); + if (it != searches_.end()) { + auto next = it->second.ops.expire(scheduler.time(), [this,key](size_t ltoken){ + doCancelListen(key, ltoken); + }); + if (next != time_point::max()) { + scheduler.edit(it->second.opExpirationJob, next); + } + } + }); + } + scheduler.edit(it->second.opExpirationJob, ops.getExpiration()); + return canceled; } size_t diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index afb17efb041fd1625f1e86f8e54c50a7d82fa968..a6162d67fa2308b831bc190641ae2d464fcdaebb 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -600,8 +600,8 @@ void DhtRunner::cancelListen(InfoHash h, size_t token) { std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { #if OPENDHT_PROXY_CLIENT - pending_ops.emplace([=](SecureDht&) { auto it = listeners_.find(token); if (it == listeners_.end()) return; if (it->second.tokenClassicDht) @@ -609,23 +609,31 @@ DhtRunner::cancelListen(InfoHash h, size_t token) if (it->second.tokenProxyDht and dht_via_proxy_) dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); listeners_.erase(it); - }); #else - pending_ops.emplace([=](SecureDht& dht) { dht.cancelListen(h, token); - }); #endif // OPENDHT_PROXY_CLIENT + }); cv.notify_all(); } void -DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> token) +DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) { { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) { - auto tk = token.get(); - dht.cancelListen(h, tk); + auto token = ftoken.get(); +#if OPENDHT_PROXY_CLIENT + auto it = listeners_.find(token); + if (it == listeners_.end()) return; + if (it->second.tokenClassicDht) + dht_->cancelListen(h, it->second.tokenClassicDht); + if (it->second.tokenProxyDht and dht_via_proxy_) + dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); + listeners_.erase(it); +#else + dht.cancelListen(h, token); +#endif // OPENDHT_PROXY_CLIENT }); } cv.notify_all(); diff --git a/src/listener.h b/src/listener.h index f491929e03d2b0bfffbf2a096ce3278183fdb200..4a057edfadd58839fbdd10d972b1809fdb694a16 100644 --- a/src/listener.h +++ b/src/listener.h @@ -20,6 +20,7 @@ #include "value.h" #include "utils.h" +#include "callbacks.h" namespace dht { diff --git a/src/op_cache.cpp b/src/op_cache.cpp new file mode 100644 index 0000000000000000000000000000000000000000..126564c56789d1d2da4c623c7c6e922316f0135e --- /dev/null +++ b/src/op_cache.cpp @@ -0,0 +1,204 @@ +/* + * Copyright (C) 2018 Savoir-faire Linux Inc. + * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#include "op_cache.h" + +namespace dht { + +constexpr const std::chrono::seconds OpCache::EXPIRATION; + +bool +OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { + std::vector<Sp<Value>> newValues; + for (const auto& v : vals) { + auto viop = values.emplace(v->id, OpCacheValueStorage{v}); + if (viop.second) { + newValues.emplace_back(v); + } else { + viop.first->second.refCount++; + } + } + return newValues.empty() ? true : callback(newValues, false); +} +bool +OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { + std::vector<Sp<Value>> expiredValues; + for (const auto& v : vals) { + auto vit = values.find(v->id); + if (vit != values.end()) { + vit->second.refCount--; + if (not vit->second.refCount) { + expiredValues.emplace_back(std::move(vit->second.data)); + values.erase(vit); + } + } + } + return expiredValues.empty() ? true : callback(expiredValues, true); +} +std::vector<Sp<Value>> +OpValueCache::get(Value::Filter& filter) const { + std::vector<Sp<Value>> ret; + if (not filter) + ret.reserve(values.size()); + for (const auto& v : values) + if (not filter or filter(*v.second.data)) + ret.emplace_back(v.second.data); + return ret; +} + +Sp<Value> +OpValueCache::get(Value::Id id) const { + auto v = values.find(id); + if (v == values.end()) + return {}; + return v->second.data; +} + +std::vector<Sp<Value>> +OpValueCache::getValues() const { + std::vector<Sp<Value>> ret; + ret.reserve(values.size()); + for (const auto& v : values) + ret.emplace_back(v.second.data); + return ret; +} + +void +OpCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { + if (not listeners.empty()) { + std::vector<LocalListener> list; + list.reserve(listeners.size()); + for (const auto& l : listeners) + list.emplace_back(l.second); + for (auto& l : list) + l.get_cb(l.filter.filter(vals), false); + } +} + +void +OpCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { + if (not listeners.empty()) { + std::vector<LocalListener> list; + list.reserve(listeners.size()); + for (const auto& l : listeners) + list.emplace_back(l.second); + for (auto& l : list) + l.get_cb(l.filter.filter(vals), true); + } +} + +time_point +OpCache::getExpiration() const { + if (not listeners.empty()) + return time_point::max(); + return lastRemoved + EXPIRATION; +} + +size_t +SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen) +{ + // find exact match + auto op = ops.find(q); + if (op == ops.end()) { + // find satisfying query + for (auto it = ops.begin(); it != ops.end(); it++) { + if (q->isSatisfiedBy(*it->first)) { + op = it; + break; + } + } + } + if (op == ops.end()) { + // New query + op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first; + auto& cache = *op->second; + cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){ + return cache.onValue(values, expired); + }); + } + auto token = nextToken_++; + if (nextToken_ == 0) + nextToken_++; + op->second->addListener(token, get_cb, q, filter); + return token; +} + +bool +SearchCache::cancelListen(size_t gtoken, const time_point& now) { + for (auto& op : ops) { + if (op.second->removeListener(gtoken, now)) { + nextExpiration_ = std::min(nextExpiration_, op.second->getExpiration()); + return true; + } + } + return false; +} + +void +SearchCache::cancelAll(std::function<void(size_t)> onCancel) { + for (auto& op : ops) { + auto cache = std::move(op.second); + cache->removeAll(); + onCancel(cache->searchToken); + } + ops.clear(); +} + +time_point +SearchCache::expire(const time_point& now, std::function<void(size_t)> onCancel) { + nextExpiration_ = time_point::max(); + auto ret = nextExpiration_; + for (auto it = ops.begin(); it != ops.end();) { + auto expiration = it->second->getExpiration(); + if (expiration < now) { + auto cache = std::move(it->second); + it = ops.erase(it); + onCancel(cache->searchToken); + } else { + nextExpiration_ = std::min(nextExpiration_, expiration); + ret = nextExpiration_; + ++it; + } + } + return ret; +} + +std::vector<Sp<Value>> +SearchCache::get(Value::Filter& filter) const { + if (ops.size() == 1) + return ops.begin()->second->get(filter); + std::map<Value::Id, Sp<Value>> c; + for (const auto& op : ops) { + for (const auto& v : op.second->get(filter)) + c.emplace(v->id, v); + } + std::vector<Sp<Value>> ret; + ret.reserve(c.size()); + for (auto& v : c) + ret.emplace_back(std::move(v.second)); + return ret; +} + +Sp<Value> +SearchCache::get(Value::Id id) const { + for (const auto& op : ops) + if (auto v = op.second->get(id)) + return v; + return {}; +} + +} diff --git a/src/op_cache.h b/src/op_cache.h index cf56d88136ecbb8e448a483e261e4aad9c6dbaac..fc6c425d731f0825b56899dda1a26e096ed29aa8 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -33,6 +33,9 @@ struct OpCacheValueStorage class OpValueCache { public: OpValueCache(ValueCallback&& cb) : callback(std::forward<ValueCallback>(cb)) {} + OpValueCache(OpValueCache&& o) : values(std::move(o.values)), callback(std::move(o.callback)) { + o.callback = {}; + } static ValueCallback cacheCallback(ValueCallback&& cb) { auto cache = std::make_shared<OpValueCache>(std::forward<ValueCallback>(cb)); @@ -48,58 +51,17 @@ public: return onValuesAdded(vals); } - bool onValuesAdded(const std::vector<Sp<Value>>& vals) { - std::vector<Sp<Value>> newValues; - for (const auto& v : vals) { - auto viop = values.emplace(v->id, OpCacheValueStorage{v}); - if (viop.second) { - newValues.emplace_back(v); - } else { - viop.first->second.refCount++; - } - } - return newValues.empty() ? true : callback(newValues, false); - } - bool onValuesExpired(const std::vector<Sp<Value>>& vals) { - std::vector<Sp<Value>> expiredValues; - for (const auto& v : vals) { - auto vit = values.find(v->id); - if (vit != values.end()) { - vit->second.refCount--; - if (not vit->second.refCount) { - expiredValues.emplace_back(std::move(vit->second.data)); - values.erase(vit); - } - } - } - return expiredValues.empty() ? true : callback(expiredValues, true); - } - std::vector<Sp<Value>> get(Value::Filter& filter) const { - std::vector<Sp<Value>> ret; - if (not filter) - ret.reserve(values.size()); - for (const auto& v : values) - if (not filter or filter(*v.second.data)) - ret.emplace_back(v.second.data); - return ret; - } - - Sp<Value> get(Value::Id id) const { - auto v = values.find(id); - if (v == values.end()) - return {}; - return v->second.data; - } + bool onValuesAdded(const std::vector<Sp<Value>>& vals); + bool onValuesExpired(const std::vector<Sp<Value>>& vals); - std::vector<Sp<Value>> getValues() const { - std::vector<Sp<Value>> ret; - ret.reserve(values.size()); - for (const auto& v : values) - ret.emplace_back(v.second.data); - return ret; - } + std::vector<Sp<Value>> get(Value::Filter& filter) const; + Sp<Value> get(Value::Id id) const; + std::vector<Sp<Value>> getValues() const; private: + OpValueCache(const OpValueCache&) = delete; + OpValueCache& operator=(const OpValueCache&) = delete; + std::map<Value::Id, OpCacheValueStorage> values {}; ValueCallback callback; }; @@ -119,33 +81,16 @@ public: return not listeners.empty(); } - void onValuesAdded(const std::vector<Sp<Value>>& vals) { - if (not listeners.empty()) { - std::vector<LocalListener> list; - list.reserve(listeners.size()); - for (const auto& l : listeners) - list.emplace_back(l.second); - for (auto& l : list) - l.get_cb(l.filter.filter(vals), false); - } - } - void onValuesExpired(const std::vector<Sp<Value>>& vals) { - if (not listeners.empty()) { - std::vector<LocalListener> list; - list.reserve(listeners.size()); - for (const auto& l : listeners) - list.emplace_back(l.second); - for (auto& l : list) - l.get_cb(l.filter.filter(vals), true); - } - } + void onValuesAdded(const std::vector<Sp<Value>>& vals); + void onValuesExpired(const std::vector<Sp<Value>>& vals); void addListener(size_t token, ValueCallback cb, Sp<Query> q, Value::Filter filter) { listeners.emplace(token, LocalListener{q, filter, cb}); cb(cache.get(filter), false); } - bool removeListener(size_t token) { + bool removeListener(size_t token, const time_point& now) { + lastRemoved = now; return listeners.erase(token) > 0; } @@ -165,89 +110,46 @@ public: return cache.get(id); } + bool isExpired(const time_point& now) const { + return listeners.empty() and (lastRemoved + EXPIRATION < now); + } + time_point getExpiration() const; + size_t searchToken; private: + constexpr static const std::chrono::seconds EXPIRATION {60}; + OpCache(const OpCache&) = delete; + OpCache& operator=(const OpCache&) = delete; + OpValueCache cache; std::map<size_t, LocalListener> listeners; + time_point lastRemoved {clock::now()}; }; class SearchCache { public: - size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen) { - // find exact match - auto op = ops.find(q); - if (op == ops.end()) { - // find satisfying query - for (auto it = ops.begin(); it != ops.end(); it++) { - if (q->isSatisfiedBy(*it->first)) { - op = it; - break; - } - } - } - if (op == ops.end()) { - // New query - op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first; - auto& cache = *op->second; - cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){ - return cache.onValue(values, expired); - }); - } - auto token = nextToken_++; - if (nextToken_ == 0) - nextToken_++; - op->second->addListener(token, get_cb, q, filter); - return token; - } + SearchCache() {} + SearchCache(SearchCache&&) = default; + size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen); - bool cancelListen(size_t gtoken, std::function<void(size_t)> onCancel) { - for (auto it = ops.begin(); it != ops.end(); it++) { - if (it->second->removeListener(gtoken)) { - if (it->second->isDone()) { - auto cache = std::move(it->second); - ops.erase(it); - onCancel(cache->searchToken); - } - return true; - } - } - return false; - } + bool cancelListen(size_t gtoken, const time_point& now); + void cancelAll(std::function<void(size_t)> onCancel); - void cancelAll(std::function<void(size_t)> onCancel) { - for (auto& op : ops) { - auto cache = std::move(op.second); - cache->removeAll(); - onCancel(cache->searchToken); - } - ops.clear(); + time_point expire(const time_point& now, std::function<void(size_t)> onCancel); + time_point getExpiration() const { + return nextExpiration_; } - std::vector<Sp<Value>> get(Value::Filter& filter) const { - if (ops.size() == 1) - return ops.begin()->second->get(filter); - std::map<Value::Id, Sp<Value>> c; - for (const auto& op : ops) { - for (const auto& v : op.second->get(filter)) - c.emplace(v->id, v); - } - std::vector<Sp<Value>> ret; - ret.reserve(c.size()); - for (auto& v : c) - ret.emplace_back(std::move(v.second)); - return ret; - } - - Sp<Value> get(Value::Id id) const { - for (const auto& op : ops) - if (auto v = op.second->get(id)) - return v; - return {}; - } + std::vector<Sp<Value>> get(Value::Filter& filter) const; + Sp<Value> get(Value::Id id) const; private: + SearchCache(const SearchCache&) = delete; + SearchCache& operator=(const SearchCache&) = delete; + std::map<Sp<Query>, std::unique_ptr<OpCache>> ops {}; size_t nextToken_ {1}; + time_point nextExpiration_ {time_point::max()}; }; } diff --git a/src/search.h b/src/search.h index faefa443c1d6fd4c4d7ca2337e70c134e8cd91ed..c83c06cd2dd9620339afee4d304b1b271739a33b 100644 --- a/src/search.h +++ b/src/search.h @@ -383,8 +383,11 @@ struct Dht::Search { /* Cache */ SearchCache cache; + Sp<Scheduler::Job> opExpirationJob; ~Search() { + if (opExpirationJob) + opExpirationJob->cancel(); for (auto& get : callbacks) { get.second.done_cb(false, {}); get.second.done_cb = {}; @@ -483,21 +486,27 @@ struct Dht::Search { }); } - void cancelListen(size_t token) { - cache.cancelListen(token, [&](size_t t){ - Sp<Query> query; - const auto& ll = listeners.find(t); - if (ll != listeners.cend()) { - query = ll->second.query; - listeners.erase(ll); - } - for (auto& sn : nodes) { - if (listeners.empty()) - sn.cancelListen(); - else if (query) - sn.cancelListen(query); - } - }); + void cancelListen(size_t token, Scheduler& scheduler) { + cache.cancelListen(token, scheduler.time()); + if (not opExpirationJob) + opExpirationJob = scheduler.add(time_point::max(), [this,&scheduler]{ + auto nextExpire = cache.expire(scheduler.time(), [&](size_t t){ + Sp<Query> query; + const auto& ll = listeners.find(t); + if (ll != listeners.cend()) { + query = ll->second.query; + listeners.erase(ll); + } + for (auto& sn : nodes) { + if (listeners.empty()) + sn.cancelListen(); + else if (query) + sn.cancelListen(query); + } + }); + scheduler.edit(opExpirationJob, nextExpire); + }); + scheduler.edit(opExpirationJob, cache.getExpiration()); } /**