diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 8dff5ea654fbf1d7400b6b7a976342f1069f6a3e..aa2e5645706c27db48caa78fc5390509af71f776 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -72,19 +72,6 @@ using MutexGuard = std::lock_guard<std::mutex>; using MutexLock = std::unique_lock<std::mutex>; using namespace upnp; -namespace { - -struct IceSTransDeleter -{ - void operator()(pj_ice_strans* ptr) - { - pj_ice_strans_stop_ice(ptr); - pj_ice_strans_destroy(ptr); - } -}; - -} // namespace - //============================================================================== class IceTransport::Impl @@ -110,6 +97,9 @@ public: bool setSlaveSession(); bool createIceSession(pj_ice_sess_role role); + /** + * Must be called while holding iceMutex_ + */ void getUFragPwd(); std::string link() const; @@ -145,7 +135,7 @@ public: IceTransportCompleteCb on_negodone_cb_ {}; IceRecvInfo on_recv_cb_ {}; mutable std::mutex iceMutex_ {}; - std::unique_ptr<pj_ice_strans, IceSTransDeleter> icest_; + pj_ice_strans* icest_{nullptr}; unsigned streamsCount_ {0}; unsigned compCountPerStream_ {0}; unsigned compCount_ {0}; @@ -421,10 +411,9 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap)); TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue)); - pj_ice_strans* icest = nullptr; - pj_status_t status = pj_ice_strans_create(name, &config_, compCount_, this, &icecb, &icest); + pj_status_t status = pj_ice_strans_create(name, &config_, compCount_, this, &icecb, &icest_); - if (status != PJ_SUCCESS || icest == nullptr) { + if (status != PJ_SUCCESS || icest_ == nullptr) { throw std::runtime_error("pj_ice_strans_create() failed"); } @@ -436,18 +425,6 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) // but here we don't care if there is event or not. handleEvents(500); // limit polling to 500ms } - // NOTE: This last handleEvents is necessary to close TURN socket. - // Because when destroying the TURN session pjproject creates a pj_timer - // to postpone the TURN destruction. This timer is only called if we poll - // the event queue. - auto started_destruction = std::chrono::system_clock::now(); - while (handleEvents(500)) { - if (std::chrono::system_clock::now() - started_destruction - > std::chrono::seconds(MAX_DESTRUCTION_TIMEOUT)) { - // If the transport is not closed after 3 seconds, avoid blocking - break; - } - } }); // Init to invalid addresses @@ -456,30 +433,48 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) IceTransport::Impl::~Impl() { - JAMI_DBG("[ice:%p] destroying", this); + JAMI_DBG("[ice:%p] destroying %p", this, icest_); + sip_utils::register_thread(); threadTerminateFlags_ = true; iceCV_.notify_all(); - if (thread_.joinable()) + if (thread_.joinable()) { thread_.join(); - - { - std::lock_guard<std::mutex> lk {iceMutex_}; - icest_.reset(); // must be done before ioqueue/timer destruction } + pj_ice_strans* strans = nullptr; + + std::swap(strans, icest_); + + assert(strans); + + // must be done before ioqueue/timer destruction + JAMI_INFO("[ice:%p] Destroying ice_strans %p", + pj_ice_strans_get_user_data(strans), strans); + + pj_ice_strans_stop_ice(strans); + pj_ice_strans_destroy(strans); + + // NOTE: This last handleEvents is necessary to close TURN socket. + // Because when destroying the TURN session pjproject creates a pj_timer + // to postpone the TURN destruction. This timer is only called if we poll + // the event queue. + while (handleEvents(500)); + if (config_.stun_cfg.ioqueue) pj_ioqueue_destroy(config_.stun_cfg.ioqueue); if (config_.stun_cfg.timer_heap) pj_timer_heap_destroy(config_.stun_cfg.timer_heap); + + JAMI_DBG("[ice:%p] done destroying", this); } bool IceTransport::Impl::_isInitialized() const { - if (auto icest = icest_.get()) { + if (auto icest = icest_) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED; } @@ -489,7 +484,7 @@ IceTransport::Impl::_isInitialized() const bool IceTransport::Impl::_isStarted() const { - if (auto icest = icest_.get()) { + if (auto icest = icest_) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED; } @@ -499,7 +494,7 @@ IceTransport::Impl::_isStarted() const bool IceTransport::Impl::_isRunning() const { - if (auto icest = icest_.get()) { + if (auto icest = icest_) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED; } @@ -509,7 +504,7 @@ IceTransport::Impl::_isRunning() const bool IceTransport::Impl::_isFailed() const { - if (auto icest = icest_.get()) + if (auto icest = icest_) return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED; return false; } @@ -581,12 +576,6 @@ IceTransport::Impl::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_st last_errmsg_.c_str()); } - { - std::lock_guard<std::mutex> lk(iceMutex_); - if (!icest_.get()) - icest_.reset(ice_st); - } - if (done and op == PJ_ICE_STRANS_OP_INIT) { if (initiatorSession_) setInitiatorSession(); @@ -641,7 +630,14 @@ IceTransport::Impl::setInitiatorSession() JAMI_DBG("[ice:%p] as master", this); initiatorSession_ = true; if (_isInitialized()) { - auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING); + + std::lock_guard<std::mutex> lk(iceMutex_); + + if (not icest_) { + return false; + } + + auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); @@ -658,7 +654,14 @@ IceTransport::Impl::setSlaveSession() JAMI_DBG("[ice:%p] as slave", this); initiatorSession_ = false; if (_isInitialized()) { - auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED); + + std::lock_guard<std::mutex> lk(iceMutex_); + + if (not icest_) { + return false; + } + + auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); @@ -680,7 +683,14 @@ IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const JAMI_ERR("[ice:%p] ICE transport is not running", this); return nullptr; } - const auto* sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id); + + std::lock_guard<std::mutex> lk(iceMutex_); + + if (not icest_) { + return nullptr; + } + + const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id); if (sess == nullptr) { JAMI_ERR("[ice:%p] Component %i has no valid pair", this, comp_id); return nullptr; @@ -726,23 +736,35 @@ IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand) void IceTransport::Impl::getUFragPwd() { - pj_str_t local_ufrag, local_pwd; - pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, nullptr, nullptr); - local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); - local_pwd_.assign(local_pwd.ptr, local_pwd.slen); + if (icest_) { + + pj_str_t local_ufrag, local_pwd; + + pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr); + local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); + local_pwd_.assign(local_pwd.ptr, local_pwd.slen); + } } bool IceTransport::Impl::createIceSession(pj_ice_sess_role role) { - if (pj_ice_strans_init_ice(icest_.get(), role, nullptr, nullptr) != PJ_SUCCESS) { + std::lock_guard<std::mutex> lk(iceMutex_); + + if (not icest_) { + return false; + } + + if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_init_ice() failed", this); return false; } // Fetch some information on local configuration getUFragPwd(); + JAMI_DBG("[ice:%p] (local) ufrag=%s, pwd=%s", this, local_ufrag_.c_str(), local_pwd_.c_str()); + return true; } @@ -1077,7 +1099,11 @@ bool IceTransport::isInitiator() const { if (isInitialized()) { - return pj_ice_strans_get_role(pimpl_->icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING; + std::lock_guard<std::mutex> lk(pimpl_->iceMutex_); + if (pimpl_->icest_) { + return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING; + } + return false; } return pimpl_->initiatorSession_; } @@ -1131,7 +1157,14 @@ IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& r JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", pimpl_.get(), rem_candidates.size()); - auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(), + + std::unique_lock lk(pimpl_->iceMutex_); + + if (not pimpl_->icest_) { + return false; + } + + auto status = pj_ice_strans_start_ice(pimpl_->icest_, pj_strset(&ufrag, (char*) rem_attrs.ufrag.c_str(), rem_attrs.ufrag.size()), @@ -1188,10 +1221,14 @@ IceTransport::startIce(const SDP& sdp) if (parseIceAttributeLine(0, line, cand)) rem_candidates.emplace_back(cand); } - std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; - if (!pimpl_->icest_) + + std::unique_lock lk(pimpl_->iceMutex_); + + if (not pimpl_->icest_) { return false; - auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(), + } + + auto status = pj_ice_strans_start_ice(pimpl_->icest_, pj_strset(&ufrag, (char*) sdp.ufrag.c_str(), sdp.ufrag.size()), @@ -1214,9 +1251,9 @@ IceTransport::stop() pimpl_->is_stopped_ = true; if (isStarted()) { std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; - if (!pimpl_->icest_) + if (not pimpl_->icest_) return false; - auto status = pj_ice_strans_stop_ice(pimpl_->icest_.get()); + auto status = pj_ice_strans_stop_ice(pimpl_->icest_); if (status != PJ_SUCCESS) { pimpl_->last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] ICE stop failed: %s", pimpl_.get(), pimpl_->last_errmsg_.c_str()); @@ -1272,9 +1309,9 @@ IceTransport::getLocalCandidates(unsigned comp_id) const { std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; - if (!pimpl_->icest_) + if (not pimpl_->icest_) return res; - if (pj_ice_strans_enum_cands(pimpl_->icest_.get(), comp_id, &cand_cnt, cand) != PJ_SUCCESS) { + if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); return res; } @@ -1334,7 +1371,7 @@ IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const { std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; - if (!pimpl_->icest_) + if (not pimpl_->icest_) return res; // In the implementation, the component IDs are enumerated globally // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create @@ -1343,7 +1380,7 @@ IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const // order to be compliant with the spec. auto globalCompId = streamIdx * 2 + compId; - if (pj_ice_strans_enum_cands(pimpl_->icest_.get(), globalCompId, &cand_cnt, cand) + if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); return res; @@ -1553,19 +1590,27 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) errno = EINVAL; return -1; } - auto status = pj_ice_strans_sendto2(pimpl_->icest_.get(), + + std::unique_lock lk(pimpl_->iceMutex_); + + if (not pimpl_->icest_) { + return -1; + } + + auto status = pj_ice_strans_sendto2(pimpl_->icest_, compId, buf, len, remote.pjPtr(), remote.getLength()); + if (status == PJ_EPENDING && isTCPEnabled()) { + // NOTE; because we are in TCP, the sent size will count the header (2 // bytes length). - std::unique_lock<std::mutex> lk(pimpl_->iceMutex_); pimpl_->waitDataCv_.wait(lk, [&] { return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) - or pimpl_->destroying_.load(); + or pimpl_->destroying_.load(); }); pimpl_->lastSentLen_ = 0; } else if (status != PJ_SUCCESS && status != PJ_EPENDING) {