From 28b5893239b3e2c5c72eb8438706147f27831660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Mon, 4 May 2020 12:13:15 -0400 Subject: [PATCH] listener: fully migrate to requests This patch migrates tellListenerRefreshed and tellListenerExpired to the new behavior --- include/opendht/network_engine.h | 6 ++--- src/dht.cpp | 4 +-- src/network_engine.cpp | 46 ++++++++++++++++++++++++++++---- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index f0007b09..c4572339 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -256,8 +256,8 @@ public: std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6, std::vector<Sp<Value>>&& values, const Query& q, int version); - void tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values); - void tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values); + void tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version); + void tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version); bool isRunning(sa_family_t af) const; inline want_t want () const { return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; } @@ -516,8 +516,8 @@ private: unsigned get {0}; unsigned put {0}; unsigned listen {0}; - unsigned updateValue {0}; unsigned refresh {0}; + unsigned updateValue {0}; }; diff --git a/src/dht.cpp b/src/dht.cpp index cb970591..49ff12eb 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1312,7 +1312,7 @@ Dht::expireStore(decltype(store)::iterator i) id.toString().c_str(), node_listeners.first->toString().c_str()); Blob ntoken = makeToken(node_listeners.first->getAddr(), false); - network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids); + network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids, l.second.version); } } } @@ -2480,7 +2480,7 @@ Dht::storageRefresh(const InfoHash& id, Value::Id vid) id.toString().c_str(), node_listeners.first->toString().c_str()); Blob ntoken = makeToken(node_listeners.first->getAddr(), false); - network_engine.tellListenerRefreshed(node_listeners.first, l.first, id, ntoken, ids); + network_engine.tellListenerRefreshed(node_listeners.first, l.first, id, ntoken, ids, l.second.version); } } } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 53eea7ab..8e404717 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -127,7 +127,7 @@ NetworkEngine::tellListener(Sp<Node> node, Tid socket_id, const InfoHash& hash, { auto nnodes = bufferNodes(node->getFamily(), hash, want, nodes, nodes6); try { - if (version == 1) { + if (version >= 1) { sendUpdateValues(node, hash, values, scheduler.time(), ntoken, socket_id); } else { sendNodesValues(node->getAddr(), socket_id, nnodes.first, nnodes.second, values, query, ntoken); @@ -139,11 +139,12 @@ NetworkEngine::tellListener(Sp<Node> node, Tid socket_id, const InfoHash& hash, } void -NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values) +NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values, int version) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4+(config.network?1:0)); + + pk.pack_map(4 + (version >= 1 ? 2 : 0) + (config.network?1:0)); pk.pack(KEY_U); pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); @@ -165,16 +166,34 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, pk.pack(KEY_NETID); pk.pack(config.network); } + if (version >= 1) { + Tid tid (n->getNewTid()); + Tid sid (socket_id); + + pk.pack(KEY_REQ_SID); pk.pack(sid); + pk.pack(KEY_TID); pk.pack(tid); + + auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, + Blob(buffer.data(), buffer.data() + buffer.size()), + [=](const Request&, ParsedMessage&&) { /* on done */ }, + [=](const Request&, bool) { /* on expired */ } + ); + sendRequest(req); + ++out_stats.updateValue; + return; + } + // send response send(n->getAddr(), buffer.data(), buffer.size()); } void -NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values) +NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values, int version) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4+(config.network?1:0)); + + pk.pack_map(4 + (version >= 1 ? 2 : 0) + (config.network?1:0)); pk.pack(KEY_U); pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); @@ -196,6 +215,23 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c pk.pack(KEY_NETID); pk.pack(config.network); } + if (version >= 1) { + Tid tid (n->getNewTid()); + Tid sid (socket_id); + + pk.pack(KEY_REQ_SID); pk.pack(sid); + pk.pack(KEY_TID); pk.pack(tid); + + auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, + Blob(buffer.data(), buffer.data() + buffer.size()), + [=](const Request&, ParsedMessage&&) { /* on done */ }, + [=](const Request&, bool) { /* on expired */ } + ); + sendRequest(req); + ++out_stats.updateValue; + return; + } + // send response send(n->getAddr(), buffer.data(), buffer.size()); } -- GitLab