/* * Copyright (C) 2014-2022 Savoir-faire Linux Inc. * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. */ #include "network_engine.h" #include "request.h" #include "default_types.h" #include "log_enable.h" #include "parsed_message.h" #include <msgpack.hpp> namespace dht { namespace net { using namespace std::chrono_literals; const std::string DhtProtocolException::GET_NO_INFOHASH {"Get_values with no info_hash"}; const std::string DhtProtocolException::LISTEN_NO_INFOHASH {"Listen with no info_hash"}; const std::string DhtProtocolException::LISTEN_WRONG_TOKEN {"Listen with wrong token"}; const std::string DhtProtocolException::PUT_NO_INFOHASH {"Put with no info_hash"}; const std::string DhtProtocolException::PUT_WRONG_TOKEN {"Put with wrong token"}; const std::string DhtProtocolException::PUT_INVALID_ID {"Put with invalid id"}; const std::string DhtProtocolException::STORAGE_NOT_FOUND {"Access operation for unknown storage"}; constexpr std::chrono::seconds NetworkEngine::UDP_REPLY_TIME; constexpr std::chrono::seconds NetworkEngine::RX_MAX_PACKET_TIME; constexpr std::chrono::seconds NetworkEngine::RX_TIMEOUT; const std::string NetworkEngine::my_v {"RNG1"}; static constexpr uint8_t v4prefix[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0}; constexpr unsigned SEND_NODES {8}; struct NetworkEngine::PartialMessage { SockAddr from; time_point start; time_point last_part; std::unique_ptr<ParsedMessage> msg; }; std::vector<Blob> serializeValues(const std::vector<Sp<Value>>& st) { std::vector<Blob> svals; svals.reserve(st.size()); for (const auto& v : st) svals.emplace_back(packMsg(v)); return svals; } void packToken(msgpack::packer<msgpack::sbuffer>& pk, const Blob& token) { pk.pack_bin(token.size()); pk.pack_bin_body((char*)token.data(), token.size()); } RequestAnswer::RequestAnswer(ParsedMessage&& msg) : ntoken(std::move(msg.token)), values(std::move(msg.values)), refreshed_values(std::move(msg.refreshed_values)), expired_values(std::move(msg.expired_values)), fields(std::move(msg.fields)), nodes4(std::move(msg.nodes4)), nodes6(std::move(msg.nodes6)) {} NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c, std::unique_ptr<DatagramSocket>&& sock, const Sp<Logger>& log, std::mt19937_64& rand, Scheduler& scheduler, decltype(NetworkEngine::onError)&& onError, decltype(NetworkEngine::onNewNode)&& onNewNode, decltype(NetworkEngine::onReportedAddr)&& onReportedAddr, decltype(NetworkEngine::onPing)&& onPing, decltype(NetworkEngine::onFindNode)&& onFindNode, decltype(NetworkEngine::onGetValues)&& onGetValues, decltype(NetworkEngine::onListen)&& onListen, decltype(NetworkEngine::onAnnounce)&& onAnnounce, decltype(NetworkEngine::onRefresh)&& onRefresh) : onError(std::move(onError)), onNewNode(std::move(onNewNode)), onReportedAddr(std::move(onReportedAddr)), onPing(std::move(onPing)), onFindNode(std::move(onFindNode)), onGetValues(std::move(onGetValues)), onListen(std::move(onListen)), onAnnounce(std::move(onAnnounce)), onRefresh(std::move(onRefresh)), myid(myid), config(c), dht_socket(std::move(sock)), logger_(log), rd(rand), cache(rd), rate_limiter(config.max_req_per_sec), scheduler(scheduler) {} NetworkEngine::~NetworkEngine() { clear(); } void NetworkEngine::tellListener(const Sp<Node>& node, Tid socket_id, const InfoHash& hash, want_t want, const Blob& ntoken, std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6, std::vector<Sp<Value>>&& values, const Query& query, int version) { auto nnodes = bufferNodes(node->getFamily(), hash, want, nodes, nodes6); try { if (version >= 1) { sendUpdateValues(node, hash, values, scheduler.time(), ntoken, socket_id); } else { sendNodesValues(node->getAddr(), socket_id, nnodes.first, nnodes.second, values, query, ntoken); } } catch (const std::overflow_error& e) { if (logger_) logger_->e("Can't send value: buffer not large enough !"); } } void NetworkEngine::tellListenerRefreshed(const Sp<Node>& n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values, int version) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0)); pk.pack(version >= 1 ? KEY_A : KEY_U); pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0)); pk.pack(KEY_REQ_ID); pk.pack(myid); if (version >= 1) { pk.pack(KEY_REQ_SID); pk.pack(socket_id); } if (not token.empty()) { pk.pack(KEY_REQ_TOKEN); packToken(pk, token); } if (not values.empty()) { pk.pack(KEY_REQ_REFRESHED); pk.pack(values); if (logger_) logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); } pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } if (version >= 1) { Tid tid (n->getNewTid()); pk.pack(KEY_Q); pk.pack(QUERY_UPDATE); pk.pack(KEY_TID); pk.pack(tid); auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request&, ParsedMessage&&) { /* on done */ }, [=](const Request&, bool) { /* on expired */ } ); sendRequest(req); ++out_stats.updateValue; return; } pk.pack(KEY_TID); pk.pack(socket_id); // send response send(n->getAddr(), buffer.data(), buffer.size()); } void NetworkEngine::tellListenerExpired(const Sp<Node>& n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values, int version) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0)); pk.pack(version >= 1 ? KEY_A : KEY_U); pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0)); pk.pack(KEY_REQ_ID); pk.pack(myid); if (version >= 1) { pk.pack(KEY_REQ_SID); pk.pack(socket_id); } if (not token.empty()) { pk.pack(KEY_REQ_TOKEN); packToken(pk, token); } if (not values.empty()) { pk.pack(KEY_REQ_EXPIRED); pk.pack(values); if (logger_) logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); } pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } if (version >= 1) { Tid tid (n->getNewTid()); pk.pack(KEY_Q); pk.pack(QUERY_UPDATE); pk.pack(KEY_TID); pk.pack(tid); auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request&, ParsedMessage&&) { /* on done */ }, [=](const Request&, bool) { /* on expired */ } ); sendRequest(req); ++out_stats.updateValue; return; } pk.pack(KEY_TID); pk.pack(socket_id); // send response send(n->getAddr(), buffer.data(), buffer.size()); } bool NetworkEngine::isRunning(sa_family_t af) const { switch (af) { case 0: return dht_socket->hasIPv4() or dht_socket->hasIPv6(); case AF_INET: return dht_socket->hasIPv4(); case AF_INET6: return dht_socket->hasIPv6(); default: return false; } } void NetworkEngine::clear() { for (auto& request : requests) { request.second->cancel(); request.second->node->setExpired(); } requests.clear(); } void NetworkEngine::connectivityChanged(sa_family_t af) { cache.clearBadNodes(af); } void NetworkEngine::requestStep(Sp<Request> sreq) { auto& req = *sreq; if (not req.pending()) return; auto now = scheduler.time(); auto& node = *req.node; if (req.isExpired(now)) { // if (logger_) // logger_->d(node.id, "[node %s] expired !", node.toString().c_str()); node.setExpired(); if (not node.id) requests.erase(req.tid); return; } else if (req.attempt_count == 1) { req.on_expired(req, false); } auto err = send(node.getAddr(), (char*)req.msg.data(), req.msg.size(), node.getReplyTime() < now - UDP_REPLY_TIME); if (err == ENETUNREACH || err == EHOSTUNREACH || err == EAFNOSUPPORT || err == EPIPE || err == EPERM) { node.setExpired(); if (not node.id) requests.erase(req.tid); } else { req.last_try = now; if (err != EAGAIN) { ++req.attempt_count; req.attempt_duration += req.attempt_duration + uniform_duration_distribution<>(0ms, ((duration)Node::MAX_RESPONSE_TIME)/4)(rd); if (not req.parts.empty()){ sendValueParts(req.tid, req.parts, node.getAddr()); } } std::weak_ptr<Request> wreq = sreq; scheduler.add(req.last_try + req.attempt_duration, [this,wreq] { if (auto req = wreq.lock()) requestStep(req); }); } } /** * Sends a request to a node. Request::MAX_ATTEMPT_COUNT attempts will * be made before the request expires. */ void NetworkEngine::sendRequest(const Sp<Request>& request) { auto& node = *request->node; if (not node.id) requests.emplace(request->tid, request); request->start = scheduler.time(); node.requested(request); requestStep(request); } /* Rate control for requests we receive. */ bool NetworkEngine::rateLimit(const SockAddr& addr) { const auto& now = scheduler.time(); // occasional IP limiter maintenance (a few times every second at max rate) if (limiter_maintenance++ == config.max_peer_req_per_sec) { for (auto it = address_rate_limiter.begin(); it != address_rate_limiter.end();) { if (it->second.maintain(now) == 0) address_rate_limiter.erase(it++); else ++it; } limiter_maintenance = 0; } // invoke per IP, then global rate limiter return (config.max_peer_req_per_sec < 0 or address_rate_limiter .emplace(addr, config.max_peer_req_per_sec).first->second .limit(now)) and rate_limiter.limit(now); } bool NetworkEngine::isMartian(const SockAddr& addr) { if (addr.getPort() == 0) return true; switch(addr.getFamily()) { case AF_INET: { const auto& sin = addr.getIPv4(); const uint8_t* address = (const uint8_t*)&sin.sin_addr; return (address[0] == 0) || ((address[0] & 0xE0) == 0xE0); } case AF_INET6: { if (addr.getLength() < sizeof(sockaddr_in6)) return true; const auto& sin6 = addr.getIPv6(); const uint8_t* address = (const uint8_t*)&sin6.sin6_addr; return address[0] == 0xFF || (address[0] == 0xFE && (address[1] & 0xC0) == 0x80) || memcmp(address, InfoHash::zero().data(), 16) == 0 || memcmp(address, v4prefix, 12) == 0; } default: return true; } } /* The internal blacklist is an LRU cache of nodes that have sent incorrect messages. */ void NetworkEngine::blacklistNode(const Sp<Node>& n) { n->setExpired(); blacklist.emplace(n->getAddr()); } bool NetworkEngine::isNodeBlacklisted(const SockAddr& addr) const { return blacklist.find(addr) != blacklist.end(); } void NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f) { auto from = f.getMappedIPv4(); if (isMartian(from)) { if (logger_) logger_->w("Received packet from martian node %s", from.toString().c_str()); return; } if (isNodeBlacklisted(from)) { if (logger_) logger_->w("Received packet from blacklisted node %s", from.toString().c_str()); return; } auto msg = std::make_unique<ParsedMessage>(); try { msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen); msg->msgpack_unpack(msg_res.get()); } catch (const std::exception& e) { if (logger_) logger_->w("Can't parse message of size %lu: %s", buflen, e.what()); // if (logger_) // logger_->DBG.logPrintable(buf, buflen); return; } if (msg->network != config.network) { if (logger_) logger_->d("Received message from other config.network %u", msg->network); return; } const auto& now = scheduler.time(); // partial value data if (msg->type == MessageType::ValueData) { auto pmsg_it = partial_messages.find(msg->tid); if (pmsg_it == partial_messages.end()) { if (logIncoming_) if (logger_) logger_->d("Can't find partial message"); rateLimit(from); return; } if (!pmsg_it->second.from.equals(from)) { if (logger_) logger_->d("Received partial message data from unexpected IP address"); rateLimit(from); return; } // append data block if (pmsg_it->second.msg->append(*msg)) { pmsg_it->second.last_part = now; // check data completion if (pmsg_it->second.msg->complete()) { try { // process the full message process(std::move(pmsg_it->second.msg), from); partial_messages.erase(pmsg_it); } catch (...) { return; } } else scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, msg->tid)); } return; } if (msg->id == myid or not msg->id) { if (logger_) logger_->d("Received message from self"); return; } if (msg->type > MessageType::Reply) { /* Rate limit requests. */ if (!rateLimit(from)) { if (logger_) logger_->w("Dropping request due to rate limiting"); return; } } if (msg->value_parts.empty()) { try { process(std::move(msg), from); } catch(...) { return; } } else { // starting partial message session auto k = msg->tid; auto& pmsg = partial_messages[k]; if (not pmsg.msg) { pmsg.from = from; pmsg.msg = std::move(msg); pmsg.start = now; pmsg.last_part = now; scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, k)); scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, k)); } else if (logger_) logger_->e("Partial message with given TID already exists"); } } void NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from) { const auto& now = scheduler.time(); auto node = cache.getNode(msg->id, from, now, true, msg->is_client); if (msg->type == MessageType::ValueUpdate) { auto rsocket = node->getSocket(msg->tid); if (not rsocket) throw DhtProtocolException {DhtProtocolException::UNKNOWN_TID, "Can't find socket", msg->id}; node->received(now, {}); onNewNode(node, 2); deserializeNodes(*msg, from); rsocket->on_receive(node, std::move(*msg)); } else if (msg->type == MessageType::Error or msg->type == MessageType::Reply) { auto rsocket = node->getSocket(msg->tid); auto req = node->getRequest(msg->tid); /* either response for a request or data for an opened socket */ if (not req and not rsocket) { auto req_it = requests.find(msg->tid); if (req_it != requests.end() and not req_it->second->node->id) { req = req_it->second; req->node = node; requests.erase(req_it); } else { node->received(now, req); if (not node->isClient()) onNewNode(node, 1); if (logger_) logger_->d(node->id, "[node %s] can't find transaction with id %u", node->toString().c_str(), msg->tid); return; } } node->received(now, req); if (not node->isClient()) onNewNode(node, 2); onReportedAddr(msg->id, msg->addr); if (req and (req->cancelled() or req->expired() or req->completed())) { if (logger_) logger_->w(node->id, "[node %s] response to expired, cancelled or completed request", node->toString().c_str()); return; } switch (msg->type) { case MessageType::Error: { if (msg->id and req and ( (msg->error_code == DhtProtocolException::NOT_FOUND and req->getType() == MessageType::Refresh) or (msg->error_code == DhtProtocolException::UNAUTHORIZED and (req->getType() == MessageType::AnnounceValue or req->getType() == MessageType::Listen)))) { req->last_try = time_point::min(); req->reply_time = time_point::min(); if (not req->setError(DhtProtocolException {msg->error_code})) onError(req, DhtProtocolException {msg->error_code}); } else { if (logIncoming_) if (logger_) logger_->w(msg->id, "[node %s %s] received unknown error message %u", msg->id.toString().c_str(), from.toString().c_str(), msg->error_code); } break; } case MessageType::Reply: if (req) { /* request reply */ auto& r = *req; if (r.getType() == MessageType::AnnounceValue or r.getType() == MessageType::Listen or r.getType() == MessageType::Refresh) { r.node->authSuccess(); } r.reply_time = scheduler.time(); deserializeNodes(*msg, from); r.setDone(std::move(*msg)); break; } else { /* request socket data */ deserializeNodes(*msg, from); rsocket->on_receive(node, std::move(*msg)); } break; default: break; } } else { node->received(now, {}); if (not node->isClient()) onNewNode(node, 1); try { switch (msg->type) { case MessageType::Ping: ++in_stats.ping; if (logIncoming_) if (logger_) logger_->d(node->id, "[node %s] sending pong", node->toString().c_str()); onPing(node); sendPong(from, msg->tid); break; case MessageType::FindNode: { // if (logger_) // logger_->d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want); ++in_stats.find; RequestAnswer answer = onFindNode(node, msg->target, msg->want); auto nnodes = bufferNodes(from.getFamily(), msg->target, msg->want, answer.nodes4, answer.nodes6); sendNodesValues(from, msg->tid, nnodes.first, nnodes.second, {}, {}, answer.ntoken); break; } case MessageType::GetValues: { // if (logger_) // logger_->d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.get; RequestAnswer answer = onGetValues(node, msg->info_hash, msg->want, msg->query); auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); sendNodesValues(from, msg->tid, nnodes.first, nnodes.second, answer.values, msg->query, answer.ntoken); break; } case MessageType::AnnounceValue: { if (logIncoming_ and logger_) logger_->d(msg->info_hash, node->id, "[node %s] got 'put' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.put; onAnnounce(node, msg->info_hash, msg->token, msg->values, msg->created); /* Note that if storageStore failed, we lie to the requestor. This is to prevent them from backtracking, and hence polluting the DHT. */ for (auto& v : msg->values) { sendValueAnnounced(from, msg->tid, v->id); } break; } case MessageType::Refresh: if (logIncoming_ and logger_) logger_->d(msg->info_hash, node->id, "[node %s] got 'refresh' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); onRefresh(node, msg->info_hash, msg->token, msg->value_id); /* Same note as above in MessageType::AnnounceValue applies. */ sendValueAnnounced(from, msg->tid, msg->value_id); break; case MessageType::Listen: { if (logIncoming_ and logger_) logger_->d(msg->info_hash, node->id, "[node %s] got 'listen' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.listen; RequestAnswer answer = onListen(node, msg->info_hash, msg->token, msg->socket_id, std::move(msg->query), msg->version); auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); sendListenConfirmation(from, msg->tid); break; } case MessageType::UpdateValue: { if (logIncoming_ and logger_) logger_->d(msg->info_hash, node->id, "[node %s] got 'update' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.updateValue; if (auto rsocket = node->getSocket(msg->socket_id)) rsocket->on_receive(node, std::move(*msg)); else if (logger_) logger_->e(msg->info_hash, node->id, "[node %s] 'update' request without socket for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); sendListenConfirmation(from, msg->tid); break; } default: break; } } catch (const std::overflow_error& e) { if (logger_) logger_->e("Can't send value: buffer not large enough !"); } catch (const DhtProtocolException& e) { sendError(from, msg->tid, e.getCode(), e.getMsg().c_str(), true); } } } void insertAddr(msgpack::packer<msgpack::sbuffer>& pk, const SockAddr& addr) { size_t addr_len = std::min<size_t>(addr.getLength(), (addr.getFamily() == AF_INET) ? sizeof(in_addr) : sizeof(in6_addr)); void* addr_ptr = (addr.getFamily() == AF_INET) ? (void*)&addr.getIPv4().sin_addr : (void*)&addr.getIPv6().sin6_addr; pk.pack("sa"); pk.pack_bin(addr_len); pk.pack_bin_body((char*)addr_ptr, addr_len); } int NetworkEngine::send(const SockAddr& addr, const char *buf, size_t len, bool confirmed) { return dht_socket ? dht_socket->sendTo(addr, (const uint8_t*)buf, len, confirmed) : ENOTCONN; } Sp<Request> NetworkEngine::sendPing(const Sp<Node>& node, RequestCb&& on_done, RequestExpiredCb&& on_expired) { Tid tid (node->getNewTid()); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); pk.pack(KEY_A); pk.pack_map(1); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_Q); pk.pack(QUERY_PING); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::Ping, tid, node, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&&) { if (logger_) logger_->d(req_status.node->id, "[node %s] got pong !", req_status.node->toString().c_str()); if (on_done) { on_done(req_status, {}); } }, [=](const Request& req_status, bool done) { /* on expired */ if (on_expired) { on_expired(req_status, done); } } ); sendRequest(req); ++out_stats.ping; return req; } void NetworkEngine::sendPong(const SockAddr& addr, Tid tid) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4+(config.network?1:0)); pk.pack(KEY_R); pk.pack_map(2); pk.pack(KEY_REQ_ID); pk.pack(myid); insertAddr(pk, addr); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } send(addr, buffer.data(), buffer.size()); } Sp<Request> NetworkEngine::sendFindNode(const Sp<Node>& n, const InfoHash& target, want_t want, RequestCb&& on_done, RequestExpiredCb&& on_expired) { Tid tid (n->getNewTid()); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); pk.pack(KEY_A); pk.pack_map(2 + (want>0?1:0)); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_TARGET); pk.pack(target); if (want > 0) { pk.pack(KEY_REQ_WANT); pk.pack_array(((want & WANT4)?1:0) + ((want & WANT6)?1:0)); if (want & WANT4) pk.pack(AF_INET); if (want & WANT6) pk.pack(AF_INET6); } pk.pack(KEY_Q); pk.pack(QUERY_FIND); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::FindNode, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (on_done) { on_done(req_status, {std::forward<ParsedMessage>(msg)}); } }, [=](const Request& req_status, bool done) { /* on expired */ if (on_expired) { on_expired(req_status, done); } } ); sendRequest(req); ++out_stats.find; return req; } Sp<Request> NetworkEngine::sendGetValues(const Sp<Node>& n, const InfoHash& info_hash, const Query& query, want_t want, RequestCb&& on_done, RequestExpiredCb&& on_expired) { Tid tid (n->getNewTid()); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); unsigned sendQuery = (not query.where.empty() or not query.select.empty()) ? 1 : 0; unsigned sendWant = (want > 0) ? 1 : 0; pk.pack(KEY_A); pk.pack_map(2 + sendQuery + sendWant); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_H); pk.pack(info_hash); if (sendQuery) { pk.pack(KEY_Q); pk.pack(query); } if (sendWant) { pk.pack(KEY_REQ_WANT); unsigned sendWant4 = (want & WANT4) ? 1 : 0; unsigned sendWant6 = (want & WANT6) ? 1 : 0; pk.pack_array(sendWant4 + sendWant6); if (sendWant4) pk.pack(AF_INET); if (sendWant6) pk.pack(AF_INET6); } pk.pack(KEY_Q); pk.pack(QUERY_GET); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::GetValues, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (on_done) { on_done(req_status, {std::forward<ParsedMessage>(msg)}); } }, [=](const Request& req_status, bool done) { /* on expired */ if (on_expired) { on_expired(req_status, done); } } ); sendRequest(req); ++out_stats.get; return req; } SockAddr deserializeIPv4(const uint8_t* ni) { SockAddr addr; addr.setFamily(AF_INET); auto& sin = addr.getIPv4(); std::memcpy(&sin.sin_addr, ni, 4); std::memcpy(&sin.sin_port, ni + 4, 2); return addr; } SockAddr deserializeIPv6(const uint8_t* ni) { SockAddr addr; addr.setFamily(AF_INET6); auto& sin6 = addr.getIPv6(); std::memcpy(&sin6.sin6_addr, ni, 16); std::memcpy(&sin6.sin6_port, ni + 16, 2); return addr; } void NetworkEngine::deserializeNodes(ParsedMessage& msg, const SockAddr& from) { if (msg.nodes4_raw.size() % NODE4_INFO_BUF_LEN != 0 || msg.nodes6_raw.size() % NODE6_INFO_BUF_LEN != 0) { throw DhtProtocolException {DhtProtocolException::WRONG_NODE_INFO_BUF_LEN}; } // deserialize nodes const auto& now = scheduler.time(); for (unsigned i = 0, n = msg.nodes4_raw.size() / NODE4_INFO_BUF_LEN; i < n; i++) { const uint8_t* ni = msg.nodes4_raw.data() + i * NODE4_INFO_BUF_LEN; const auto& ni_id = *reinterpret_cast<const InfoHash*>(ni); if (ni_id == myid) continue; SockAddr addr = deserializeIPv4(ni + ni_id.size()); if (addr.isLoopback() and from.getFamily() == AF_INET) { auto port = addr.getPort(); addr = from; addr.setPort(port); } if (isMartian(addr) || isNodeBlacklisted(addr)) continue; msg.nodes4.emplace_back(cache.getNode(ni_id, addr, now, false)); onNewNode(msg.nodes4.back(), 0); } for (unsigned i = 0, n = msg.nodes6_raw.size() / NODE6_INFO_BUF_LEN; i < n; i++) { const uint8_t* ni = msg.nodes6_raw.data() + i * NODE6_INFO_BUF_LEN; const auto& ni_id = *reinterpret_cast<const InfoHash*>(ni); if (ni_id == myid) continue; SockAddr addr = deserializeIPv6(ni + ni_id.size()); if (addr.isLoopback() and from.getFamily() == AF_INET6) { auto port = addr.getPort(); addr = from; addr.setPort(port); } if (isMartian(addr) || isNodeBlacklisted(addr)) continue; msg.nodes6.emplace_back(cache.getNode(ni_id, addr, now, false)); onNewNode(msg.nodes6.back(), 0); } } std::vector<Blob> NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<Sp<Value>>& st) { auto svals = serializeValues(st); size_t total_size = 0; for (const auto& v : svals) total_size += v.size(); msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack(KEY_REQ_VALUES); pk.pack_array(svals.size()); // try to put everything in a single UDP packet if (svals.size() < 50 && total_size < MAX_PACKET_VALUE_SIZE) { for (const auto& b : svals) buffer.write((const char*)b.data(), b.size()); // if (logger_) // logger_->d("sending %lu bytes of values", total_size); svals.clear(); } else { for (const auto& b : svals) pk.pack(b.size()); } return svals; } void NetworkEngine::sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr) { msgpack::sbuffer buffer; unsigned i=0; for (const auto& v: svals) { size_t start {0}, end; do { end = std::min(start + MTU, v.size()); buffer.clear(); msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(3+(config.network?1:0)); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } pk.pack(KEY_Y); pk.pack(KEY_V); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_V); pk.pack_map(1); pk.pack(i); pk.pack_map(2); pk.pack("o"sv); pk.pack(start); pk.pack("d"sv); pk.pack_bin(end-start); pk.pack_bin_body((const char*)v.data()+start, end-start); send(addr, buffer.data(), buffer.size()); start = end; } while (start != v.size()); i++; } } void NetworkEngine::sendNodesValues(const SockAddr& addr, Tid tid, const Blob& nodes, const Blob& nodes6, const std::vector<Sp<Value>>& st, const Query& query, const Blob& token) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4+(config.network?1:0)); pk.pack(KEY_R); pk.pack_map(2 + (not st.empty()?1:0) + (nodes.size()>0?1:0) + (nodes6.size()>0?1:0) + (not token.empty()?1:0)); pk.pack(KEY_REQ_ID); pk.pack(myid); insertAddr(pk, addr); if (nodes.size() > 0) { pk.pack(KEY_REQ_NODES4); pk.pack_bin(nodes.size()); pk.pack_bin_body((const char*)nodes.data(), nodes.size()); } if (nodes6.size() > 0) { pk.pack(KEY_REQ_NODES6); pk.pack_bin(nodes6.size()); pk.pack_bin_body((const char*)nodes6.data(), nodes6.size()); } if (not token.empty()) { pk.pack(KEY_REQ_TOKEN); packToken(pk, token); } std::vector<Blob> svals {}; if (not st.empty()) { /* pack complete values */ if (query.select.empty()) { svals = packValueHeader(buffer, st); } else { /* pack fields */ auto fields = query.select.getSelection(); pk.pack(KEY_REQ_FIELDS); pk.pack_map(2); pk.pack("f"sv); pk.pack(fields); pk.pack("v"sv); pk.pack_array(st.size()*fields.size()); for (const auto& v : st) v->msgpack_pack_fields(fields, pk); //DHT_LOG_DBG("sending closest nodes (%d+%d nodes.), %u value headers containing %u fields", // nodes.size(), nodes6.size(), st.size(), fields.size()); } } pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } // send response send(addr, buffer.data(), buffer.size()); // send parts if (not svals.empty()) sendValueParts(tid, svals, addr); } Blob NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes) { std::sort(nodes.begin(), nodes.end(), [&](const Sp<Node>& a, const Sp<Node>& b){ return id.xorCmp(a->id, b->id) < 0; }); size_t nnode = std::min<size_t>(SEND_NODES, nodes.size()); Blob bnodes; if (af == AF_INET) { bnodes.resize(NODE4_INFO_BUF_LEN * nnode); for (size_t i=0; i<nnode; i++) { const Node& n = *nodes[i]; const auto& sin = n.getAddr().getIPv4(); auto dest = bnodes.data() + NODE4_INFO_BUF_LEN * i; memcpy(dest, n.id.data(), HASH_LEN); memcpy(dest + HASH_LEN, &sin.sin_addr, sizeof(in_addr)); memcpy(dest + HASH_LEN + sizeof(in_addr), &sin.sin_port, sizeof(in_port_t)); } } else if (af == AF_INET6) { bnodes.resize(NODE6_INFO_BUF_LEN * nnode); for (size_t i=0; i<nnode; i++) { const Node& n = *nodes[i]; const auto& sin6 = n.getAddr().getIPv6(); auto dest = bnodes.data() + NODE6_INFO_BUF_LEN * i; memcpy(dest, n.id.data(), HASH_LEN); memcpy(dest + HASH_LEN, &sin6.sin6_addr, sizeof(in6_addr)); memcpy(dest + HASH_LEN + sizeof(in6_addr), &sin6.sin6_port, sizeof(in_port_t)); } } return bnodes; } std::pair<Blob, Blob> NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, want_t want, std::vector<Sp<Node>>& nodes4, std::vector<Sp<Node>>& nodes6) { if (want < 0) want = af == AF_INET ? WANT4 : WANT6; Blob bnodes4; if (want & WANT4) bnodes4 = bufferNodes(AF_INET, id, nodes4); Blob bnodes6; if (want & WANT6) bnodes6 = bufferNodes(AF_INET6, id, nodes6); return {std::move(bnodes4), std::move(bnodes6)}; } Sp<Request> NetworkEngine::sendListen(const Sp<Node>& n, const InfoHash& hash, const Query& query, const Blob& token, Tid socketId, RequestCb&& on_done, RequestExpiredCb&& on_expired) { Tid tid (n->getNewTid()); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); auto has_query = not query.where.empty() or not query.select.empty(); pk.pack(KEY_A); pk.pack_map(5 + has_query); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_VERSION); pk.pack(1); pk.pack(KEY_REQ_H); pk.pack(hash); pk.pack(KEY_REQ_TOKEN); packToken(pk, token); pk.pack(KEY_REQ_SID); pk.pack(socketId); if (has_query) { pk.pack(KEY_REQ_QUERY); pk.pack(query); } pk.pack(KEY_Q); pk.pack(QUERY_LISTEN); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::Listen, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (on_done) on_done(req_status, {std::forward<ParsedMessage>(msg)}); }, [=](const Request& req_status, bool done) { /* on expired */ if (on_expired) on_expired(req_status, done); } ); sendRequest(req); ++out_stats.listen; return req; } void NetworkEngine::sendListenConfirmation(const SockAddr& addr, Tid tid) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4+(config.network?1:0)); pk.pack(KEY_R); pk.pack_map(2); pk.pack(KEY_REQ_ID); pk.pack(myid); insertAddr(pk, addr); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } send(addr, buffer.data(), buffer.size()); } Sp<Request> NetworkEngine::sendAnnounceValue(const Sp<Node>& n, const InfoHash& infohash, const Sp<Value>& value, time_point created, const Blob& token, RequestCb&& on_done, RequestExpiredCb&& on_expired) { Tid tid (n->getNewTid()); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); bool add_created = created < scheduler.time(); pk.pack(KEY_A); pk.pack_map(add_created ? 5 : 4); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_H); pk.pack(infohash); auto v = packValueHeader(buffer, {value}); if (add_created) { pk.pack(KEY_REQ_CREATION); pk.pack(to_time_t(created)); } pk.pack(KEY_REQ_TOKEN); pk.pack(token); pk.pack(KEY_Q); pk.pack(QUERY_PUT); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::AnnounceValue, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { if (logger_) logger_->d(infohash, "Unknown search or announce!"); } else { if (on_done) { RequestAnswer answer {}; answer.vid = msg.value_id; on_done(req_status, std::move(answer)); } } }, [=](const Request& req_status, bool done) { /* on expired */ if (on_expired) { on_expired(req_status, done); } } ); req->parts = std::move(v); sendRequest(req); ++out_stats.put; return req; } Sp<Request> NetworkEngine::sendUpdateValues(const Sp<Node>& n, const InfoHash& infohash, const std::vector<Sp<Value>>& values, time_point created, const Blob& token, const size_t& socket_id) { Tid tid (n->getNewTid()); Tid sid (socket_id); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); pk.pack(KEY_A); pk.pack_map((created < scheduler.time() ? 7 : 6)); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_VERSION); pk.pack(1); pk.pack(KEY_REQ_H); pk.pack(infohash); pk.pack(KEY_REQ_SID); pk.pack(sid); auto v = packValueHeader(buffer, values); if (created < scheduler.time()) { pk.pack(KEY_REQ_CREATION); pk.pack(to_time_t(created)); } pk.pack(KEY_REQ_TOKEN); pk.pack(token); pk.pack(KEY_Q); pk.pack(QUERY_UPDATE); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request&, ParsedMessage&&) { /* on done */ }, [=](const Request&, bool) { /* on expired */ } ); req->parts = std::move(v); sendRequest(req); ++out_stats.updateValue; return req; } Sp<Request> NetworkEngine::sendRefreshValue(const Sp<Node>& n, const InfoHash& infohash, const Value::Id& vid, const Blob& token, RequestCb&& on_done, RequestErrorCb&& on_error, RequestExpiredCb&& on_expired) { Tid tid (n->getNewTid()); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(config.network?1:0)); pk.pack(KEY_A); pk.pack_map(4); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_H); pk.pack(infohash); pk.pack(KEY_REQ_VALUE_ID); pk.pack(vid); pk.pack(KEY_REQ_TOKEN); pk.pack(token); pk.pack(KEY_Q); pk.pack(QUERY_REFRESH); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_Q); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } auto req = std::make_shared<Request>(MessageType::Refresh, tid, n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { if (logger_) logger_->d(infohash, "Unknown search or announce!"); } else { if (on_done) { RequestAnswer answer {}; answer.vid = msg.value_id; on_done(req_status, std::move(answer)); } } }, on_error, [=](const Request& req_status, bool done) { /* on expired */ if (on_expired) { on_expired(req_status, done); } } ); sendRequest(req); ++out_stats.refresh; return req; } void NetworkEngine::sendValueAnnounced(const SockAddr& addr, Tid tid, Value::Id vid) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4+(config.network?1:0)); pk.pack(KEY_R); pk.pack_map(3); pk.pack(KEY_REQ_ID); pk.pack(myid); pk.pack(KEY_REQ_VALUE_ID); pk.pack(vid); insertAddr(pk, addr); pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_R); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } send(addr, buffer.data(), buffer.size()); } void NetworkEngine::sendError(const SockAddr& addr, Tid tid, uint16_t code, const std::string& message, bool include_id) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(4 + (include_id?1:0) + (config.network?1:0)); pk.pack(KEY_E); pk.pack_array(2); pk.pack(code); pk.pack(message); if (include_id) { pk.pack(KEY_R); pk.pack_map(1); pk.pack(KEY_REQ_ID); pk.pack(myid); } pk.pack(KEY_TID); pk.pack(tid); pk.pack(KEY_Y); pk.pack(KEY_E); pk.pack(KEY_UA); pk.pack(my_v); if (config.network) { pk.pack(KEY_NETID); pk.pack(config.network); } send(addr, buffer.data(), buffer.size()); } void NetworkEngine::maintainRxBuffer(Tid tid) { auto msg = partial_messages.find(tid); if (msg != partial_messages.end()) { const auto& now = scheduler.time(); if (msg->second.start + RX_MAX_PACKET_TIME < now || msg->second.last_part + RX_TIMEOUT < now) { if (logger_) logger_->w("Dropping expired partial message from %s", msg->second.from.toString().c_str()); partial_messages.erase(msg); } } } } /* namespace net */ } /* namespace dht */