From 989e9bdbed1364a2eec8ab6089d3df2ff4754f08 Mon Sep 17 00:00:00 2001 From: Eloi BAIL <eloi.bail@savoirfairelinux.com> Date: Wed, 19 Aug 2015 22:58:53 -0400 Subject: [PATCH] socket_pair: send and receive RTCP packets This patchset allows to send RTCP packets. Ice callbacks are used to read RTP and RTCP packets. Synchronisation mechanism is added between libav reading callbacks and ice callbacks Issue: #78983 Change-Id: Iafef224551fec1d2794b0ef3b2ee885d9ca75fb5 Signed-off-by: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> --- src/ice_socket.h | 2 + src/ice_transport.cpp | 8 ++ src/media/socket_pair.cpp | 267 ++++++++++++++++++++------------------ src/media/socket_pair.h | 8 +- 4 files changed, 158 insertions(+), 127 deletions(-) diff --git a/src/ice_socket.h b/src/ice_socket.h index bd47c2d28b..e055f1479b 100644 --- a/src/ice_socket.h +++ b/src/ice_socket.h @@ -34,6 +34,7 @@ namespace ring { class IceTransport; +using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>; class IceSocket { @@ -50,6 +51,7 @@ class IceSocket ssize_t send(const unsigned char* buf, size_t len); ssize_t getNextPacketSize() const; ssize_t waitForData(unsigned int timeout); + void setOnRecv(IceRecvCb cb); }; }; diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 140ded90c1..3ef33b38b7 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -956,4 +956,12 @@ IceSocket::waitForData(unsigned int timeout) return ice_transport_->waitForData(compId_, timeout); } +void +IceSocket::setOnRecv(IceRecvCb cb) +{ + if (!ice_transport_.get()) + return; + return ice_transport_->setOnRecv(compId_, cb); +} + } // namespace ring diff --git a/src/media/socket_pair.cpp b/src/media/socket_pair.cpp index e93747aa67..47d12b11af 100644 --- a/src/media/socket_pair.cpp +++ b/src/media/socket_pair.cpp @@ -37,6 +37,11 @@ #include "libav_utils.h" #include "logger.h" +#include <iostream> +#include <string> +#include <algorithm> +#include <iterator> + extern "C" { #include "srtp.h" } @@ -64,7 +69,7 @@ extern "C" { namespace ring { static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */ -static constexpr int RTP_MAX_PACKET_LENGTH = 8192; +static constexpr int RTP_MAX_PACKET_LENGTH = 2048; class SRTPProtoContext { public: @@ -211,13 +216,29 @@ SocketPair::SocketPair(std::unique_ptr<IceSocket> rtp_sock, , rtpDestAddrLen_() , rtcpDestAddr_() , rtcpDestAddrLen_() -{} +{ + auto queueRtpPacket = [this](uint8_t* buf, size_t len) { + std::lock_guard<std::mutex> l(dataBuffMutex_); + rtpDataBuff_.emplace_back(buf, buf+len); + cv_.notify_one(); + return len; + }; + + auto queueRtcpPacket = [this](uint8_t* buf, size_t len) { + std::lock_guard<std::mutex> l(dataBuffMutex_); + rtcpDataBuff_.emplace_back(buf, buf+len); + cv_.notify_one(); + return len; + }; + + rtp_sock_->setOnRecv(queueRtpPacket); + rtcp_sock_->setOnRecv(queueRtcpPacket); +} SocketPair::~SocketPair() { - interrupted_ = true; - if (rtpHandle_ >= 0) - closeSockets(); + interrupt(); + closeSockets(); } void @@ -231,6 +252,9 @@ void SocketPair::interrupt() { interrupted_ = true; + if (rtp_sock_) rtp_sock_->setOnRecv(nullptr); + if (rtcp_sock_) rtcp_sock_->setOnRecv(nullptr); + cv_.notify_all(); } void @@ -284,187 +308,180 @@ SocketPair::createIOContext() int SocketPair::waitForData() { + // System sockets if (rtpHandle_ >= 0) { - // work with system socket - struct pollfd p[2] = { {rtpHandle_, POLLIN, 0}, - {rtcpHandle_, POLLIN, 0} }; - return poll(p, 2, NET_POLL_TIMEOUT); + int ret; + do { + if (interrupted_) { + errno = EINTR; + return -1; + } + + // work with system socket + struct pollfd p[2] = { {rtpHandle_, POLLIN, 0}, + {rtcpHandle_, POLLIN, 0} }; + ret = poll(p, 2, NET_POLL_TIMEOUT); + } while (ret < 0 and errno == EAGAIN); + + return ret; } // work with IceSocket - auto result = rtp_sock_->waitForData(NET_POLL_TIMEOUT); - if (result < 0) { - errno = EIO; + { + std::unique_lock<std::mutex> lk(dataBuffMutex_); + cv_.wait(lk, [this]{ return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty(); }); + } + + if (interrupted_) { + errno = EINTR; return -1; } - return result; + return 0; } int SocketPair::readRtpData(void* buf, int buf_size) { - auto data = static_cast<uint8_t*>(buf); - + // handle system socket if (rtpHandle_ >= 0) { - // work with system socket struct sockaddr_storage from; socklen_t from_len = sizeof(from); - -start: - int result = recvfrom(rtpHandle_, static_cast<char*>(buf), buf_size, 0, - reinterpret_cast<struct sockaddr *>(&from), &from_len); - if (result > 0 and srtpContext_ and srtpContext_->srtp_in.aes) - if (ff_srtp_decrypt(&srtpContext_->srtp_in, data, &result) < 0) - goto start; // XXX: see libavformat/srtpproto.c - - return result; + return recvfrom(rtpHandle_, static_cast<char*>(buf), buf_size, 0, + reinterpret_cast<struct sockaddr*>(&from), &from_len); } - // work with IceSocket -start_ice: - int result = rtp_sock_->recv(static_cast<unsigned char*>(buf), buf_size); - if (result > 0 and srtpContext_ and srtpContext_->srtp_in.aes) { - if (ff_srtp_decrypt(&srtpContext_->srtp_in, data, &result) < 0) - goto start_ice; - } else if (result < 0) { - errno = EIO; - return -1; - } else if (result == 0) { - errno = EAGAIN; - return -1; + // handle ICE + std::unique_lock<std::mutex> lk(dataBuffMutex_); + if (not rtpDataBuff_.empty()) { + auto pkt = rtpDataBuff_.front(); + rtpDataBuff_.pop_front(); + lk.unlock(); // to not block our ICE callbacks + int pkt_size = pkt.size(); + int len = std::min(pkt_size, buf_size); + std::copy_n(pkt.begin(), len, static_cast<char*>(buf)); + return len; } - return result; + return 0; } int SocketPair::readRtcpData(void* buf, int buf_size) { + // handle system socket if (rtcpHandle_ >= 0) { - // work with system socket struct sockaddr_storage from; socklen_t from_len = sizeof(from); - return recvfrom(rtcpHandle_,(char*) buf, buf_size, 0, - (struct sockaddr *)&from, &from_len); + return recvfrom(rtcpHandle_, static_cast<char*>(buf), buf_size, 0, + reinterpret_cast<struct sockaddr*>(&from), &from_len); } - // work with IceSocket - auto result = rtcp_sock_->recv(static_cast<unsigned char*>(buf), buf_size); - if (result < 0) { - errno = EIO; - return -1; + // handle ICE + std::unique_lock<std::mutex> lk(dataBuffMutex_); + if (not rtcpDataBuff_.empty()) { + auto pkt = rtcpDataBuff_.front(); + rtcpDataBuff_.pop_front(); + lk.unlock(); + int pkt_size = pkt.size(); + int len = std::min(pkt_size, buf_size); + std::copy_n(pkt.begin(), len, static_cast<char*>(buf)); + return len; } - if (result == 0) { - errno = EAGAIN; - return -1; - } - return result; + + return 0; } int -SocketPair::writeRtpData(void* buf, int buf_size) +SocketPair::readCallback(uint8_t* buf, int buf_size) { - auto data = static_cast<const uint8_t*>(buf); - - if (rtpHandle_ >= 0) { - auto ret = ff_network_wait_fd(rtpHandle_); + while (1) { + auto ret = waitForData(); if (ret < 0) return ret; - if (srtpContext_ and srtpContext_->srtp_out.aes) { - buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out, data, - buf_size, srtpContext_->encryptbuf, - sizeof(srtpContext_->encryptbuf)); - if (buf_size < 0) - return buf_size; + // Priority to RTCP as its less invasive in bandwidth + auto len = readRtcpData(buf, buf_size); + bool fromRTCP = true; - return sendto(rtpHandle_, (char*)srtpContext_->encryptbuf, buf_size, 0, - (sockaddr*) &rtpDestAddr_, rtpDestAddrLen_); + // No RTCP... try RTP + if (len == 0) { + len = readRtpData(buf, buf_size); + fromRTCP = false; } - return sendto(rtpHandle_, (char*)buf, buf_size, 0, - (sockaddr*) &rtpDestAddr_, rtpDestAddrLen_); + if (len < 0) + return len; + + // SRTP decrypt + if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) { + auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len); + if (err < 0) + RING_WARN("decrypt error %d", err); + } + + return len; } +} - // work with IceSocket - if (srtpContext_ and srtpContext_->srtp_out.aes) { - buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out, data, +int +SocketPair::writeData(uint8_t* buf, int buf_size) +{ + bool isRTCP = RTP_PT_IS_RTCP(buf[1]); + + // Encrypt? + if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) { + buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out, buf, buf_size, srtpContext_->encryptbuf, sizeof(srtpContext_->encryptbuf)); - if (buf_size < 0) + if (buf_size < 0) { + RING_WARN("encrypt error %d", buf_size); return buf_size; + } buf = srtpContext_->encryptbuf; } - return rtp_sock_->send(static_cast<unsigned char*>(buf), buf_size); -} + // System sockets? + if (rtpHandle_ >= 0) { + int fd; + sockaddr_storage dest_addr; + socklen_t dest_addr_len; + + if (isRTCP) { + fd = rtcpHandle_; + dest_addr = rtcpDestAddr_; + dest_addr_len = rtcpDestAddrLen_; + } else { + fd = rtpHandle_; + dest_addr = rtpDestAddr_; + dest_addr_len = rtpDestAddrLen_; + } -int -SocketPair::writeRtcpData(void* buf, int buf_size) -{ - if (rtcpHandle_ >= 0) { - auto ret = ff_network_wait_fd(rtcpHandle_); + auto ret = ff_network_wait_fd(fd); if (ret < 0) return ret; - return sendto(rtcpHandle_,(char*) buf, buf_size, 0, - (sockaddr*) &rtcpDestAddr_, rtcpDestAddrLen_); - } - - // work with IceSocket - return rtcp_sock_->send(static_cast<unsigned char*>(buf), buf_size); -} -int -SocketPair::readCallback(uint8_t* buf, int buf_size) -{ -retry: - if (interrupted_) { - RING_ERR("interrupted"); - return -EINTR; - } - - if (waitForData() < 0) { - if (errno == EINTR) - goto retry; - return -EIO; - } - - /* RTP */ - int len = readRtpData(buf, buf_size); - if (len < 0) { - if (errno == EAGAIN or errno == EINTR) - goto retry; - return -EIO; + return sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, + reinterpret_cast<sockaddr*>(&dest_addr), dest_addr_len); } - return len; + // IceSocket + if (isRTCP) + return rtcp_sock_->send(buf, buf_size); + else + return rtp_sock_->send(buf, buf_size); } int SocketPair::writeCallback(uint8_t* buf, int buf_size) { int ret; - -retry: - if (RTP_PT_IS_RTCP(buf[1])) { - /* RTCP payload type */ - ret = writeRtcpData(buf, buf_size); - if (ret < 0) { - if (errno == EAGAIN) - goto retry; - return ret; - } - } else { - /* RTP payload type */ - ret = writeRtpData(buf, buf_size); - if (ret < 0) { - if (errno == EAGAIN) - goto retry; - return ret; - } - } + do { + if (interrupted_) + return -EINTR; + ret = writeData(buf, buf_size); + } while (ret < 0 and errno == EAGAIN); return ret < 0 ? -errno : ret; } diff --git a/src/media/socket_pair.h b/src/media/socket_pair.h index 7a12eb6ead..bdada9d644 100644 --- a/src/media/socket_pair.h +++ b/src/media/socket_pair.h @@ -102,8 +102,12 @@ class SocketPair { int waitForData(); int readRtpData(void* buf, int buf_size); int readRtcpData(void* buf, int buf_size); - int writeRtpData(void* buf, int buf_size); - int writeRtcpData(void* buf, int buf_size); + int writeData(uint8_t* buf, int buf_size); + + std::mutex dataBuffMutex_; + std::condition_variable cv_; + std::list<std::vector<uint8_t>> rtpDataBuff_; + std::list<std::vector<uint8_t>> rtcpDataBuff_; std::unique_ptr<IceSocket> rtp_sock_; std::unique_ptr<IceSocket> rtcp_sock_; -- GitLab