diff --git a/include/opendht/dht.h b/include/opendht/dht.h index d3385fa68861663789864376aa30b38b4150b9c5..4672f68ab5745f2d2429ff6d4ab0d67e67bddc03 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -27,7 +27,6 @@ #include "network_engine.h" #include "scheduler.h" #include "routing_table.h" -#include "node_cache.h" #include "callbacks.h" #include <string> @@ -251,9 +250,6 @@ public: return {total_store_size, total_values}; } - /* This must be provided by the user. */ - static bool isBlacklisted(const sockaddr*, socklen_t) { return false; } - std::vector<Address> getPublicAddress(sa_family_t family = 0); protected: @@ -291,10 +287,6 @@ private: static constexpr std::chrono::seconds REANNOUNCE_MARGIN {5}; - /* The maximum number of nodes that we snub. There is probably little - reason to increase this value. */ - static constexpr unsigned BLACKLISTED_MAX {10}; - static constexpr size_t TOKEN_SIZE {64}; struct SearchNode { @@ -603,9 +595,6 @@ private: // registred types std::map<ValueType::Id, ValueType> types; - // cache of nodes not in the main routing table but used for searches - NodeCache cache; - // are we a bootstrap node ? // note: Any running node can be used as a bootstrap node. // Only nodes running only as bootstrap nodes should @@ -623,7 +612,6 @@ private: std::map<InfoHash, std::shared_ptr<Search>> searches4 {}; std::map<InfoHash, std::shared_ptr<Search>> searches6 {}; - //std::map<std::shared_ptr<NetworkEngine::Request>, std::shared_ptr<Search>> searches {}; /* not used for now */ uint16_t search_id {0}; // map a global listen token to IPv4, IPv6 specific listen tokens. @@ -631,9 +619,6 @@ private: std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {}; size_t listener_token {1}; - sockaddr_storage blacklist[BLACKLISTED_MAX] {}; - unsigned next_blacklisted = 0; - // timing Scheduler scheduler {}; std::shared_ptr<Scheduler::Job> nextNodesConfirmation {}; @@ -700,15 +685,11 @@ private: void dumpBucket(const Bucket& b, std::ostream& out) const; // Nodes - std::shared_ptr<Node> newNode(const InfoHash& id, const sockaddr*, socklen_t, int confirm); + std::shared_ptr<Node> newNode(const std::shared_ptr<Node>& node, int confirm); std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af); const std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af) const; bool trySearchInsert(const std::shared_ptr<Node>& node); - void blacklistNode(const InfoHash* id, const sockaddr*, socklen_t); - bool isNodeBlacklisted(const sockaddr*, socklen_t) const; - static bool isMartian(const sockaddr*, socklen_t); - // Searches /** diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index d9e22a865d361d2abbdd07d0a530776a8da2e448..6cb501f656133aa71097528e04ba80250b86e7ae 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -21,6 +21,7 @@ #pragma once +#include "node_cache.h" #include "value.h" #include "infohash.h" #include "node.h" @@ -157,8 +158,8 @@ class NetworkEngine final { Blob token; /* security token */ Value::Id value_id; /* the value id */ time_point created { time_point::max() }; /* time when value was first created */ - Blob nodes4; /* IPv4 nodes in response to a 'find' request */ - Blob nodes6; /* IPv6 nodes in response to a 'find' request */ + Blob nodes4_raw, nodes6_raw; /* IPv4 nodes in response to a 'find' request */ + std::vector<std::shared_ptr<Node>> nodes4, nodes6; std::vector<std::shared_ptr<Value>> values; /* values for a 'get' request */ want_t want; /* states if ipv4 or ipv6 request */ uint16_t error_code; /* error code in case of error */ @@ -176,11 +177,14 @@ public: * and looking up the response from a node. */ struct RequestAnswer { - Blob ntoken; - Value::Id vid; - std::vector<std::shared_ptr<Value>> values; - std::vector<std::shared_ptr<Node>> nodes; - std::vector<std::shared_ptr<Node>> nodes6; + Blob ntoken {}; + Value::Id vid {}; + std::vector<std::shared_ptr<Value>> values {}; + std::vector<std::shared_ptr<Node>> nodes4 {}; + std::vector<std::shared_ptr<Node>> nodes6 {}; + RequestAnswer() {} + RequestAnswer(ParsedMessage&& msg) + : ntoken(std::move(msg.token)), values(std::move(msg.values)), nodes4(std::move(msg.nodes4)), nodes6(std::move(msg.nodes6)) {} }; @@ -264,6 +268,8 @@ public: } } + void connectivityChanged(); + private: /** @@ -283,7 +289,7 @@ private: * @param saddr_len (type: socklen_t) lenght of the sockaddr struct. * @param confirm (type: int) 1 if the node sent a message, 2 if it sent us a reply. */ - std::function<std::shared_ptr<Node>(const InfoHash&, const sockaddr*, socklen_t, int)> onNewNode; + std::function<std::shared_ptr<Node>(const std::shared_ptr<Node>&, int)> onNewNode; /** * @brief when an addres is reported from a distant node. * @@ -442,7 +448,11 @@ public: * @param fromlen The length of the corresponding sockaddr structure. * @param now The time to adjust the clock in the network engine. */ - void processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen); + void processMessage(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen); + + std::shared_ptr<Node> insertNode(const InfoHash& myid, const sockaddr* from, socklen_t fromlen) { + return cache.getNode(myid, from, fromlen, scheduler.time(), 0); + } std::vector<unsigned> getNodeMessageStats(bool in) { auto stats = in ? std::vector<unsigned>{in_stats.ping, in_stats.find, in_stats.get, in_stats.listen, in_stats.put} @@ -453,6 +463,21 @@ public: return stats; } + void blacklistNode(const std::shared_ptr<Node>& n) { + for (auto rit = requests.begin(); rit != requests.end();) { + if (rit->second->node == n) { + rit->second->cancel(); + requests.erase(rit++); + } else { + ++rit; + } + } + //blacklistedNodes.emplace(n); + memcpy(&blacklist[next_blacklisted], &n->ss, n->sslen); + next_blacklisted = (next_blacklisted + 1) % BLACKLISTED_MAX; + //blacklistNode(&n->id, (const sockaddr*)&n->ss, n->sslen); + } + private: /*************** * Constants * @@ -464,6 +489,9 @@ private: static const constexpr size_t NODE6_INFO_BUF_LEN {38}; /* TODO */ static constexpr std::chrono::seconds UDP_REPLY_TIME {15}; + /* The maximum number of nodes that we snub. There is probably little + reason to increase this value. */ + static constexpr unsigned BLACKLISTED_MAX {10}; /* TODO */ static const std::string my_v; @@ -473,8 +501,16 @@ private: const int dht_socket6 {-1}; const Logger& DHT_LOG; + NodeCache cache {}; + sockaddr_storage blacklist[BLACKLISTED_MAX] {}; + unsigned next_blacklisted = 0; + bool rateLimit(); + static bool isMartian(const sockaddr* sa, socklen_t len); + //void blacklistNode(const InfoHash* id, const sockaddr*, socklen_t); + bool isNodeBlacklisted(const sockaddr*, socklen_t) const; + void pinged(Node&); void requestStep(std::shared_ptr<Request> req) { @@ -572,7 +608,7 @@ private: const std::string& message, bool include_id=false); - RequestAnswer deserializeNodesValues(ParsedMessage& msg); + void deserializeNodesValues(ParsedMessage& msg); std::queue<time_point> rate_limit_time {}; static std::mt19937 rd_device; diff --git a/include/opendht/routing_table.h b/include/opendht/routing_table.h index f78babd6870293ceaec992ec44491c3a87367fc9..5823999f42e1f12aa105c52d19ee517ed4998710 100644 --- a/include/opendht/routing_table.h +++ b/include/opendht/routing_table.h @@ -31,8 +31,7 @@ struct Bucket { InfoHash first {}; time_point time {time_point::min()}; /* time of last reply in this bucket */ std::list<std::shared_ptr<Node>> nodes {}; - sockaddr_storage cached; /* the address of a likely candidate */ - socklen_t cachedlen {0}; + std::shared_ptr<Node> cached; /* the address of a likely candidate */ /** Return a random node in a bucket. */ std::shared_ptr<Node> randomNode(); diff --git a/src/dht.cpp b/src/dht.cpp index 426ddcc0d4c00d668f93776355efa25a6487f92e..f250207e36f86215a4c52379ddc682841e05dc3d 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -70,10 +70,6 @@ set_nonblocking(int fd, int nonblocking) static std::mt19937 rd {dht::crypto::random_device{}()}; static std::uniform_int_distribution<uint8_t> rand_byte; -static const uint8_t v4prefix[16] = { - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0 -}; - static std::string to_hex(const uint8_t *buf, size_t buflen) { @@ -138,40 +134,6 @@ Dht::shutdown(ShutdownCallback cb) { bool Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); } -bool -Dht::isMartian(const sockaddr *sa, socklen_t len) -{ - // Check that sa_family can be accessed safely - if (!sa || len < sizeof(sockaddr_in)) - return true; - - switch(sa->sa_family) { - case AF_INET: { - sockaddr_in *sin = (sockaddr_in*)sa; - const uint8_t *address = (const uint8_t*)&sin->sin_addr; - return sin->sin_port == 0 || - (address[0] == 0) || - (address[0] == 127) || - ((address[0] & 0xE0) == 0xE0); - } - case AF_INET6: { - if (len < sizeof(sockaddr_in6)) - return true; - sockaddr_in6 *sin6 = (sockaddr_in6*)sa; - const uint8_t *address = (const uint8_t*)&sin6->sin6_addr; - return sin6->sin6_port == 0 || - (address[0] == 0xFF) || - (address[0] == 0xFE && (address[1] & 0xC0) == 0x80) || - (memcmp(address, zeroes.data(), 15) == 0 && - (address[15] == 0 || address[15] == 1)) || - (memcmp(address, v4prefix, 12) == 0); - } - - default: - return true; - } -} - /* Every bucket contains an unordered list of nodes. */ std::shared_ptr<Node> Dht::findNode(const InfoHash& id, sa_family_t af) @@ -200,59 +162,15 @@ int Dht::sendCachedPing(Bucket& b) { /* We set family to 0 when there's no cached node. */ - if (b.cached.ss_family == 0) + if (!b.cached) return 0; DHT_LOG.DEBUG("Sending ping to cached node."); - network_engine.sendPing((sockaddr*)&b.cached, b.cachedlen, nullptr, nullptr); - b.cached.ss_family = 0; - b.cachedlen = 0; + network_engine.sendPing(b.cached, nullptr, nullptr); + b.cached = {}; return 0; } -/* The internal blacklist is an LRU cache of nodes that have sent - incorrect messages. */ -void -Dht::blacklistNode(const InfoHash* id, const sockaddr *sa, socklen_t salen) -{ - DHT_LOG.WARN("Blacklisting broken node."); - - if (id) { - auto n = findNode(*id, sa->sa_family); - /* Discard it from any searches in progress. */ - auto black_list_in = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { - for (auto& srp : srs) { - auto& sr = srp.second; - sr->nodes.erase(std::partition(sr->nodes.begin(), sr->nodes.end(), [&](SearchNode& sn) { - return sn.node != n; - }), sr->nodes.end()); - } - }; - black_list_in(searches4); - black_list_in(searches6); - } - /* And make sure we don't hear from it again. */ - memcpy(&blacklist[next_blacklisted], sa, salen); - next_blacklisted = (next_blacklisted + 1) % BLACKLISTED_MAX; -} - -bool -Dht::isNodeBlacklisted(const sockaddr *sa, socklen_t salen) const -{ - if (salen > sizeof(sockaddr_storage)) - return true; - - if (isBlacklisted(sa, salen)) - return true; - - for (unsigned i = 0; i < BLACKLISTED_MAX; i++) { - if (memcmp(&blacklist[i], sa, salen) == 0) - return true; - } - - return false; -} - std::vector<Address> Dht::getPublicAddress(sa_family_t family) { @@ -302,52 +220,40 @@ Dht::reportedAddr(const sockaddr *sa, socklen_t sa_len) /* We just learnt about a node, not necessarily a new one. Confirm is 1 if the node sent a message, 2 if it sent us a reply. */ std::shared_ptr<Node> -Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confirm) +Dht::newNode(const std::shared_ptr<Node>& node, int confirm) { - const auto& now = scheduler.time(); - if (id == myid || isMartian(sa, salen) || isNodeBlacklisted(sa, salen)) - return nullptr; - - auto& list = sa->sa_family == AF_INET ? buckets : buckets6; - auto b = list.findBucket(id); + auto& list = node->getFamily() == AF_INET ? buckets : buckets6; + auto b = list.findBucket(node->id); if (b == list.end()) - return nullptr; - - bool mybucket = list.contains(b, myid); + return {}; for (auto& n : b->nodes) { - if (n->id != id) continue; - /* Known node. Update stuff. */ - - if (confirm || n->time + Node::NODE_EXPIRE_TIME < now) { - n->update(sa, salen); - if (confirm) { - n->received(now, confirm >= 2); - /* If this node existed in searches but was expired, give it another chance. */ - trySearchInsert(n); - } + if (n == node) { + if (confirm) + trySearchInsert(node); + return n; } - return n; } /* New node. */ + /* Try adding the node to searches */ + trySearchInsert(node); + const auto& now = scheduler.time(); + bool mybucket = list.contains(b, myid); if (mybucket) { - if (sa->sa_family == AF_INET) + if (node->getFamily() == AF_INET) mybucket_grow_time = now; else mybucket6_grow_time = now; //scheduler.edit(nextNodesConfirmation, now); } - /* First, try to get rid of a known-bad node. */ + /* Try to get rid of an expired node. */ for (auto& n : b->nodes) { if (not n->isExpired(now)) continue; - n = cache.getNode(id, sa, salen, now, confirm); - - /* Try adding the node to searches */ - trySearchInsert(n); + n = node; return n; } @@ -373,24 +279,18 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir DHT_LOG.DEBUG("Splitting from depth %u", list.depth(b)); sendCachedPing(*b); list.split(b); - return newNode(id, sa, salen, confirm); + return newNode(node, 0); } /* No space for this node. Cache it away for later. */ - if (confirm || b->cached.ss_family == 0) { - memcpy(&b->cached, sa, salen); - b->cachedlen = salen; - } - auto cn = cache.getNode(id, sa, salen, now, confirm); - trySearchInsert(cn); - return cn; + if (confirm or not b->cached) + b->cached = node; + } else { + /* Create a new node. */ + b->nodes.emplace_front(node); } - /* Create a new node. */ - auto cn = cache.getNode(id, sa, salen, now, confirm); - b->nodes.emplace_front(cn); - trySearchInsert(cn); - return cn; + return node; } /* Called periodically to purge known-bad nodes. Note that we're very @@ -435,6 +335,9 @@ Dht::Search::removeExpiredNode(time_point now) bool Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& token) { + if (expired and nodes.empty()) + return false; + if (node->ss.ss_family != af) { //DHT_LOG.DEBUG("Attempted to insert node in the wrong family."); return false; @@ -577,6 +480,10 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update if (auto sr = ws.lock()) { auto srn = sr->getNode(status->node); if (srn and not srn->candidate) { + DHT_LOG.DEBUG("[search %s IPv%c] [node %s %s] 'get' expired", + sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', + srn->node->id.toString().c_str(), + print_addr(srn->node->ss, srn->node->sslen).c_str()); if (not over) { srn->candidate = true; //DHT_LOG.DEBUG("[search %s] sn %s now candidate... %d", @@ -1628,7 +1535,7 @@ Dht::connectivityChanged() mybucket_grow_time = now; mybucket6_grow_time = now; reported_addr.clear(); - cache.clearBadNodes(); + network_engine.connectivityChanged(); auto stop_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { for (auto& sp : srs) for (auto& sn : sp.second->nodes) @@ -1642,7 +1549,7 @@ void Dht::rotateSecrets() { const auto& now = scheduler.time(); - uniform_duration_distribution<> time_dist(std::chrono::seconds(45), std::chrono::minutes(1)); + uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45)); auto rotate_secrets_time = now + time_dist(rd); oldsecret = secret; @@ -1720,7 +1627,7 @@ Dht::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_retu dubious++; } } - if (b.cached.ss_family > 0) + if (b.cached) cached++; } if (good_return) @@ -1740,7 +1647,7 @@ Dht::dumpBucket(const Bucket& b, std::ostream& out) const const auto& now = scheduler.time(); using namespace std::chrono; out << b.first << " count " << b.nodes.size() << " age " << duration_cast<seconds>(now - b.time).count() << " sec"; - if (b.cached.ss_family) + if (b.cached) out << " (cached)"; out << std::endl; for (auto& n : b.nodes) { @@ -1923,7 +1830,7 @@ Dht::Dht(int s, int s6, Config config) : myid(config.node_id), is_bootstrap(config.is_bootstrap), network_engine(myid, s, s6, DHT_LOG, scheduler, std::bind(&Dht::onError, this, _1, _2), - std::bind(&Dht::newNode, this, _1, _2, _3, _4), + std::bind(&Dht::newNode, this, _1, _2), std::bind(&Dht::onReportedAddr, this, _1, _2, _3), std::bind(&Dht::onPing, this, _1), std::bind(&Dht::onFindNode, this, _1, _2, _3), @@ -2122,26 +2029,18 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc if (buflen == 0) return; - if (isMartian(from, fromlen)) - return; - - if (isNodeBlacklisted(from, fromlen)) { - DHT_LOG.DEBUG("Received packet from blacklisted node."); - return; - } - try { network_engine.processMessage(buf, buflen, from, fromlen); } catch (DhtProtocolException& e) { DHT_LOG.ERROR("Can't parse message from %s: %s", e.getNodeId().toString().c_str(), e.what()); - auto code = e.getCode(); - if (code == DhtProtocolException::INVALID_TID_SIZE or code == DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) { + //auto code = e.getCode(); + //if (code == DhtProtocolException::INVALID_TID_SIZE or code == DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) { /* This is really annoying, as it means that we will time-out all our searches that go through this node. Kill it. */ - const auto& id = e.getNodeId(); - blacklistNode(&id, from, fromlen); - } + //const auto& id = e.getNodeId(); + //blacklistNode(&id, from, fromlen); + ///} } } @@ -2300,12 +2199,12 @@ Dht::exportNodes() } bool -Dht::insertNode(const InfoHash& id, const sockaddr *sa, socklen_t salen) +Dht::insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen) { if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) return false; scheduler.syncTime(); - auto n = newNode(id, sa, salen, 0); + auto n = network_engine.insertNode(id, sa, salen); return !!n; } @@ -2358,52 +2257,42 @@ NetworkEngine::RequestAnswer Dht::onFindNode(std::shared_ptr<Node> node, InfoHash& target, want_t want) { const auto& now = scheduler.time(); - Blob ntoken = makeToken((sockaddr*)&node->ss, false); - std::vector<std::shared_ptr<Node>> nodes, nodes6; - if (want & WANT4) { - auto tmp = buckets.findClosestNodes(target, now, TARGET_NODES); - nodes.insert(nodes.begin(), tmp.begin(), tmp.end()); - } - if (want & WANT6) { - auto tmp = buckets6.findClosestNodes(target, now, TARGET_NODES); - nodes6.insert(nodes6.begin(), tmp.begin(), tmp.end()); - } - return NetworkEngine::RequestAnswer {ntoken, Value::INVALID_ID, {}, std::move(nodes), std::move(nodes6)}; + NetworkEngine::RequestAnswer answer; + answer.ntoken = makeToken((sockaddr*)&node->ss, false); + if (want & WANT4) + answer.nodes4 = buckets.findClosestNodes(target, now, TARGET_NODES); + if (want & WANT6) + answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES); + return answer; } NetworkEngine::RequestAnswer Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t) { - const auto& now = scheduler.time(); - std::shared_ptr<NetworkEngine::RequestAnswer> answer {}; if (hash == zeroes) { DHT_LOG.WARN("[node %s %s] Eek! Got get_values with no info_hash.", node->id.toString().c_str(), print_addr(node->ss, node->sslen).c_str()); throw DhtProtocolException {DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, DhtProtocolException::GET_NO_INFOHASH}; + } + const auto& now = scheduler.time(); + NetworkEngine::RequestAnswer answer {}; + auto st = findStorage(hash); + answer.ntoken = makeToken((sockaddr*)&node->ss, false); + answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES); + answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); + if (st != store.end() && not st->empty()) { + auto values = st->getValues(); + answer.values.resize(values.size()); + std::transform(values.begin(), values.end(), answer.values.begin(), [](const ValueStorage& vs) { + return vs.data; + }); + DHT_LOG.DEBUG("[node %s %s] sending %u values.", + node->id.toString().c_str(), print_addr(node->ss, node->sslen).c_str(), answer.values.size()); } else { - auto st = findStorage(hash); - Blob ntoken = makeToken((sockaddr*)&node->ss, false); - answer = std::shared_ptr<NetworkEngine::RequestAnswer> { - new NetworkEngine::RequestAnswer { - ntoken, 0, {}, - buckets.findClosestNodes(hash, now, TARGET_NODES), - buckets6.findClosestNodes(hash, now, TARGET_NODES) - } - }; - if (st != store.end() && not st->empty()) { - auto values = st->getValues(); - answer->values.resize(values.size()); - std::transform(values.begin(), values.end(), answer->values.begin(), [](const ValueStorage& vs) { - return vs.data; - }); - DHT_LOG.DEBUG("[node %s %s] sending %u values.", - node->id.toString().c_str(), print_addr(node->ss, node->sslen).c_str(), answer->values.size()); - } else { - DHT_LOG.DEBUG("[node %s %s] sending nodes.", - node->id.toString().c_str(), print_addr(node->ss, node->sslen).c_str()); - } + DHT_LOG.DEBUG("[node %s %s] sending nodes.", + node->id.toString().c_str(), print_addr(node->ss, node->sslen).c_str()); } - return *answer; + return answer; } void @@ -2415,7 +2304,7 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::Request> status, return; } - DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status->node->toString().c_str()); + DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s with %u nodes", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status->node->toString().c_str(), a.nodes4.size()); if (not a.ntoken.empty()) { if (!a.values.empty()) { @@ -2451,15 +2340,14 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::Request> status, } else { DHT_LOG.WARN("[node %s %s] no token provided. Ignoring response content.", status->node->id.toString().c_str(), print_addr(status->node->ss, status->node->sslen).c_str()); - blacklistNode(&status->node->id, (sockaddr*)&status->node->ss, status->node->sslen); + network_engine.blacklistNode(status->node); } if (not sr->done) { - const auto& now = scheduler.time(); searchSendGetValues(sr); // Force to recompute the next step time - scheduler.edit(sr->nextSearchStep, now); + scheduler.edit(sr->nextSearchStep, scheduler.time()); } } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 2979e18e960cff1dc4a4bacffc57cf6c5b54799b..18e6f79e48fe8c84c96eceb628e59a0db8eb2fbc 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -44,6 +44,10 @@ const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::ANNOUNCE_VALUES = const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::LISTEN = {"lt"}; constexpr long unsigned NetworkEngine::MAX_REQUESTS_PER_SEC; +static const uint8_t v4prefix[16] = { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0 +}; + /* Called whenever we send a request to a node, increases the ping count and, if that reaches 3, sends a ping to a new candidate. */ void @@ -83,6 +87,12 @@ NetworkEngine::isRunning(sa_family_t af) const } } +void +NetworkEngine::connectivityChanged() +{ + cache.clearBadNodes(); +} + /* Rate control for requests we receive. */ bool NetworkEngine::rateLimit() @@ -98,8 +108,94 @@ NetworkEngine::rateLimit() return true; } +bool +NetworkEngine::isMartian(const sockaddr* sa, socklen_t len) +{ + // Check that sa_family can be accessed safely + if (!sa || len < sizeof(sockaddr_in)) + return true; + + switch(sa->sa_family) { + case AF_INET: { + sockaddr_in *sin = (sockaddr_in*)sa; + const uint8_t *address = (const uint8_t*)&sin->sin_addr; + return sin->sin_port == 0 || + (address[0] == 0) || + (address[0] == 127) || + ((address[0] & 0xE0) == 0xE0); + } + case AF_INET6: { + if (len < sizeof(sockaddr_in6)) + return true; + sockaddr_in6 *sin6 = (sockaddr_in6*)sa; + const uint8_t *address = (const uint8_t*)&sin6->sin6_addr; + return sin6->sin6_port == 0 || + (address[0] == 0xFF) || + (address[0] == 0xFE && (address[1] & 0xC0) == 0x80) || + (memcmp(address, zeroes.data(), 15) == 0 && + (address[15] == 0 || address[15] == 1)) || + (memcmp(address, v4prefix, 12) == 0); + } + + default: + return true; + } +} + +/* The internal blacklist is an LRU cache of nodes that have sent + incorrect messages. */ +/*void +NetworkEngine::blacklistNode(const InfoHash* id, const sockaddr *sa, socklen_t salen) +{ + DHT_LOG.WARN("Blacklisting broken node."); + + if (id) { + auto n = cache.getNode(*id, sa, salen, 0);//findNode(); + for () + // Discard it from any searches in progress. + auto black_list_in = [&](std::map<InfoHash, std::shared_ptr<Search>>& srs) { + for (auto& srp : srs) { + auto& sr = srp.second; + sr->nodes.erase(std::partition(sr->nodes.begin(), sr->nodes.end(), [&](SearchNode& sn) { + return sn.node != n; + }), sr->nodes.end()); + } + }; + black_list_in(searches4); + black_list_in(searches6); + } + // And make sure we don't hear from it again. + +}*/ + +bool +NetworkEngine::isNodeBlacklisted(const sockaddr *sa, socklen_t salen) const +{ + if (salen > sizeof(sockaddr_storage)) + return true; + + /*if (isBlacklisted(sa, salen)) + return true;*/ + + for (unsigned i = 0; i < BLACKLISTED_MAX; i++) { + if (memcmp(&blacklist[i], sa, salen) == 0) + return true; + } + + return false; +} + void -NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen) { +NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen) +{ + if (isMartian(from, fromlen)) + return; + + if (isNodeBlacklisted(from, fromlen)) { + DHT_LOG.DEBUG("Received packet from blacklisted node."); + return; + } + ParsedMessage msg; try { msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen); @@ -125,10 +221,13 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr } } + const auto& now = scheduler.time(); + if (msg.tid.length != 4) { DHT_LOG.ERROR("Broken node truncates transaction ids (len: %d): ", msg.tid.length); DHT_LOG.ERROR.logPrintable(buf, buflen); - throw DhtProtocolException {DhtProtocolException::INVALID_TID_SIZE}; + blacklistNode(cache.getNode(msg.id, from, fromlen, now, 1)); + return; } uint16_t ttid = 0; @@ -140,7 +239,8 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr if (req->cancelled()) return; - auto node = onNewNode(msg.id, from, fromlen, 2); + auto node = cache.getNode(msg.id, from, fromlen, now, 2); + onNewNode(node, 2); onReportedAddr(msg.id, (sockaddr*)&msg.addr.first, msg.addr.second); switch (msg.type) { case MessageType::Error: { @@ -164,6 +264,8 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr if (not req->persistent) requests.erase(reqp); req->reply_time = scheduler.time(); + + deserializeNodesValues(msg); req->completed_ = true; req->on_done(req, std::move(msg)); if (not req->persistent) @@ -173,7 +275,8 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr break; } } else { - auto node = onNewNode(msg.id, from, fromlen, 1); + auto node = cache.getNode(msg.id, from, fromlen, now, 1); + onNewNode(node, 1); try { switch (msg.type) { case MessageType::Ping: @@ -187,7 +290,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr msg.id.toString().c_str(), print_addr(from, fromlen).c_str(), msg.want); ++in_stats.find; RequestAnswer answer = onFindNode(node, msg.target, msg.want); - auto nnodes = bufferNodes(from->sa_family, msg.target, msg.want, answer.nodes, answer.nodes6); + auto nnodes = bufferNodes(from->sa_family, msg.target, msg.want, answer.nodes4, answer.nodes6); sendNodesValues(from, fromlen, msg.tid, nnodes.first, nnodes.second, {}, answer.ntoken); break; } @@ -196,7 +299,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr msg.id.toString().c_str(), print_addr(from, fromlen).c_str(), msg.info_hash.toString().c_str()); ++in_stats.get; RequestAnswer answer = onGetValues(node, msg.info_hash, msg.want); - auto nnodes = bufferNodes(from->sa_family, msg.info_hash, msg.want, answer.nodes, answer.nodes6); + auto nnodes = bufferNodes(from->sa_family, msg.info_hash, msg.want, answer.nodes4, answer.nodes6); sendNodesValues(from, fromlen, msg.tid, nnodes.first, nnodes.second, answer.values, answer.ntoken); break; } @@ -353,7 +456,7 @@ NetworkEngine::sendFindNode(std::shared_ptr<Node> n, const InfoHash& target, wan std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), [=](std::shared_ptr<Request> req_status, ParsedMessage&& msg) { /* on done */ if (on_done) { - on_done(req_status, deserializeNodesValues(msg)); + on_done(req_status, {std::forward<ParsedMessage>(msg)}); } }, [=](std::shared_ptr<Request> req_status, bool) { /* on expired */ @@ -396,7 +499,7 @@ NetworkEngine::sendGetValues(std::shared_ptr<Node> n, const InfoHash& info_hash, std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), [=](std::shared_ptr<Request> req_status, ParsedMessage&& msg) { /* on done */ if (on_done) { - on_done(req_status, deserializeNodesValues(msg)); + on_done(req_status, {std::forward<ParsedMessage>(msg)}); } }, [=](std::shared_ptr<Request> req_status, bool) { /* on expired */ @@ -410,15 +513,15 @@ NetworkEngine::sendGetValues(std::shared_ptr<Node> n, const InfoHash& info_hash, return req; } -NetworkEngine::RequestAnswer +void NetworkEngine::deserializeNodesValues(ParsedMessage& msg) { - RequestAnswer req_a {msg.token, 0, std::move(msg.values), {}, {}}; - if (msg.nodes4.size() % NODE4_INFO_BUF_LEN != 0 || msg.nodes6.size() % NODE6_INFO_BUF_LEN != 0) { + if (msg.nodes4_raw.size() % NODE4_INFO_BUF_LEN != 0 || msg.nodes6_raw.size() % NODE6_INFO_BUF_LEN != 0) { throw DhtProtocolException {DhtProtocolException::WRONG_NODE_INFO_BUF_LEN}; } else { // deserialize nodes - for (unsigned i = 0; i < msg.nodes4.size() / NODE4_INFO_BUF_LEN; i++) { - uint8_t *ni = msg.nodes4.data() + i * NODE4_INFO_BUF_LEN; + const auto& now = scheduler.time(); + for (unsigned i = 0; i < msg.nodes4_raw.size() / NODE4_INFO_BUF_LEN; i++) { + uint8_t *ni = msg.nodes4_raw.data() + i * NODE4_INFO_BUF_LEN; const InfoHash& ni_id = *reinterpret_cast<InfoHash*>(ni); if (ni_id == myid) continue; @@ -427,11 +530,13 @@ NetworkEngine::deserializeNodesValues(ParsedMessage& msg) { sin.sin_family = AF_INET; memcpy(&sin.sin_addr, ni + ni_id.size(), 4); memcpy(&sin.sin_port, ni + ni_id.size() + 4, 2); - if (auto n = onNewNode(ni_id, (sockaddr*)&sin, sizeof(sin), 0)) - req_a.nodes.emplace_back(std::move(n)); + if (isMartian((sockaddr*)&sin, sizeof(sin)) || isNodeBlacklisted((sockaddr*)&sin, sizeof(sin))) + continue; + msg.nodes4.emplace_back(cache.getNode(ni_id, (sockaddr*)&sin, sizeof(sin), now, 0)); + onNewNode(msg.nodes4.back(), 0); } - for (unsigned i = 0; i < msg.nodes6.size() / NODE6_INFO_BUF_LEN; i++) { - uint8_t *ni = msg.nodes6.data() + i * NODE6_INFO_BUF_LEN; + for (unsigned i = 0; i < msg.nodes6_raw.size() / NODE6_INFO_BUF_LEN; i++) { + uint8_t *ni = msg.nodes6_raw.data() + i * NODE6_INFO_BUF_LEN; const InfoHash& ni_id = *reinterpret_cast<InfoHash*>(ni); if (ni_id == myid) continue; @@ -440,11 +545,12 @@ NetworkEngine::deserializeNodesValues(ParsedMessage& msg) { sin6.sin6_family = AF_INET6; memcpy(&sin6.sin6_addr, ni + HASH_LEN, 16); memcpy(&sin6.sin6_port, ni + HASH_LEN + 16, 2); - if (auto n = onNewNode(ni_id, (sockaddr*)&sin6, sizeof(sin6), 0)) - req_a.nodes.emplace_back(std::move(n)); + if (isMartian((sockaddr*)&sin6, sizeof(sin6)) || isNodeBlacklisted((sockaddr*)&sin6, sizeof(sin6))) + continue; + msg.nodes6.emplace_back(cache.getNode(ni_id, (sockaddr*)&sin6, sizeof(sin6), now, 0)); + onNewNode(msg.nodes6.back(), 0); } } - return req_a; } void @@ -605,7 +711,7 @@ NetworkEngine::sendListen(std::shared_ptr<Node> n, const InfoHash& infohash, con std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), [=](std::shared_ptr<Request> req_status, ParsedMessage&& msg) { /* on done */ if (on_done) - on_done(req_status, deserializeNodesValues(msg)); + on_done(req_status, {std::forward<ParsedMessage>(msg)}); }, [=](std::shared_ptr<Request> req_status, bool) { /* on expired */ if (on_expired) @@ -786,10 +892,10 @@ NetworkEngine::ParsedMessage::msgpack_unpack(msgpack::object msg) value_id = vid->as<Value::Id>(); if (auto rnodes4 = findMapValue(req, "n4")) - nodes4 = unpackBlob(*rnodes4); + nodes4_raw = unpackBlob(*rnodes4); if (auto rnodes6 = findMapValue(req, "n6")) - nodes6 = unpackBlob(*rnodes6); + nodes6_raw = unpackBlob(*rnodes6); if (auto sa = findMapValue(req, "sa")) { if (sa->type != msgpack::type::BIN)