diff --git a/include/opendht/dht.h b/include/opendht/dht.h index de31a3bc7da64fe11f3200545cc6b18074e1c55a..90cced661a9cba29624c7b26b547b665bfb027d5 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -469,9 +469,11 @@ private: ack->second->last_try + Node::MAX_RESPONSE_TIME ); } + time_point getAnnounceTime(Value::Id vid, const ValueType& type) const { return getAnnounceTime(acked.find(vid), type); } + time_point getListenTime() const { if (not listenStatus) return time_point::min(); @@ -544,6 +546,7 @@ private: time_point refill_time {time_point::min()}; time_point step_time {time_point::min()}; /* the time of the last search step */ time_point get_step_time {time_point::min()}; /* the time of the last get step */ + std::shared_ptr<Scheduler::Job> nextSearchStep {}; bool expired {false}; /* no node, or all nodes expired */ bool done {false}; /* search is over, cached for later */ @@ -762,6 +765,7 @@ private: // timing Scheduler scheduler {}; + std::shared_ptr<Scheduler::Job> nextNodesConfirmation {}; time_point mybucket_grow_time {time_point::min()}, mybucket6_grow_time {time_point::min()}; NetworkEngine network_engine; @@ -852,7 +856,7 @@ private: Search *findSearch(unsigned short tid, sa_family_t af); void expireSearches(); - void confirmNodes(bool reschedule = false); + void confirmNodes(); void expire(); /** diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index c803bf3058b1f069ea5eb03fbcc7f6e1824951cd..b657c9d8445ab7b75dae6fc3609986f9b60b61c1 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -452,6 +452,7 @@ private: void requestStep(std::shared_ptr<Request> req) { if (req->status->completed or req->status->cancelled) return; + auto now = scheduler.time(); if (req->status->expired(now)) { req->on_expired(req->status, true); diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index 8930ef184d7279a5ab55bb5668ab5f13a1060fea..a5cb6ca74fc3d3358d1569275d03f24a6ca16e79 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -42,6 +42,7 @@ namespace dht { class Scheduler { public: struct Job { + bool done; bool cancelled; std::function<void()> do_; }; @@ -51,14 +52,30 @@ public: * * @param time The time upon which the job shall be executed. * @param job_func The job function to execute. + * + * @return pointer to the newly scheduled job. */ - std::weak_ptr<Scheduler::Job> - add(time_point time, std::function<void()> job_func) { - auto job = std::make_shared<Job>(Job {false, std::move(job_func)}); + std::shared_ptr<Scheduler::Job> add(time_point time, std::function<void()> job_func) { + auto job = std::make_shared<Job>(Job {false, false, std::move(job_func)}); timers.emplace(std::move(time), job); return job; } + /** + * Reschedules a job. + * + * @param time The time at which the job shall be rescheduled. + * @param job The job to edit. + * + * @return pointer to the newly scheduled job. + */ + std::shared_ptr<Scheduler::Job> edit(const std::shared_ptr<Scheduler::Job>& job, time_point time) { + if (not job) + return {}; + job->cancelled = true; + return add(time, std::move(job->do_)); + } + /** * Runs the jobs to do up to now. * @@ -69,7 +86,12 @@ public: for (auto t = timers.begin(); t != timers.end(); ) { if (t->first > now) break; - t->second->do_(); + + auto& job = timer->second; + if (not job->cancelled and job->do_) { + job->do_(); + job->done = true; + } t = timers.erase(t); } return getNextJobTime(); diff --git a/src/dht.cpp b/src/dht.cpp index 44012575a79070d877d9132568646d1e264e8b01..37646110a09f4570233f251a68b1bf0681652985 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -518,7 +518,7 @@ Dht::trySearchInsert(const std::shared_ptr<Node>& node) auto& s = srp.second; if (s->insertNode(node, now)) { inserted = true; - scheduler.add(s->getNextStepTime(types, now), std::bind(&Dht::searchStep, this, s)); + s->nextSearchStep = scheduler.edit(s->nextSearchStep, s->getNextStepTime(types, now)); } } return inserted; @@ -576,7 +576,7 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir mybucket_grow_time = now; else mybucket6_grow_time = now; - scheduler.add(now, std::bind(&Dht::confirmNodes, this, false)); + nextNodesConfirmation = scheduler.edit(nextNodesConfirmation, now); } /* First, try to get rid of a known-bad node. */ @@ -1328,12 +1328,14 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallba if (search_id == 0) search_id++; } + if (sr->nextSearchStep) + sr->nextSearchStep->cancelled = true; + sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr)); if (callback) sr->callbacks.push_back({.start=scheduler.time(), .filter=filter, .get_cb=callback, .done_cb=done_callback}); bootstrapSearch(*sr); - searchStep(sr); return sr; } @@ -1384,7 +1386,7 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, a_sr->callback = callback; } } - scheduler.add(sr->getNextStepTime(types, now), std::bind(&Dht::searchStep, this, sr)); + sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); //TODO //if (tm < search_time) { // DHT_LOG.ERROR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(),l @@ -1411,7 +1413,7 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter sr->done = false; auto token = ++sr->listener_token; sr->listeners.emplace(token, LocalListener{f, cb}); - scheduler.add(sr->getNextStepTime(types, now), std::bind(&Dht::searchStep, this, sr)); + sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); return token; } @@ -1866,7 +1868,7 @@ void Dht::connectivityChanged() { const auto& now = scheduler.time(); - scheduler.add(now, std::bind(&Dht::confirmNodes, this, false)); + nextNodesConfirmation = scheduler.edit(nextNodesConfirmation, now); mybucket_grow_time = now; mybucket6_grow_time = now; reported_addr.clear(); @@ -2200,7 +2202,7 @@ Dht::Dht(int s, int s6, Config config) uniform_duration_distribution<> time_dis {std::chrono::seconds(0), std::chrono::seconds(3)}; auto confirm_nodes_time = scheduler.time() + time_dis(rd); - scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this, true)); + nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); // Fill old secret { @@ -2419,7 +2421,7 @@ Dht::expire() } void -Dht::confirmNodes(bool reschedule) +Dht::confirmNodes() { using namespace std::chrono; bool soon = false; @@ -2449,8 +2451,7 @@ Dht::confirmNodes(bool reschedule) : uniform_duration_distribution<> {seconds(60), seconds(180)}; auto confirm_nodes_time = now + time_dis(rd); - if (reschedule) - scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this, reschedule)); + nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); } std::vector<Dht::ValuesExport> @@ -2696,7 +2697,7 @@ Dht::onGetValuesDone(std::shared_ptr<NetworkEngine::RequestStatus> status, Netwo l.first(l.second); } // Force to recompute the next step time - scheduler.add(now, std::bind(&Dht::searchStep, this, sr)); + sr->nextSearchStep = scheduler.edit(sr->nextSearchStep, now); } NetworkEngine::RequestAnswer