Skip to content
Snippets Groups Projects
Commit a6ff8072 authored by Olivier Dion's avatar Olivier Dion Committed by Adrien Béraud
Browse files

ice_transport: Fix leak of TURN session

Gitlab: #590

Change-Id: Ie05da9f7fdad57c86da97fc7e3a2f8235100fc2c
parent 3e73e264
Branches
No related tags found
No related merge requests found
...@@ -72,19 +72,6 @@ using MutexGuard = std::lock_guard<std::mutex>; ...@@ -72,19 +72,6 @@ using MutexGuard = std::lock_guard<std::mutex>;
using MutexLock = std::unique_lock<std::mutex>; using MutexLock = std::unique_lock<std::mutex>;
using namespace upnp; using namespace upnp;
namespace {
struct IceSTransDeleter
{
void operator()(pj_ice_strans* ptr)
{
pj_ice_strans_stop_ice(ptr);
pj_ice_strans_destroy(ptr);
}
};
} // namespace
//============================================================================== //==============================================================================
class IceTransport::Impl class IceTransport::Impl
...@@ -110,6 +97,9 @@ public: ...@@ -110,6 +97,9 @@ public:
bool setSlaveSession(); bool setSlaveSession();
bool createIceSession(pj_ice_sess_role role); bool createIceSession(pj_ice_sess_role role);
/**
* Must be called while holding iceMutex_
*/
void getUFragPwd(); void getUFragPwd();
std::string link() const; std::string link() const;
...@@ -145,7 +135,7 @@ public: ...@@ -145,7 +135,7 @@ public:
IceTransportCompleteCb on_negodone_cb_ {}; IceTransportCompleteCb on_negodone_cb_ {};
IceRecvInfo on_recv_cb_ {}; IceRecvInfo on_recv_cb_ {};
mutable std::mutex iceMutex_ {}; mutable std::mutex iceMutex_ {};
std::unique_ptr<pj_ice_strans, IceSTransDeleter> icest_; pj_ice_strans* icest_{nullptr};
unsigned streamsCount_ {0}; unsigned streamsCount_ {0};
unsigned compCountPerStream_ {0}; unsigned compCountPerStream_ {0};
unsigned compCount_ {0}; unsigned compCount_ {0};
...@@ -421,10 +411,9 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) ...@@ -421,10 +411,9 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options)
TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap)); TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap));
TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue)); TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue));
pj_ice_strans* icest = nullptr; pj_status_t status = pj_ice_strans_create(name, &config_, compCount_, this, &icecb, &icest_);
pj_status_t status = pj_ice_strans_create(name, &config_, compCount_, this, &icecb, &icest);
if (status != PJ_SUCCESS || icest == nullptr) { if (status != PJ_SUCCESS || icest_ == nullptr) {
throw std::runtime_error("pj_ice_strans_create() failed"); throw std::runtime_error("pj_ice_strans_create() failed");
} }
...@@ -436,18 +425,6 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) ...@@ -436,18 +425,6 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options)
// but here we don't care if there is event or not. // but here we don't care if there is event or not.
handleEvents(500); // limit polling to 500ms handleEvents(500); // limit polling to 500ms
} }
// NOTE: This last handleEvents is necessary to close TURN socket.
// Because when destroying the TURN session pjproject creates a pj_timer
// to postpone the TURN destruction. This timer is only called if we poll
// the event queue.
auto started_destruction = std::chrono::system_clock::now();
while (handleEvents(500)) {
if (std::chrono::system_clock::now() - started_destruction
> std::chrono::seconds(MAX_DESTRUCTION_TIMEOUT)) {
// If the transport is not closed after 3 seconds, avoid blocking
break;
}
}
}); });
// Init to invalid addresses // Init to invalid addresses
...@@ -456,30 +433,48 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) ...@@ -456,30 +433,48 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options)
IceTransport::Impl::~Impl() IceTransport::Impl::~Impl()
{ {
JAMI_DBG("[ice:%p] destroying", this); JAMI_DBG("[ice:%p] destroying %p", this, icest_);
sip_utils::register_thread(); sip_utils::register_thread();
threadTerminateFlags_ = true; threadTerminateFlags_ = true;
iceCV_.notify_all(); iceCV_.notify_all();
if (thread_.joinable()) if (thread_.joinable()) {
thread_.join(); thread_.join();
{
std::lock_guard<std::mutex> lk {iceMutex_};
icest_.reset(); // must be done before ioqueue/timer destruction
} }
pj_ice_strans* strans = nullptr;
std::swap(strans, icest_);
assert(strans);
// must be done before ioqueue/timer destruction
JAMI_INFO("[ice:%p] Destroying ice_strans %p",
pj_ice_strans_get_user_data(strans), strans);
pj_ice_strans_stop_ice(strans);
pj_ice_strans_destroy(strans);
// NOTE: This last handleEvents is necessary to close TURN socket.
// Because when destroying the TURN session pjproject creates a pj_timer
// to postpone the TURN destruction. This timer is only called if we poll
// the event queue.
while (handleEvents(500));
if (config_.stun_cfg.ioqueue) if (config_.stun_cfg.ioqueue)
pj_ioqueue_destroy(config_.stun_cfg.ioqueue); pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
if (config_.stun_cfg.timer_heap) if (config_.stun_cfg.timer_heap)
pj_timer_heap_destroy(config_.stun_cfg.timer_heap); pj_timer_heap_destroy(config_.stun_cfg.timer_heap);
JAMI_DBG("[ice:%p] done destroying", this);
} }
bool bool
IceTransport::Impl::_isInitialized() const IceTransport::Impl::_isInitialized() const
{ {
if (auto icest = icest_.get()) { if (auto icest = icest_) {
auto state = pj_ice_strans_get_state(icest); auto state = pj_ice_strans_get_state(icest);
return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED; return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED;
} }
...@@ -489,7 +484,7 @@ IceTransport::Impl::_isInitialized() const ...@@ -489,7 +484,7 @@ IceTransport::Impl::_isInitialized() const
bool bool
IceTransport::Impl::_isStarted() const IceTransport::Impl::_isStarted() const
{ {
if (auto icest = icest_.get()) { if (auto icest = icest_) {
auto state = pj_ice_strans_get_state(icest); auto state = pj_ice_strans_get_state(icest);
return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED; return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED;
} }
...@@ -499,7 +494,7 @@ IceTransport::Impl::_isStarted() const ...@@ -499,7 +494,7 @@ IceTransport::Impl::_isStarted() const
bool bool
IceTransport::Impl::_isRunning() const IceTransport::Impl::_isRunning() const
{ {
if (auto icest = icest_.get()) { if (auto icest = icest_) {
auto state = pj_ice_strans_get_state(icest); auto state = pj_ice_strans_get_state(icest);
return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED; return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED;
} }
...@@ -509,7 +504,7 @@ IceTransport::Impl::_isRunning() const ...@@ -509,7 +504,7 @@ IceTransport::Impl::_isRunning() const
bool bool
IceTransport::Impl::_isFailed() const IceTransport::Impl::_isFailed() const
{ {
if (auto icest = icest_.get()) if (auto icest = icest_)
return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED; return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED;
return false; return false;
} }
...@@ -581,12 +576,6 @@ IceTransport::Impl::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_st ...@@ -581,12 +576,6 @@ IceTransport::Impl::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_st
last_errmsg_.c_str()); last_errmsg_.c_str());
} }
{
std::lock_guard<std::mutex> lk(iceMutex_);
if (!icest_.get())
icest_.reset(ice_st);
}
if (done and op == PJ_ICE_STRANS_OP_INIT) { if (done and op == PJ_ICE_STRANS_OP_INIT) {
if (initiatorSession_) if (initiatorSession_)
setInitiatorSession(); setInitiatorSession();
...@@ -641,7 +630,14 @@ IceTransport::Impl::setInitiatorSession() ...@@ -641,7 +630,14 @@ IceTransport::Impl::setInitiatorSession()
JAMI_DBG("[ice:%p] as master", this); JAMI_DBG("[ice:%p] as master", this);
initiatorSession_ = true; initiatorSession_ = true;
if (_isInitialized()) { if (_isInitialized()) {
auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING);
std::lock_guard<std::mutex> lk(iceMutex_);
if (not icest_) {
return false;
}
auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING);
if (status != PJ_SUCCESS) { if (status != PJ_SUCCESS) {
last_errmsg_ = sip_utils::sip_strerror(status); last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str());
...@@ -658,7 +654,14 @@ IceTransport::Impl::setSlaveSession() ...@@ -658,7 +654,14 @@ IceTransport::Impl::setSlaveSession()
JAMI_DBG("[ice:%p] as slave", this); JAMI_DBG("[ice:%p] as slave", this);
initiatorSession_ = false; initiatorSession_ = false;
if (_isInitialized()) { if (_isInitialized()) {
auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED);
std::lock_guard<std::mutex> lk(iceMutex_);
if (not icest_) {
return false;
}
auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED);
if (status != PJ_SUCCESS) { if (status != PJ_SUCCESS) {
last_errmsg_ = sip_utils::sip_strerror(status); last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str());
...@@ -680,7 +683,14 @@ IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const ...@@ -680,7 +683,14 @@ IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const
JAMI_ERR("[ice:%p] ICE transport is not running", this); JAMI_ERR("[ice:%p] ICE transport is not running", this);
return nullptr; return nullptr;
} }
const auto* sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id);
std::lock_guard<std::mutex> lk(iceMutex_);
if (not icest_) {
return nullptr;
}
const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id);
if (sess == nullptr) { if (sess == nullptr) {
JAMI_ERR("[ice:%p] Component %i has no valid pair", this, comp_id); JAMI_ERR("[ice:%p] Component %i has no valid pair", this, comp_id);
return nullptr; return nullptr;
...@@ -726,23 +736,35 @@ IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand) ...@@ -726,23 +736,35 @@ IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand)
void void
IceTransport::Impl::getUFragPwd() IceTransport::Impl::getUFragPwd()
{ {
if (icest_) {
pj_str_t local_ufrag, local_pwd; pj_str_t local_ufrag, local_pwd;
pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, nullptr, nullptr);
pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr);
local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen);
local_pwd_.assign(local_pwd.ptr, local_pwd.slen); local_pwd_.assign(local_pwd.ptr, local_pwd.slen);
} }
}
bool bool
IceTransport::Impl::createIceSession(pj_ice_sess_role role) IceTransport::Impl::createIceSession(pj_ice_sess_role role)
{ {
if (pj_ice_strans_init_ice(icest_.get(), role, nullptr, nullptr) != PJ_SUCCESS) { std::lock_guard<std::mutex> lk(iceMutex_);
if (not icest_) {
return false;
}
if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) {
JAMI_ERR("[ice:%p] pj_ice_strans_init_ice() failed", this); JAMI_ERR("[ice:%p] pj_ice_strans_init_ice() failed", this);
return false; return false;
} }
// Fetch some information on local configuration // Fetch some information on local configuration
getUFragPwd(); getUFragPwd();
JAMI_DBG("[ice:%p] (local) ufrag=%s, pwd=%s", this, local_ufrag_.c_str(), local_pwd_.c_str()); JAMI_DBG("[ice:%p] (local) ufrag=%s, pwd=%s", this, local_ufrag_.c_str(), local_pwd_.c_str());
return true; return true;
} }
...@@ -1077,7 +1099,11 @@ bool ...@@ -1077,7 +1099,11 @@ bool
IceTransport::isInitiator() const IceTransport::isInitiator() const
{ {
if (isInitialized()) { if (isInitialized()) {
return pj_ice_strans_get_role(pimpl_->icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING; std::lock_guard<std::mutex> lk(pimpl_->iceMutex_);
if (pimpl_->icest_) {
return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING;
}
return false;
} }
return pimpl_->initiatorSession_; return pimpl_->initiatorSession_;
} }
...@@ -1131,7 +1157,14 @@ IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& r ...@@ -1131,7 +1157,14 @@ IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& r
JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)",
pimpl_.get(), pimpl_.get(),
rem_candidates.size()); rem_candidates.size());
auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(),
std::unique_lock lk(pimpl_->iceMutex_);
if (not pimpl_->icest_) {
return false;
}
auto status = pj_ice_strans_start_ice(pimpl_->icest_,
pj_strset(&ufrag, pj_strset(&ufrag,
(char*) rem_attrs.ufrag.c_str(), (char*) rem_attrs.ufrag.c_str(),
rem_attrs.ufrag.size()), rem_attrs.ufrag.size()),
...@@ -1188,10 +1221,14 @@ IceTransport::startIce(const SDP& sdp) ...@@ -1188,10 +1221,14 @@ IceTransport::startIce(const SDP& sdp)
if (parseIceAttributeLine(0, line, cand)) if (parseIceAttributeLine(0, line, cand))
rem_candidates.emplace_back(cand); rem_candidates.emplace_back(cand);
} }
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
if (!pimpl_->icest_) std::unique_lock lk(pimpl_->iceMutex_);
if (not pimpl_->icest_) {
return false; return false;
auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(), }
auto status = pj_ice_strans_start_ice(pimpl_->icest_,
pj_strset(&ufrag, pj_strset(&ufrag,
(char*) sdp.ufrag.c_str(), (char*) sdp.ufrag.c_str(),
sdp.ufrag.size()), sdp.ufrag.size()),
...@@ -1214,9 +1251,9 @@ IceTransport::stop() ...@@ -1214,9 +1251,9 @@ IceTransport::stop()
pimpl_->is_stopped_ = true; pimpl_->is_stopped_ = true;
if (isStarted()) { if (isStarted()) {
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
if (!pimpl_->icest_) if (not pimpl_->icest_)
return false; return false;
auto status = pj_ice_strans_stop_ice(pimpl_->icest_.get()); auto status = pj_ice_strans_stop_ice(pimpl_->icest_);
if (status != PJ_SUCCESS) { if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status); pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] ICE stop failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str()); JAMI_ERR("[ice:%p] ICE stop failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str());
...@@ -1272,9 +1309,9 @@ IceTransport::getLocalCandidates(unsigned comp_id) const ...@@ -1272,9 +1309,9 @@ IceTransport::getLocalCandidates(unsigned comp_id) const
{ {
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
if (!pimpl_->icest_) if (not pimpl_->icest_)
return res; return res;
if (pj_ice_strans_enum_cands(pimpl_->icest_.get(), comp_id, &cand_cnt, cand) != PJ_SUCCESS) { if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) {
JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get());
return res; return res;
} }
...@@ -1334,7 +1371,7 @@ IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const ...@@ -1334,7 +1371,7 @@ IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const
{ {
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
if (!pimpl_->icest_) if (not pimpl_->icest_)
return res; return res;
// In the implementation, the component IDs are enumerated globally // In the implementation, the component IDs are enumerated globally
// (per SDP: 1, 2, 3, 4, ...). This is simpler because we create // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create
...@@ -1343,7 +1380,7 @@ IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const ...@@ -1343,7 +1380,7 @@ IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const
// order to be compliant with the spec. // order to be compliant with the spec.
auto globalCompId = streamIdx * 2 + compId; auto globalCompId = streamIdx * 2 + compId;
if (pj_ice_strans_enum_cands(pimpl_->icest_.get(), globalCompId, &cand_cnt, cand) if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand)
!= PJ_SUCCESS) { != PJ_SUCCESS) {
JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get());
return res; return res;
...@@ -1553,16 +1590,24 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) ...@@ -1553,16 +1590,24 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
auto status = pj_ice_strans_sendto2(pimpl_->icest_.get(),
std::unique_lock lk(pimpl_->iceMutex_);
if (not pimpl_->icest_) {
return -1;
}
auto status = pj_ice_strans_sendto2(pimpl_->icest_,
compId, compId,
buf, buf,
len, len,
remote.pjPtr(), remote.pjPtr(),
remote.getLength()); remote.getLength());
if (status == PJ_EPENDING && isTCPEnabled()) { if (status == PJ_EPENDING && isTCPEnabled()) {
// NOTE; because we are in TCP, the sent size will count the header (2 // NOTE; because we are in TCP, the sent size will count the header (2
// bytes length). // bytes length).
std::unique_lock<std::mutex> lk(pimpl_->iceMutex_);
pimpl_->waitDataCv_.wait(lk, [&] { pimpl_->waitDataCv_.wait(lk, [&] {
return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len)
or pimpl_->destroying_.load(); or pimpl_->destroying_.load();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment