diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index cd7cdca7d2fe7ba3e05fd465fc2f60c4fd227e75..d932021053daa120fbf4172523e1eac074ab982e 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -140,7 +140,6 @@ public: bool upnpEnabled_ {false}; IceTransportCompleteCb on_initdone_cb_ {}; IceTransportCompleteCb on_negodone_cb_ {}; - IceRecvInfo on_recv_cb_ {}; mutable std::mutex iceMutex_ {}; pj_ice_strans* icest_ {nullptr}; unsigned streamsCount_ {0}; @@ -211,9 +210,9 @@ public: std::atomic_bool threadTerminateFlags_ {false}; // Wait data on components - pj_size_t lastSentLen_ {}; + mutable std::mutex sendDataMutex_ {}; std::condition_variable waitDataCv_ = {}; - + pj_size_t lastSentLen_ {0}; std::atomic_bool destroying_ {false}; onShutdownCb scb {}; }; @@ -474,17 +473,20 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) JAMI_WARN("null IceTransport"); }; - icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) { - if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) { - std::lock_guard<std::mutex> lk(tr->iceMutex_); - tr->lastSentLen_ += size; - tr->waitDataCv_.notify_all(); - } else - JAMI_WARN("null IceTransport"); - }; + if (isTcp_) { + icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) { + if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) { + std::lock_guard lk(tr->sendDataMutex_); + tr->lastSentLen_ += size; + tr->waitDataCv_.notify_all(); + } else + JAMI_WARN("null IceTransport"); + }; + } icecb.on_destroy = [](pj_ice_strans* ice_st) { if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) { + std::lock_guard lk(tr->sendDataMutex_); tr->destroying_ = true; tr->waitDataCv_.notify_all(); if (tr->scb) @@ -1681,12 +1683,16 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) return -1; } - std::unique_lock lk(pimpl_->iceMutex_); + std::lock_guard lk(pimpl_->iceMutex_); if (not pimpl_->icest_) { return -1; } + std::unique_lock dlk(pimpl_->sendDataMutex_, std::defer_lock); + if (isTCPEnabled()) + dlk.lock(); + auto status = pj_ice_strans_sendto2(pimpl_->icest_, compId, buf, @@ -1697,7 +1703,7 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) if (status == PJ_EPENDING && isTCPEnabled()) { // NOTE; because we are in TCP, the sent size will count the header (2 // bytes length). - pimpl_->waitDataCv_.wait(lk, [&] { + pimpl_->waitDataCv_.wait(dlk, [&] { return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_.load(); }); diff --git a/src/ice_transport.h b/src/ice_transport.h index 06f77ed8981ba82c9e48a2f29da49587e1279c86..4bee84740c1a68492f46332760c2a1ad74ce18c7 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -41,7 +41,6 @@ class Controller; class IceTransport; using IceTransportCompleteCb = std::function<void(bool)>; -using IceRecvInfo = std::function<void(void)>; using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>; using IceCandidate = pj_ice_sess_cand; using onShutdownCb = std::function<void(void)>;