diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 06a490e0993c1a804c9ec2558087f1df24e1dd6d..795370a67ddb12fee6299983cd0345b79c74347b 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -120,7 +120,12 @@ public: * and an ID for the node. */ Dht(int s, int s6, Config config); - virtual ~Dht() {} + virtual ~Dht() { + for (auto& s : searches4) + s.second->clear(); + for (auto& s : searches6) + s.second->clear(); + } /** * Get the ID of the node. @@ -426,19 +431,18 @@ private: * Can we use this node to listen/announce now ? */ bool isSynced(time_point now) const { - if (not getStatus) - return false; - + /*if (not getStatus) + return false;*/ return not node->isExpired(now) and - last_get_reply >= now - Node::NODE_EXPIRE_TIME; + not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME; } bool canGet(time_point now, time_point update) const { - if (not getStatus) - return true; - + /*if (not getStatus) + return true;*/ return not node->isExpired(now) and - (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply) and - now > getStatus->last_try + Node::MAX_RESPONSE_TIME; + (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply) + and (not getStatus or not getStatus->pending(now)); + // and now > getStatus->last_try + Node::MAX_RESPONSE_TIME; } bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const { @@ -599,6 +603,14 @@ private: unsigned refill(const RoutingTable&, time_point now); std::vector<std::shared_ptr<Node>> getNodes() const; + + void clear() { + announce.clear(); + callbacks.clear(); + listeners.clear(); + nodes.clear(); + nextSearchStep = {}; + } }; struct ValueStorage { diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 0448bfa5beec2917ac606a6496a7e0338c618fae..cb541b991171f34bb3a84e62d209a833ff4d161e 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -211,7 +211,10 @@ public: } bool pending(time_point now) const { - return reply_time < last_try && now - last_try <= Node::MAX_RESPONSE_TIME; + return not cancelled + and not completed + and reply_time < last_try + and now - last_try <= Node::MAX_RESPONSE_TIME; } Request() {} @@ -222,13 +225,13 @@ public: Blob &&msg, std::function<void(std::shared_ptr<Request> req_status, ParsedMessage&&)> on_done, std::function<void(std::shared_ptr<Request> req_status, bool)> on_expired, bool persistent = false) : - node(node), on_done(on_done), on_expired(on_expired), tid(tid), msg(msg), persistent(persistent) { } + node(node), on_done(on_done), on_expired(on_expired), tid(tid), msg(std::move(msg)), persistent(persistent) { } std::function<void(std::shared_ptr<Request> req_status, ParsedMessage&&)> on_done {}; std::function<void(std::shared_ptr<Request> req_status, bool)> on_expired {}; const uint16_t tid {0}; /* the request id. */ - Blob msg {}; /* the serialized message. */ + const Blob msg {}; /* the serialized message. */ const bool persistent {false}; /* the request is not erased upon completion. */ }; @@ -239,6 +242,8 @@ public: void cancelRequest(std::shared_ptr<Request>& req) { if (req) { req->cancelled = true; + req->on_done = {}; + req->on_expired = {}; requests.erase(req->tid); } } @@ -346,7 +351,17 @@ public: { transaction_id = std::uniform_int_distribution<decltype(transaction_id)>{1}(rd_device); } - virtual ~NetworkEngine() {}; + virtual ~NetworkEngine() { + clear(); + }; + + void clear() { + for (auto& req : requests) { + req.second->on_expired = {}; + req.second->on_done = {}; + } + requests.clear(); + } /** * Sends values (with closest nodes) to a listenner. @@ -455,6 +470,8 @@ private: auto now = scheduler.time(); if (req->expired(now)) { req->on_expired(req, true); + req->on_expired = {}; + req->on_done = {}; requests.erase(req->tid); return; } else if (req->attempt_count == 1) { @@ -481,7 +498,10 @@ private: */ void sendRequest(std::shared_ptr<Request>& request) { request->start = scheduler.time(); - requests.emplace(request->tid, request); + auto e = requests.emplace(request->tid, request); + if (!e.second) { + DHT_LOG.ERROR("Request already existed !"); + } requestStep(request); } diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index 558e3c7000c074e303505cb997688ebbafb14a33..abfa66b55851f21b271085333e5f71fd567d9612 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -39,7 +39,6 @@ public: struct Job { bool done; bool cancelled; - time_point time; std::function<void()> do_; }; @@ -51,11 +50,11 @@ public: * * @return pointer to the newly scheduled job. */ - std::shared_ptr<Scheduler::Job> add(time_point time, std::function<void()> job_func) { - auto scheduled_time = std::max(time, now); /* This should prevent running an auto rescheduling job forever - before the Scheduler::run method ends. */ - auto job = std::make_shared<Job>(Job {false, false, scheduled_time, std::move(job_func)}); - timers.emplace(std::move(scheduled_time), job); + std::shared_ptr<Scheduler::Job> add(time_point t, std::function<void()> job_func) { + //std::cout << "Scheduler: adding " << (job_func ? "" : "empty") << " job in " << print_dt(t - clock::now()) << std::endl; + auto job = std::make_shared<Job>(Job {false, false, std::move(job_func)}); + if (t != time_point::max()) + timers.emplace(std::move(t), job); return job; } @@ -67,13 +66,17 @@ public: * * @return pointer to the newly scheduled job. */ - std::shared_ptr<Scheduler::Job> edit(const std::shared_ptr<Scheduler::Job>& job, time_point time) { - if (not job) - return {}; + void edit(std::shared_ptr<Scheduler::Job>& job, time_point t) { + if (not job) { + std::cout << "editing an empty job" << std::endl; + return; + } job->cancelled = true; - return add(time, std::move(job->do_)); + job = add(t, std::move(job->do_)); } + + /** * Runs the jobs to do up to now. * @@ -88,12 +91,13 @@ public: * loops before this method ends. It is garanteed by the fact that a * job will at least be scheduled for "now" and not before. */ - if (not (timer->first < now)) + if (timer->first > now) break; auto& job = timer->second; if (not job->cancelled and job->do_) { job->do_(); + //job->do_ = {}; job->done = true; } timers.erase(timer); @@ -102,6 +106,8 @@ public: } inline time_point getNextJobTime() const { + //if (not timers.empty()) + // std::cout << "Next job in " << print_dt(timers.begin()->first - clock::now()) << std::endl; return not timers.empty() ? timers.begin()->first : time_point::max(); } diff --git a/include/opendht/utils.h b/include/opendht/utils.h index e5f87825dd2ae7557d2e364269d09175cff8ce94..205ac4efabe9520abcd5607feae689af09cfe720 100644 --- a/include/opendht/utils.h +++ b/include/opendht/utils.h @@ -68,6 +68,12 @@ using duration = clock::duration; time_point from_time_t(std::time_t t); std::time_t to_time_t(time_point t); +template <class DT> +static double +print_dt(DT d) { + return std::chrono::duration_cast<std::chrono::duration<double>>(d).count(); +} + static /*constexpr*/ const time_point TIME_INVALID = {time_point::min()}; static /*constexpr*/ const time_point TIME_MAX {time_point::max()}; diff --git a/src/dht.cpp b/src/dht.cpp index 5df3bc3ac1d060d303ac2896b88708b38b649481..bc6997f1d08140f189e8110953be5509493a68ea 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -85,12 +85,6 @@ to_hex(const uint8_t *buf, size_t buflen) return s.str(); } -template <class DT> -static double -print_dt(DT d) { - return std::chrono::duration_cast<std::chrono::duration<double>>(d).count(); -} - namespace dht { using namespace std::placeholders; @@ -491,7 +485,7 @@ Dht::trySearchInsert(const std::shared_ptr<Node>& node) auto& s = srp.second; if (s->insertNode(node, now)) { inserted = true; - s->nextSearchStep = scheduler.edit(s->nextSearchStep, s->getNextStepTime(types, now)); + scheduler.edit(s->nextSearchStep, s->getNextStepTime(types, now)); } } return inserted; @@ -549,7 +543,7 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir mybucket_grow_time = now; else mybucket6_grow_time = now; - nextNodesConfirmation = scheduler.edit(nextNodesConfirmation, now); + //scheduler.edit(nextNodesConfirmation, now); } /* First, try to get rid of a known-bad node. */ @@ -745,6 +739,9 @@ Dht::expireSearches() Dht::SearchNode* Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update) { + if (sr->done) + return nullptr; + const auto& now = scheduler.time(); const time_point up = update ? sr->getLastGetTime() : time_point::min(); SearchNode* n = nullptr; @@ -768,16 +765,17 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update n->node->id.toString().c_str(), print_addr(n->node->ss, n->node->sslen).c_str()); + std::weak_ptr<Search> ws = sr; auto onDone = - [=](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable { - if (sr) { + [this,ws](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable { + if (auto sr = ws.lock()) { sr->insertNode(status->node, scheduler.time(), answer.ntoken); onGetValuesDone(status, answer, sr); } }; auto onExpired = - [=](std::shared_ptr<NetworkEngine::Request> status, bool over) mutable { - if (sr) { + [this,ws](std::shared_ptr<NetworkEngine::Request> status, bool over) mutable { + if (auto sr = ws.lock()) { if (not over) { auto srn = std::find_if(sr->nodes.begin(), sr->nodes.end(), [&status](SearchNode& sn) { return status->node == sn.node; @@ -789,7 +787,7 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update } } if (searchSendGetValues(sr)) - sr->get_step_time = now; + sr->get_step_time = scheduler.time(); searchStep(sr); } }; @@ -860,18 +858,20 @@ Dht::searchStep(std::shared_ptr<Search> sr) //std::cout << "Sending listen to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl; network_engine.cancelRequest(n.listenStatus); + + std::weak_ptr<Search> ws = sr; n.listenStatus = network_engine.sendListen(n.node, sr->id, n.token, - [=](std::shared_ptr<NetworkEngine::Request> status, + [this,ws](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable { /* on done */ - if (sr) { + if (auto sr = ws.lock()) { onListenDone(status, answer, sr); searchStep(sr); } }, - [=](std::shared_ptr<NetworkEngine::Request>, bool) mutable + [this,ws](std::shared_ptr<NetworkEngine::Request>, bool) mutable { /* on expired */ - if (sr) { + if (auto sr = ws.lock()) { searchStep(sr); } } @@ -904,17 +904,18 @@ Dht::searchStep(std::shared_ptr<Search> sr) print_addr(n.node->ss, n.node->sslen).c_str(), vid); //std::cout << "Sending announce_value to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl; + std::weak_ptr<Search> ws = sr; n.acked[vid] = network_engine.sendAnnounceValue(n.node, sr->id, *a.value, a.created, n.token, - [=](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable + [this,ws](std::shared_ptr<NetworkEngine::Request> status, NetworkEngine::RequestAnswer&& answer) mutable { /* on done */ - if (sr) { + if (auto sr = ws.lock()) { onAnnounceDone(status, answer, sr); searchStep(sr); } }, - [=](std::shared_ptr<NetworkEngine::Request>, bool) mutable + [this,ws](std::shared_ptr<NetworkEngine::Request>, bool) mutable { /* on expired */ - if (sr) { searchStep(sr); } + if (auto sr = ws.lock()) { searchStep(sr); } } ); } @@ -972,7 +973,8 @@ Dht::searchStep(std::shared_ptr<Search> sr) } /* periodic searchStep scheduling. */ - sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + if (not sr->done) + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); } @@ -1073,9 +1075,11 @@ Dht::Search::getUpdateTime(time_point now) const for (const auto& sn : nodes) { if (sn.node->isExpired(now) or (sn.candidate and t >= TARGET_NODES)) continue; - if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get)) { + bool pending = sn.getStatus and sn.getStatus->pending(now); + if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) { // not isSynced - ut = std::min(ut, get_step_time + SEARCH_GET_STEP); + if (not pending) + ut = std::min(ut, get_step_time + SEARCH_GET_STEP); if (not sn.candidate) d++; } else { @@ -1086,7 +1090,7 @@ Dht::Search::getUpdateTime(time_point now) const if (not sn.candidate and ++i == TARGET_NODES) break; } - if ((not callbacks.empty() or not announce.empty()) and d == 0) { + if (not callbacks.empty() and d == 0) { // If all synced/updated but some callbacks remain, step now to clear them return now; } @@ -1178,7 +1182,7 @@ Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, ti auto ut = getUpdateTime(now); if (ut != time_point::max()) { - //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl; + std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl; next_step = std::min(next_step, ut); } @@ -1186,13 +1190,13 @@ Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, ti { auto at = getAnnounceTime(types, now); if (at != time_point::max()) { - //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl; + std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl; next_step = std::min(next_step, at); } auto lt = getListenTime(now); if (lt != time_point::max()) { - //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl; + std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl; next_step = std::min(next_step, lt); } } @@ -1284,9 +1288,8 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallba if (search_id == 0) search_id++; } - if (sr->nextSearchStep) - sr->nextSearchStep->cancelled = true; - sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr)); + if (not sr->nextSearchStep) + sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr)); if (callback) sr->callbacks.push_back({.start=scheduler.time(), .filter=filter, .get_cb=callback, .done_cb=done_callback}); @@ -1342,7 +1345,7 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, a_sr->callback = callback; } } - sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); //TODO //if (tm < search_time) { // DHT_LOG.ERROR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(),l @@ -1369,7 +1372,7 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter sr->done = false; auto token = ++sr->listener_token; sr->listeners.emplace(token, LocalListener{f, cb}); - sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); return token; } @@ -1826,7 +1829,7 @@ void Dht::connectivityChanged() { const auto& now = scheduler.time(); - nextNodesConfirmation = scheduler.edit(nextNodesConfirmation, now); + scheduler.edit(nextNodesConfirmation, now); mybucket_grow_time = now; mybucket6_grow_time = now; reported_addr.clear(); @@ -1844,7 +1847,7 @@ void Dht::rotateSecrets() { const auto& now = scheduler.time(); - uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45)); + uniform_duration_distribution<> time_dist(std::chrono::seconds(45), std::chrono::minutes(1)); auto rotate_secrets_time = now + time_dist(rd); oldsecret = secret; @@ -2151,8 +2154,9 @@ Dht::Dht(int s, int s6, Config config) search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd); - uniform_duration_distribution<> time_dis {std::chrono::seconds(0), std::chrono::seconds(3)}; + uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)}; auto confirm_nodes_time = scheduler.time() + time_dis(rd); + DHT_LOG.DEBUG("Scheduling %s", myid.toString().c_str()); nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); // Fill old secret @@ -2374,9 +2378,13 @@ Dht::confirmNodes() bool soon = false; const auto& now = scheduler.time(); - if (searches4.empty() and searches6.empty() and getStatus() != Status::Disconnected) { - DHT_LOG.DEBUG("[confirm nodes] initial 'get' for my id (%s).", myid.toString().c_str()); - get(myid, GetCallbackSimple{}); + if (searches4.empty() and getStatus(AF_INET) != Status::Disconnected) { + DHT_LOG.DEBUG("[confirm nodes] initial IPv4 'get' for my id (%s).", myid.toString().c_str()); + search(myid, AF_INET); + } + if (searches6.empty() and getStatus(AF_INET6) != Status::Disconnected) { + DHT_LOG.DEBUG("[confirm nodes] initial IPv6 'get' for my id (%s).", myid.toString().c_str()); + search(myid, AF_INET6); } soon |= bucketMaintenance(buckets); @@ -2613,8 +2621,8 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::Request> status, return; } - DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get'", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); - const auto& now = scheduler.time(); + DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status->node->id.toString().c_str()); + if (not a.ntoken.empty()) { if (!a.values.empty()) { DHT_LOG.DEBUG("[search %s IPv%c] found %u values", @@ -2652,11 +2660,14 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::Request> status, blacklistNode(&status->node->id, (sockaddr*)&status->node->ss, status->node->sslen); } - if (searchSendGetValues(sr)) /* always keep a 'get' request in progress if possible. */ - sr->get_step_time = now; + if (not sr->done) { + const auto& now = scheduler.time(); + if (searchSendGetValues(sr)) /* always keep a 'get' request in progress if possible. */ + sr->get_step_time = now; - // Force to recompute the next step time - sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, now); + // Force to recompute the next step time + scheduler.edit(sr->nextSearchStep, now); + } } NetworkEngine::RequestAnswer @@ -2690,9 +2701,11 @@ Dht::onListenDone(std::shared_ptr<NetworkEngine::Request>& status, NetworkEngine onGetValuesDone(status, answer, sr); } - if (searchSendGetValues(sr)) - sr->get_step_time = now; - sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, now); + if (not sr->done) { + if (searchSendGetValues(sr)) + sr->get_step_time = now; + scheduler.edit(sr->nextSearchStep, now); + } } else DHT_LOG.DEBUG("Unknown search or announce!"); } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 3f2d703671a1fc91f7a1ed4b57496dd27f85e7c1..c03093a3042cc6406abda9b512f20471190b06ec 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -163,6 +163,10 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr req->reply_time = scheduler.time(); req->completed = true; req->on_done(req, std::move(msg)); + if (not req->persistent) { + req->on_done = {}; + req->on_expired = {}; + } break; default: break; @@ -284,7 +288,8 @@ NetworkEngine::sendPing(const sockaddr* sa, socklen_t salen, RequestCb on_done, Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), std::make_shared<Node>(InfoHash {}, sa, salen), std::move(b), - [=](std::shared_ptr<Request> req_status, ParsedMessage&&){ + [=](std::shared_ptr<Request> req_status, ParsedMessage&&) { + DHT_LOG.DEBUG("Got pong from %s", print_addr(req_status->node->ss, req_status->node->sslen).c_str()); if (on_done) { on_done(req_status, {}); } diff --git a/tools/tools_common.h b/tools/tools_common.h index 80427292b69c865795040e6ee9ad87efeb6c0490..9c64be2207c8d2363d0df2354ffbc940f3009e2d 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -115,11 +115,11 @@ disableLogging(dht::DhtRunner& dht) /** * Converts std::chrono::duration to floating-point seconds. */ -template <class DT> +/*template <class DT> static double print_dt(DT d) { return std::chrono::duration_cast<std::chrono::duration<double>>(d).count(); -} +}*/ /** * Split "[host]:port" or "host:port" to pair<"host", "port">.