diff --git a/src/manager.cpp b/src/manager.cpp index 48af7c3dff5100b9ceeb21933f17b95b38410f21..e710f3cd644b9772fe8714e201a952cfc0be2907 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -299,6 +299,9 @@ struct Manager::ManagerPimpl Manager& base_; // pimpl back-pointer + /** Main scheduler */ + ScheduledExecutor scheduler_; + std::atomic_bool autoAnswer_ {false}; /** Application wide tone controller */ @@ -353,14 +356,6 @@ struct Manager::ManagerPimpl */ std::unique_ptr<RingBufferPool> ringbufferpool_; - std::map<uintptr_t, Manager::EventHandler> eventHandlerMap_; - - decltype(eventHandlerMap_)::iterator nextEventHandler_; - - std::list<std::function<bool()>> pendingTaskList_; - std::multimap<std::chrono::steady_clock::time_point, std::shared_ptr<Manager::Runnable>> scheduledTasks_; - std::mutex scheduledTasksMutex_; - // Map containing conference pointers ConferenceMap conferenceMap_; @@ -795,9 +790,7 @@ Manager::finish() noexcept pimpl_->ice_tf_.reset(); // Flush remaining tasks (free lambda' with capture) - pimpl_->pendingTaskList_.clear(); - pimpl_->scheduledTasks_.clear(); - pimpl_->eventHandlerMap_.clear(); + pimpl_->scheduler_.stop(); pj_shutdown(); ThreadPool::instance().join(); @@ -1658,117 +1651,27 @@ Manager::removeAudio(Call& call) getRingBufferPool().unBindAll(call_id); } -// Not thread-safe, SHOULD be called in same thread that run pollEvents() -void -Manager::registerEventHandler(uintptr_t handlerId, EventHandler handler) -{ - pimpl_->eventHandlerMap_[handlerId] = handler; -} - -// Not thread-safe, SHOULD be called in same thread that run pollEvents() -void -Manager::unregisterEventHandler(uintptr_t handlerId) +ScheduledExecutor& +Manager::scheduler() { - auto iter = pimpl_->eventHandlerMap_.find(handlerId); - if (iter != pimpl_->eventHandlerMap_.end()) { - if (iter == pimpl_->nextEventHandler_) - pimpl_->nextEventHandler_ = pimpl_->eventHandlerMap_.erase(iter); - else - pimpl_->eventHandlerMap_.erase(iter); - } + return pimpl_->scheduler_; } void Manager::addTask(std::function<bool()>&& task) { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - pimpl_->pendingTaskList_.emplace_back(std::move(task)); + pimpl_->scheduler_.scheduleAtFixedRate(std::move(task), std::chrono::milliseconds(30)); } -std::shared_ptr<Manager::Runnable> -Manager::scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when) -{ - auto runnable = std::make_shared<Runnable>(std::move(task)); - scheduleTask(runnable, when); - return runnable; -} - -void -Manager::scheduleTask(const std::shared_ptr<Runnable>& task, std::chrono::steady_clock::time_point when) +std::shared_ptr<Task> +Manager::scheduleTask(std::function<void()>&& task, std::chrono::steady_clock::time_point when) { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - pimpl_->scheduledTasks_.emplace(when, task); + return pimpl_->scheduler_.schedule(std::move(task), when); } // Must be invoked periodically by a timer from the main event loop void Manager::pollEvents() { - //-- Handlers - { - auto iter = pimpl_->eventHandlerMap_.begin(); - while (iter != pimpl_->eventHandlerMap_.end()) { - if (pimpl_->finished_) - return; - - // WARN: following callback can do anything and typically - // calls (un)registerEventHandler. - // Think twice before modify this code. - - pimpl_->nextEventHandler_ = std::next(iter); - try { - iter->second(); - } catch (const std::exception& e) { - RING_ERR("MainLoop exception (handler): %s", e.what()); - } - iter = pimpl_->nextEventHandler_; - } - } - - //-- Scheduled tasks - { - auto now = std::chrono::steady_clock::now(); - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - while (not pimpl_->scheduledTasks_.empty() && pimpl_->scheduledTasks_.begin()->first <= now) { - auto f = pimpl_->scheduledTasks_.begin(); - auto task = std::move(f->second->cb); - if (task) - pimpl_->pendingTaskList_.emplace_back([task](){ - task(); - return false; - }); - pimpl_->scheduledTasks_.erase(f); - } - } - - //-- Tasks - { - decltype(pimpl_->pendingTaskList_) tmpList; - { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - std::swap(pimpl_->pendingTaskList_, tmpList); - } - auto iter = std::begin(tmpList); - while (iter != tmpList.cend()) { - if (pimpl_->finished_) - return; - - auto next = std::next(iter); - bool result; - try { - result = (*iter)(); - } catch (const std::exception& e) { - RING_ERR("MainLoop exception (task): %s", e.what()); - result = false; - } - if (not result) - tmpList.erase(iter); - iter = next; - } - { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - pimpl_->pendingTaskList_.splice(std::end(pimpl_->pendingTaskList_), tmpList); - } - } } //THREAD=Main diff --git a/src/manager.h b/src/manager.h index 61e915eb47cec8a61bf5228bfe13c4b89abfbc3f..dead57c28efedb7af494c23b044a7c910743d5a3 100644 --- a/src/manager.h +++ b/src/manager.h @@ -34,6 +34,7 @@ #include "call_factory.h" #include "preferences.h" #include "audio/audiolayer.h" +#include "scheduled_executor.h" #include <string> #include <vector> @@ -835,31 +836,12 @@ class Manager { CallFactory callFactory; - using EventHandler = std::function<void()>; - - /** - * Install an event handler called periodically by pollEvents(). - * @param handlerId an unique identifier for the handler. - * @param handler the event handler function. - */ - void registerEventHandler(uintptr_t handlerId, EventHandler handler); - - /** - * Remove a previously registered event handler. - * @param handlerId id of handler to remove. - */ - void unregisterEventHandler(uintptr_t handlerId); - IceTransportFactory& getIceTransportFactory(); - void addTask(std::function<bool()>&& task); + ScheduledExecutor& scheduler(); - struct Runnable { - std::function<void()> cb; - Runnable(const std::function<void()>&& t) : cb(std::move(t)) {} - }; - std::shared_ptr<Runnable> scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when); - void scheduleTask(const std::shared_ptr<Runnable>& task, std::chrono::steady_clock::time_point when); + void addTask(std::function<bool()>&& task); + std::shared_ptr<Task> scheduleTask(std::function<void()>&& task, std::chrono::steady_clock::time_point when); #ifdef RING_VIDEO /** @@ -905,9 +887,8 @@ private: // Helper to install a callback to be called once by the main event loop template<typename Callback> static void runOnMainThread(Callback&& cb) { - Manager::instance().addTask([=]() mutable { + Manager::instance().scheduler().run([cb = std::move(cb)]() mutable { cb(); - return false; }); } diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 84306539ca1da62e31e5ab0ebc34e2e3f0f19806..e8af3b5a019b3a8a06cf435c75739ee5200d2cd5 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -128,8 +128,8 @@ struct RingAccount::BuddyInfo /* the presence timestamps */ std::map<dht::InfoHash, std::chrono::steady_clock::time_point> devicesTimestamps; - /* The callable object to update buddy info */ - std::function<void()> updateInfo {}; + /* The disposable object to update buddy info */ + std::shared_ptr<RepeatedTask> updateTask; BuddyInfo(dht::InfoHash id) : id(id) {} }; @@ -295,7 +295,10 @@ RingAccount::RingAccount(const std::string& accountID, bool /* presenceEnabled * RingAccount::~RingAccount() { - Manager::instance().unregisterEventHandler((uintptr_t)this); + if (eventHandler) { + eventHandler->cancel(); + eventHandler.reset(); + } dht_.join(); } @@ -1956,25 +1959,13 @@ RingAccount::trackBuddyPresence(const std::string& buddy_id) auto buddy_infop = trackedBuddies_.emplace(h, decltype(trackedBuddies_)::mapped_type {h}); if (buddy_infop.second) { auto& buddy_info = buddy_infop.first->second; - buddy_info.updateInfo = Manager::instance().scheduleTask([h,weak_this]() { + buddy_info.updateTask = Manager::instance().scheduler().scheduleAtFixedRate([h,weak_this] { if (auto shared_this = weak_this.lock()) { /* ::forEachDevice call will update buddy info accordingly. */ - shared_this->forEachDevice(h, {}, [h] (const std::shared_ptr<RingAccount>& shared_this, bool /* ok */) { - std::lock_guard<std::recursive_mutex> lock(shared_this->buddyInfoMtx); - auto buddy_info_it = shared_this->trackedBuddies_.find(h); - if (buddy_info_it == shared_this->trackedBuddies_.end()) return; - - auto& buddy_info = buddy_info_it->second; - if (buddy_info.updateInfo) { - auto cb = buddy_info.updateInfo; - Manager::instance().scheduleTask( - std::move(cb), - std::chrono::steady_clock::now() + DeviceAnnouncement::TYPE.expiration - ); - } - }); + shared_this->forEachDevice(h, {}, [] (const std::shared_ptr<RingAccount>&, bool /* ok */) {}); } - }, std::chrono::steady_clock::now())->cb; + return true; + }, DeviceAnnouncement::TYPE.expiration); RING_DBG("[Account %s] tracking buddy %s", getAccountID().c_str(), h.to_c_str()); } } @@ -2036,6 +2027,12 @@ RingAccount::doRegister_() auto shared = std::static_pointer_cast<RingAccount>(shared_from_this()); std::weak_ptr<RingAccount> w {shared}; + eventHandler = Manager::instance().scheduler().scheduleAtFixedRate([w]{ + if (auto this_ = w.lock()) + this_->handlePendingCallList(); + return true; + }, std::chrono::milliseconds(50)); + #if HAVE_RINGNS // Look for registered name on the blockchain nameDir_.get().lookupAddress(ringAccountId_, [w](const std::string& result, const NameDirectory::Response& response) { @@ -2140,7 +2137,6 @@ RingAccount::doRegister_() dht_.importValues(loadValues()); - Manager::instance().registerEventHandler((uintptr_t)this, [this]{ handlePendingCallList(); }); setRegistrationState(RegistrationState::TRYING); dht_.bootstrap(loadNodes()); @@ -2537,7 +2533,11 @@ RingAccount::doUnregister(std::function<void(bool)> released_cb) upnp_->removeMappings(); } - Manager::instance().unregisterEventHandler((uintptr_t)this); + if (eventHandler) { + eventHandler->cancel(); + eventHandler.reset(); + } + saveNodes(dht_.exportNodes()); saveValues(dht_.exportValues()); dht_.join(); diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 065d3cf4fe7cf2b38653544fd9f6f13d922162f6..2414bf24c082d8589ec605449e08c893355beab9 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -35,6 +35,7 @@ #include "ip_utils.h" #include "ring_types.h" // enable_if_base_of #include "security/certstore.h" +#include "scheduled_executor.h" #include <opendht/dhtrunner.h> #include <opendht/default_types.h> @@ -665,6 +666,8 @@ class RingAccount : public SIPAccountBase { void registerDhtAddress(IceTransport&); std::unique_ptr<DhtPeerConnector> dhtPeerConnector_; + + std::shared_ptr<RepeatedTask> eventHandler {}; }; static inline std::ostream& operator<< (std::ostream& os, const RingAccount& acc) diff --git a/src/sip/sipaccount.cpp b/src/sip/sipaccount.cpp index ce4de4e270744660e60130a2cd8060738812a60f..2c8e2d81660c06bdedc66f20b95fc759e1f03627 100644 --- a/src/sip/sipaccount.cpp +++ b/src/sip/sipaccount.cpp @@ -247,7 +247,7 @@ SIPAccount::newOutgoingCall(const std::string& toUrl, if (created) { std::weak_ptr<SIPCall> weak_call = call; - manager.addTask([this, weak_call] { + manager.scheduler().run([this, weak_call] { if (auto call = weak_call.lock()) { if (not SIPStartCall(call)) { RING_ERR("Could not send outgoing INVITE request for new call");