Skip to content
Snippets Groups Projects
Commit c8711080 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

network engine: send confirmation for value update

parent 15fbfdeb
No related branches found
No related tags found
No related merge requests found
...@@ -144,11 +144,14 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, ...@@ -144,11 +144,14 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&,
msgpack::sbuffer buffer; msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer); msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_map(4 + (version >= 1 ? 2 : 0) + (config.network?1:0)); pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0));
pk.pack(KEY_U); pk.pack(version >= 1 ? KEY_A : KEY_U);
pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0));
pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_ID); pk.pack(myid);
if (version >= 1) {
pk.pack(KEY_REQ_SID); pk.pack(socket_id);
}
if (not token.empty()) { if (not token.empty()) {
pk.pack(KEY_REQ_TOKEN); packToken(pk, token); pk.pack(KEY_REQ_TOKEN); packToken(pk, token);
} }
...@@ -159,8 +162,7 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, ...@@ -159,8 +162,7 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&,
logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size());
} }
pk.pack(KEY_TID); pk.pack(socket_id); pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R);
pk.pack(KEY_Y); pk.pack(KEY_R);
pk.pack(KEY_UA); pk.pack(my_v); pk.pack(KEY_UA); pk.pack(my_v);
if (config.network) { if (config.network) {
pk.pack(KEY_NETID); pk.pack(config.network); pk.pack(KEY_NETID); pk.pack(config.network);
...@@ -168,9 +170,8 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, ...@@ -168,9 +170,8 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&,
if (version >= 1) { if (version >= 1) {
Tid tid (n->getNewTid()); Tid tid (n->getNewTid());
Tid sid (socket_id);
pk.pack(KEY_REQ_SID); pk.pack(sid); pk.pack(KEY_Q); pk.pack(QUERY_UPDATE);
pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_TID); pk.pack(tid);
auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n,
...@@ -182,6 +183,7 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, ...@@ -182,6 +183,7 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&,
++out_stats.updateValue; ++out_stats.updateValue;
return; return;
} }
pk.pack(KEY_TID); pk.pack(socket_id);
// send response // send response
send(n->getAddr(), buffer.data(), buffer.size()); send(n->getAddr(), buffer.data(), buffer.size());
...@@ -193,11 +195,14 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c ...@@ -193,11 +195,14 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c
msgpack::sbuffer buffer; msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer); msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_map(4 + (version >= 1 ? 2 : 0) + (config.network?1:0)); pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0));
pk.pack(KEY_U); pk.pack(version >= 1 ? KEY_A : KEY_U);
pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0));
pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_ID); pk.pack(myid);
if (version >= 1) {
pk.pack(KEY_REQ_SID); pk.pack(socket_id);
}
if (not token.empty()) { if (not token.empty()) {
pk.pack(KEY_REQ_TOKEN); packToken(pk, token); pk.pack(KEY_REQ_TOKEN); packToken(pk, token);
} }
...@@ -208,8 +213,7 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c ...@@ -208,8 +213,7 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c
logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size());
} }
pk.pack(KEY_TID); pk.pack(socket_id); pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R);
pk.pack(KEY_Y); pk.pack(KEY_R);
pk.pack(KEY_UA); pk.pack(my_v); pk.pack(KEY_UA); pk.pack(my_v);
if (config.network) { if (config.network) {
pk.pack(KEY_NETID); pk.pack(config.network); pk.pack(KEY_NETID); pk.pack(config.network);
...@@ -217,9 +221,8 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c ...@@ -217,9 +221,8 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c
if (version >= 1) { if (version >= 1) {
Tid tid (n->getNewTid()); Tid tid (n->getNewTid());
Tid sid (socket_id);
pk.pack(KEY_REQ_SID); pk.pack(sid); pk.pack(KEY_Q); pk.pack(QUERY_UPDATE);
pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_TID); pk.pack(tid);
auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n,
...@@ -231,6 +234,7 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c ...@@ -231,6 +234,7 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c
++out_stats.updateValue; ++out_stats.updateValue;
return; return;
} }
pk.pack(KEY_TID); pk.pack(socket_id);
// send response // send response
send(n->getAddr(), buffer.data(), buffer.size()); send(n->getAddr(), buffer.data(), buffer.size());
...@@ -653,6 +657,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro ...@@ -653,6 +657,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro
rsocket->on_receive(node, std::move(*msg)); rsocket->on_receive(node, std::move(*msg));
else if (logger_) else if (logger_)
logger_->e(msg->info_hash, node->id, "[node %s] 'update' request without socket for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); logger_->e(msg->info_hash, node->id, "[node %s] 'update' request without socket for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
sendListenConfirmation(from, msg->tid);
break; break;
} }
default: default:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment