diff --git a/src/ringdht/sips_transport_ice.cpp b/src/ringdht/sips_transport_ice.cpp index 2c012d88eb9b75342615e15d46c9bd07736ffa8f..5e89f6eafc008e25afaf0503116e73fbe6af616f 100644 --- a/src/ringdht/sips_transport_ice.cpp +++ b/src/ringdht/sips_transport_ice.cpp @@ -659,34 +659,38 @@ SipsIceTransport::setup() void SipsIceTransport::handleEvents() { + decltype(rxPending_) rx; { std::lock_guard<std::mutex> l(rxMtx_); - while (not rxPending_.empty()) { - auto pck_it = rxPending_.begin(); - auto& pck = *pck_it; - pj_pool_reset(rdata_.tp_info.pool); - pj_gettimeofday(&rdata_.pkt_info.timestamp); - rdata_.pkt_info.len = pck.size(); - std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet); - auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); - if (eaten != rdata_.pkt_info.len) { - // partial sip packet received - auto npck_it = std::next(pck_it); - if (npck_it != rxPending_.end()) { - // drop current packet, merge reminder with next one - auto& npck = *npck_it; - npck.insert(npck.begin(), pck.begin()+eaten, pck.end()); - rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); - } else { - // erase eaten part, keep reminder - pck.erase(pck.begin(), pck.begin()+eaten); - break; - } + rx = std::move(rxPending_); + } + + for (auto it = rx.begin(); it != rx.end(); ++it) { + auto& pck = *it; + pj_pool_reset(rdata_.tp_info.pool); + pj_gettimeofday(&rdata_.pkt_info.timestamp); + rdata_.pkt_info.len = pck.size(); + std::copy_n(pck.data(), pck.size(), rdata_.pkt_info.packet); + auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); + if (eaten != rdata_.pkt_info.len) { + // partial sip packet received + auto npck_it = std::next(it); + if (npck_it != rx.end()) { + // drop current packet, merge reminder with next one + auto& npck = *npck_it; + npck.insert(npck.begin(), pck.begin()+eaten, pck.end()); } else { - rxPendingPool_.splice(rxPendingPool_.end(), rxPending_, pck_it); + // erase eaten part, keep reminder + pck.erase(pck.begin(), pck.begin()+eaten); + { + std::lock_guard<std::mutex> l(rxMtx_); + rxPending_.splice(rxPending_.begin(), rx, it); + } + break; } } } + putBuff(std::move(rx)); rxCv_.notify_all(); } @@ -756,17 +760,19 @@ SipsIceTransport::loop() if (state_ != TlsConnectionState::ESTABLISHED and not getTransportBase()->is_shutdown) return; - while (canRead_) { - std::lock_guard<std::mutex> l(rxMtx_); - if (rxPendingPool_.empty()) - rxPendingPool_.emplace_back(PJSIP_MAX_PKT_LEN); - auto& buf = rxPendingPool_.front(); - buf.resize(PJSIP_MAX_PKT_LEN); + decltype(rxPending_) rx; + while (canRead_ or gnutls_record_check_pending(session_)) { + if (rx.empty()) + getBuff(rx, PJSIP_MAX_PKT_LEN); + auto& buf = rx.front(); const auto decrypted_size = gnutls_record_recv(session_, buf.data(), buf.size()); if (decrypted_size > 0/* || transport error */) { buf.resize(decrypted_size); - rxPending_.splice(rxPending_.end(), rxPendingPool_, rxPendingPool_.begin()); + { + std::lock_guard<std::mutex> l(rxMtx_); + rxPending_.splice(rxPending_.end(), rx, rx.begin()); + } } else if (decrypted_size == 0) { /* EOF */ tlsThread_.stop(); @@ -798,6 +804,7 @@ SipsIceTransport::loop() break; } } + putBuff(std::move(rx)); flushOutputBuff(); } } @@ -830,7 +837,10 @@ SipsIceTransport::clean() rxCv_.wait(l, [&](){ return rxPending_.empty(); }); - rxPendingPool_.clear(); + } + { + std::lock_guard<std::mutex> lk(buffPoolMtx_); + buffPool_.clear(); } bool event = state_ == TlsConnectionState::ESTABLISHED; @@ -933,19 +943,30 @@ SipsIceTransport::send(pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, addr_len==sizeof(pj_sockaddr_in6)), PJ_EINVAL); - tdata->op_key.tdata = tdata; tdata->op_key.token = token; tdata->op_key.callback = callback; - { + if (state_ == TlsConnectionState::ESTABLISHED) { + decltype(txBuff_) tx; + size_t size = tdata->buf.cur - tdata->buf.start; + getBuff(tx, (uint8_t*)tdata->buf.start, (uint8_t*)tdata->buf.cur); + { + std::lock_guard<std::mutex> l(outputBuffMtx_); + txBuff_.splice(txBuff_.end(), std::move(tx)); + } + tdata->op_key.tdata = nullptr; + if (tdata->op_key.callback) + tdata->op_key.callback(getTransportBase(), token, size); + } else { std::lock_guard<std::mutex> l(outputBuffMtx_); + tdata->op_key.tdata = tdata; outputBuff_.emplace_back(DelayedTxData{&tdata->op_key, {}}); if (tdata->msg && tdata->msg->type == PJSIP_REQUEST_MSG) { auto& dtd = outputBuff_.back(); dtd.timeout = clock::now(); dtd.timeout += std::chrono::milliseconds(pjsip_cfg()->tsx.td); } - canWrite_ = true; } + canWrite_ = true; cv_.notify_all(); return PJ_EPENDING; } @@ -954,14 +975,14 @@ pj_status_t SipsIceTransport::flushOutputBuff() { ssize_t status = PJ_SUCCESS; + + // send delayed data first while (true) { DelayedTxData f; { std::lock_guard<std::mutex> l(outputBuffMtx_); - if (outputBuff_.empty()) { - canWrite_ = false; + if (outputBuff_.empty()) break; - } else { f = outputBuff_.front(); outputBuff_.pop_front(); @@ -977,6 +998,30 @@ SipsIceTransport::flushOutputBuff() if (status < 0) break; } + if (status < 0) + return status; + + decltype(txBuff_) tx; + { + std::lock_guard<std::mutex> l(outputBuffMtx_); + tx = std::move(txBuff_); + canWrite_ = false; + } + for (auto it = tx.begin(); it != tx.end(); ++it) { + const auto& msg = *it; + const auto nwritten = gnutls_record_send(session_, msg.data(), msg.size()); + if (nwritten <= 0) { + RING_ERR("gnutls_record_send: %s", gnutls_strerror(nwritten)); + status = tls_status_from_err(nwritten); + { + std::lock_guard<std::mutex> l(outputBuffMtx_); + txBuff_.splice(txBuff_.begin(), tx, it, tx.end()); + canWrite_ = true; + } + break; + } + } + putBuff(std::move(tx)); return status > 0 ? PJ_SUCCESS : (pj_status_t)status; } @@ -1019,6 +1064,40 @@ SipsIceTransport::shutdown() cv_.notify_all(); } +void +SipsIceTransport::getBuff(decltype(buffPool_)& l, const uint8_t* b, const uint8_t* e) +{ + std::lock_guard<std::mutex> lk(buffPoolMtx_); + if (buffPool_.empty()) + l.emplace_back(b, e); + else { + l.splice(l.end(), buffPool_, buffPool_.cbegin()); + auto& buf = l.back(); + buf.resize(std::distance(b, e)); + std::copy(b, e, buf.begin()); + } +} + +void +SipsIceTransport::getBuff(decltype(buffPool_)& l, const size_t s) +{ + std::lock_guard<std::mutex> lk(buffPoolMtx_); + if (buffPool_.empty()) + l.emplace_back(s); + else { + l.splice(l.end(), buffPool_, buffPool_.cbegin()); + auto& buf = l.back(); + buf.resize(s); + } +} + +void +SipsIceTransport::putBuff(decltype(buffPool_)&& l) +{ + std::lock_guard<std::mutex> lk(buffPoolMtx_); + buffPool_.splice(buffPool_.end(), l); +} + pj_status_t SipsIceTransport::tls_status_from_err(int err) { diff --git a/src/ringdht/sips_transport_ice.h b/src/ringdht/sips_transport_ice.h index 291d1a800119023ad496933e05ab9d9af8111be3..f6d5af410d26e2e9ac301526631fac11266bddf7 100644 --- a/src/ringdht/sips_transport_ice.h +++ b/src/ringdht/sips_transport_ice.h @@ -161,14 +161,21 @@ private: ssize_t trySend(pjsip_tx_data_op_key* tdata); pj_status_t flushOutputBuff(); std::list<DelayedTxData> outputBuff_; + std::list<std::vector<uint8_t>> txBuff_; std::mutex outputBuffMtx_; std::mutex rxMtx_; std::condition_variable_any rxCv_; std::list<std::vector<uint8_t>> rxPending_; - std::list<std::vector<uint8_t>> rxPendingPool_; pjsip_rx_data rdata_; + // data buffer pool + std::list<std::vector<uint8_t>> buffPool_; + std::mutex buffPoolMtx_; + void getBuff(decltype(buffPool_)& l, const uint8_t* b, const uint8_t* e); + void getBuff(decltype(buffPool_)& l, const size_t s); + void putBuff(decltype(buffPool_)&& l); + // GnuTLS <-> ICE ssize_t tlsSend(const void*, size_t); ssize_t tlsRecv(void* d , size_t s);