diff --git a/src/ice_socket.h b/src/ice_socket.h index bd47c2d28b41110b74b20dce54fd9f915c4c00bd..e055f1479b51c923bf899870a56b851622d45df9 100644 --- a/src/ice_socket.h +++ b/src/ice_socket.h @@ -34,6 +34,7 @@ namespace ring { class IceTransport; +using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>; class IceSocket { @@ -50,6 +51,7 @@ class IceSocket ssize_t send(const unsigned char* buf, size_t len); ssize_t getNextPacketSize() const; ssize_t waitForData(unsigned int timeout); + void setOnRecv(IceRecvCb cb); }; }; diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 140ded90c11236c04c336ca77e5ebba9888558b6..3ef33b38b7cf30d36986882ad96605f90a1b233d 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -956,4 +956,12 @@ IceSocket::waitForData(unsigned int timeout) return ice_transport_->waitForData(compId_, timeout); } +void +IceSocket::setOnRecv(IceRecvCb cb) +{ + if (!ice_transport_.get()) + return; + return ice_transport_->setOnRecv(compId_, cb); +} + } // namespace ring diff --git a/src/media/socket_pair.cpp b/src/media/socket_pair.cpp index e93747aa679ab22114eb94ae2388550d4982bccc..47d12b11afeab139ad5f1f721cb7fc16975e321e 100644 --- a/src/media/socket_pair.cpp +++ b/src/media/socket_pair.cpp @@ -37,6 +37,11 @@ #include "libav_utils.h" #include "logger.h" +#include <iostream> +#include <string> +#include <algorithm> +#include <iterator> + extern "C" { #include "srtp.h" } @@ -64,7 +69,7 @@ extern "C" { namespace ring { static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */ -static constexpr int RTP_MAX_PACKET_LENGTH = 8192; +static constexpr int RTP_MAX_PACKET_LENGTH = 2048; class SRTPProtoContext { public: @@ -211,13 +216,29 @@ SocketPair::SocketPair(std::unique_ptr<IceSocket> rtp_sock, , rtpDestAddrLen_() , rtcpDestAddr_() , rtcpDestAddrLen_() -{} +{ + auto queueRtpPacket = [this](uint8_t* buf, size_t len) { + std::lock_guard<std::mutex> l(dataBuffMutex_); + rtpDataBuff_.emplace_back(buf, buf+len); + cv_.notify_one(); + return len; + }; + + auto queueRtcpPacket = [this](uint8_t* buf, size_t len) { + std::lock_guard<std::mutex> l(dataBuffMutex_); + rtcpDataBuff_.emplace_back(buf, buf+len); + cv_.notify_one(); + return len; + }; + + rtp_sock_->setOnRecv(queueRtpPacket); + rtcp_sock_->setOnRecv(queueRtcpPacket); +} SocketPair::~SocketPair() { - interrupted_ = true; - if (rtpHandle_ >= 0) - closeSockets(); + interrupt(); + closeSockets(); } void @@ -231,6 +252,9 @@ void SocketPair::interrupt() { interrupted_ = true; + if (rtp_sock_) rtp_sock_->setOnRecv(nullptr); + if (rtcp_sock_) rtcp_sock_->setOnRecv(nullptr); + cv_.notify_all(); } void @@ -284,187 +308,180 @@ SocketPair::createIOContext() int SocketPair::waitForData() { + // System sockets if (rtpHandle_ >= 0) { - // work with system socket - struct pollfd p[2] = { {rtpHandle_, POLLIN, 0}, - {rtcpHandle_, POLLIN, 0} }; - return poll(p, 2, NET_POLL_TIMEOUT); + int ret; + do { + if (interrupted_) { + errno = EINTR; + return -1; + } + + // work with system socket + struct pollfd p[2] = { {rtpHandle_, POLLIN, 0}, + {rtcpHandle_, POLLIN, 0} }; + ret = poll(p, 2, NET_POLL_TIMEOUT); + } while (ret < 0 and errno == EAGAIN); + + return ret; } // work with IceSocket - auto result = rtp_sock_->waitForData(NET_POLL_TIMEOUT); - if (result < 0) { - errno = EIO; + { + std::unique_lock<std::mutex> lk(dataBuffMutex_); + cv_.wait(lk, [this]{ return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty(); }); + } + + if (interrupted_) { + errno = EINTR; return -1; } - return result; + return 0; } int SocketPair::readRtpData(void* buf, int buf_size) { - auto data = static_cast<uint8_t*>(buf); - + // handle system socket if (rtpHandle_ >= 0) { - // work with system socket struct sockaddr_storage from; socklen_t from_len = sizeof(from); - -start: - int result = recvfrom(rtpHandle_, static_cast<char*>(buf), buf_size, 0, - reinterpret_cast<struct sockaddr *>(&from), &from_len); - if (result > 0 and srtpContext_ and srtpContext_->srtp_in.aes) - if (ff_srtp_decrypt(&srtpContext_->srtp_in, data, &result) < 0) - goto start; // XXX: see libavformat/srtpproto.c - - return result; + return recvfrom(rtpHandle_, static_cast<char*>(buf), buf_size, 0, + reinterpret_cast<struct sockaddr*>(&from), &from_len); } - // work with IceSocket -start_ice: - int result = rtp_sock_->recv(static_cast<unsigned char*>(buf), buf_size); - if (result > 0 and srtpContext_ and srtpContext_->srtp_in.aes) { - if (ff_srtp_decrypt(&srtpContext_->srtp_in, data, &result) < 0) - goto start_ice; - } else if (result < 0) { - errno = EIO; - return -1; - } else if (result == 0) { - errno = EAGAIN; - return -1; + // handle ICE + std::unique_lock<std::mutex> lk(dataBuffMutex_); + if (not rtpDataBuff_.empty()) { + auto pkt = rtpDataBuff_.front(); + rtpDataBuff_.pop_front(); + lk.unlock(); // to not block our ICE callbacks + int pkt_size = pkt.size(); + int len = std::min(pkt_size, buf_size); + std::copy_n(pkt.begin(), len, static_cast<char*>(buf)); + return len; } - return result; + return 0; } int SocketPair::readRtcpData(void* buf, int buf_size) { + // handle system socket if (rtcpHandle_ >= 0) { - // work with system socket struct sockaddr_storage from; socklen_t from_len = sizeof(from); - return recvfrom(rtcpHandle_,(char*) buf, buf_size, 0, - (struct sockaddr *)&from, &from_len); + return recvfrom(rtcpHandle_, static_cast<char*>(buf), buf_size, 0, + reinterpret_cast<struct sockaddr*>(&from), &from_len); } - // work with IceSocket - auto result = rtcp_sock_->recv(static_cast<unsigned char*>(buf), buf_size); - if (result < 0) { - errno = EIO; - return -1; + // handle ICE + std::unique_lock<std::mutex> lk(dataBuffMutex_); + if (not rtcpDataBuff_.empty()) { + auto pkt = rtcpDataBuff_.front(); + rtcpDataBuff_.pop_front(); + lk.unlock(); + int pkt_size = pkt.size(); + int len = std::min(pkt_size, buf_size); + std::copy_n(pkt.begin(), len, static_cast<char*>(buf)); + return len; } - if (result == 0) { - errno = EAGAIN; - return -1; - } - return result; + + return 0; } int -SocketPair::writeRtpData(void* buf, int buf_size) +SocketPair::readCallback(uint8_t* buf, int buf_size) { - auto data = static_cast<const uint8_t*>(buf); - - if (rtpHandle_ >= 0) { - auto ret = ff_network_wait_fd(rtpHandle_); + while (1) { + auto ret = waitForData(); if (ret < 0) return ret; - if (srtpContext_ and srtpContext_->srtp_out.aes) { - buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out, data, - buf_size, srtpContext_->encryptbuf, - sizeof(srtpContext_->encryptbuf)); - if (buf_size < 0) - return buf_size; + // Priority to RTCP as its less invasive in bandwidth + auto len = readRtcpData(buf, buf_size); + bool fromRTCP = true; - return sendto(rtpHandle_, (char*)srtpContext_->encryptbuf, buf_size, 0, - (sockaddr*) &rtpDestAddr_, rtpDestAddrLen_); + // No RTCP... try RTP + if (len == 0) { + len = readRtpData(buf, buf_size); + fromRTCP = false; } - return sendto(rtpHandle_, (char*)buf, buf_size, 0, - (sockaddr*) &rtpDestAddr_, rtpDestAddrLen_); + if (len < 0) + return len; + + // SRTP decrypt + if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) { + auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len); + if (err < 0) + RING_WARN("decrypt error %d", err); + } + + return len; } +} - // work with IceSocket - if (srtpContext_ and srtpContext_->srtp_out.aes) { - buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out, data, +int +SocketPair::writeData(uint8_t* buf, int buf_size) +{ + bool isRTCP = RTP_PT_IS_RTCP(buf[1]); + + // Encrypt? + if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) { + buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out, buf, buf_size, srtpContext_->encryptbuf, sizeof(srtpContext_->encryptbuf)); - if (buf_size < 0) + if (buf_size < 0) { + RING_WARN("encrypt error %d", buf_size); return buf_size; + } buf = srtpContext_->encryptbuf; } - return rtp_sock_->send(static_cast<unsigned char*>(buf), buf_size); -} + // System sockets? + if (rtpHandle_ >= 0) { + int fd; + sockaddr_storage dest_addr; + socklen_t dest_addr_len; + + if (isRTCP) { + fd = rtcpHandle_; + dest_addr = rtcpDestAddr_; + dest_addr_len = rtcpDestAddrLen_; + } else { + fd = rtpHandle_; + dest_addr = rtpDestAddr_; + dest_addr_len = rtpDestAddrLen_; + } -int -SocketPair::writeRtcpData(void* buf, int buf_size) -{ - if (rtcpHandle_ >= 0) { - auto ret = ff_network_wait_fd(rtcpHandle_); + auto ret = ff_network_wait_fd(fd); if (ret < 0) return ret; - return sendto(rtcpHandle_,(char*) buf, buf_size, 0, - (sockaddr*) &rtcpDestAddr_, rtcpDestAddrLen_); - } - - // work with IceSocket - return rtcp_sock_->send(static_cast<unsigned char*>(buf), buf_size); -} -int -SocketPair::readCallback(uint8_t* buf, int buf_size) -{ -retry: - if (interrupted_) { - RING_ERR("interrupted"); - return -EINTR; - } - - if (waitForData() < 0) { - if (errno == EINTR) - goto retry; - return -EIO; - } - - /* RTP */ - int len = readRtpData(buf, buf_size); - if (len < 0) { - if (errno == EAGAIN or errno == EINTR) - goto retry; - return -EIO; + return sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, + reinterpret_cast<sockaddr*>(&dest_addr), dest_addr_len); } - return len; + // IceSocket + if (isRTCP) + return rtcp_sock_->send(buf, buf_size); + else + return rtp_sock_->send(buf, buf_size); } int SocketPair::writeCallback(uint8_t* buf, int buf_size) { int ret; - -retry: - if (RTP_PT_IS_RTCP(buf[1])) { - /* RTCP payload type */ - ret = writeRtcpData(buf, buf_size); - if (ret < 0) { - if (errno == EAGAIN) - goto retry; - return ret; - } - } else { - /* RTP payload type */ - ret = writeRtpData(buf, buf_size); - if (ret < 0) { - if (errno == EAGAIN) - goto retry; - return ret; - } - } + do { + if (interrupted_) + return -EINTR; + ret = writeData(buf, buf_size); + } while (ret < 0 and errno == EAGAIN); return ret < 0 ? -errno : ret; } diff --git a/src/media/socket_pair.h b/src/media/socket_pair.h index 7a12eb6ead631c531ff749fab5c0a140daf22e0e..bdada9d644a0e999651b5d137a8911ba769e9782 100644 --- a/src/media/socket_pair.h +++ b/src/media/socket_pair.h @@ -102,8 +102,12 @@ class SocketPair { int waitForData(); int readRtpData(void* buf, int buf_size); int readRtcpData(void* buf, int buf_size); - int writeRtpData(void* buf, int buf_size); - int writeRtcpData(void* buf, int buf_size); + int writeData(uint8_t* buf, int buf_size); + + std::mutex dataBuffMutex_; + std::condition_variable cv_; + std::list<std::vector<uint8_t>> rtpDataBuff_; + std::list<std::vector<uint8_t>> rtcpDataBuff_; std::unique_ptr<IceSocket> rtp_sock_; std::unique_ptr<IceSocket> rtcp_sock_;