Commit ba773017 authored by Adrien Béraud's avatar Adrien Béraud

sips/ice: fix race condition when sending packets

Refs #69371

Change-Id: I5f18ffa02c58a07a71f6260f3de360c9b1f75905
parent 05cc3437
......@@ -659,34 +659,38 @@ SipsIceTransport::setup()
void
SipsIceTransport::handleEvents()
{
decltype(rxPending_) rx;
{
std::lock_guard<std::mutex> l(rxMtx_);
while (not rxPending_.empty()) {
auto pck_it = rxPending_.begin();
auto& pck = *pck_it;
pj_pool_reset(rdata_.tp_info.pool);
pj_gettimeofday(&rdata_.pkt_info.timestamp);
rdata_.pkt_info.len = pck.size();
std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet);
auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
if (eaten != rdata_.pkt_info.len) {
// partial sip packet received
auto npck_it = std::next(pck_it);
if (npck_it != rxPending_.end()) {
// drop current packet, merge reminder with next one
auto& npck = *npck_it;
npck.insert(npck.begin(), pck.begin()+eaten, pck.end());
rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it);
} else {
// erase eaten part, keep reminder
pck.erase(pck.begin(), pck.begin()+eaten);
break;
}
rx = std::move(rxPending_);
}
for (auto it = rx.begin(); it != rx.end(); ++it) {
auto& pck = *it;
pj_pool_reset(rdata_.tp_info.pool);
pj_gettimeofday(&rdata_.pkt_info.timestamp);
rdata_.pkt_info.len = pck.size();
std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet);
auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
if (eaten != rdata_.pkt_info.len) {
// partial sip packet received
auto npck_it = std::next(it);
if (npck_it != rx.end()) {
// drop current packet, merge reminder with next one
auto& npck = *npck_it;
npck.insert(npck.begin(), pck.begin()+eaten, pck.end());
} else {
rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it);
// erase eaten part, keep reminder
pck.erase(pck.begin(), pck.begin()+eaten);
{
std::lock_guard<std::mutex> l(rxMtx_);
rxPending_.splice(rxPending_.begin(), rx, it);
}
break;
}
}
}
putBuff(std::move(rx));
rxCv_.notify_all();
}
......@@ -756,17 +760,19 @@ SipsIceTransport::loop()
if (state_ != TlsConnectionState::ESTABLISHED and not getTransportBase()->is_shutdown)
return;
while (canRead_) {
std::lock_guard<std::mutex> l(rxMtx_);
if (rxPendingPool_.empty())
rxPendingPool_.emplace_back(PJSIP_MAX_PKT_LEN);
auto& buf = rxPendingPool_.front();
buf.resize(PJSIP_MAX_PKT_LEN);
decltype(rxPending_) rx;
while (canRead_ or gnutls_record_check_pending(session_)) {
if (rx.empty())
getBuff(rx, PJSIP_MAX_PKT_LEN);
auto& buf = rx.front();
const auto decrypted_size = gnutls_record_recv(session_, buf.data(), buf.size());
if (decrypted_size > 0/* || transport error */) {
buf.resize(decrypted_size);
rxPending_.splice(rxPending_.end(), rxPendingPool_, rxPendingPool_.begin());
{
std::lock_guard<std::mutex> l(rxMtx_);
rxPending_.splice(rxPending_.end(), rx, rx.begin());
}
} else if (decrypted_size == 0) {
/* EOF */
tlsThread_.stop();
......@@ -798,6 +804,7 @@ SipsIceTransport::loop()
break;
}
}
putBuff(std::move(rx));
flushOutputBuff();
}
}
......@@ -830,7 +837,10 @@ SipsIceTransport::clean()
rxCv_.wait(l, [&](){
return rxPending_.empty();
});
rxPendingPool_.clear();
}
{
std::lock_guard<std::mutex> lk(buffPoolMtx_);
buffPool_.clear();
}
bool event = state_ == TlsConnectionState::ESTABLISHED;
......@@ -933,19 +943,30 @@ SipsIceTransport::send(pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr,
addr_len==sizeof(pj_sockaddr_in6)),
PJ_EINVAL);
tdata->op_key.tdata = tdata;
tdata->op_key.token = token;
tdata->op_key.callback = callback;
{
if (state_ == TlsConnectionState::ESTABLISHED) {
decltype(txBuff_) tx;
size_t size = tdata->buf.cur - tdata->buf.start;
getBuff(tx, (uint8_t*)tdata->buf.start, (uint8_t*)tdata->buf.cur);
{
std::lock_guard<std::mutex> l(outputBuffMtx_);
txBuff_.splice(txBuff_.end(), std::move(tx));
}
tdata->op_key.tdata = nullptr;
if (tdata->op_key.callback)
tdata->op_key.callback(getTransportBase(), token, size);
} else {
std::lock_guard<std::mutex> l(outputBuffMtx_);
tdata->op_key.tdata = tdata;
outputBuff_.emplace_back(DelayedTxData{&tdata->op_key, {}});
if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) {
auto& dtd = outputBuff_.back();
dtd.timeout = clock::now();
dtd.timeout += std::chrono::milliseconds(pjsip_cfg()->tsx.td);
}
canWrite_ = true;
}
canWrite_ = true;
cv_.notify_all();
return PJ_EPENDING;
}
......@@ -954,14 +975,14 @@ pj_status_t
SipsIceTransport::flushOutputBuff()
{
ssize_t status = PJ_SUCCESS;
// send delayed data first
while (true) {
DelayedTxData f;
{
std::lock_guard<std::mutex> l(outputBuffMtx_);
if (outputBuff_.empty()) {
canWrite_ = false;
if (outputBuff_.empty())
break;
}
else {
f = outputBuff_.front();
outputBuff_.pop_front();
......@@ -977,6 +998,30 @@ SipsIceTransport::flushOutputBuff()
if (status < 0)
break;
}
if (status < 0)
return status;
decltype(txBuff_) tx;
{
std::lock_guard<std::mutex> l(outputBuffMtx_);
tx = std::move(txBuff_);
canWrite_ = false;
}
for (auto it = tx.begin(); it != tx.end(); ++it) {
const auto& msg = *it;
const auto nwritten = gnutls_record_send(session_, msg.data(), msg.size());
if (nwritten <= 0) {
RING_ERR("gnutls_record_send: %s", gnutls_strerror(nwritten));
status = tls_status_from_err(nwritten);
{
std::lock_guard<std::mutex> l(outputBuffMtx_);
txBuff_.splice(txBuff_.begin(), tx, it, tx.end());
canWrite_ = true;
}
break;
}
}
putBuff(std::move(tx));
return status > 0 ? PJ_SUCCESS : (pj_status_t)status;
}
......@@ -1019,6 +1064,40 @@ SipsIceTransport::shutdown()
cv_.notify_all();
}
void
SipsIceTransport::getBuff(decltype(buffPool_)& l, const uint8_t* b, const uint8_t* e)
{
std::lock_guard<std::mutex> lk(buffPoolMtx_);
if (buffPool_.empty())
l.emplace_back(b, e);
else {
l.splice(l.end(), buffPool_, buffPool_.cbegin());
auto& buf = l.back();
buf.resize(std::distance(b, e));
std::copy(b, e, buf.begin());
}
}
void
SipsIceTransport::getBuff(decltype(buffPool_)& l, const size_t s)
{
std::lock_guard<std::mutex> lk(buffPoolMtx_);
if (buffPool_.empty())
l.emplace_back(s);
else {
l.splice(l.end(), buffPool_, buffPool_.cbegin());
auto& buf = l.back();
buf.resize(s);
}
}
void
SipsIceTransport::putBuff(decltype(buffPool_)&& l)
{
std::lock_guard<std::mutex> lk(buffPoolMtx_);
buffPool_.splice(buffPool_.end(), l);
}
pj_status_t
SipsIceTransport::tls_status_from_err(int err)
{
......
......@@ -161,14 +161,21 @@ private:
ssize_t trySend(pjsip_tx_data_op_key* tdata);
pj_status_t flushOutputBuff();
std::list<DelayedTxData> outputBuff_;
std::list<std::vector<uint8_t>> txBuff_;
std::mutex outputBuffMtx_;
std::mutex rxMtx_;
std::condition_variable_any rxCv_;
std::list<std::vector<uint8_t>> rxPending_;
std::list<std::vector<uint8_t>> rxPendingPool_;
pjsip_rx_data rdata_;
// data buffer pool
std::list<std::vector<uint8_t>> buffPool_;
std::mutex buffPoolMtx_;
void getBuff(decltype(buffPool_)& l, const uint8_t* b, const uint8_t* e);
void getBuff(decltype(buffPool_)& l, const size_t s);
void putBuff(decltype(buffPool_)&& l);
// GnuTLS <-> ICE
ssize_t tlsSend(const void*, size_t);
ssize_t tlsRecv(void* d , size_t s);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment