diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index 01cee921a47b77a1a5e0b8dbc16fee98ef5fa874..030a18caeefc290673d0b9f537c9c10dd34f93d3 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -36,8 +36,9 @@ namespace dht { class Scheduler { public: struct Job { - Job(std::function<void()>&& f) : do_(std::move(f)) {} + Job(std::function<void()>&& f, time_point t) : do_(std::move(f)), t_(t) {} std::function<void()> do_; + const time_point t_; void cancel() { do_ = {}; } }; @@ -50,17 +51,12 @@ public: * @return pointer to the newly scheduled job. */ Sp<Scheduler::Job> add(time_point t, std::function<void()>&& job_func) { - auto job = std::make_shared<Job>(std::move(job_func)); + auto job = std::make_shared<Job>(std::move(job_func), t); if (t != time_point::max()) timers.emplace(std::move(t), job); return job; } - void add(const Sp<Scheduler::Job>& job, time_point t) { - if (t != time_point::max()) - timers.emplace(std::move(t), job); - } - /** * Reschedules a job. * @@ -68,16 +64,29 @@ public: * @param t The time at which the job shall be rescheduled. */ void edit(Sp<Scheduler::Job>& job, time_point t) { - if (not job) { + if (not job) return; - } // std::function move doesn't garantee to leave the object empty. // Force clearing old value. auto task = std::move(job->do_); - job->do_ = {}; + cancel(job); job = add(t, std::move(task)); } + bool cancel(Sp<Scheduler::Job>& job) { + if (job) { + job->cancel(); + for (auto r = timers.equal_range(job->t_); r.first != r.second; ++r.first) { + if (r.first->second == job) { + timers.erase(r.first); + job.reset(); + return true; + } + } + } + return false; + } + /** * Runs the jobs to do up to now. * diff --git a/src/dht.cpp b/src/dht.cpp index 7c08225409f180a126b516a6f3e236fd3864dd13..baddc070c29f62fb4a84d9975d6e7659978559cf 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -491,10 +491,7 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { auto next_refresh_time = now + getType(a.value->type).expiration; auto& acked = sn->acked[a.value->id]; - if (acked.refresh) { - acked.refresh->cancel(); - acked.refresh.reset(); - } + scheduler.cancel(acked.refresh); /* only put the value if the node doesn't already have it */ if (not hasValue or seq_no < a.value->seq) { if (logger_) @@ -1280,8 +1277,7 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created if (auto vs = store.first) { total_store_size += store.second.size_diff; total_values += store.second.values_diff; - if (vs->expiration_job) - vs->expiration_job->cancel(); + scheduler.cancel(vs->expiration_job); if (not permanent) { vs->expiration_job = scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id)); } @@ -2060,8 +2056,7 @@ Dht::bootstrap() logger_->e(myid, "Can't resolve %s:%s: %s", boootstrap.first.c_str(), boootstrap.second.c_str(), e.what()); } } - if (bootstrapJob) - bootstrapJob->cancel(); + scheduler.cancel(bootstrapJob); bootstrapJob = scheduler.add(scheduler.time() + bootstrap_period, std::bind(&Dht::bootstrap, this)); bootstrap_period = std::min(bootstrap_period * 2, BOOTSTRAP_PERIOD_MAX); } @@ -2076,10 +2071,7 @@ Dht::startBootstrap() void Dht::stopBootstrap() { - if (bootstrapJob) { - bootstrapJob->cancel(); - bootstrapJob.reset(); - } + scheduler.cancel(bootstrapJob); bootstrap_period = BOOTSTRAP_PERIOD; } @@ -2568,10 +2560,7 @@ Dht::storageRefresh(const InfoHash& id, Value::Id vid) auto expiration = s->second.refresh(now, vid, types); if (expiration.first) { - if (expiration.first->expiration_job) { - expiration.first->expiration_job->cancel(); - expiration.first->expiration_job.reset(); - } + scheduler.cancel(expiration.first->expiration_job); if (expiration.second != time_point::max()) { expiration.first->expiration_job = scheduler.add(expiration.second, std::bind(&Dht::expireStorage, this, id)); }