diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 3d8a4157b8f5b805185769e7c5b0e2078bb3168c..0aa6c9b739e34f2cdfb03996b9edfb28ea3900c4 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -332,13 +332,16 @@ public: want_t want, RequestCb on_done, RequestExpiredCb on_expired); + std::shared_ptr<Request> sendListen(std::shared_ptr<Node> n, const InfoHash& infohash, const Query& query, const Blob& token, + std::shared_ptr<Request> previous, RequestCb on_done, RequestExpiredCb on_expired); + std::shared_ptr<Request> sendAnnounceValue(std::shared_ptr<Node> n, const InfoHash& infohash, @@ -534,7 +537,7 @@ private: // requests handling uint16_t transaction_id {1}; - std::map<uint16_t, std::shared_ptr<Request>> requests {}; + std::map<TransId, std::shared_ptr<Request>> requests {}; std::map<TransId, PartialMessage> partial_messages; MessageStats in_stats {}, out_stats {}; std::set<SockAddr> blacklist {}; diff --git a/src/dht.cpp b/src/dht.cpp index 5c15e8c33fe308021ae0bfe5be5cab9966638927..14a03b4957da9d5b8e2adc6aec8fd57b0d0d7d1c 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -116,7 +116,7 @@ struct Dht::ValueStorage { struct Dht::Storage { time_point maintenance_time {}; - std::map<std::shared_ptr<Node>, Listener> listeners {}; + std::map<std::shared_ptr<Node>, std::map<TransId, Listener>> listeners {}; std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; @@ -2294,8 +2294,9 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s return; st = store.emplace(id, Storage(now)).first; } - auto l = st->second.listeners.find(node); - if (l == st->second.listeners.end()) { + auto l = st->second.listeners.emplace(node).first; + auto trans = l->second.find(rid); + if (trans == l->second.end()) { auto vals = st->second.get(query.where.getFilter()); if (not vals.empty()) { network_engine.tellListener(node, rid, id, WANT4 | WANT6, makeToken((sockaddr*)&node->addr.first, false), @@ -2305,7 +2306,7 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s st->second.listeners.emplace(node, Listener {rid, now, std::forward<Query>(query)}); } else - l->second.refresh(rid, now); + trans->second.refresh(rid, now, query); } void