From 7201cc528cf030679b60ba6bebde591d4a673f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Thu, 26 Mar 2015 15:58:56 -0400 Subject: [PATCH] sips/ice: properly report all incoming packets before closing Refs #69371 Change-Id: Ife39e493848f1326234d25f657b49734e8ce0b1d --- src/ringdht/sips_transport_ice.cpp | 58 +++++++++++++++++------------- src/ringdht/sips_transport_ice.h | 1 + 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/src/ringdht/sips_transport_ice.cpp b/src/ringdht/sips_transport_ice.cpp index db3d54b7c6..2c012d88eb 100644 --- a/src/ringdht/sips_transport_ice.cpp +++ b/src/ringdht/sips_transport_ice.cpp @@ -659,32 +659,35 @@ SipsIceTransport::setup() void SipsIceTransport::handleEvents() { - std::lock_guard<std::mutex> l(rxMtx_); - while (not rxPending_.empty()) { - auto pck_it = rxPending_.begin(); - auto& pck = *pck_it; - pj_pool_reset(rdata_.tp_info.pool); - pj_gettimeofday(&rdata_.pkt_info.timestamp); - rdata_.pkt_info.len = pck.size(); - std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet); - auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); - if (eaten != rdata_.pkt_info.len) { - // partial sip packet received - auto npck_it = std::next(pck_it); - if (npck_it != rxPending_.end()) { - // drop current packet, merge reminder with next one - auto& npck = *npck_it; - npck.insert(npck.begin(), pck.begin()+eaten, pck.end()); - rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); + { + std::lock_guard<std::mutex> l(rxMtx_); + while (not rxPending_.empty()) { + auto pck_it = rxPending_.begin(); + auto& pck = *pck_it; + pj_pool_reset(rdata_.tp_info.pool); + pj_gettimeofday(&rdata_.pkt_info.timestamp); + rdata_.pkt_info.len = pck.size(); + std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet); + auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); + if (eaten != rdata_.pkt_info.len) { + // partial sip packet received + auto npck_it = std::next(pck_it); + if (npck_it != rxPending_.end()) { + // drop current packet, merge reminder with next one + auto& npck = *npck_it; + npck.insert(npck.begin(), pck.begin()+eaten, pck.end()); + rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); + } else { + // erase eaten part, keep reminder + pck.erase(pck.begin(), pck.begin()+eaten); + break; + } } else { - // erase eaten part, keep reminder - pck.erase(pck.begin(), pck.begin()+eaten); - break; + rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); } - } else { - rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); } } + rxCv_.notify_all(); } void @@ -766,7 +769,7 @@ SipsIceTransport::loop() rxPending_.splice(rxPending_.end(), rxPendingPool_, rxPendingPool_.begin()); } else if (decrypted_size == 0) { /* EOF */ - shutdown(); + tlsThread_.stop(); break; } else if (decrypted_size == GNUTLS_E_AGAIN or decrypted_size == GNUTLS_E_INTERRUPTED) { @@ -821,7 +824,14 @@ SipsIceTransport::clean() cookie_key_.data = nullptr; cookie_key_.size = 0; } - rxPendingPool_.clear(); + { + // make sure all incoming packets are reported before closing + std::unique_lock<std::mutex> l(rxMtx_); + rxCv_.wait(l, [&](){ + return rxPending_.empty(); + }); + rxPendingPool_.clear(); + } bool event = state_ == TlsConnectionState::ESTABLISHED; closeTlsSession(); diff --git a/src/ringdht/sips_transport_ice.h b/src/ringdht/sips_transport_ice.h index 1517b2e6c5..291d1a8001 100644 --- a/src/ringdht/sips_transport_ice.h +++ b/src/ringdht/sips_transport_ice.h @@ -164,6 +164,7 @@ private: std::mutex outputBuffMtx_; std::mutex rxMtx_; + std::condition_variable_any rxCv_; std::list<std::vector<uint8_t>> rxPending_; std::list<std::vector<uint8_t>> rxPendingPool_; pjsip_rx_data rdata_; -- GitLab