diff --git a/src/manager.cpp b/src/manager.cpp index c00490a59f563cec3c5a173548cfe51f9e947d87..09b24dab0d2ef913396952e26462156b1c65e4c4 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -366,7 +366,7 @@ struct Manager::ManagerPimpl std::thread ioContextRunner_; /** Main scheduler */ - ScheduledExecutor scheduler_; + ScheduledExecutor scheduler_ {"manager"}; std::atomic_bool autoAnswer_ {false}; diff --git a/src/scheduled_executor.cpp b/src/scheduled_executor.cpp index 146de1737436507209a8e7c19513c6e3fc5ba9a0..9e06f96b91208afa25cefdb844af058e49aaf75c 100644 --- a/src/scheduled_executor.cpp +++ b/src/scheduled_executor.cpp @@ -22,8 +22,11 @@ namespace jami { -ScheduledExecutor::ScheduledExecutor() - : running_(std::make_shared<std::atomic<bool>>(true)) +std::atomic<uint64_t> task_cookie = {0}; + +ScheduledExecutor::ScheduledExecutor(const std::string& name) + : name_(name) + , running_(std::make_shared<std::atomic<bool>>(true)) , thread_([this, is_running = running_] { // The thread needs its own reference of `running_` in case the // scheduler is destroyed within the thread because of a job @@ -61,34 +64,40 @@ ScheduledExecutor::stop() } void -ScheduledExecutor::run(Job&& job) +ScheduledExecutor::run(std::function<void()>&& job, + const char* filename, uint32_t linum) { { std::lock_guard<std::mutex> lock(jobLock_); auto now = clock::now(); - jobs_[now].emplace_back(std::move(job)); + jobs_[now].emplace_back(std::move(job), filename, linum); } cv_.notify_all(); } std::shared_ptr<Task> -ScheduledExecutor::schedule(Job&& job, time_point t) +ScheduledExecutor::schedule(std::function<void()>&& job, time_point t, + const char* filename, uint32_t linum) { - auto ret = std::make_shared<Task>(std::move(job)); + auto ret = std::make_shared<Task>(std::move(job), filename, linum); schedule(ret, t); return ret; } std::shared_ptr<Task> -ScheduledExecutor::scheduleIn(Job&& job, duration dt) +ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt, + const char* filename, uint32_t linum) { - return schedule(std::move(job), clock::now() + dt); + return schedule(std::move(job), clock::now() + dt, + filename, linum); } std::shared_ptr<RepeatedTask> -ScheduledExecutor::scheduleAtFixedRate(RepeatedJob&& job, duration dt) +ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job, + duration dt, + const char* filename, uint32_t linum) { - auto ret = std::make_shared<RepeatedTask>(std::move(job)); + auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum); reschedule(ret, clock::now(), dt); return ret; } @@ -97,9 +106,9 @@ void ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt) { schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable { - if (task->run()) + if (task->run(name_.c_str())) reschedule(std::move(task), t + dt, dt); - }), + }, task->job().filename, task->job().linum), t); } @@ -108,7 +117,8 @@ ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t) { { std::lock_guard<std::mutex> lock(jobLock_); - jobs_[t].emplace_back([task = std::move(task)] { task->run(); }); + jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); }, + task->job().filename, task->job().linum); } cv_.notify_all(); } @@ -134,7 +144,7 @@ ScheduledExecutor::loop() } for (auto& job : jobs) { try { - job(); + job.fn(); } catch (const std::exception& e) { JAMI_ERR("Exception running job: %s", e.what()); } diff --git a/src/scheduled_executor.h b/src/scheduled_executor.h index 1a1c7083a59ced5769e49045d3e4a01c8d99c15b..1ff7d6d5b9dd6a7fdad3e1769552909cc4e0278f 100644 --- a/src/scheduled_executor.h +++ b/src/scheduled_executor.h @@ -32,13 +32,53 @@ #include "noncopyable.h" +#include "tracepoint.h" +#include "trace-tools.h" + namespace jami { +extern std::atomic<uint64_t> task_cookie; + /** * A runnable function */ -using Job = std::function<void()>; -using RepeatedJob = std::function<bool()>; +struct Job { + Job(std::function<void()>&& f, const char* file, uint32_t l) + : fn(std::move(f)) + , filename(file) + , linum(l) { } + + std::function<void()> fn; + const char* filename; + uint32_t linum; + + inline operator bool() const { + return static_cast<bool>(fn); + } + + void reset() { + fn = {}; + } +}; + +struct RepeatedJob { + RepeatedJob(std::function<bool()>&& f, const char* file, uint32_t l) + : fn(std::move(f)) + , filename(file) + , linum(l) { } + + std::function<bool()> fn; + const char* filename; + uint32_t linum; + + inline operator bool() { + return static_cast<bool>(fn); + } + + void reset() { + fn = {}; + } +}; /** * A Job that can be disposed @@ -46,19 +86,34 @@ using RepeatedJob = std::function<bool()>; class Task { public: - Task(Job&& j) - : job_(std::move(j)) - {} - void run() + Task(std::function<void()>&& fn, const char* filename, uint32_t linum) + : job_(std::move(fn), filename, linum) + , cookie_(task_cookie++) { } + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" + void run(const char* executor_name) { - if (job_) - job_(); + if (job_.fn) { + jami_tracepoint(scheduled_executor_task_begin, + executor_name, + job_.filename, job_.linum, + cookie_); + job_.fn(); + jami_tracepoint(scheduled_executor_task_end, + cookie_); + } } - void cancel() { job_ = {}; } +#pragma GCC pop + + void cancel() { job_.reset(); } bool isCancelled() const { return !job_; } + Job& job() { return job_; } + private: Job job_; + uint64_t cookie_; }; /** @@ -67,32 +122,58 @@ private: class RepeatedTask { public: - RepeatedTask(RepeatedJob&& j) - : job_(std::move(j)) - {} - bool run() + RepeatedTask(std::function<bool()>&& fn, const char* filename, + uint32_t linum) + : job_(std::move(fn), filename, linum) + , cookie_(task_cookie++) { } + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" + bool run(const char* executor_name) { + bool cont; std::lock_guard<std::mutex> l(lock_); - if (cancel_.load() or (job_ and not job_())) { - cancel_.store(true); - job_ = {}; + + if (not cancel_.load() and job_.fn) { + jami_tracepoint(scheduled_executor_task_begin, + executor_name, + job_.filename, job_.linum, + cookie_); + cont = job_.fn(); + jami_tracepoint(scheduled_executor_task_end, + cookie_); + + } else { + cont = false; } - return (bool) job_; + + if (not cont) { + job_.reset(); + } + + return static_cast<bool>(job_); } +#pragma GCC pop + void cancel() { cancel_.store(true); } + void destroy() { cancel(); std::lock_guard<std::mutex> l(lock_); - job_ = {}; + job_.reset(); } + bool isCancelled() const { return cancel_.load(); } + RepeatedJob& job() { return job_; } + private: NON_COPYABLE(RepeatedTask); - mutable std::mutex lock_; RepeatedJob job_; + mutable std::mutex lock_; std::atomic_bool cancel_ {false}; + uint64_t cookie_; }; class ScheduledExecutor @@ -102,28 +183,37 @@ public: using time_point = clock::time_point; using duration = clock::duration; - ScheduledExecutor(); + ScheduledExecutor(const std::string& name_); ~ScheduledExecutor(); /** * Schedule job to be run ASAP */ - void run(Job&& job); + void run(std::function<void()>&& job, + const char* filename=CURRENT_FILENAME(), + uint32_t linum=CURRENT_LINE()); /** * Schedule job to be run at time t */ - std::shared_ptr<Task> schedule(Job&& job, time_point t); + std::shared_ptr<Task> schedule(std::function<void()>&& job, time_point t, + const char* filename=CURRENT_FILENAME(), + uint32_t linum=CURRENT_LINE()); /** * Schedule job to be run after delay dt */ - std::shared_ptr<Task> scheduleIn(Job&& job, duration dt); + std::shared_ptr<Task> scheduleIn(std::function<void()>&& job, duration dt, + const char* filename=CURRENT_FILENAME(), + uint32_t linum=CURRENT_LINE()); /** * Schedule job to be run every dt, starting now. */ - std::shared_ptr<RepeatedTask> scheduleAtFixedRate(RepeatedJob&& job, duration dt); + std::shared_ptr<RepeatedTask> scheduleAtFixedRate(std::function<bool()>&& job, + duration dt, + const char* filename=CURRENT_FILENAME(), + uint32_t linum=CURRENT_LINE()); /** * Stop the scheduler, can't be reversed @@ -137,6 +227,7 @@ private: void schedule(std::shared_ptr<Task>, time_point t); void reschedule(std::shared_ptr<RepeatedTask>, time_point t, duration dt); + std::string name_; std::shared_ptr<std::atomic<bool>> running_; std::map<time_point, std::vector<Job>> jobs_ {}; std::mutex jobLock_ {}; diff --git a/src/upnp/protocol/natpmp/nat_pmp.h b/src/upnp/protocol/natpmp/nat_pmp.h index a8a8261cb07b7dcc370c65f48b9690acf18c20d0..b3f9e33536f213856331fa5cd808375114285007 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.h +++ b/src/upnp/protocol/natpmp/nat_pmp.h @@ -155,7 +155,7 @@ private: // Data members std::shared_ptr<PMPIGD> igd_; natpmp_t natpmpHdl_; - ScheduledExecutor natpmpScheduler_ {}; + ScheduledExecutor natpmpScheduler_ {"natpmp"}; std::shared_ptr<Task> searchForIgdTimer_ {}; unsigned int igdSearchCounter_ {0}; UpnpMappingObserver* observer_ {nullptr}; diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h index 5f714d8ecc1149c3be71fac593ade404a92ff992..7d0d3d054cfc7bf1025a656aaacd8acde7820bb8 100644 --- a/src/upnp/protocol/pupnp/pupnp.h +++ b/src/upnp/protocol/pupnp/pupnp.h @@ -236,7 +236,7 @@ private: std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); } // Execution queue to run lib upnp actions - ScheduledExecutor pupnpScheduler_ {}; + ScheduledExecutor pupnpScheduler_ {"pupnp"}; // Initialization status. std::atomic_bool initialized_ {false};