diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 9e6b081b2b515a81e9947b249d92016ce104f144..45a0c601a0db13935ef8e296e80994a97e994cda 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -80,10 +80,14 @@ public: */ inline const InfoHash& getNodeId() const override { return myid; } + NodeStatus updateStatus(sa_family_t af) override; + /** * Get the current status of the node for the given family. */ - NodeStatus getStatus(sa_family_t af) const override; + NodeStatus getStatus(sa_family_t af) const override { + return dht(af).status; + } NodeStatus getStatus() const override { return std::max(getStatus(AF_INET), getStatus(AF_INET6)); @@ -111,6 +115,15 @@ public: return types.getType(type_id); } + void addBootstrap(const std::string& host, const std::string& service) override { + bootstrap_nodes.emplace_back(host, service); + onDisconnected(); + } + + void clearBootstrap() override { + bootstrap_nodes.clear(); + } + /** * Insert a node in the main routing table. * The node is not pinged, so this should be @@ -366,9 +379,23 @@ private: // registred types TypeStore types; - // the stuff - RoutingTable buckets4 {}; - RoutingTable buckets6 {}; + using SearchMap = std::map<InfoHash, Sp<Search>>; + struct Kad { + RoutingTable buckets {}; + SearchMap searches {}; + unsigned pending_pings {0}; + NodeStatus status; + + NodeStatus getStatus(time_point now) const; + NodeStats getNodesStats(time_point now, const InfoHash& myid) const; + }; + + Kad dht4 {}; + Kad dht6 {}; + + std::vector<std::pair<std::string,std::string>> bootstrap_nodes {}; + std::chrono::steady_clock::duration bootstrap_period {std::chrono::seconds(10)}; + Sp<Scheduler::Job> bootstrapJob {}; std::map<InfoHash, Storage> store; std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota; @@ -377,9 +404,6 @@ private: size_t max_store_keys {MAX_HASHES}; size_t max_store_size {DEFAULT_STORAGE_LIMIT}; - using SearchMap = std::map<InfoHash, Sp<Search>>; - SearchMap searches4 {}; - SearchMap searches6 {}; size_t max_searches {MAX_SEARCHES}; size_t search_id {0}; @@ -396,9 +420,6 @@ private: Sp<Scheduler::Job> nextStorageMaintenance {}; net::NetworkEngine network_engine; - unsigned pending_pings4 {0}; - unsigned pending_pings6 {0}; - using ReportedAddr = std::pair<unsigned, SockAddr>; std::vector<ReportedAddr> reported_addr; @@ -444,8 +465,10 @@ private: size_t maintainStorage(decltype(store)::value_type&, bool force=false, const DoneCallback& donecb={}); // Buckets - RoutingTable& buckets(sa_family_t af) { return af == AF_INET ? buckets4 : buckets6; } - const RoutingTable& buckets(sa_family_t af) const { return af == AF_INET ? buckets4 : buckets6; } + Kad& dht(sa_family_t af) { return af == AF_INET ? dht4 : dht6; } + const Kad& dht(sa_family_t af) const { return af == AF_INET ? dht4 : dht6; } + RoutingTable& buckets(sa_family_t af) { return dht(af).buckets; } + const RoutingTable& buckets(sa_family_t af) const { return dht(af).buckets; } Bucket* findBucket(const InfoHash& id, sa_family_t af) { auto& b = buckets(af); auto it = b.findBucket(id); @@ -467,8 +490,8 @@ private: // Searches - inline SearchMap& searches(sa_family_t af) { return af == AF_INET ? searches4 : searches6; } - inline const SearchMap& searches(sa_family_t af) const { return af == AF_INET ? searches4 : searches6; } + inline SearchMap& searches(sa_family_t af) { return dht(af).searches; } + inline const SearchMap& searches(sa_family_t af) const { return dht(af).searches; } /** * Low-level method that will perform a search on the DHT for the specified @@ -491,6 +514,7 @@ private: void confirmNodes(); void expire(); + void onDisconnected(); /** * Generic function to execute when a 'get' request has completed. diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index b5bccc5126f945be719489bfa22c37405c2deb1d..7a9f2cda78db0e3e647f57c47d02d19557a4face 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -42,6 +42,7 @@ public: /** * Get the current status of the node for the given family. */ + virtual NodeStatus updateStatus(sa_family_t af) { return getStatus(af); }; virtual NodeStatus getStatus(sa_family_t af) const = 0; virtual NodeStatus getStatus() const = 0; @@ -69,6 +70,9 @@ public: virtual const ValueType& getType(ValueType::Id type_id) const = 0; + virtual void addBootstrap(const std::string& /*host*/, const std::string& /*service*/) {}; + virtual void clearBootstrap() {}; + /** * Insert a node in the main routing table. * The node is not pinged, so this should be diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 848dff08af71db4b1eb34eeb8d9c5e3925fbef02..4324c06b624ac5d6b629f3eac09b22155745a297 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -459,14 +459,6 @@ private: Stopping }; - /** - * Will try to resolve the list of hostnames `bootstrap_nodes` on seperate - * thread and then queue ping requests. This list should contain reliable - * nodes so that the DHT node can recover quickly from losing connection - * with the network. - */ - void tryBootstrapContinuously(); - time_point loop_(); NodeStatus getStatus() const { @@ -513,15 +505,6 @@ private: net::PacketList rcv {}; decltype(rcv) rcv_free {}; - /** true if currently actively boostraping */ - std::atomic_bool bootstraping {false}; - /* bootstrap nodes given as (host, service) pairs */ - std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {}; - std::vector<std::pair<std::string,std::string>> bootstrap_nodes {}; - std::thread bootstrap_thread {}; - std::mutex bootstrap_mtx {}; - std::condition_variable bootstrap_cv {}; - std::queue<std::function<void(SecureDht&)>> pending_ops_prio {}; std::queue<std::function<void(SecureDht&)>> pending_ops {}; std::mutex storage_mtx {}; diff --git a/include/opendht/network_utils.h b/include/opendht/network_utils.h index 1e1fc19cb71aa09bf2c56639d6539fcef186cfdc..4559ea3036ad5a6e67cbdfc4ca6ee63a944d34c9 100644 --- a/include/opendht/network_utils.h +++ b/include/opendht/network_utils.h @@ -61,6 +61,9 @@ using PacketList = std::list<ReceivedPacket>; class OPENDHT_PUBLIC DatagramSocket { public: + /** A function that takes a list of new received packets and + * optionally returns consumed packets for recycling. + **/ using OnReceive = std::function<PacketList(PacketList&& packets)>; virtual ~DatagramSocket() {}; @@ -85,6 +88,11 @@ public: virtual const SockAddr& getBoundRef(sa_family_t family = AF_UNSPEC) const = 0; + /** Virtual resolver mothod allows to implement custom resolver */ + virtual std::vector<SockAddr> resolve(const std::string& host, const std::string& service = {}) { + return SockAddr::resolve(host, service); + } + virtual void stop() = 0; protected: diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 553331bc5723ccd1638b356bfc3a3142b1c85373..9f6ff4a2d773462f6bb8c5d206bd9ea53a5eba54 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -201,6 +201,9 @@ public: time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override { return dht_->periodic(buf, buflen, from, fromlen, now); } + NodeStatus updateStatus(sa_family_t af) override { + return dht_->updateStatus(af); + } NodeStatus getStatus(sa_family_t af) const override { return dht_->getStatus(af); } @@ -216,6 +219,12 @@ public: const ValueType& getType(ValueType::Id type_id) const override { return dht_->getType(type_id); } + void addBootstrap(const std::string& host, const std::string& service) override { + dht_->addBootstrap(host, service); + } + void clearBootstrap() override { + dht_->clearBootstrap(); + } void insertNode(const InfoHash& id, const SockAddr& sa) override { dht_->insertNode(id, sa); } diff --git a/src/dht.cpp b/src/dht.cpp index dc4d35ca40d07a26796e78681128d88e5e0bc66e..2c7f9171b3ea241bb0c331d1f8f1c6419274e44f 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -17,7 +17,6 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ - #include "dht.h" #include "rng.h" #include "search.h" @@ -43,18 +42,40 @@ constexpr duration Dht::REANNOUNCE_MARGIN; static constexpr size_t MAX_REQUESTS_PER_SEC {8 * 1024}; NodeStatus -Dht::getStatus(sa_family_t af) const +Dht::updateStatus(sa_family_t af) +{ + auto& d = dht(af); + auto old = d.status; + d.status = d.getStatus(scheduler.time()); + if (d.status != old) { + auto& other = dht(af == AF_INET ? AF_INET6 : AF_INET); + if (other.status == NodeStatus::Disconnected && d.status == NodeStatus::Disconnected) + onDisconnected(); + else if (other.status == NodeStatus::Connected || d.status == NodeStatus::Connected) { + // On connected + if (bootstrapJob) { + bootstrapJob->cancel(); + bootstrapJob.reset(); + } + bootstrap_period = std::chrono::seconds(10); + } + } + return d.status; +} + +NodeStatus +Dht::Kad::getStatus(time_point now) const { unsigned dubious = 0; - for (const auto& b : buckets(af)) { + for (const auto& b : buckets) { for (auto& n : b.nodes) { - if (n->isGood(scheduler.time())) { + if (n->isGood(now)) { return NodeStatus::Connected; } else if (not n->isExpired()) dubious++; } } - auto& ping = af == AF_INET ? pending_pings4 : pending_pings6; + auto& ping = pending_pings; if (dubious or ping) return NodeStatus::Connecting; return NodeStatus::Disconnected; @@ -224,8 +245,8 @@ Dht::expireSearches() return b; } else { return false; } }; - erase_if(searches4, expired); - erase_if(searches6, expired); + erase_if(dht4.searches, expired); + erase_if(dht6.searches, expired); } void @@ -804,47 +825,12 @@ Dht::announce(const InfoHash& id, { auto& srs = searches(af); auto srp = srs.find(id); - auto sr = srp == srs.end() ? search(id, af) : srp->second; - if (!sr) { - if (callback) - callback(false, {}); - return; - } - sr->done = false; - sr->expired = false; - auto a_sr = std::find_if(sr->announce.begin(), sr->announce.end(), [&](const Announce& a){ - return a.value->id == value->id; - }); - if (a_sr == sr->announce.end()) { - sr->announce.emplace_back(Announce {permanent, value, created, callback}); - for (auto& n : sr->nodes) { - n->probe_query.reset(); - n->acked[value->id].first.reset(); - } - } else { - a_sr->permanent = permanent; - a_sr->created = created; - if (a_sr->value != value) { - a_sr->value = value; - for (auto& n : sr->nodes) { - n->acked[value->id].first.reset(); - n->probe_query.reset(); - } - } - if (sr->isAnnounced(value->id)) { - if (a_sr->callback) - a_sr->callback(true, {}); - a_sr->callback = {}; - if (callback) - callback(true, {}); - return; - } else { - if (a_sr->callback) - a_sr->callback(false, {}); - a_sr->callback = callback; - } + if (auto sr = srp == srs.end() ? search(id, af) : srp->second) { + sr->put(value, callback, created, permanent); + scheduler.edit(sr->nextSearchStep, scheduler.time()); + } else if (callback) { + callback(false, {}); } - scheduler.edit(sr->nextSearchStep, scheduler.time()); } size_t @@ -924,8 +910,8 @@ Dht::cancelListen(const InfoHash& id, size_t token) srp->second->cancelListen(token, scheduler); } }; - searches_cancel_listen(searches4, std::get<1>(it->second)); - searches_cancel_listen(searches6, std::get<2>(it->second)); + searches_cancel_listen(dht4.searches, std::get<1>(it->second)); + searches_cancel_listen(dht6.searches, std::get<2>(it->second)); listeners.erase(it); return true; } @@ -1136,8 +1122,8 @@ Dht::getPut(const InfoHash& id) const auto vals = srp->second->getPut(); ret.insert(ret.end(), vals.begin(), vals.end()); }; - find_values(searches4); - find_values(searches6); + find_values(dht4.searches); + find_values(dht6.searches); return ret; } @@ -1148,9 +1134,9 @@ Dht::getPut(const InfoHash& id, const Value::Id& vid) const auto srp = srs.find(id); return (srp != srs.end()) ? srp->second->getPut(vid) : Sp<Value> {}; }; - if (auto v4 = find_value(searches4)) + if (auto v4 = find_value(dht4.searches)) return v4; - if (auto v6 = find_value(searches6)) + if (auto v6 = find_value(dht6.searches)) return v6; return {}; } @@ -1163,8 +1149,8 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) auto srp = srs.find(id); return (srp != srs.end()) ? srp->second->cancelPut(vid) : false; }; - canceled |= sr_cancel_put(searches4); - canceled |= sr_cancel_put(searches6); + canceled |= sr_cancel_put(dht4.searches); + canceled |= sr_cancel_put(dht6.searches); if (canceled) storageErase(id, vid); return canceled; @@ -1288,7 +1274,7 @@ Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_ auto vals = st->second.get(query.where.getFilter()); if (not vals.empty()) { network_engine.tellListener(node, socket_id, id, WANT4 | WANT6, makeToken(node->getAddr(), false), - buckets4.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), + dht4.buckets.findClosestNodes(id, now, TARGET_NODES), dht6.buckets.findClosestNodes(id, now, TARGET_NODES), std::move(vals), query); } node_listeners.emplace(socket_id, Listener {now, std::forward<Query>(query)}); @@ -1466,11 +1452,17 @@ Dht::tokenMatch(const Blob& token, const SockAddr& addr) const NodeStats Dht::getNodesStats(sa_family_t af) const +{ + NodeStats stats = dht(af).getNodesStats(scheduler.time(), myid); + stats.node_cache_size = network_engine.getNodeCacheSize(af); + return stats; +} + +NodeStats +Dht::Kad::getNodesStats(time_point now, const InfoHash& myid) const { NodeStats stats {}; - const auto& now = scheduler.time(); - const auto& bcks = buckets(af); - for (const auto& b : bcks) { + for (const auto& b : buckets) { for (auto& n : b.nodes) { if (n->isGood(now)) { stats.good_nodes++; @@ -1482,9 +1474,8 @@ Dht::getNodesStats(sa_family_t af) const if (b.cached) stats.cached_nodes++; } - stats.table_depth = bcks.depth(bcks.findBucket(myid)); - stats.searches = searches(af).size(); - stats.node_cache_size = network_engine.getNodeCacheSize(af); + stats.table_depth = buckets.depth(buckets.findBucket(myid)); + stats.searches = searches.size(); return stats; } @@ -1605,18 +1596,18 @@ Dht::dumpTables() const out << "My id " << myid << std::endl; out << "Buckets IPv4 :" << std::endl; - for (const auto& b : buckets4) + for (const auto& b : dht4.buckets) dumpBucket(b, out); out << "Buckets IPv6 :" << std::endl; - for (const auto& b : buckets6) + for (const auto& b : dht6.buckets) dumpBucket(b, out); auto dump_searches = [&](std::map<InfoHash, Sp<Search>> srs) { for (auto& srp : srs) dumpSearch(*srp.second, out); }; - dump_searches(searches4); - dump_searches(searches6); + dump_searches(dht4.searches); + dump_searches(dht6.searches); out << std::endl; out << getStorageLog() << std::endl; @@ -1692,24 +1683,24 @@ std::string Dht::getSearchesLog(sa_family_t af) const { std::stringstream out; - auto num_searches = searches4.size() + searches6.size(); + auto num_searches = dht4.searches.size() + dht6.searches.size(); if (num_searches > 8) { if (not af or af == AF_INET) - for (const auto& sr : searches4) + for (const auto& sr : dht4.searches) out << "[search " << sr.first << " IPv4]" << std::endl; if (not af or af == AF_INET6) - for (const auto& sr : searches6) + for (const auto& sr : dht6.searches) out << "[search " << sr.first << " IPv6]" << std::endl; } else { out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl; if (not af or af == AF_INET) - for (const auto& sr : searches4) + for (const auto& sr : dht4.searches) dumpSearch(*sr.second, out); if (not af or af == AF_INET6) - for (const auto& sr : searches6) + for (const auto& sr : dht6.searches) dumpSearch(*sr.second, out); } - out << "Total: " << num_searches << " searches (" << searches4.size() << " IPv4, " << searches6.size() << " IPv6)." << std::endl; + out << "Total: " << num_searches << " searches (" << dht4.searches.size() << " IPv4, " << dht6.searches.size() << " IPv6)." << std::endl; return out.str(); } @@ -1730,9 +1721,9 @@ Dht::getSearchLog(const InfoHash& id, sa_family_t af) const Dht::~Dht() { - for (auto& s : searches4) + for (auto& s : dht4.searches) s.second->clear(); - for (auto& s : searches6) + for (auto& s : dht6.searches) s.second->clear(); } @@ -1777,12 +1768,12 @@ Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, cons if (not s or (not s->hasIPv4() and not s->hasIPv6())) throw DhtException("Opened socket required"); if (s->hasIPv4()) { - buckets4 = {Bucket {AF_INET}}; - buckets4.is_client = config.is_bootstrap; + dht4.buckets = {Bucket {AF_INET}}; + dht4.buckets.is_client = config.is_bootstrap; } if (s->hasIPv6()) { - buckets6 = {Bucket {AF_INET6}}; - buckets6.is_client = config.is_bootstrap; + dht6.buckets = {Bucket {AF_INET6}}; + dht6.buckets.is_client = config.is_bootstrap; } search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd); @@ -1977,13 +1968,34 @@ Dht::expire() uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6)); auto expire_stuff_time = scheduler.time() + duration(time_dis(rd)); - expireBuckets(buckets4); - expireBuckets(buckets6); + expireBuckets(dht4.buckets); + expireBuckets(dht6.buckets); expireStore(); expireSearches(); scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this)); } +void +Dht::onDisconnected() +{ + if (dht4.status != NodeStatus::Disconnected || dht6.status != NodeStatus::Disconnected) + return; + if (logger_) + logger_->d(myid, "Bootstraping"); + for (const auto& boootstrap : bootstrap_nodes) + try { + for (const auto& ip : network_engine.getSocket()->resolve(boootstrap.first, boootstrap.second)) + pingNode(ip); + } catch (const std::exception& e) { + if (logger_) + logger_->e(myid, "Can't resolve %s:%s: %s", boootstrap.first.c_str(), boootstrap.second.c_str(), e.what()); + } + if (bootstrapJob) + bootstrapJob->cancel(); + bootstrapJob = scheduler.add(scheduler.time() + bootstrap_period, std::bind(&Dht::onDisconnected, this)); + bootstrap_period *= 2; +} + void Dht::confirmNodes() { @@ -1991,25 +2003,25 @@ Dht::confirmNodes() bool soon = false; const auto& now = scheduler.time(); - if (searches4.empty() and getStatus(AF_INET) == NodeStatus::Connected) { + if (dht4.searches.empty() and dht4.status == NodeStatus::Connected) { if (logger_) logger_->d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str()); search(myid, AF_INET); } - if (searches6.empty() and getStatus(AF_INET6) == NodeStatus::Connected) { + if (dht6.searches.empty() and dht6.status == NodeStatus::Connected) { if (logger_) logger_->d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str()); search(myid, AF_INET6); } - soon |= bucketMaintenance(buckets4); - soon |= bucketMaintenance(buckets6); + soon |= bucketMaintenance(dht4.buckets); + soon |= bucketMaintenance(dht6.buckets); if (!soon) { - if (buckets4.grow_time >= now - seconds(150)) - soon |= neighbourhoodMaintenance(buckets4); - if (buckets6.grow_time >= now - seconds(150)) - soon |= neighbourhoodMaintenance(buckets6); + if (dht4.buckets.grow_time >= now - seconds(150)) + soon |= neighbourhoodMaintenance(dht4.buckets); + if (dht6.buckets.grow_time >= now - seconds(150)) + soon |= neighbourhoodMaintenance(dht6.buckets); } /* In order to maintain all buckets' age within 600 seconds, worst @@ -2094,25 +2106,25 @@ Dht::exportNodes() const { const auto& now = scheduler.time(); std::vector<NodeExport> nodes; - const auto b4 = buckets4.findBucket(myid); - if (b4 != buckets4.end()) { + const auto b4 = dht4.buckets.findBucket(myid); + if (b4 != dht4.buckets.end()) { for (auto& n : b4->nodes) if (n->isGood(now)) nodes.push_back(n->exportNode()); } - const auto b6 = buckets6.findBucket(myid); - if (b6 != buckets6.end()) { + const auto b6 = dht6.buckets.findBucket(myid); + if (b6 != dht6.buckets.end()) { for (auto& n : b6->nodes) if (n->isGood(now)) nodes.push_back(n->exportNode()); } - for (auto b = buckets4.begin(); b != buckets4.end(); ++b) { + for (auto b = dht4.buckets.begin(); b != dht4.buckets.end(); ++b) { if (b == b4) continue; for (auto& n : b->nodes) if (n->isGood(now)) nodes.push_back(n->exportNode()); } - for (auto b = buckets6.begin(); b != buckets6.end(); ++b) { + for (auto b = dht6.buckets.begin(); b != dht6.buckets.end(); ++b) { if (b == b6) continue; for (auto& n : b->nodes) if (n->isGood(now)) @@ -2136,7 +2148,7 @@ Dht::pingNode(SockAddr sa, DoneCallbackSimple&& cb) scheduler.syncTime(); if (logger_) logger_->d("Sending ping to %s", sa.toString().c_str()); - auto& count = sa.getFamily() == AF_INET ? pending_pings4 : pending_pings6; + auto& count = dht(sa.getFamily()).pending_pings; count++; network_engine.sendPing(std::move(sa), [&count,cb](const net::Request&, net::RequestAnswer&&) { count--; @@ -2197,9 +2209,9 @@ Dht::onFindNode(Sp<Node> node, const InfoHash& target, want_t want) net::RequestAnswer answer; answer.ntoken = makeToken(node->getAddr(), false); if (want & WANT4) - answer.nodes4 = buckets4.findClosestNodes(target, now, TARGET_NODES); + answer.nodes4 = dht4.buckets.findClosestNodes(target, now, TARGET_NODES); if (want & WANT6) - answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES); + answer.nodes6 = dht6.buckets.findClosestNodes(target, now, TARGET_NODES); return answer; } @@ -2218,8 +2230,8 @@ Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query net::RequestAnswer answer {}; auto st = store.find(hash); answer.ntoken = makeToken(node->getAddr(), false); - answer.nodes4 = buckets4.findClosestNodes(hash, now, TARGET_NODES); - answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); + answer.nodes4 = dht4.buckets.findClosestNodes(hash, now, TARGET_NODES); + answer.nodes6 = dht6.buckets.findClosestNodes(hash, now, TARGET_NODES); if (st != store.end() && not st->second.empty()) { answer.values = st->second.get(query.where.getFilter()); if (logger_) diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 3fa79d1c7b6541c3a5a884587c656da8a04d9f41..1d4e8b1de8252c19797a4a87a7c1f18a21f3eff3 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -174,7 +174,7 @@ DhtRunner::run(const Config& config, Context&& context) if (not pending_ops_prio.empty()) return true; auto s = getStatus(); - if (not pending_ops.empty() and (s == NodeStatus::Connected or (s == NodeStatus::Disconnected and not bootstraping))) + if (not pending_ops.empty() and (s == NodeStatus::Connected or s == NodeStatus::Disconnected)) return true; } return false; @@ -300,7 +300,6 @@ DhtRunner::join() if (running.exchange(State::Idle) == State::Idle) return; cv.notify_all(); - bootstrap_cv.notify_all(); #ifdef OPENDHT_PEER_DISCOVERY if (peerDiscovery_) peerDiscovery_->stop(); @@ -315,9 +314,6 @@ DhtRunner::join() if (dht_thread.joinable()) dht_thread.join(); - if (bootstrap_thread.joinable()) - bootstrap_thread.join(); - { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops = decltype(pending_ops)(); @@ -584,7 +580,7 @@ DhtRunner::loop_() { std::lock_guard<std::mutex> lck(storage_mtx); auto s = getStatus(); - ops = (pending_ops_prio.empty() && (s == NodeStatus::Connected or (s == NodeStatus::Disconnected and not bootstraping))) ? + ops = (pending_ops_prio.empty() && (s == NodeStatus::Connected or s == NodeStatus::Disconnected)) ? std::move(pending_ops) : std::move(pending_ops_prio); } while (not ops.empty()) { @@ -639,20 +635,11 @@ DhtRunner::loop_() if (dropped) std::cerr << "Dropped " << dropped << " packets with high delay" << std::endl; - NodeStatus nstatus4 = dht->getStatus(AF_INET); - NodeStatus nstatus6 = dht->getStatus(AF_INET6); + NodeStatus nstatus4 = dht->updateStatus(AF_INET); + NodeStatus nstatus6 = dht->updateStatus(AF_INET6); if (nstatus4 != status4 || nstatus6 != status6) { status4 = nstatus4; status6 = nstatus6; - if (status4 == NodeStatus::Disconnected and status6 == NodeStatus::Disconnected) { - // We have lost connection with the DHT. Try to recover using bootstrap nodes. - std::unique_lock<std::mutex> lck(bootstrap_mtx); - bootstrap_nodes = bootstrap_nodes_all; - tryBootstrapContinuously(); - } else { - std::unique_lock<std::mutex> lck(bootstrap_mtx); - bootstrap_nodes.clear(); - } if (statusCb) statusCb(status4, status6); } @@ -899,89 +886,34 @@ DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, Done putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), std::move(cb), permanent); } -void -DhtRunner::tryBootstrapContinuously() -{ - if (bootstrap_thread.joinable()) { - if (bootstraping) - return; // already running - else - bootstrap_thread.join(); - } - bootstraping = true; - bootstrap_thread = std::thread([this]() { - auto next = clock::now(); - do { - decltype(bootstrap_nodes) nodes; - { - std::lock_guard<std::mutex> lck(bootstrap_mtx); - nodes = bootstrap_nodes; - } - - next += BOOTSTRAP_PERIOD; - { - std::mutex mtx; - std::unique_lock<std::mutex> blck(mtx); - unsigned ping_count(0); - // Reverse: try last inserted bootstrap nodes first - for (auto it = nodes.rbegin(); it != nodes.rend(); it++) { - ++ping_count; - try { - bootstrap(SockAddr::resolve(it->first, it->second), [&](bool) { - if (running != State::Running) - return; - { - std::unique_lock<std::mutex> blck(mtx); - --ping_count; - } - bootstrap_cv.notify_all(); - }); - } catch (std::invalid_argument& e) { - --ping_count; - std::cerr << e.what() << std::endl; - } - } - // wait at least until the next BOOTSTRAP_PERIOD - bootstrap_cv.wait_until(blck, next, [&]() { return running != State::Running; }); - // wait for bootstrap requests to end. - if (running != State::Running) - bootstrap_cv.wait(blck, [&]() { return running != State::Running or ping_count == 0; }); - } - // update state - { - std::lock_guard<std::mutex> lck(dht_mtx); - bootstraping = running == State::Running and - status4 == NodeStatus::Disconnected and - status6 == NodeStatus::Disconnected; - } - } while (bootstraping); - }); -} - void DhtRunner::bootstrap(const std::string& host, const std::string& service) { - std::lock_guard<std::mutex> lck(bootstrap_mtx); - bootstrap_nodes_all.emplace_back(host, service); - bootstrap_nodes.emplace_back(host, service); - tryBootstrapContinuously(); + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops_prio.emplace([host, service] (SecureDht& dht) mutable { + dht.addBootstrap(host, service); + }); + cv.notify_all(); } void DhtRunner::bootstrap(const std::string& hostService) { - std::lock_guard<std::mutex> lck(bootstrap_mtx); - auto host_service = splitPort(hostService); - bootstrap_nodes_all.emplace_back(host_service.first, host_service.second); - bootstrap_nodes.emplace_back(std::move(host_service.first), std::move(host_service.second)); - tryBootstrapContinuously(); + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops_prio.emplace([host_service = splitPort(hostService)] (SecureDht& dht) mutable { + dht.addBootstrap(host_service.first, host_service.second); + }); + cv.notify_all(); } void DhtRunner::clearBootstrap() { - std::lock_guard<std::mutex> lck(bootstrap_mtx); - bootstrap_nodes_all.clear(); + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops_prio.emplace([] (SecureDht& dht) mutable { + dht.clearBootstrap(); + }); + cv.notify_all(); } void diff --git a/src/search.h b/src/search.h index da75b6833d199bea54fed6455e294262afe8ef8d..a9a4194b8650f19286f5537f940a0d49339c5bd6 100644 --- a/src/search.h +++ b/src/search.h @@ -568,6 +568,43 @@ struct Dht::Search { return canceled; } + void put(const Sp<Value>& value, DoneCallback callback, time_point created, bool permanent) { + done = false; + expired = false; + auto a_sr = std::find_if(announce.begin(), announce.end(), [&](const Announce& a){ + return a.value->id == value->id; + }); + if (a_sr == announce.end()) { + announce.emplace_back(Announce {permanent, value, created, callback}); + for (auto& n : nodes) { + n->probe_query.reset(); + n->acked[value->id].first.reset(); + } + } else { + a_sr->permanent = permanent; + a_sr->created = created; + if (a_sr->value != value) { + a_sr->value = value; + for (auto& n : nodes) { + n->acked[value->id].first.reset(); + n->probe_query.reset(); + } + } + if (isAnnounced(value->id)) { + if (a_sr->callback) + a_sr->callback(true, {}); + a_sr->callback = {}; + if (callback) + callback(true, {}); + return; + } else { + if (a_sr->callback) + a_sr->callback(false, {}); + a_sr->callback = callback; + } + } + } + /** * @return The number of non-good search nodes. */ diff --git a/src/utils.cpp b/src/utils.cpp index 1ad5fedf764616971872132e137d4a79f08c9b2a..90efca778a960be549fe1917f30273df2ad967fe 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -76,11 +76,8 @@ SockAddr::resolve(const std::string& host, const std::string& service) if(rc != 0) throw std::invalid_argument(std::string("Error: `") + host + ":" + service + "`: " + gai_strerror(rc)); - addrinfo* infop = info; - while (infop) { + for (addrinfo* infop = info; infop; infop = infop->ai_next) ips.emplace_back(infop->ai_addr, infop->ai_addrlen); - infop = infop->ai_next; - } freeaddrinfo(info); return ips; }