Commit 0d1a7ab2 authored by Adrien Béraud's avatar Adrien Béraud Committed by Sébastien Blin
Browse files

ice transport: avoid deadlock in send callback

deadlock could happen with:
* handleEvents -> pj_lock -> on_data_sent -> iceMutex_
* send() -> iceMutex_ -> pj_ice_strans_sendto2 -> pj_lock

This was fixed by:
* Not setting on_data_sent callback for UDP
* Using a different mutex than iceMutex_ to wait for data send

Change-Id: Ic50698ac9dfe37574f145baa718fff8b74cc99be
parent 39a27cd0
......@@ -140,7 +140,6 @@ public:
bool upnpEnabled_ {false};
IceTransportCompleteCb on_initdone_cb_ {};
IceTransportCompleteCb on_negodone_cb_ {};
IceRecvInfo on_recv_cb_ {};
mutable std::mutex iceMutex_ {};
pj_ice_strans* icest_ {nullptr};
unsigned streamsCount_ {0};
......@@ -211,9 +210,9 @@ public:
std::atomic_bool threadTerminateFlags_ {false};
// Wait data on components
pj_size_t lastSentLen_ {};
mutable std::mutex sendDataMutex_ {};
std::condition_variable waitDataCv_ = {};
pj_size_t lastSentLen_ {0};
std::atomic_bool destroying_ {false};
onShutdownCb scb {};
};
......@@ -474,17 +473,20 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options)
JAMI_WARN("null IceTransport");
};
icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
std::lock_guard<std::mutex> lk(tr->iceMutex_);
tr->lastSentLen_ += size;
tr->waitDataCv_.notify_all();
} else
JAMI_WARN("null IceTransport");
};
if (isTcp_) {
icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
std::lock_guard lk(tr->sendDataMutex_);
tr->lastSentLen_ += size;
tr->waitDataCv_.notify_all();
} else
JAMI_WARN("null IceTransport");
};
}
icecb.on_destroy = [](pj_ice_strans* ice_st) {
if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
std::lock_guard lk(tr->sendDataMutex_);
tr->destroying_ = true;
tr->waitDataCv_.notify_all();
if (tr->scb)
......@@ -1681,12 +1683,16 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
return -1;
}
std::unique_lock lk(pimpl_->iceMutex_);
std::lock_guard lk(pimpl_->iceMutex_);
if (not pimpl_->icest_) {
return -1;
}
std::unique_lock dlk(pimpl_->sendDataMutex_, std::defer_lock);
if (isTCPEnabled())
dlk.lock();
auto status = pj_ice_strans_sendto2(pimpl_->icest_,
compId,
buf,
......@@ -1697,7 +1703,7 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
if (status == PJ_EPENDING && isTCPEnabled()) {
// NOTE; because we are in TCP, the sent size will count the header (2
// bytes length).
pimpl_->waitDataCv_.wait(lk, [&] {
pimpl_->waitDataCv_.wait(dlk, [&] {
return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len)
or pimpl_->destroying_.load();
});
......
......@@ -41,7 +41,6 @@ class Controller;
class IceTransport;
using IceTransportCompleteCb = std::function<void(bool)>;
using IceRecvInfo = std::function<void(void)>;
using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>;
using IceCandidate = pj_ice_sess_cand;
using onShutdownCb = std::function<void(void)>;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment