Commit 8e2780b6 authored by Adrien Béraud's avatar Adrien Béraud

sips/ice: send packets to pjsip from main thread

Refs #69122

Change-Id: I346c480728233d3e15c681b78febbac9bcf96b97
parent 5102c349
......@@ -30,6 +30,7 @@
#include "sips_transport_ice.h"
#include "ice_transport.h"
#include "manager.h"
#include "logger.h"
#include "gnutls_support.h"
......@@ -205,6 +206,7 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt,
});
tlsThread_.start();
Manager::instance().registerEventHandler((uintptr_t)this, [this]{ handleEvents(); });
}
SipsIceTransport::~SipsIceTransport()
......@@ -213,6 +215,7 @@ SipsIceTransport::~SipsIceTransport()
shutdown();
ice_->setOnRecv(comp_id_, nullptr);
tlsThread_.join();
Manager::instance().unregisterEventHandler((uintptr_t)this);
pj_lock_destroy(trData_.base.lock);
pj_atomic_destroy(trData_.base.ref_cnt);
......@@ -653,6 +656,36 @@ SipsIceTransport::setup()
return startTlsSession() == PJ_SUCCESS;
}
void
SipsIceTransport::handleEvents()
{
std::lock_guard<std::mutex> l(rxMtx_);
while (not rxPending_.empty()) {
auto pck_it = rxPending_.begin();
auto& pck = *pck_it;
pj_pool_reset(rdata_.tp_info.pool);
pj_gettimeofday(&rdata_.pkt_info.timestamp);
rdata_.pkt_info.len = pck.size();
std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet);
auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
if (eaten != rdata_.pkt_info.len) {
// partial sip packet received
auto npck_it = std::next(pck_it);
if (npck_it != rxPending_.end()) {
// drop current packet, merge reminder with next one
auto& npck = *npck_it;
npck.insert(npck.begin(), pck.begin()+eaten, pck.end());
rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it);
} else {
// erase eaten part, keep reminder
pck.erase(pck.begin(), pck.begin()+eaten);
}
} else {
rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it);
}
}
}
void
SipsIceTransport::loop()
{
......@@ -720,24 +753,15 @@ SipsIceTransport::loop()
return;
while (canRead_) {
if (rdata_.pkt_info.len < 0)
rdata_.pkt_info.len = 0;
const auto decrypted_size = gnutls_record_recv(session_,
(uint8_t*)rdata_.pkt_info.packet + rdata_.pkt_info.len,
sizeof(rdata_.pkt_info.packet)-rdata_.pkt_info.len);
rdata_.pkt_info.len += decrypted_size;
std::lock_guard<std::mutex> l(rxMtx_);
if (rxPendingPool_.empty())
rxPendingPool_.emplace_back(PJSIP_MAX_PKT_LEN);
auto& buf = rxPendingPool_.front();
const auto decrypted_size = gnutls_record_recv(session_, buf.data(), buf.size());
if (decrypted_size > 0/* || transport error */) {
rdata_.pkt_info.zero = 0;
pj_gettimeofday(&rdata_.pkt_info.timestamp);
auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
auto rem = rdata_.pkt_info.len - eaten;
if (rem > 0 && rem != rdata_.pkt_info.len) {
std::move(rdata_.pkt_info.packet + eaten,
rdata_.pkt_info.packet + eaten + rem,
rdata_.pkt_info.packet);
}
rdata_.pkt_info.len = rem;
pj_pool_reset(rdata_.tp_info.pool);
buf.resize(decrypted_size);
rxPending_.splice(rxPending_.end(), rxPendingPool_, rxPendingPool_.begin());
} else if (decrypted_size == 0) {
/* EOF */
shutdown();
......@@ -795,6 +819,7 @@ SipsIceTransport::clean()
cookie_key_.data = nullptr;
cookie_key_.size = 0;
}
rxPendingPool_.clear();
bool event = state_ == TlsConnectionState::ESTABLISHED;
closeTlsSession();
......
......@@ -144,6 +144,11 @@ private:
gnutls_datum_t cookie_key_ {nullptr, 0};
gnutls_dtls_prestate_st prestate_;
/**
* To be called on a regular basis to receive packets
*/
void handleEvents();
// ThreadLoop
bool setup();
void loop();
......@@ -157,6 +162,10 @@ private:
pj_status_t flushOutputBuff();
std::list<DelayedTxData> outputBuff_;
std::mutex outputBuffMtx_;
std::mutex rxMtx_;
std::list<std::vector<uint8_t>> rxPending_;
std::list<std::vector<uint8_t>> rxPendingPool_;
pjsip_rx_data rdata_;
// GnuTLS <-> ICE
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment