diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 90c6b83f45605fa553bec24cce06b2de34c47819..2f10a0ce5eadae956e8b28c86dc0ac71a497f6cb 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -452,7 +452,7 @@ private: InfoHash middle(const RoutingTable::const_iterator&) const; - std::vector<std::shared_ptr<Node>> findClosestNodes(const InfoHash id) const; + std::vector<std::shared_ptr<Node>> findClosestNodes(const InfoHash id, size_t count = TARGET_NODES) const; RoutingTable::iterator findBucket(const InfoHash& id); RoutingTable::const_iterator findBucket(const InfoHash& id) const; diff --git a/src/dht.cpp b/src/dht.cpp index 9700d401edfa1eca079756567c8df00c42ba223d..e366cc81b3d4ea21c1d040e0a2069bf3fb88a991 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -303,37 +303,42 @@ Dht::RoutingTable::depth(const RoutingTable::const_iterator& it) const } std::vector<std::shared_ptr<Node>> -Dht::RoutingTable::findClosestNodes(const InfoHash id) const { +Dht::RoutingTable::findClosestNodes(const InfoHash id, size_t count) const { std::vector<std::shared_ptr<Node>> nodes {}; auto bucket = findBucket(id); if (bucket == end()) { return nodes; } auto sortedBucketInsert = [&](const Bucket &b) { - for (auto n : b.nodes) { - auto here = std::find_if(nodes.begin(), nodes.end(), [&id,&n](std::shared_ptr<Node> &node) { - return id.xorCmp(node->id, n->id) < 0; - }); - nodes.insert(here, n); - } - }; - - // Inserting very closest nodes - sortedBucketInsert(*bucket); - - // adjacent buckets contain remaining closest candidates - if (std::next(bucket) != this->end() && nodes.size() < TARGET_NODES) { - sortedBucketInsert(*std::next(bucket)); - } - if (std::prev(bucket) != this->end() && nodes.size() < TARGET_NODES) { - sortedBucketInsert(*std::prev(bucket)); - } + for (auto n : b.nodes) { + auto here = std::find_if(nodes.begin(), nodes.end(), + [&id,&n](std::shared_ptr<Node> &node) { + return id.xorCmp(node->id, n->id) < 0; + } + ); + nodes.insert(here, n); + } + }; - // shrink to the TARGET_NODES closest nodes. - if (nodes.size() > TARGET_NODES) { - nodes.resize(TARGET_NODES); + auto itn = bucket; + auto itp = std::prev(bucket); + while (nodes.size() < count && (itn != end() || itp != end())) { + if (itn != end()) { + sortedBucketInsert(*itn); + itn = std::next(itn); + } + if (itp != end()) { + sortedBucketInsert(*itp); + if (itp == begin()) { + itp = end(); + continue; + } + itp = std::prev(itp); + } } + // shrink to the count closest nodes. + nodes.resize(count); return nodes; } @@ -2376,6 +2381,8 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc uint16_t ttid = 0; switch (msg.type) { + //TODO: handle case where put was made to node claiming to be outside of a + // valid range. case MessageType::Error: if (msg.tid.length != 4) return; if (msg.error_code == 401 && msg.id != zeroes && (msg.tid.matches(TransPrefix::ANNOUNCE_VALUES, &ttid) || msg.tid.matches(TransPrefix::LISTEN, &ttid))) { @@ -2613,6 +2620,21 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc sendError(from, fromlen, msg.tid, 401, "Put with wrong token", true); break; } + { + // We store a value only if we think we're part of the + // 2*TARGET_NODES nodes around the target id. + auto closest_nodes = (from->sa_family == AF_INET ? buckets : buckets6).findClosestNodes(msg.info_hash, 2*TARGET_NODES); + if (msg.info_hash.xorCmp(closest_nodes.back()->id, myid) < 0) { + std::cerr << "my id [" << myid << "] is too far from " << msg.info_hash << ". Dropping value." << std::endl; + DHT_WARN("[node %s %s] announce too far from the target id. Dropping value.", + msg.id.toString().c_str(), print_addr(from, fromlen).c_str()); + for (auto& v : msg.values) { + sendValueAnnounced(from, fromlen, msg.tid, v->id); + } + break; + } + } + for (const auto& v : msg.values) { if (v->id == Value::INVALID_ID) { DHT_WARN("[value %s %s] incorrect value id", msg.info_hash.toString().c_str(), v->id); @@ -2640,7 +2662,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc DHT_DEBUG("[value %s %lu] storing %s.", msg.info_hash.toString().c_str(), vc->id, vc->toString().c_str()); storageStore(msg.info_hash, vc, msg.created); } else { - DHT_DEBUG("[value %s %lu] rejecting storage of %s.", msg.info_hash.toString().c_str(), vc->id, vc->toString().c_str()); + DHT_DEBUG("[value %s %lu] rejecting storage of %s.", msg.info_hash.toString().c_str(), vc->id, vc->toString().c_str()); } }