diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index 143a9a0cdbfff3183672030caefcdfd0c0da8eeb..ad074aaa5897c4759a08c07d30632b6b77f7136e 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -66,11 +66,14 @@ MessageEngine::reschedule() return; std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this()); auto next = nextEvent(); - if (next != clock::time_point::max()) - Manager::instance().scheduleTask([w,this](){ - if (auto s = w.lock()) - retrySend(); - }, next); + if (next != clock::time_point::max()) { + auto task = std::make_shared<Task>( + [w, this](UNUSED Task& task){ + if (auto s = w.lock()) + retrySend(); + }); + task->schedule(next); + } } MessageEngine::clock::time_point diff --git a/src/manager.cpp b/src/manager.cpp index d36e68d71d3ecb361a2ac8a19345a65c84e7a4b0..fcefd59128f44590d69e82f90576bb772190ce99 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -353,9 +353,8 @@ struct Manager::ManagerPimpl decltype(eventHandlerMap_)::iterator nextEventHandler_; - std::list<std::function<bool()>> pendingTaskList_; - std::multimap<std::chrono::steady_clock::time_point, std::shared_ptr<Manager::Runnable>> scheduledTasks_; std::mutex scheduledTasksMutex_; + std::multimap<Task::TimePoint, std::shared_ptr<Task>> scheduledTasks_; // Map containing conference pointers ConferenceMap conferenceMap_; @@ -1662,30 +1661,25 @@ Manager::unregisterEventHandler(uintptr_t handlerId) } void -Manager::addTask(const std::function<bool()>&& task) +Manager::addTask(std::function<bool()>&& callable) { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - pimpl_->pendingTaskList_.emplace_back(std::move(task)); -} - -std::shared_ptr<Manager::Runnable> -Manager::scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when) -{ - auto runnable = std::make_shared<Runnable>(std::move(task)); - scheduleTask(runnable, when); - return runnable; + scheduleTask(std::make_shared<Task>( + [callable](Task& task){ + if (callable()) + task.schedule(); + }), Task::Clock::now()); } void -Manager::scheduleTask(const std::shared_ptr<Runnable>& task, std::chrono::steady_clock::time_point when) +Manager::scheduleTask(const std::shared_ptr<Task>& task, const Task::TimePoint& when) { std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); pimpl_->scheduledTasks_.emplace(when, task); - RING_DBG("Task scheduled. Next in %" PRId64, std::chrono::duration_cast<std::chrono::seconds>(pimpl_->scheduledTasks_.begin()->first - std::chrono::steady_clock::now()).count()); } // Must be invoked periodically by a timer from the main event loop -void Manager::pollEvents() +void +Manager::pollEvents() { //-- Handlers { @@ -1708,49 +1702,33 @@ void Manager::pollEvents() } } - //-- Scheduled tasks + auto now = Task::Clock::now(); + + //-- Search for scheduled tasks + std::list<std::shared_ptr<Task>> todo_list; { - auto now = std::chrono::steady_clock::now(); std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - while (not pimpl_->scheduledTasks_.empty() && pimpl_->scheduledTasks_.begin()->first <= now) { - auto f = pimpl_->scheduledTasks_.begin(); - auto task = std::move(f->second->cb); - if (task) - pimpl_->pendingTaskList_.emplace_back([task](){ - task(); - return false; - }); - pimpl_->scheduledTasks_.erase(f); + auto item = std::begin(pimpl_->scheduledTasks_); + while (item != std::end(pimpl_->scheduledTasks_)) { + if (item->first >= now) { + todo_list.emplace_back(std::move(item->second)); + item = pimpl_->scheduledTasks_.erase(item); + } else + ++item; } } - //-- Tasks + //-- Fire scheduled tasks { - decltype(pimpl_->pendingTaskList_) tmpList; - { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - std::swap(pimpl_->pendingTaskList_, tmpList); - } - auto iter = std::begin(tmpList); - while (iter != tmpList.cend()) { + for (auto& task : todo_list) { if (pimpl_->finished_) return; - auto next = std::next(iter); - bool result; try { - result = (*iter)(); + (*task)(); } catch (const std::exception& e) { - RING_ERR("MainLoop exception (task): %s", e.what()); - result = false; + RING_ERR("MainLoop exception during task execution: %s", e.what()); } - if (not result) - tmpList.erase(iter); - iter = next; - } - { - std::lock_guard<std::mutex> lock(pimpl_->scheduledTasksMutex_); - pimpl_->pendingTaskList_.splice(std::end(pimpl_->pendingTaskList_), tmpList); } } } @@ -3093,4 +3071,10 @@ Manager::getVideoManager() const } #endif +void +Task::schedule(const TimePoint& when) +{ + Manager::instance().scheduleTask(shared_from_this(), when); +} + } // namespace ring diff --git a/src/manager.h b/src/manager.h index 28f6197563495536ab6f9db608cef363fe08b958..99ee4842cc85253fb48b6e88efd1ba71525b6ccc 100644 --- a/src/manager.h +++ b/src/manager.h @@ -53,6 +53,50 @@ class Conference; class AudioLoop; class IceTransportFactory; +/// +/// Simple (re-)schedulable task +/// +/// The given callable is scheduled on demand by the process running Manager::pollEvents(). +/// +class Task : public std::enable_shared_from_this<Task> +{ +public: + using Clock = std::chrono::steady_clock; + using TimePoint = Clock::time_point; + using Callback = std::function<void(Task&)>; + + Task() = default; + explicit Task(Callback&& callable) : cb_(std::move(callable)) {} + + // Moveable + Task(Task&&) noexcept = default; + Task& operator=(Task&&) noexcept = default; + + /// + /// Schedule the callable at given absolute time + /// + void schedule(const TimePoint& when); + + /// + /// Schedule the callable after given duration since the call instant. + /// + template<class Rep=int, class Period=std::ratio<1>> + void schedule(const std::chrono::duration<Rep, Period>& delay = std::chrono::duration<Rep, Period>::zero()) { + schedule(Clock::now() + delay); + } + + void operator()() { + cb_(*this); + } + +private: + // Non-copiable + Task(const Task&) = delete; + Task& operator=(const Task&) = delete; + + Callback cb_; +}; + /** Manager (controller) of Ring daemon */ class Manager { public: @@ -818,8 +862,7 @@ class Manager { /** * Call periodically to poll for VoIP events */ - void - pollEvents(); + void pollEvents(); /** * Create a new outgoing call @@ -851,14 +894,18 @@ class Manager { IceTransportFactory& getIceTransportFactory(); - void addTask(const std::function<bool()>&& task); - - struct Runnable { - std::function<void()> cb; - Runnable(const std::function<void()>&& t) : cb(std::move(t)) {} - }; - std::shared_ptr<Runnable> scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when); - void scheduleTask(const std::shared_ptr<Runnable>& task, std::chrono::steady_clock::time_point when); + /// + /// Adding simple callable to be scheduled at next handleEvents() call. + /// + /// \note Deprecated, use Task objects + /// + void addTask(std::function<bool()>&& task); + + /// + /// Low-level way to insert a task to schedule, used by Task itself. + /// + /// \note Prefer Task's methods than this one. + void scheduleTask(const std::shared_ptr<Task>& task, const Task::TimePoint& when); #ifdef RING_VIDEO /** diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 0bdfffb86b509fba34eaff3580edc0b04b409c0c..5c5992a58d0d9a506c9b837874194e7522f85275 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -118,8 +118,7 @@ struct RingAccount::BuddyInfo /* the presence timestamps */ std::map<dht::InfoHash, std::chrono::steady_clock::time_point> devicesTimestamps; - /* The callable object to update buddy info */ - std::function<void()> updateInfo {}; + std::shared_ptr<Task> refreshTask; ///< Refresh task to schedule BuddyInfo(dht::InfoHash id) : id(id) {} }; @@ -1991,27 +1990,23 @@ RingAccount::trackBuddyPresence(const std::string& buddy_id) auto buddy_infop = trackedBuddies_.emplace(h, decltype(trackedBuddies_)::mapped_type {h}); if (buddy_infop.second) { auto& buddy_info = buddy_infop.first->second; - buddy_info.updateInfo = Manager::instance().scheduleTask([h,weak_this]() { + buddy_info.refreshTask = std::make_shared<Task>([h, weak_this](UNUSED Task& task) { if (auto shared_this = weak_this.lock()) { /* ::forEachDevice call will update buddy info accordingly. */ shared_this->forEachDevice(h, {}, [h, weak_this] (bool /* ok */) { if (auto shared_this = weak_this.lock()) { std::lock_guard<std::recursive_mutex> lock(shared_this->buddyInfoMtx); auto buddy_info_it = shared_this->trackedBuddies_.find(h); - if (buddy_info_it == shared_this->trackedBuddies_.end()) return; + if (buddy_info_it == shared_this->trackedBuddies_.end()) + return; auto& buddy_info = buddy_info_it->second; - if (buddy_info.updateInfo) { - auto cb = buddy_info.updateInfo; - Manager::instance().scheduleTask( - std::move(cb), - std::chrono::steady_clock::now() + DeviceAnnouncement::TYPE.expiration - ); - } + buddy_info.refreshTask->schedule(DeviceAnnouncement::TYPE.expiration); } }); } - }, std::chrono::steady_clock::now())->cb; + }); + buddy_info.refreshTask->schedule(); RING_DBG("Now tracking buddy %s", h.toString().c_str()); } else RING_WARN("Buddy %s is already being tracked.", h.toString().c_str()); @@ -3326,7 +3321,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, // Timeout cleanup std::weak_ptr<RingAccount> wshared = shared(); - Manager::instance().scheduleTask([wshared, confirm, token]() { + auto task = std::make_shared<Task>([wshared, confirm, token](UNUSED Task& task) { if (not confirm->replied and not confirm->listenTokens.empty()) { if (auto this_ = wshared.lock()) { RING_DBG("[Account %s] [message %" PRIx64 "] timeout", this_->getAccountID().c_str(), token); @@ -3337,7 +3332,8 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, this_->messageEngine_.onMessageSent(token, false); } } - }, std::chrono::steady_clock::now() + std::chrono::minutes(1)); + }); + task->schedule(std::chrono::minutes(1)); } void diff --git a/src/security/tls_session.cpp b/src/security/tls_session.cpp index 9a924d2c6184d23c71c0c3556d1b7396183fe946..ce1c28f16358abc6cc142326af9c3990206f3cd2 100644 --- a/src/security/tls_session.cpp +++ b/src/security/tls_session.cpp @@ -195,6 +195,7 @@ TlsSession::TlsSession(const std::shared_ptr<IceTransport>& ice, int ice_comp_id , params_(params) , callbacks_(cbs) , anonymous_(anonymous) + , rxFlushTask_ {std::make_shared<Task>([this](Task& task){ if (flushRxQueue()) task.schedule(RX_OOO_TIMEOUT); })} , cacred_(nullptr) , sacred_(nullptr) , xcred_(nullptr) @@ -215,8 +216,6 @@ TlsSession::TlsSession(const std::shared_ptr<IceTransport>& ice, int ice_comp_id return len; }); - Manager::instance().registerEventHandler((uintptr_t)this, [this]{ flushRxQueue(); }); - // Run FSM into dedicated thread thread_.start(); } @@ -227,8 +226,7 @@ TlsSession::~TlsSession() thread_.join(); socket_->setOnRecv(nullptr); - - Manager::instance().unregisterEventHandler((uintptr_t)this); + flushRxQueue(); // let application receive last packets } const char* @@ -899,28 +897,29 @@ TlsSession::handleDataPacket(std::vector<uint8_t>&& buf, const uint8_t* seq_byte if (reorderBuffer_.empty()) lastReadTime_ = clock::now(); reorderBuffer_.emplace(pkt_seq, std::move(buf)); + + // Ready to forward to application + rxFlushTask_->schedule(); } /// /// Reorder and push received packet to upper layer /// -/// \note This method must be called continously, faster than RX_OOO_TIMEOUT +/// \return true if a delayed re-schedule is needed, else false. /// -void +bool TlsSession::flushRxQueue() { std::unique_lock<std::mutex> lk {reorderBufMutex_}; if (reorderBuffer_.empty()) - return; + return false; auto item = std::begin(reorderBuffer_); auto next_offset = item->first; - // Wait for next continous packet until timeou - if ((lastReadTime_ - clock::now()) >= RX_OOO_TIMEOUT) { - // OOO packet timeout - consider waited packets as lost - } else if (next_offset != gapOffset_) - return; + // First packet not the waited one and wait time not exhausted? Wait again! + if (next_offset != gapOffset_ and (lastReadTime_ - clock::now()) < RX_OOO_TIMEOUT) + return true; // Loop on offset-ordered received packet until a discontinuity in sequence number while (item != std::end(reorderBuffer_) and item->first <= next_offset) { @@ -939,6 +938,8 @@ TlsSession::flushRxQueue() gapOffset_ = std::max(gapOffset_, next_offset); lastReadTime_ = clock::now(); + + return not reorderBuffer_.empty(); // try later if any waited packet } TlsSessionState diff --git a/src/security/tls_session.h b/src/security/tls_session.h index 640b4a398aa58ca4523c99ecf7de0dec42618d85..7cb837299cd6d74eb2eae777f008685fa01e576b 100644 --- a/src/security/tls_session.h +++ b/src/security/tls_session.h @@ -48,6 +48,7 @@ namespace ring { class IceTransport; class IceSocket; +class Task; } // namespace ring namespace dht { namespace crypto { @@ -222,6 +223,7 @@ private: uint64_t gapOffset_ {1}; // offset of first byte not received yet (start at 1) clock::time_point lastReadTime_; std::map<uint64_t, std::vector<uint8_t>> reorderBuffer_ {}; + std::shared_ptr<Task> rxFlushTask_; ssize_t send_(const uint8_t* tx_data, std::size_t tx_size); ssize_t sendRaw(const void*, size_t); @@ -230,7 +232,7 @@ private: int waitForRawData(unsigned); void handleDataPacket(std::vector<uint8_t>&&, const uint8_t*); - void flushRxQueue(); + bool flushRxQueue(); // Statistics std::atomic<std::size_t> stRxRawPacketCnt_ {0};