From ba2e179abe10a092b774db543aa1adb2a75e508c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Sat, 16 Feb 2019 18:12:26 -0500 Subject: [PATCH] dht: optimize get --- src/dht.cpp | 89 +++++++++++++++++++++++++++-------------------------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/src/dht.cpp b/src/dht.cpp index 8dd9fdb0..e37c5808 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -914,7 +914,7 @@ struct OpStatus { template <typename T> struct GetStatus : public OpStatus { - std::vector<Sp<T>> values; + T values; std::vector<Sp<Node>> nodes; }; @@ -973,20 +973,16 @@ void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, G } } -template <typename T, typename Cb> -bool callbackWrapper(Cb get_cb, - DoneCallback done_cb, - const std::vector<Sp<T>>& values, - std::function<std::vector<Sp<T>>(const std::vector<Sp<T>>&)> add_values, - Sp<GetStatus<T>> o) +template <typename T, typename St, typename Cb, typename Av, typename Cv> +bool callbackWrapper(Cb get_cb, DoneCallback done_cb, const std::vector<Sp<T>>& values, + Av add_values, Cv cache_values, GetStatus<St>& op) { - auto& op = *o; if (op.status.done) return false; auto newvals = add_values(values); if (not newvals.empty()) { op.status.ok = !get_cb(newvals); - op.values.insert(op.values.end(), newvals.begin(), newvals.end()); + cache_values(newvals); } doneCallbackWrapper(done_cb, {}, op); return !op.status.ok; @@ -998,23 +994,26 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt scheduler.syncTime(); auto q = std::make_shared<Query>(Select {}, std::move(where)); - auto op = std::make_shared<GetStatus<Value>>(); + auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>(); auto f = filter.chain(q->where.getFilter()); - auto add_values = [op,f](const std::vector<Sp<Value>>& values) { - std::vector<Sp<Value>> newvals {}; - for (const auto& v : values) { - auto it = std::find_if(op->values.cbegin(), op->values.cend(), [&](const Sp<Value>& sv) { - return sv == v or *sv == *v; - }); - if (it == op->values.cend()) { - if (not f or f(*v)) - newvals.push_back(v); + auto gcb = [getcb, donecb, op, f](const std::vector<Sp<Value>>& vals) { + auto& o = *op; + return callbackWrapper(getcb, donecb, vals, [&o,&f](const std::vector<Sp<Value>>& values) { + std::vector<Sp<Value>> newvals {}; + for (const auto& v : values) { + auto it = o.values.find(v->id); + if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) { + if (not f or f(*v)) + newvals.push_back(v); + } } - } - return newvals; + return newvals; + }, [&o](const std::vector<Sp<Value>>& newvals) { + for (const auto& v : newvals) + o.values[v->id] = v; + }, o); }; - auto gcb = std::bind(callbackWrapper<Value, GetCallback>, getcb, donecb, _1, add_values, op); /* Try to answer this search locally. */ gcb(getLocal(id, f)); @@ -1034,36 +1033,39 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q) { scheduler.syncTime(); - auto op = std::make_shared<GetStatus<FieldValueIndex>>(); - + auto op = std::make_shared<GetStatus<std::vector<Sp<FieldValueIndex>>>>(); auto f = q.where.getFilter(); - auto values = getLocal(id, f); - auto add_fields = [=](const std::vector<Sp<FieldValueIndex>>& fields) { - std::vector<Sp<FieldValueIndex>> newvals {}; - for (const auto& f : fields) { - auto it = std::find_if(op->values.cbegin(), op->values.cend(), - [&](const Sp<FieldValueIndex>& sf) { - return sf == f or f->containedIn(*sf); - }); - if (it == op->values.cend()) { - auto lesser = std::find_if(op->values.begin(), op->values.end(), + auto qcb = [cb, done_cb, op](const std::vector<Sp<FieldValueIndex>>& fields){ + auto& o = *op; + return callbackWrapper(cb, done_cb, fields, [&](const std::vector<Sp<FieldValueIndex>>& fields) { + std::vector<Sp<FieldValueIndex>> newvals {}; + for (const auto& f : fields) { + auto it = std::find_if(o.values.cbegin(), o.values.cend(), [&](const Sp<FieldValueIndex>& sf) { - return sf->containedIn(*f); + return sf == f or f->containedIn(*sf); }); - if (lesser != op->values.end()) - op->values.erase(lesser); - newvals.push_back(f); + if (it == o.values.cend()) { + auto lesser = std::find_if(o.values.begin(), o.values.end(), + [&](const Sp<FieldValueIndex>& sf) { + return sf->containedIn(*f); + }); + if (lesser != o.values.end()) + o.values.erase(lesser); + newvals.push_back(f); + } } - } - return newvals; + return newvals; + }, [&](const std::vector<Sp<FieldValueIndex>>& fields){ + o.values.insert(o.values.end(), fields.begin(), fields.end()); + }, o); }; + + /* Try to answer this search locally. */ + auto values = getLocal(id, f); std::vector<Sp<FieldValueIndex>> local_fields(values.size()); std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) { return std::make_shared<FieldValueIndex>(*v, q.select); }); - auto qcb = std::bind(callbackWrapper<FieldValueIndex, QueryCallback>, cb, done_cb, _1, add_fields, op); - - /* Try to answer this search locally. */ qcb(local_fields); auto sq = std::make_shared<Query>(std::move(q)); @@ -1171,6 +1173,7 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newVa if (not st.local_listeners.empty()) { DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs; + cbs.reserve(st.local_listeners.size()); for (const auto& l : st.local_listeners) { std::vector<Sp<Value>> vals; if (not l.second.filter or l.second.filter(*v.data)) -- GitLab