diff --git a/src/ringdht/sips_transport_ice.cpp b/src/ringdht/sips_transport_ice.cpp index b9cc114634f2b7cbf1d8c00370035cc04c62cabb..c9d5f66c942ca08183e9c26fe34c38907fb5ee36 100644 --- a/src/ringdht/sips_transport_ice.cpp +++ b/src/ringdht/sips_transport_ice.cpp @@ -30,6 +30,7 @@ #include "sips_transport_ice.h" #include "ice_transport.h" +#include "manager.h" #include "logger.h" #include "gnutls_support.h" @@ -205,6 +206,7 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt, }); tlsThread_.start(); + Manager::instance().registerEventHandler((uintptr_t)this, [this]{ handleEvents(); }); } SipsIceTransport::~SipsIceTransport() @@ -213,6 +215,7 @@ SipsIceTransport::~SipsIceTransport() shutdown(); ice_->setOnRecv(comp_id_, nullptr); tlsThread_.join(); + Manager::instance().unregisterEventHandler((uintptr_t)this); pj_lock_destroy(trData_.base.lock); pj_atomic_destroy(trData_.base.ref_cnt); @@ -653,6 +656,36 @@ SipsIceTransport::setup() return startTlsSession() == PJ_SUCCESS; } +void +SipsIceTransport::handleEvents() +{ + std::lock_guard<std::mutex> l(rxMtx_); + while (not rxPending_.empty()) { + auto pck_it = rxPending_.begin(); + auto& pck = *pck_it; + pj_pool_reset(rdata_.tp_info.pool); + pj_gettimeofday(&rdata_.pkt_info.timestamp); + rdata_.pkt_info.len = pck.size(); + std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet); + auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); + if (eaten != rdata_.pkt_info.len) { + // partial sip packet received + auto npck_it = std::next(pck_it); + if (npck_it != rxPending_.end()) { + // drop current packet, merge reminder with next one + auto& npck = *npck_it; + npck.insert(npck.begin(), pck.begin()+eaten, pck.end()); + rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); + } else { + // erase eaten part, keep reminder + pck.erase(pck.begin(), pck.begin()+eaten); + } + } else { + rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); + } + } +} + void SipsIceTransport::loop() { @@ -720,24 +753,15 @@ SipsIceTransport::loop() return; while (canRead_) { - if (rdata_.pkt_info.len < 0) - rdata_.pkt_info.len = 0; - const auto decrypted_size = gnutls_record_recv(session_, - (uint8_t*)rdata_.pkt_info.packet + rdata_.pkt_info.len, - sizeof(rdata_.pkt_info.packet)-rdata_.pkt_info.len); - rdata_.pkt_info.len += decrypted_size; + std::lock_guard<std::mutex> l(rxMtx_); + if (rxPendingPool_.empty()) + rxPendingPool_.emplace_back(PJSIP_MAX_PKT_LEN); + auto& buf = rxPendingPool_.front(); + const auto decrypted_size = gnutls_record_recv(session_, buf.data(), buf.size()); + if (decrypted_size > 0/* || transport error */) { - rdata_.pkt_info.zero = 0; - pj_gettimeofday(&rdata_.pkt_info.timestamp); - auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); - auto rem = rdata_.pkt_info.len - eaten; - if (rem > 0 && rem != rdata_.pkt_info.len) { - std::move(rdata_.pkt_info.packet + eaten, - rdata_.pkt_info.packet + eaten + rem, - rdata_.pkt_info.packet); - } - rdata_.pkt_info.len = rem; - pj_pool_reset(rdata_.tp_info.pool); + buf.resize(decrypted_size); + rxPending_.splice(rxPending_.end(), rxPendingPool_, rxPendingPool_.begin()); } else if (decrypted_size == 0) { /* EOF */ shutdown(); @@ -795,6 +819,7 @@ SipsIceTransport::clean() cookie_key_.data = nullptr; cookie_key_.size = 0; } + rxPendingPool_.clear(); bool event = state_ == TlsConnectionState::ESTABLISHED; closeTlsSession(); diff --git a/src/ringdht/sips_transport_ice.h b/src/ringdht/sips_transport_ice.h index b0bd6ba060dc203e9888c7b7d60e03c577ff04aa..1517b2e6c5ae059062346402229cd89fdd390c56 100644 --- a/src/ringdht/sips_transport_ice.h +++ b/src/ringdht/sips_transport_ice.h @@ -144,6 +144,11 @@ private: gnutls_datum_t cookie_key_ {nullptr, 0}; gnutls_dtls_prestate_st prestate_; + /** + * To be called on a regular basis to receive packets + */ + void handleEvents(); + // ThreadLoop bool setup(); void loop(); @@ -157,6 +162,10 @@ private: pj_status_t flushOutputBuff(); std::list<DelayedTxData> outputBuff_; std::mutex outputBuffMtx_; + + std::mutex rxMtx_; + std::list<std::vector<uint8_t>> rxPending_; + std::list<std::vector<uint8_t>> rxPendingPool_; pjsip_rx_data rdata_; // GnuTLS <-> ICE