diff --git a/CMakeLists.txt b/CMakeLists.txt index a10ab280f1247fc42e5ac884370ebb83f3b82cb1..f98910f52a4cd3fdb82f7503e0ebd22077e2e02f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,6 +70,8 @@ set (CMAKE_CXX_STANDARD_REQUIRED on) set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-return-type -Wall -Wextra -Wnon-virtual-dtor -pedantic-errors -fvisibility=hidden") if (OPENDHT_SANITIZE) set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fstack-protector-strong") +else () + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-stack-protector") endif () set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMSGPACK_DISABLE_LEGACY_NIL -DMSGPACK_DISABLE_LEGACY_CONVERT") if (NOT CMAKE_BUILD_TYPE) @@ -304,7 +306,7 @@ install (EXPORT opendht DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/opendht FILE o install (FILES ${CMAKE_CURRENT_BINARY_DIR}/opendhtConfigVersion.cmake DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/opendht) # Unit tests -IF(OPENDHT_TESTS) +if (OPENDHT_TESTS) FIND_PACKAGE(Cppunit REQUIRED) # unit testing list (APPEND test_FILES @@ -325,16 +327,17 @@ IF(OPENDHT_TESTS) tests/tests_runner.cpp ${test_FILES} ) + target_include_directories(opendht_unit_tests SYSTEM PRIVATE ${CPPUNIT_INCLUDE_DIRS}) if (OPENDHT_SHARED) - TARGET_LINK_LIBRARIES(opendht_unit_tests opendht) + target_link_libraries(opendht_unit_tests opendht) else () - TARGET_LINK_LIBRARIES(opendht_unit_tests opendht-static) + target_link_libraries(opendht_unit_tests opendht-static) endif () - TARGET_LINK_LIBRARIES(opendht_unit_tests + target_link_libraries(opendht_unit_tests ${CMAKE_THREAD_LIBS_INIT} ${CPPUNIT_LIBRARIES} ${GNUTLS_LIBRARIES} ) enable_testing() add_test(TEST opendht_unit_tests) -ENDIF() +endif() diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 7f750795f570b1025382447ab2bf0e7cde64a1bd..46da9994d52a899c7996250566e561e6c7bbc251 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -163,7 +163,7 @@ public: /** * Get locally stored data for the given hash. */ - std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const; + std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = {}) const; /** * Get locally stored data for the given key and value id. @@ -469,7 +469,7 @@ private: Sp<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, const Sp<Query>& q = {}); void announce(const InfoHash& id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false); - size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = Value::AllFilter(), const Sp<Query>& q = {}); + size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = {}, const Sp<Query>& q = {}); /** * Refill the search with good nodes if possible. diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index bf837492c19289564c17c238776fa7043ebae8b9..82b7bf85cc565cc358eaeec9f7703b12f883f843 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -106,7 +106,7 @@ public: /** * Get locally stored data for the given hash. */ - virtual std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const = 0; + virtual std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = {}) const = 0; /** * Get locally stored data for the given key and value id. diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 59eba71e56e93f40d92aace6e2e726e6138bdabb..972167ca4769d59f9dd2f39faaaf597b6a5db204 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -41,8 +41,6 @@ namespace Json { namespace dht { -class SearchCache; - class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface { public: @@ -276,7 +274,6 @@ private: void opFailed(); - size_t doListen(const InfoHash& key, ValueCallback, Value::Filter); bool doCancelListen(const InfoHash& key, size_t token); struct ListenState; diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 60a2110fe4a059a23b4f47ab9a95b5ba4bd82e04..e74391313d69d2cde7cddaf161e3558439bd2f49 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -63,20 +63,20 @@ public: DhtRunner(); virtual ~DhtRunner(); - void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) { + void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = {}, Where w = {}) { get(id, bindGetCb(cb), donecb, f, w); } - void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) { + void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) { get(id, bindGetCb(cb), donecb, f, w); } void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {}); - void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) { + void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) { get(id, cb, bindDoneCb(donecb), f, w); } - void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter(), Where w = {}); + void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {}); template <class T> void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={}) diff --git a/include/opendht/value.h b/include/opendht/value.h index a4a3348a1c0b019bcccecb465e5295f5c2c55280..34224ff7cd58db9f5e5525f7af8205863236b8aa 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -165,8 +165,8 @@ struct OPENDHT_PUBLIC Value return chainOr(std::move(f1), std::move(f2)); } static Filter chain(Filter&& f1, Filter&& f2) { - if (not f1) return f2; - if (not f2) return f1; + if (not f1) return std::move(f1); + if (not f2) return std::move(f2); return [f1,f2](const Value& v) { return f1(v) and f2(v); }; @@ -184,7 +184,7 @@ struct OPENDHT_PUBLIC Value return chainAll(std::vector<Filter>(l.begin(), l.end())); } static Filter chainOr(Filter&& f1, Filter&& f2) { - if (not f1 or not f2) return AllFilter(); + if (not f1 or not f2) return {}; return [f1,f2](const Value& v) { return f1(v) or f2(v); }; @@ -990,7 +990,7 @@ template <typename T, Value::Filter getFilterSet() { - return Value::AllFilter(); + return {}; } template <class T> diff --git a/src/dht.cpp b/src/dht.cpp index 3a97ea60586a243aa8f48d507d23aee4f0b355db..e37c5808b258fca6148ad8f53431e0f605160790 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -437,22 +437,18 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { if (not hasValue or seq_no < a.value->seq) { DHT_LOG.d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = std::make_pair(network_engine.sendAnnounceValue(sn->node, - sr->id, - a.value, - a.permanent ? time_point::max() : a.created, - sn->token, - onDone, - onExpired), next_refresh_time); + auto created = a.permanent ? time_point::max() : a.created; + sn->acked[a.value->id] = { + network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired), + next_refresh_time + }; } else if (hasValue and a.permanent) { DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = std::make_pair(network_engine.sendRefreshValue(sn->node, - sr->id, - a.value->id, - sn->token, - onDone, - onExpired), next_refresh_time); + sn->acked[a.value->id] = { + network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, onExpired), + next_refresh_time + }; } else { DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); @@ -540,6 +536,13 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) l->second.get_cb(l->second.filter.filter(values), expired); } } + }, [ws,list_token] (ListenSyncStatus status) { + if (auto sr = ws.lock()) { + auto l = sr->listeners.find(list_token); + if (l != sr->listeners.end()) { + l->second.sync_cb(status); + } + } } }).first; auto node = n.node; @@ -557,8 +560,10 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) { /* on done */ if (auto sr = ws.lock()) { scheduler.edit(sr->nextSearchStep, scheduler.time()); - if (auto sn = sr->getNode(req.node)) + if (auto sn = sr->getNode(req.node)) { scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr)); + sn->onListenSynced(query); + } onListenDone(req.node, answer, sr); } }, @@ -909,7 +914,7 @@ struct OpStatus { template <typename T> struct GetStatus : public OpStatus { - std::vector<Sp<T>> values; + T values; std::vector<Sp<Node>> nodes; }; @@ -968,20 +973,16 @@ void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, G } } -template <typename T, typename Cb> -bool callbackWrapper(Cb get_cb, - DoneCallback done_cb, - const std::vector<Sp<T>>& values, - std::function<std::vector<Sp<T>>(const std::vector<Sp<T>>&)> add_values, - Sp<GetStatus<T>> o) +template <typename T, typename St, typename Cb, typename Av, typename Cv> +bool callbackWrapper(Cb get_cb, DoneCallback done_cb, const std::vector<Sp<T>>& values, + Av add_values, Cv cache_values, GetStatus<St>& op) { - auto& op = *o; if (op.status.done) return false; auto newvals = add_values(values); if (not newvals.empty()) { op.status.ok = !get_cb(newvals); - op.values.insert(op.values.end(), newvals.begin(), newvals.end()); + cache_values(newvals); } doneCallbackWrapper(done_cb, {}, op); return !op.status.ok; @@ -993,23 +994,26 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt scheduler.syncTime(); auto q = std::make_shared<Query>(Select {}, std::move(where)); - auto op = std::make_shared<GetStatus<Value>>(); + auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>(); auto f = filter.chain(q->where.getFilter()); - auto add_values = [op,f](const std::vector<Sp<Value>>& values) { - std::vector<Sp<Value>> newvals {}; - for (const auto& v : values) { - auto it = std::find_if(op->values.cbegin(), op->values.cend(), [&](const Sp<Value>& sv) { - return sv == v or *sv == *v; - }); - if (it == op->values.cend()) { - if (not f or f(*v)) - newvals.push_back(v); + auto gcb = [getcb, donecb, op, f](const std::vector<Sp<Value>>& vals) { + auto& o = *op; + return callbackWrapper(getcb, donecb, vals, [&o,&f](const std::vector<Sp<Value>>& values) { + std::vector<Sp<Value>> newvals {}; + for (const auto& v : values) { + auto it = o.values.find(v->id); + if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) { + if (not f or f(*v)) + newvals.push_back(v); + } } - } - return newvals; + return newvals; + }, [&o](const std::vector<Sp<Value>>& newvals) { + for (const auto& v : newvals) + o.values[v->id] = v; + }, o); }; - auto gcb = std::bind(callbackWrapper<Value, GetCallback>, getcb, donecb, _1, add_values, op); /* Try to answer this search locally. */ gcb(getLocal(id, f)); @@ -1029,36 +1033,39 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q) { scheduler.syncTime(); - auto op = std::make_shared<GetStatus<FieldValueIndex>>(); - + auto op = std::make_shared<GetStatus<std::vector<Sp<FieldValueIndex>>>>(); auto f = q.where.getFilter(); - auto values = getLocal(id, f); - auto add_fields = [=](const std::vector<Sp<FieldValueIndex>>& fields) { - std::vector<Sp<FieldValueIndex>> newvals {}; - for (const auto& f : fields) { - auto it = std::find_if(op->values.cbegin(), op->values.cend(), - [&](const Sp<FieldValueIndex>& sf) { - return sf == f or f->containedIn(*sf); - }); - if (it == op->values.cend()) { - auto lesser = std::find_if(op->values.begin(), op->values.end(), + auto qcb = [cb, done_cb, op](const std::vector<Sp<FieldValueIndex>>& fields){ + auto& o = *op; + return callbackWrapper(cb, done_cb, fields, [&](const std::vector<Sp<FieldValueIndex>>& fields) { + std::vector<Sp<FieldValueIndex>> newvals {}; + for (const auto& f : fields) { + auto it = std::find_if(o.values.cbegin(), o.values.cend(), [&](const Sp<FieldValueIndex>& sf) { - return sf->containedIn(*f); + return sf == f or f->containedIn(*sf); }); - if (lesser != op->values.end()) - op->values.erase(lesser); - newvals.push_back(f); + if (it == o.values.cend()) { + auto lesser = std::find_if(o.values.begin(), o.values.end(), + [&](const Sp<FieldValueIndex>& sf) { + return sf->containedIn(*f); + }); + if (lesser != o.values.end()) + o.values.erase(lesser); + newvals.push_back(f); + } } - } - return newvals; + return newvals; + }, [&](const std::vector<Sp<FieldValueIndex>>& fields){ + o.values.insert(o.values.end(), fields.begin(), fields.end()); + }, o); }; + + /* Try to answer this search locally. */ + auto values = getLocal(id, f); std::vector<Sp<FieldValueIndex>> local_fields(values.size()); std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) { return std::make_shared<FieldValueIndex>(*v, q.select); }); - auto qcb = std::bind(callbackWrapper<FieldValueIndex, QueryCallback>, cb, done_cb, _1, add_fields, op); - - /* Try to answer this search locally. */ qcb(local_fields); auto sq = std::make_shared<Query>(std::move(q)); @@ -1166,6 +1173,7 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newVa if (not st.local_listeners.empty()) { DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs; + cbs.reserve(st.local_listeners.size()); for (const auto& l : st.local_listeners) { std::vector<Sp<Value>> vals; if (not l.second.filter or l.second.filter(*v.data)) @@ -2275,9 +2283,7 @@ Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t soc } void -Dht::onListenDone(const Sp<Node>& node, - net::RequestAnswer& answer, - Sp<Search>& sr) +Dht::onListenDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr) { // DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation", // sr->id.toString().c_str(), node->toString().c_str(), answer.values.size()); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index f36e78071b24a62a4e65205588bdd6656a026ede..38eaf72a97d62393e33a9d3b615096e452c42fc0 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -668,8 +668,93 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt it = searches_.emplace(key, ProxySearch{}).first; } auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb){ - return doListen(key, vcb, filter); + auto token = it->second.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback scb) -> size_t { + scheduler.syncTime(); + restbed::Uri uri(serverHost_ + "/" + key.toString()); + std::lock_guard<std::mutex> lock(searchLock_); + auto search = searches_.find(key); + if (search == searches_.end()) { + DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str()); + return 0; + } + DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); + + auto req = std::make_shared<restbed::Request>(uri); + auto token = ++listenerToken_; + auto l = search->second.listeners.find(token); + if (l == search->second.listeners.end()) { + auto f = filter; + l = search->second.listeners.emplace(token, Listener { + ValueCache(cb, std::move(scb)), scheduler.add(time_point::max(), [this, key, token]{ + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return; + } + auto next = l->second.cache.expireValues(scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + }), req, std::move(f) + }).first; + } else { + if (l->second.state) + l->second.state->cancel = true; + } + + auto state = std::make_shared<ListenState>(); + l->second.state = state; + l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { + if (state->cancel) + return false; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return false; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return false; + } + const std::vector<Sp<Value>> new_values_empty; + std::vector<Value::Id> expired_ids; + if (expired) { + expired_ids.reserve(values.size()); + for (const auto& v : values) + expired_ids.emplace_back(v->id); + } + auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + loopSignal_(); + return true; + }; + + auto vcb = l->second.cb; + l->second.req = req; + + if (not deviceKey_.empty()) { + // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason) + l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] { + if (state->cancel) + return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s != searches_.end()) { + auto l = s->second.listeners.find(token); + if (l != s->second.listeners.end()) { + resubscribe(key, l->second); + } + } + }); + } + l->second.thread = std::thread([this, req, vcb, filter, state]() { + sendListen(req, vcb, filter, state, + deviceKey_.empty() ? ListenMethod::LISTEN + : ListenMethod::SUBSCRIBE); + }); + return token; }); return token; } @@ -745,7 +830,7 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, } auto expired = json.get("expired", Json::Value(false)).asBool(); auto value = std::make_shared<Value>(json); - if ((not filter or filter(*value)) and cb) { + if ((not filter or filter(*value)) and cb) { std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([cb, value, state, expired]() { if (not state->cancel and not cb({value}, expired)) @@ -773,97 +858,6 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, opFailed(); } -size_t -DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter filter/*, Where where*/) -{ - scheduler.syncTime(); - restbed::Uri uri(serverHost_ + "/" + key.toString()); - std::lock_guard<std::mutex> lock(searchLock_); - auto search = searches_.find(key); - if (search == searches_.end()) { - DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str()); - return 0; - } - DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); - - auto req = std::make_shared<restbed::Request>(uri); - auto token = ++listenerToken_; - auto l = search->second.listeners.find(token); - if (l == search->second.listeners.end()) { - auto f = filter; - l = search->second.listeners.emplace(token, Listener { - ValueCache(cb), scheduler.add(time_point::max(), [this, key, token]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return; - } - auto next = l->second.cache.expireValues(scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - }), req, std::move(f) - }).first; - } else { - if (l->second.state) - l->second.state->cancel = true; - } - - auto state = std::make_shared<ListenState>(); - l->second.state = state; - l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { - if (state->cancel) - return false; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return false; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return false; - } - const std::vector<Sp<Value>> new_values_empty; - std::vector<Value::Id> expired_ids; - if (expired) { - expired_ids.reserve(values.size()); - for (const auto& v : values) - expired_ids.emplace_back(v->id); - } - auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - loopSignal_(); - return true; - }; - - auto vcb = l->second.cb; - l->second.req = req; - - if (not deviceKey_.empty()) { - // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason) - l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] { - if (state->cancel) - return; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s != searches_.end()) { - auto l = s->second.listeners.find(token); - if (l != s->second.listeners.end()) { - resubscribe(key, l->second); - } - } - }); - } - l->second.thread = std::thread([this, req, vcb, filter, state]() { - sendListen(req, vcb, filter, state, - deviceKey_.empty() ? ListenMethod::LISTEN - : ListenMethod::SUBSCRIBE); - }); - return token; -} - bool DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) { diff --git a/src/listener.h b/src/listener.h index 4a057edfadd58839fbdd10d972b1809fdb694a16..bae2f570dbc7f91c42d0f0b65d7455c0e92bb7d2 100644 --- a/src/listener.h +++ b/src/listener.h @@ -48,11 +48,4 @@ struct LocalListener { ValueCallback get_cb; }; - -struct SearchListener { - Sp<Query> query; - Value::Filter filter; - ValueCallback get_cb; -}; - } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 0ff7d7c4f70ef0a24e0f05f3fc7ca2d2a1b8dcff..4e51e89608fba443f1710152dd32ef6fe153bc19 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -353,9 +353,8 @@ NetworkEngine::rateLimit(const SockAddr& addr) limiter_maintenance = 0; } - auto it = address_rate_limiter.emplace(addr, IpLimiter{}); // invoke per IP, then global rate limiter - return it.first->second.limit(now) and rate_limiter.limit(now); + return address_rate_limiter[addr].limit(now) and rate_limiter.limit(now); } bool @@ -475,15 +474,15 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& process(std::move(msg), from); } else { // starting partial message session - PartialMessage pmsg; - pmsg.from = from; - pmsg.msg = std::move(msg); - pmsg.start = now; - pmsg.last_part = now; - auto wmsg = partial_messages.emplace(pmsg.msg->tid, std::move(pmsg)); - if (wmsg.second) { - scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, wmsg.first->first)); - scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, wmsg.first->first)); + auto k = msg->tid; + auto& pmsg = partial_messages[k]; + if (not pmsg.msg) { + pmsg.from = from; + pmsg.msg = std::move(msg); + pmsg.start = now; + pmsg.last_part = now; + scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, k)); + scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, k)); } else DHT_LOG.e("Partial message with given TID already exists"); } diff --git a/src/op_cache.cpp b/src/op_cache.cpp index 772531c3b631d3bf4bf3e0949e2fd37a12a05fc1..b9aa9bcb84febf0cccab3ca7ff230a23278d40a6 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -25,7 +25,7 @@ bool OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { std::vector<Sp<Value>> newValues; for (const auto& v : vals) { - auto viop = values.emplace(v->id, OpCacheValueStorage{v}); + auto viop = values.emplace(v->id, v); if (viop.second) { newValues.emplace_back(v); } else { @@ -50,7 +50,7 @@ OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { return expiredValues.empty() ? true : callback(expiredValues, true); } std::vector<Sp<Value>> -OpValueCache::get(Value::Filter& filter) const { +OpValueCache::get(const Value::Filter& filter) const { std::vector<Sp<Value>> ret; if (not filter) ret.reserve(values.size()); @@ -108,26 +108,51 @@ OpCache::getExpiration() const { return lastRemoved + EXPIRATION; } -size_t -SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen) +SearchCache::OpMap::iterator +SearchCache::getOp(const Sp<Query>& q) { // find exact match auto op = ops.find(q); - if (op == ops.end()) { - // find satisfying query - for (auto it = ops.begin(); it != ops.end(); it++) { - if (q->isSatisfiedBy(*it->first)) { - op = it; - break; - } + if (op != ops.end()) + return op; + // find satisfying query + for (auto it = ops.begin(); it != ops.end(); it++) { + if (q->isSatisfiedBy(*it->first)) { + return it; + } + } + return ops.end(); +} + +SearchCache::OpMap::const_iterator +SearchCache::getOp(const Sp<Query>& q) const +{ + // find exact match + auto op = ops.find(q); + if (op != ops.cend()) + return op; + // find satisfying query + for (auto it = ops.begin(); it != ops.end(); it++) { + if (q->isSatisfiedBy(*it->first)) { + return it; } } + return ops.cend(); +} + +size_t +SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, OnListen onListen) +{ + // find exact match + auto op = getOp(q); if (op == ops.end()) { // New query op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first; auto& cache = *op->second; cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){ return cache.onValue(values, expired); + }, [&](ListenSyncStatus status) { + cache.onNodeChanged(status); }); } auto token = nextToken_++; @@ -176,8 +201,22 @@ SearchCache::expire(const time_point& now, std::function<void(size_t)> onCancel) return ret; } +bool +SearchCache::get(const Value::Filter& f, const Sp<Query>& q, const GetCallback& gcb, const DoneCallback& dcb) const +{ + auto op = getOp(q); + if (op != ops.end()) { + auto vals = op->second->get(f); + if ((not vals.empty() and not gcb(vals)) or op->second->isSynced()) { + dcb(true, {}); + return true; + } + } + return false; +} + std::vector<Sp<Value>> -SearchCache::get(Value::Filter& filter) const { +SearchCache::get(const Value::Filter& filter) const { if (ops.size() == 1) return ops.begin()->second->get(filter); std::map<Value::Id, Sp<Value>> c; diff --git a/src/op_cache.h b/src/op_cache.h index 51dd89e92547954ce7fde39ff9425e9559b73562..79cd8f140faac06fe1e21a976fbf02c7fa0b8f18 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -27,7 +27,7 @@ struct OpCacheValueStorage { Sp<Value> data {}; unsigned refCount {1}; - OpCacheValueStorage(Sp<Value> val = {}) : data(val) {} + OpCacheValueStorage(Sp<Value> val) : data(val) {} }; class OpValueCache { @@ -57,7 +57,18 @@ public: bool onValuesAdded(const std::vector<Sp<Value>>& vals); bool onValuesExpired(const std::vector<Sp<Value>>& vals); - std::vector<Sp<Value>> get(Value::Filter& filter) const; + void onNodeChanged(ListenSyncStatus status) { + switch (status) { + case ListenSyncStatus::ADDED: nodes++; break; + case ListenSyncStatus::REMOVED: nodes--; break; + case ListenSyncStatus::SYNCED : syncedNodes++; break; + case ListenSyncStatus::UNSYNCED: syncedNodes--; break; + } + } + + bool isSynced() const { return nodes > 0 and syncedNodes == nodes; } + + std::vector<Sp<Value>> get(const Value::Filter& filter) const; Sp<Value> get(Value::Id id) const; std::vector<Sp<Value>> getValues() const; @@ -65,6 +76,8 @@ private: OpValueCache(const OpValueCache&) = delete; OpValueCache& operator=(const OpValueCache&) = delete; + size_t nodes {0}; + size_t syncedNodes {0}; std::map<Value::Id, OpCacheValueStorage> values {}; ValueCallback callback; }; @@ -83,6 +96,9 @@ public: cache.onValue(vals, expired); return not listeners.empty(); } + void onNodeChanged(ListenSyncStatus status) { + cache.onNodeChanged(status); + } void onValuesAdded(const std::vector<Sp<Value>>& vals); void onValuesExpired(const std::vector<Sp<Value>>& vals); @@ -112,7 +128,7 @@ public: return listeners.empty(); } - std::vector<Sp<Value>> get(Value::Filter& filter) const { + std::vector<Sp<Value>> get(const Value::Filter& filter) const { return cache.get(filter); } @@ -120,6 +136,10 @@ public: return cache.get(id); } + bool isSynced() const { + return cache.isSynced(); + } + bool isExpired(const time_point& now) const { return listeners.empty() and (lastRemoved + EXPIRATION < now); } @@ -140,7 +160,9 @@ class SearchCache { public: SearchCache() {} SearchCache(SearchCache&&) = default; - size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen); + + using OnListen = std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)>; + size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, OnListen onListen); bool cancelListen(size_t gtoken, const time_point& now); void cancelAll(std::function<void(size_t)> onCancel); @@ -150,14 +172,19 @@ public: return nextExpiration_; } - std::vector<Sp<Value>> get(Value::Filter& filter) const; + bool get(const Value::Filter& f, const Sp<Query>& q, const GetCallback& gcb, const DoneCallback& dcb) const; + std::vector<Sp<Value>> get(const Value::Filter& filter) const; Sp<Value> get(Value::Id id) const; private: SearchCache(const SearchCache&) = delete; SearchCache& operator=(const SearchCache&) = delete; - std::map<Sp<Query>, std::unique_ptr<OpCache>> ops {}; + using OpMap = std::map<Sp<Query>, std::unique_ptr<OpCache>>; + OpMap ops {}; + OpMap::iterator getOp(const Sp<Query>& q); + OpMap::const_iterator getOp(const Sp<Query>& q) const; + size_t nextToken_ {1}; time_point nextExpiration_ {time_point::max()}; }; diff --git a/src/search.h b/src/search.h index 00621809096e53f760f01aa86891fa0c2b166a95..325944b7f1f7a051359cc426073b57d19675f4c3 100644 --- a/src/search.h +++ b/src/search.h @@ -65,7 +65,8 @@ struct Dht::SearchNode { ValueCache cache; Sp<Scheduler::Job> cacheExpirationJob {}; Sp<net::Request> req {}; - CachedListenStatus(ValueStateCallback&& cb) : cache(std::forward<ValueStateCallback>(cb)) {} + CachedListenStatus(ValueStateCallback&& cb, SyncCallback&& scb) + : cache(std::forward<ValueStateCallback>(cb), std::forward<SyncCallback>(scb)) {} CachedListenStatus(CachedListenStatus&&) = default; CachedListenStatus(const CachedListenStatus&) = delete; CachedListenStatus& operator=(const CachedListenStatus&) = delete; @@ -225,6 +226,13 @@ struct Dht::SearchNode { } } + void onListenSynced(const Sp<Query>& q, bool synced = true) { + auto l = listenStatus.find(q); + if (l != listenStatus.end()) { + l->second.cache.onSynced(synced); + } + } + void expireValues(const Sp<Query>& q, Scheduler& scheduler) { auto l = listenStatus.find(q); if (l != listenStatus.end()) { @@ -377,6 +385,12 @@ struct Dht::Search { std::multimap<time_point, Get> callbacks {}; /* listeners */ + struct SearchListener { + Sp<Query> query; + Value::Filter filter; + ValueCallback get_cb; + SyncCallback sync_cb; + }; std::map<size_t, SearchListener> listeners {}; size_t listener_token = 1; @@ -477,21 +491,20 @@ struct Dht::Search { void get(Value::Filter f, const Sp<Query>& q, const QueryCallback& qcb, const GetCallback& gcb, const DoneCallback& dcb, Scheduler& scheduler) { if (gcb or qcb) { - const auto& now = scheduler.time(); - callbacks.emplace(now, Get { now, f, q, qcb, gcb, dcb }); - auto values = cache.get(f); - if (not values.empty()) - gcb(values); - scheduler.edit(nextSearchStep, now); + if (not cache.get(f, q, gcb, dcb)) { + const auto& now = scheduler.time(); + callbacks.emplace(now, Get { now, f, q, qcb, gcb, dcb }); + scheduler.edit(nextSearchStep, now); + } } } size_t listen(ValueCallback cb, Value::Filter f, const Sp<Query>& q, Scheduler& scheduler) { //DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); - return cache.listen(cb, q, f, [&](const Sp<Query>& q, ValueCallback vcb){ + return cache.listen(cb, q, f, [&](const Sp<Query>& q, ValueCallback vcb, SyncCallback scb){ done = false; auto token = ++listener_token; - listeners.emplace(token, SearchListener{q, f, vcb}); + listeners.emplace(token, SearchListener{q, f, vcb, scb}); scheduler.edit(nextSearchStep, scheduler.time()); return token; }); diff --git a/src/securedht.cpp b/src/securedht.cpp index 636ac6bc84ee015dc8e7fc5d40ed81132e272e97..4be57f74ebb03c61a38be480db1821f90c6d31ed 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -284,6 +284,8 @@ SecureDht::getCallbackFilter(ValueCallback cb, Value::Filter&& filter) { return [=](const std::vector<Sp<Value>>& values, bool expired) { std::vector<Sp<Value>> tmpvals {}; + if (not filter) + tmpvals.reserve(values.size()); for (const auto& v : values) { if (auto nv = checkValue(v)) if (not filter or filter(*nv)) @@ -301,6 +303,8 @@ SecureDht::getCallbackFilter(GetCallback cb, Value::Filter&& filter) { return [=](const std::vector<Sp<Value>>& values) { std::vector<Sp<Value>> tmpvals {}; + if (not filter) + tmpvals.reserve(values.size()); for (const auto& v : values) { if (auto nv = checkValue(v)) if (not filter or filter(*nv)) diff --git a/src/value.cpp b/src/value.cpp index 6f9f5306d2796cc0349ced9fb42fa6d317219750..91bf520ddae62b93c0c6d2040f10c4040c615a47 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -288,7 +288,7 @@ FieldValue::getLocalFilter() const case Value::Field::UserType: return Value::UserTypeFilter(std::string {blobValue.begin(), blobValue.end()}); default: - return Value::AllFilter(); + return {}; } } @@ -497,8 +497,7 @@ template <typename T> bool subset(std::vector<T> fds, std::vector<T> qfds) { for (auto& fd : fds) { - auto correspondance = std::find_if(qfds.begin(), qfds.end(), [&fd](T& _vfd) { return fd == _vfd; }); - if (correspondance == qfds.end()) + if (std::find_if(qfds.begin(), qfds.end(), [&fd](T& _vfd) { return fd == _vfd; }) == qfds.end()) return false; } return true; @@ -506,10 +505,9 @@ bool subset(std::vector<T> fds, std::vector<T> qfds) bool Select::isSatisfiedBy(const Select& os) const { /* empty, means all values are selected. */ - if (fieldSelection_.empty() and not os.fieldSelection_.empty()) - return false; - else - return subset(fieldSelection_, os.fieldSelection_); + return fieldSelection_.empty() ? + os.fieldSelection_.empty() : + subset(fieldSelection_, os.fieldSelection_); } bool Where::isSatisfiedBy(const Where& ow) const { diff --git a/src/value_cache.h b/src/value_cache.h index 7853420510bad0a80b78e00c255bb9f385c20790..74d6ea9a77c8ee760d1ae1468bcdd5bcc520f874 100644 --- a/src/value_cache.h +++ b/src/value_cache.h @@ -22,19 +22,32 @@ namespace dht { using ValueStateCallback = std::function<void(const std::vector<Sp<Value>>&, bool)>; +enum class ListenSyncStatus { ADDED, SYNCED, UNSYNCED, REMOVED }; +using SyncCallback = std::function<void(ListenSyncStatus)>; using CallbackQueue = std::list<std::function<void()>>; class ValueCache { public: - ValueCache(ValueStateCallback&& cb) : callback(std::forward<ValueStateCallback>(cb)) {} - ValueCache(ValueCache&& o) : values(std::move(o.values)), callback(std::move(o.callback)) { + ValueCache(ValueStateCallback&& cb, SyncCallback&& scb = {}) + : callback(std::forward<ValueStateCallback>(cb)), syncCallback(std::move(scb)) + { + if (syncCallback) + syncCallback(ListenSyncStatus::ADDED); + } + ValueCache(ValueCache&& o) : values(std::move(o.values)), callback(std::move(o.callback)), syncCallback(std::move(o.syncCallback)) { o.callback = {}; + o.syncCallback = {}; } ~ValueCache() { auto q = clear(); for (auto& cb: q) cb(); + if (syncCallback) { + if (status == ListenSyncStatus::SYNCED) + syncCallback(ListenSyncStatus::UNSYNCED); + syncCallback(ListenSyncStatus::REMOVED); + } } CallbackQueue clear() { @@ -120,6 +133,15 @@ public: return ret; } + void onSynced(bool synced) { + auto newStatus = synced ? ListenSyncStatus::SYNCED : ListenSyncStatus::UNSYNCED; + if (status != newStatus) { + status = newStatus; + if (syncCallback) + syncCallback(newStatus); + } + } + private: // prevent copy ValueCache(const ValueCache&) = delete; @@ -141,6 +163,8 @@ private: std::map<Value::Id, CacheValueStorage> values; ValueStateCallback callback; + SyncCallback syncCallback; + ListenSyncStatus status {ListenSyncStatus::UNSYNCED}; CallbackQueue addValues(const std::vector<Sp<Value>>& new_values, const TypeStore& types, const time_point& now) { std::vector<Sp<Value>> nvals;