From 5a3768f31a7ac331bf05e76698dbff6b39fbee56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Tue, 16 Aug 2022 13:53:20 -0400 Subject: [PATCH] ice_transport: avoid dirty underlyingIce() Let the IceTransport layer stopped it's current read if destroyed (as it uses a peerChannel, this channel should be cancelled to let upper layer stop correctly all operations). Call the shutdown callback whenever the ICE is really destroyed (so caller must not re-use the ICE pointer). Finally, remove some dirty underlyingIce() accesses in peer_connection Change-Id: Icca73d9af273297a558121ffddc991ac6617ece2 GitLab: #703 --- src/ice_transport.cpp | 59 ++++++++++++++++++++--------------------- src/peer_connection.cpp | 16 +---------- src/peer_connection.h | 2 -- 3 files changed, 30 insertions(+), 47 deletions(-) diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index c44be21aa3..d26f3ff83e 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -82,21 +82,16 @@ class IceLock public: IceLock(pj_ice_strans* strans) - : lk_(pj_ice_strans_get_grp_lock(strans)) { + : lk_(pj_ice_strans_get_grp_lock(strans)) + { lock(); } - ~IceLock() { - unlock(); - } + ~IceLock() { unlock(); } - void lock() { - pj_grp_lock_acquire(lk_); - } + void lock() { pj_grp_lock_acquire(lk_); } - void unlock() { - pj_grp_lock_release(lk_); - } + void unlock() { pj_grp_lock_release(lk_); } }; class IceTransport::Impl @@ -236,6 +231,12 @@ public: pj_size_t lastSentLen_ {0}; bool destroying_ {false}; onShutdownCb scb {}; + + void cancelOperations() + { + for (auto& c : peerChannels_) + c.stop(); + } }; //============================================================================== @@ -381,6 +382,8 @@ IceTransport::Impl::~Impl() } JAMI_DBG("[ice:%p] done destroying", this); + if (scb) + scb(); } void @@ -508,11 +511,10 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) icecb.on_destroy = [](pj_ice_strans* ice_st) { if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) { + tr->cancelOperations(); // Avoid upper layer to manage this ; Stop read operations std::lock_guard lk(tr->sendDataMutex_); tr->destroying_ = true; - tr->waitDataCv_.notify_all(); - if (tr->scb) - tr->scb(); + tr->waitDataCv_.notify_all(); // Stop write operations } else { JAMI_WARN("null IceTransport"); } @@ -698,9 +700,9 @@ IceTransport::Impl::checkEventQueue(int maxEventToPoll) void IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status) { - const char* opname = op == PJ_ICE_STRANS_OP_INIT - ? "initialization" - : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; + const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization" + : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" + : "unknown_op"; const bool done = status == PJ_SUCCESS; if (done) { @@ -748,7 +750,7 @@ IceTransport::Impl::link() const std::ostringstream out; for (unsigned strm = 1; strm <= streamsCount_ * compCountPerStream_; strm++) { auto absIdx = strm; - auto comp = (strm+1)/compCountPerStream_; + auto comp = (strm + 1) / compCountPerStream_; auto laddr = getLocalAddress(absIdx); auto raddr = getRemoteAddress(absIdx); @@ -770,7 +772,6 @@ IceTransport::Impl::setInitiatorSession() JAMI_DBG("[ice:%p] as master", this); initiatorSession_ = true; if (_isInitialized()) { - auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); @@ -788,7 +789,6 @@ IceTransport::Impl::setSlaveSession() JAMI_DBG("[ice:%p] as slave", this); initiatorSession_ = false; if (_isInitialized()) { - auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); @@ -1102,7 +1102,8 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size) jami_tracepoint_if_enabled(ice_transport_recv, reinterpret_cast<uint64_t>(this), - comp_id, size, + comp_id, + size, getRemoteAddress(comp_id).toString().c_str()); if (size == 0) return; @@ -1130,8 +1131,8 @@ IceTransport::Impl::_waitForInitialization(std::chrono::milliseconds timeout) IceLock lk(icest_); if (not iceCV_.wait_for(lk, timeout, [this] { - return threadTerminateFlags_ or _isInitialized() or _isFailed(); - })) { + return threadTerminateFlags_ or _isInitialized() or _isFailed(); + })) { JAMI_WARN("[ice:%p] waitForInitialization: timeout", this); return false; } @@ -1156,8 +1157,7 @@ IceTransport::initIceInstance(const IceTransportOptions& options) { pimpl_->initIceInstance(options); - jami_tracepoint(ice_transport_context, - reinterpret_cast<uint64_t>(this)); + jami_tracepoint(ice_transport_context, reinterpret_cast<uint64_t>(this)); } bool @@ -1366,9 +1366,7 @@ void IceTransport::cancelOperations() { isCancelled_ = true; - for (auto& c : pimpl_->peerChannels_) { - c.stop(); - } + pimpl_->cancelOperations(); } IpAddr @@ -1692,7 +1690,9 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) jami_tracepoint(ice_transport_send, reinterpret_cast<uint64_t>(this), - compId, len, remote.toString().c_str()); + compId, + len, + remote.toString().c_str()); auto status = pj_ice_strans_sendto2(pimpl_->icest_, compId, @@ -1707,8 +1707,7 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) // NOTE; because we are in TCP, the sent size will count the header (2 // bytes length). pimpl_->waitDataCv_.wait(dlk, [&] { - return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) - or pimpl_->destroying_; + return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_; }); pimpl_->lastSentLen_ = 0; } else if (status != PJ_SUCCESS && status != PJ_EPENDING) { diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index c87f88c06e..d0f72a9f7e 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -195,12 +195,6 @@ public: /*.cert_check = */ nullptr, }; tls = std::make_unique<tls::TlsSession>(std::move(ep), tls_param, tls_cbs); - - if (const auto& ice = ep_->underlyingICE()) - ice->setOnShutdown([this]() { - if (tls) - tls->shutdown(); - }); } Impl(std::unique_ptr<IceSocketEndpoint>&& ep, @@ -232,18 +226,10 @@ public: /*.cert_check = */ nullptr, }; tls = std::make_unique<tls::TlsSession>(std::move(ep), tls_param, tls_cbs); - - if (const auto& ice = ep_->underlyingICE()) - ice->setOnShutdown([this]() { - if (tls) - tls->shutdown(); - }); } ~Impl() { - if (const auto& ice = ep_->underlyingICE()) - ice->setOnShutdown({}); { std::lock_guard<std::mutex> lk(cbMtx_); onStateChangeCb_ = {}; @@ -417,12 +403,12 @@ TlsSocketEndpoint::setOnReady(std::function<void(bool ok)>&& cb) void TlsSocketEndpoint::shutdown() { + pimpl_->tls->shutdown(); if (pimpl_->ep_) { const auto* iceSocket = reinterpret_cast<const IceSocketEndpoint*>(pimpl_->ep_); if (iceSocket && iceSocket->underlyingICE()) iceSocket->underlyingICE()->cancelOperations(); } - pimpl_->tls->shutdown(); } std::shared_ptr<IceTransport> diff --git a/src/peer_connection.h b/src/peer_connection.h index 3d82df39ce..c5f101ce35 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -100,8 +100,6 @@ public: ice_->setOnRecv(compId_, cb); } - void setOnShutdown(onShutdownCb&& cb) { ice_->setOnShutdown(std::move(cb)); } - private: std::shared_ptr<IceTransport> ice_ {nullptr}; std::atomic_bool iceStopped {false}; -- GitLab