diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 8e404717b1186fc3c7b1afee27f627c677ebb4db..2323476b93d65cfbfb38b0ae9739ba04ad1e1387 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -144,23 +144,25 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4 + (version >= 1 ? 2 : 0) + (config.network?1:0)); + pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0)); - pk.pack(KEY_U); - pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); - pk.pack(KEY_REQ_ID); pk.pack(myid); - if (not token.empty()) { - pk.pack(KEY_REQ_TOKEN); packToken(pk, token); - } - if (not values.empty()) { - pk.pack(KEY_REQ_REFRESHED); - pk.pack(values); - if (logger_) - logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); - } + pk.pack(version >= 1 ? KEY_A : KEY_U); + pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0)); + pk.pack(KEY_REQ_ID); pk.pack(myid); + if (version >= 1) { + pk.pack(KEY_REQ_SID); pk.pack(socket_id); + } + if (not token.empty()) { + pk.pack(KEY_REQ_TOKEN); packToken(pk, token); + } + if (not values.empty()) { + pk.pack(KEY_REQ_REFRESHED); + pk.pack(values); + if (logger_) + logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); + } - pk.pack(KEY_TID); pk.pack(socket_id); - pk.pack(KEY_Y); pk.pack(KEY_R); + pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); @@ -168,9 +170,8 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, if (version >= 1) { Tid tid (n->getNewTid()); - Tid sid (socket_id); - pk.pack(KEY_REQ_SID); pk.pack(sid); + pk.pack(KEY_Q); pk.pack(QUERY_UPDATE); pk.pack(KEY_TID); pk.pack(tid); auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, @@ -182,6 +183,7 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, ++out_stats.updateValue; return; } + pk.pack(KEY_TID); pk.pack(socket_id); // send response send(n->getAddr(), buffer.data(), buffer.size()); @@ -193,23 +195,25 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4 + (version >= 1 ? 2 : 0) + (config.network?1:0)); + pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0)); - pk.pack(KEY_U); - pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); - pk.pack(KEY_REQ_ID); pk.pack(myid); - if (not token.empty()) { - pk.pack(KEY_REQ_TOKEN); packToken(pk, token); - } - if (not values.empty()) { - pk.pack(KEY_REQ_EXPIRED); - pk.pack(values); - if (logger_) - logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); - } + pk.pack(version >= 1 ? KEY_A : KEY_U); + pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0)); + pk.pack(KEY_REQ_ID); pk.pack(myid); + if (version >= 1) { + pk.pack(KEY_REQ_SID); pk.pack(socket_id); + } + if (not token.empty()) { + pk.pack(KEY_REQ_TOKEN); packToken(pk, token); + } + if (not values.empty()) { + pk.pack(KEY_REQ_EXPIRED); + pk.pack(values); + if (logger_) + logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); + } - pk.pack(KEY_TID); pk.pack(socket_id); - pk.pack(KEY_Y); pk.pack(KEY_R); + pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); @@ -217,10 +221,9 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c if (version >= 1) { Tid tid (n->getNewTid()); - Tid sid (socket_id); - pk.pack(KEY_REQ_SID); pk.pack(sid); - pk.pack(KEY_TID); pk.pack(tid); + pk.pack(KEY_Q); pk.pack(QUERY_UPDATE); + pk.pack(KEY_TID); pk.pack(tid); auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), @@ -231,6 +234,7 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c ++out_stats.updateValue; return; } + pk.pack(KEY_TID); pk.pack(socket_id); // send response send(n->getAddr(), buffer.data(), buffer.size()); @@ -653,6 +657,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro rsocket->on_receive(node, std::move(*msg)); else if (logger_) logger_->e(msg->info_hash, node->id, "[node %s] 'update' request without socket for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); + sendListenConfirmation(from, msg->tid); break; } default: