diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 3d32e946df585c77a36a625e7588270abeab8b89..ebbdc56515b5e426aaed851da69e4eebbbdc271b 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -243,13 +243,6 @@ public: int pingNode(const sockaddr*, socklen_t); - /** - * Maintains the store. For each storage, if values don't belong there - * anymore because this node is too far from the target, values are sent to - * the appropriate nodes. - */ - void maintainStore(bool force=false); - time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen); /** @@ -459,7 +452,7 @@ private: InfoHash middle(const RoutingTable::const_iterator&) const; - std::vector<std::shared_ptr<Node>> findClosestNodes(const InfoHash id) const; + std::vector<std::shared_ptr<Node>> findClosestNodes(const InfoHash id, size_t count = TARGET_NODES) const; RoutingTable::iterator findBucket(const InfoHash& id); RoutingTable::const_iterator findBucket(const InfoHash& id) const; @@ -616,6 +609,7 @@ private: bool expired {false}; /* no node, or all nodes expired */ bool done {false}; /* search is over, cached for later */ + bool refilled {false}; std::vector<SearchNode> nodes {}; std::vector<Announce> announce {}; std::vector<Get> callbacks {}; @@ -886,6 +880,11 @@ private: void expireStorage(); void storageChanged(Storage& st, ValueStorage&); + /** + * For a given storage, if values don't belong there anymore because this + * node is too far from the target, values are sent to the appropriate + * nodes. + */ size_t maintainStorage(InfoHash id, bool force=false, DoneCallback donecb=nullptr); // Buckets diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index f671bb257caadf46337576fb3502642790c6a073..5bdfc321df9656d9e23c3313aa6098be8d6ae050 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -134,7 +134,7 @@ class DhtNetworkSubProcess(NSPopen): SHUTDOWN_CLUSTER_REQ = b"sdc" DUMP_STORAGE_REQ = b"strl" MESSAGE_STATS = b"gms" - + # tokens NOTIFY_TOKEN = 'notify' @@ -515,13 +515,13 @@ class PersistenceTest(FeatureTest): DhtNetwork.log("Waiting 15 seconds for packets to work their way effectively.") time.sleep(15) ops_count.append(cluster_ops_count/self.wb.node_num) - + # checking if values were transfered to new nodes foreign_nodes_before_delete = PersistenceTest.foreign_nodes DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) new_nodes = set(PersistenceTest.foreign_nodes) - set(foreign_nodes_before_delete) - + self._result(local_values, new_nodes) if self._plot: @@ -602,7 +602,7 @@ class PersistenceTest(FeatureTest): nodes = set([]) # prevents garbage collecting of unused flood nodes during the test. - flood_nodes = [] + flood_nodes = [] def gottaGetThemAllPokeNodes(nodes=None): nonlocal consumer, hashes diff --git a/src/dht.cpp b/src/dht.cpp index 9700d401edfa1eca079756567c8df00c42ba223d..1efc9bef361717aa8695f583e9189d32128361c3 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -303,37 +303,44 @@ Dht::RoutingTable::depth(const RoutingTable::const_iterator& it) const } std::vector<std::shared_ptr<Node>> -Dht::RoutingTable::findClosestNodes(const InfoHash id) const { +Dht::RoutingTable::findClosestNodes(const InfoHash id, size_t count) const { std::vector<std::shared_ptr<Node>> nodes {}; auto bucket = findBucket(id); if (bucket == end()) { return nodes; } auto sortedBucketInsert = [&](const Bucket &b) { - for (auto n : b.nodes) { - auto here = std::find_if(nodes.begin(), nodes.end(), [&id,&n](std::shared_ptr<Node> &node) { - return id.xorCmp(node->id, n->id) < 0; - }); - nodes.insert(here, n); - } - }; - - // Inserting very closest nodes - sortedBucketInsert(*bucket); + for (auto n : b.nodes) { + auto here = std::find_if(nodes.begin(), nodes.end(), + [&id,&n](std::shared_ptr<Node> &node) { + return id.xorCmp(n->id, node->id) < 0; + } + ); + nodes.insert(here, n); + } + }; - // adjacent buckets contain remaining closest candidates - if (std::next(bucket) != this->end() && nodes.size() < TARGET_NODES) { - sortedBucketInsert(*std::next(bucket)); - } - if (std::prev(bucket) != this->end() && nodes.size() < TARGET_NODES) { - sortedBucketInsert(*std::prev(bucket)); + auto itn = bucket; + auto itp = std::prev(bucket); + while (nodes.size() < count && (itn != end() || itp != end())) { + if (itn != end()) { + sortedBucketInsert(*itn); + itn = std::next(itn); + } + if (itp != end()) { + sortedBucketInsert(*itp); + if (itp == begin()) { + itp = end(); + continue; + } + itp = std::prev(itp); + } } - // shrink to the TARGET_NODES closest nodes. - if (nodes.size() > TARGET_NODES) { - nodes.resize(TARGET_NODES); + // shrink to the count closest nodes. + if (nodes.size() > count) { + nodes.resize(count); } - return nodes; } @@ -1041,13 +1048,19 @@ Dht::searchStep(Search& sr) return sn.candidate or sn.node->isExpired(now); }) == sr.nodes.size()) { - unsigned added = sr.refill(sr.af == AF_INET ? buckets : buckets6, now); + unsigned added = 0; + if (not sr.refilled) { + added = sr.refill(sr.af == AF_INET ? buckets : buckets6, now); + sr.refilled = true; + } if (added) { DHT_WARN("[search %s IPv%c] refilled with %u nodes", sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', added); } else { DHT_ERROR("[search %s IPv%c] expired", sr.id.toString().c_str(), sr.af == AF_INET ? '4' : '6'); // no nodes or all expired nodes sr.expired = true; + // reset refilled since the search is now expired. + sr.refilled = false; if (sr.announce.empty() && sr.listeners.empty()) { // Listening or announcing requires keeping the cluster up to date. sr.done = true; @@ -2613,6 +2626,20 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc sendError(from, fromlen, msg.tid, 401, "Put with wrong token", true); break; } + { + // We store a value only if we think we're part of the + // SEARCH_NODES nodes around the target id. + auto closest_nodes = (from->sa_family == AF_INET ? buckets : buckets6).findClosestNodes(msg.info_hash, SEARCH_NODES); + if (msg.info_hash.xorCmp(closest_nodes.back()->id, myid) < 0) { + DHT_WARN("[node %s %s] announce too far from the target id. Dropping value.", + msg.id.toString().c_str(), print_addr(from, fromlen).c_str()); + for (auto& v : msg.values) { + sendValueAnnounced(from, fromlen, msg.tid, v->id); + } + break; + } + } + for (const auto& v : msg.values) { if (v->id == Value::INVALID_ID) { DHT_WARN("[value %s %s] incorrect value id", msg.info_hash.toString().c_str(), v->id); @@ -2640,7 +2667,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc DHT_DEBUG("[value %s %lu] storing %s.", msg.info_hash.toString().c_str(), vc->id, vc->toString().c_str()); storageStore(msg.info_hash, vc, msg.created); } else { - DHT_DEBUG("[value %s %lu] rejecting storage of %s.", msg.info_hash.toString().c_str(), vc->id, vc->toString().c_str()); + DHT_DEBUG("[value %s %lu] rejecting storage of %s.", msg.info_hash.toString().c_str(), vc->id, vc->toString().c_str()); } }