diff --git a/include/opendht/dht.h b/include/opendht/dht.h index d0dee30133c9aa748cbffef06b5f0751384d0fad..60fcff277ca549408e4f7c3411084ebb171feb0b 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -382,7 +382,7 @@ private: size_t listener_token {1}; // timing - Scheduler scheduler {}; + Scheduler scheduler; std::shared_ptr<Scheduler::Job> nextNodesConfirmation {}; std::shared_ptr<Scheduler::Job> nextStorageMaintenance {}; time_point mybucket_grow_time {time_point::min()}, mybucket6_grow_time {time_point::min()}; diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index f377fd23f6d33059f12054e0c4d09a8382126172..8f5ca6ff2d4f388c3eee4a4f1007e213980a8251 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -36,8 +36,10 @@ namespace dht { */ class Scheduler { public: + Scheduler(const Logger& l) : DHT_LOG(l) {} + struct Job { - Job(std::function<void()>&& f) : do_(std::forward<std::function<void()>>(f)) {} + Job(std::function<void()>&& f) : do_(std::move(f)) {} std::function<void()> do_; }; @@ -50,7 +52,7 @@ public: * @return pointer to the newly scheduled job. */ std::shared_ptr<Scheduler::Job> add(time_point t, std::function<void()>&& job_func) { - auto job = std::make_shared<Job>(std::forward<std::function<void()>>(job_func)); + auto job = std::make_shared<Job>(std::move(job_func)); if (t != time_point::max()) timers.emplace(std::move(t), job); return job; @@ -66,10 +68,14 @@ public: */ void edit(std::shared_ptr<Scheduler::Job>& job, time_point t) { if (not job) { - std::cerr << "editing an empty job" << std::endl; + DHT_LOG.ERR("editing an empty job"); return; } - job = add(t, std::move(job->do_)); + // std::function move doesn't garantee to leave the object empty. + // Force clearing old value. + auto task = std::move(job->do_); + job->do_ = {}; + job = add(t, std::move(task)); } /** @@ -89,17 +95,16 @@ public: if (timer->first > now) break; - const auto& job = *timer->second; - if (job.do_) - job.do_(); + auto job = std::move(timer->second); timers.erase(timer); + + if (job->do_) + job->do_(); } return getNextJobTime(); } inline time_point getNextJobTime() const { - //if (not timers.empty()) - // std::cout << "Next job in " << print_dt(timers.begin()->first - clock::now()) << std::endl; return timers.empty() ? time_point::max() : timers.begin()->first; } @@ -113,6 +118,7 @@ public: private: time_point now {clock::now()}; std::multimap<time_point, std::shared_ptr<Job>> timers {}; /* the jobs ordered by time */ + const Logger& DHT_LOG; }; } diff --git a/src/dht.cpp b/src/dht.cpp index 5c1eab8b3a2ee5c254ad728cd3ee81491512de8d..e2e2d12be4a41ce2889f7ca8904a8881e9108192 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1386,14 +1386,14 @@ Dht::searchStep(std::shared_ptr<Search> sr) network_engine.cancelRequest(last_req); if (auto sr = ws.lock()) { onListenDone(req, answer, sr, query); - searchStep(sr); + scheduler.edit(sr->nextSearchStep, scheduler.time()); } }, [this,ws,last_req,query](const Request& req, bool over) mutable { /* on expired */ network_engine.cancelRequest(last_req); if (auto sr = ws.lock()) { - searchStep(sr); + scheduler.edit(sr->nextSearchStep, scheduler.time()); if (over) if (auto sn = sr->getNode(req.node)) sn->listenStatus.erase(query); @@ -2652,11 +2652,11 @@ Dht::~Dht() s.second->clear(); } -Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {} +Dht::Dht() : store(), scheduler(DHT_LOG), network_engine(DHT_LOG, scheduler) {} Dht::Dht(int s, int s6, Config config) : myid(config.node_id), is_bootstrap(config.is_bootstrap), store(), - network_engine(myid, config.network, s, s6, DHT_LOG, scheduler, + scheduler(DHT_LOG), network_engine(myid, config.network, s, s6, DHT_LOG, scheduler, std::bind(&Dht::onError, this, _1, _2), std::bind(&Dht::onNewNode, this, _1, _2), std::bind(&Dht::onReportedAddr, this, _1, _2), @@ -2779,9 +2779,13 @@ Dht::bucketMaintenance(RoutingTable& list) } DHT_LOG_DEBUG("[node %s] sending find %s for bucket maintenance.", n->toString().c_str(), id.toString().c_str()); - network_engine.sendFindNode(n, id, want, nullptr, [&](const Request&, bool ok){ - if (not ok) - scheduler.edit(nextNodesConfirmation, scheduler.time()); + auto start = scheduler.time(); + network_engine.sendFindNode(n, id, want, nullptr, [this,start,n](const Request&, bool over) { + if (over) { + auto end = scheduler.time(); + DHT_LOG_DEBUG("[node %s] bucket maintenance op expired after %llu", n->toString().c_str(), std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()); + scheduler.edit(nextNodesConfirmation, end + 3 * Node::NODE_EXPIRE_TIME); + } }); /* In order to avoid sending queries back-to-back, give up for now and reschedule us soon. */ @@ -2938,7 +2942,10 @@ Dht::confirmNodes() : uniform_duration_distribution<> {seconds(60), seconds(180)}; auto confirm_nodes_time = now + time_dis(rd); - nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); + if (nextNodesConfirmation) + scheduler.edit(nextNodesConfirmation, confirm_nodes_time); + else + nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); } std::vector<ValuesExport>