diff --git a/src/dht.cpp b/src/dht.cpp index 58804df31f0f664632fc175149a8e46eeeb4b2d1..5124f8b09aeb4d8e8f4c0cc54e1e56f6bf169878 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -836,41 +836,28 @@ Dht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where where) { scheduler.syncTime(); - Query q {{}, where}; - auto vals = std::make_shared<std::map<Value::Id, Sp<Value>>>(); auto token = ++listener_token; - auto gcb = OpValueCache::cacheCallback(std::move(cb), [this, id, token]{ cancelListen(id, token); }); - auto query = std::make_shared<Query>(q); - auto filter = f.chain(q.where.getFilter()); - size_t tokenlocal = 0; + auto query = std::make_shared<Query>(Select{}, std::move(where)); + auto filter = f.chain(query->where.getFilter()); auto st = store.find(id); if (st == store.end() && store.size() < MAX_HASHES) st = store.emplace(id, scheduler.time() + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME).first; + + size_t tokenlocal = 0; if (st != store.end()) { - if (not st->second.empty()) { - std::vector<Sp<Value>> newvals = st->second.get(filter); - if (not newvals.empty()) { - if (!gcb(newvals, false)) - return 0; - for (const auto& v : newvals) { - auto it = vals->emplace(v->id, v); - if (not it.second) - it.first->second = v; - } - } - } - tokenlocal = ++st->second.listener_token; - st->second.local_listeners.emplace(tokenlocal, LocalListener{query, filter, gcb}); + tokenlocal = st->second.listen(gcb, filter, query); + if (tokenlocal == 0) + return 0; } auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query); auto token6 = token4 == 0 ? 0 : Dht::listenTo(id, AF_INET6, gcb, filter, query); if (token6 == 0 && st != store.end()) { - st->second.local_listeners.erase(tokenlocal); + st->second.cancelListen(tokenlocal); return 0; } @@ -889,15 +876,17 @@ Dht::cancelListen(const InfoHash& id, size_t token) return false; } DHT_LOG.d(id, "cancelListen %s with token %d", id.toString().c_str(), token); - auto st = store.find(id); - auto tokenlocal = std::get<0>(it->second); - if (st != store.end() && tokenlocal) - st->second.local_listeners.erase(tokenlocal); - + if (auto tokenlocal = std::get<0>(it->second)) { + auto st = store.find(id); + if (st != store.end()) + st->second.cancelListen(tokenlocal); + } auto searches_cancel_listen = [this,&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) { - auto srp = srs.find(id); - if (srp != srs.end() and token) - srp->second->cancelListen(token, scheduler); + if (token) { + auto srp = srs.find(id); + if (srp != srs.end()) + srp->second->cancelListen(token, scheduler); + } }; searches_cancel_listen(searches4, std::get<1>(it->second)); searches_cancel_listen(searches6, std::get<2>(it->second)); @@ -1276,16 +1265,16 @@ Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_ return; st = store.emplace(id, now).first; } - auto node_listeners = st->second.listeners.emplace(node, std::map<size_t, Listener> {}).first; - auto l = node_listeners->second.find(socket_id); - if (l == node_listeners->second.end()) { + auto& node_listeners = st->second.listeners[node]; + auto l = node_listeners.find(socket_id); + if (l == node_listeners.end()) { auto vals = st->second.get(query.where.getFilter()); if (not vals.empty()) { network_engine.tellListener(node, socket_id, id, WANT4 | WANT6, makeToken(node->getAddr(), false), buckets4.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), std::move(vals), query); } - node_listeners->second.emplace(socket_id, Listener {now, std::forward<Query>(query)}); + node_listeners.emplace(socket_id, Listener {now, std::forward<Query>(query)}); } else l->second.refresh(now, std::forward<Query>(query)); diff --git a/src/storage.h b/src/storage.h index be554e393099c1b3b8b542197fde31209466ba01..bc4fc076484836e5edc0d0991f58961307e3d275 100644 --- a/src/storage.h +++ b/src/storage.h @@ -153,6 +153,12 @@ struct Storage { return time_point::max(); } + size_t listen(ValueCallback& cb, Value::Filter& f, const Sp<Query>& q); + + void cancelListen(size_t token) { + local_listeners.erase(token); + } + StoreDiff remove(const InfoHash& id, Value::Id); std::pair<ssize_t, std::vector<Sp<Value>>> expire(const InfoHash& id, time_point now); @@ -166,6 +172,22 @@ private: }; +size_t +Storage::listen(ValueCallback& gcb, Value::Filter& filter, const Sp<Query>& query) +{ + if (not empty()) { + std::vector<Sp<Value>> newvals = get(filter); + if (not newvals.empty()) { + if (!gcb(newvals, false)) + return 0; + } + } + auto tokenlocal = ++listener_token; + local_listeners.emplace(tokenlocal, LocalListener{query, filter, gcb}); + return tokenlocal; +} + + std::pair<ValueStorage*, Storage::StoreDiff> Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, time_point expiration, StorageBucket* sb) {