Select Git revision
socket_pair.cpp
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
socket_pair.cpp 10.02 KiB
/*
* Copyright (C) 2004-2015 Savoir-Faire Linux Inc.
* Copyright (c) 2002 Fabrice Bellard
*
* Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Additional permission under GNU GPL version 3 section 7:
*
* If you modify this program, or any covered work, by linking or
* combining it with the OpenSSL project's OpenSSL library (or a
* modified version of that library), containing parts covered by the
* terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
* grants you additional permission to convey the resulting work.
* Corresponding Source for a non-source form of such a combination
* shall include the source code for the parts of OpenSSL used as well
* as that of the covered work.
*/
#include "libav_deps.h"
#include "socket_pair.h"
#include "ice_socket.h"
#include "libav_utils.h"
#include "logger.h"
#include <cstring>
#include <stdexcept>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#ifdef __ANDROID__
#include <asm-generic/fcntl.h>
#define SOCK_NONBLOCK O_NONBLOCK
#endif
#ifdef __APPLE__
#include <fcntl.h>
#define SOCK_NONBLOCK O_NONBLOCK
#endif
static const int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
static int
ff_network_wait_fd(int fd)
{
struct pollfd p = { fd, POLLOUT, 0 };
int ret;
ret = poll(&p, 1, NET_POLL_TIMEOUT);
return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
}
static struct
addrinfo* udp_resolve_host(const char *node, int service)
{
struct addrinfo hints, *res = 0;
int error;
char sport[16];
memset(&hints, 0, sizeof(hints));
snprintf(sport, sizeof(sport), "%d", service);
hints.ai_socktype = SOCK_DGRAM;
hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_PASSIVE;
if ((error = getaddrinfo(node, sport, &hints, &res))) {
res = NULL;
RING_ERR("%s\n", gai_strerror(error));
}
return res;
}
static unsigned
udp_set_url(struct sockaddr_storage *addr, const char *hostname, int port)
{
struct addrinfo *res0;
int addr_len;
res0 = udp_resolve_host(hostname, port);
if (res0 == 0) return 0;
memcpy(addr, res0->ai_addr, res0->ai_addrlen);
addr_len = res0->ai_addrlen;
freeaddrinfo(res0);
return addr_len;
}
static int
udp_socket_create(sockaddr_storage *addr, socklen_t *addr_len, int local_port)
{
int udp_fd = -1;
struct addrinfo *res0 = NULL, *res = NULL;
res0 = udp_resolve_host(0, local_port);
if (res0 == 0)
return -1;
for (res = res0; res; res=res->ai_next) {
udp_fd = socket(res->ai_family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
if (udp_fd != -1) break;
RING_ERR("socket error");
}
if (udp_fd < 0) {
freeaddrinfo(res0);
return -1;
}
memcpy(addr, res->ai_addr, res->ai_addrlen);
*addr_len = res->ai_addrlen;
// bind socket so that we send from and receive
// on local port
if (bind(udp_fd, reinterpret_cast<sockaddr*>(addr), *addr_len) < 0) {
RING_ERR("Bind failed");
strErr();
close(udp_fd);
udp_fd = -1;
}
freeaddrinfo(res0);
return udp_fd;
}
namespace ring {
using std::string;
static const int RTP_BUFFER_SIZE = 1472;
SocketPair::SocketPair(const char *uri, int localPort)
: rtp_sock_()
, rtcp_sock_()
, rtcpWriteMutex_()
, rtpDestAddr_()
, rtpDestAddrLen_()
, rtcpDestAddr_()
, rtcpDestAddrLen_()
{
openSockets(uri, localPort);
}
SocketPair::SocketPair(std::unique_ptr<ring::IceSocket> rtp_sock,
std::unique_ptr<ring::IceSocket> rtcp_sock)
: rtp_sock_(std::move(rtp_sock))
, rtcp_sock_(std::move(rtcp_sock))
, rtcpWriteMutex_()
, rtpDestAddr_()
, rtpDestAddrLen_()
, rtcpDestAddr_()
, rtcpDestAddrLen_()
{}
SocketPair::~SocketPair()
{
interrupted_ = true;
if (rtpHandle_ >= 0)
closeSockets();
}
void SocketPair::interrupt()
{
interrupted_ = true;
}
void SocketPair::closeSockets()
{
if (rtcpHandle_ > 0 and close(rtcpHandle_))
strErr();
if (rtpHandle_ > 0 and close(rtpHandle_))
strErr();
}
void SocketPair::openSockets(const char *uri, int local_rtp_port)
{
char hostname[256];
char path[1024];
int rtp_port;
libav_utils::sfl_url_split(uri, hostname, sizeof(hostname), &rtp_port, path,
sizeof(path));
const int rtcp_port = rtp_port + 1;
const int local_rtcp_port = local_rtp_port + 1;
sockaddr_storage rtp_addr, rtcp_addr;
socklen_t rtp_len, rtcp_len;
// Open sockets and store addresses for sending
if ((rtpHandle_ = udp_socket_create(&rtp_addr, &rtp_len, local_rtp_port)) == -1 or
(rtcpHandle_ = udp_socket_create(&rtcp_addr, &rtcp_len, local_rtcp_port)) == -1 or
(rtpDestAddrLen_ = udp_set_url(&rtpDestAddr_, hostname, rtp_port)) == 0 or
(rtcpDestAddrLen_ = udp_set_url(&rtcpDestAddr_, hostname, rtcp_port)) == 0) {
// Handle failed socket creation
closeSockets();
throw std::runtime_error("Socket creation failed");
}
RING_WARN("SocketPair: local{%d,%d}, remote{%d,%d}",
local_rtp_port, local_rtcp_port, rtp_port, rtcp_port);
}
MediaIOHandle* SocketPair::createIOContext()
{
return new MediaIOHandle(RTP_BUFFER_SIZE, true,
&readCallback, &writeCallback, 0,
reinterpret_cast<void*>(this));
}
int
SocketPair::waitForData()
{
if (rtpHandle_ >= 0) {
// work with system socket
struct pollfd p[2] = { {rtpHandle_, POLLIN, 0},
{rtcpHandle_, POLLIN, 0} };
return poll(p, 2, NET_POLL_TIMEOUT);
}
// work with IceSocket
auto result = rtp_sock_->waitForData(NET_POLL_TIMEOUT);
if (result < 0) {
errno = EIO;
return -1;
}
return result;
}
int
SocketPair::readRtpData(void *buf, int buf_size)
{
if (rtpHandle_ >= 0) {
// work with system socket
struct sockaddr_storage from;
socklen_t from_len = sizeof(from);
auto result = recvfrom(rtpHandle_, buf, buf_size, 0,
(struct sockaddr *)&from, &from_len);
return result;
}
// work with IceSocket
auto result = rtp_sock_->recv(static_cast<unsigned char*>(buf), buf_size);
if (result < 0) {
errno = EIO;
return -1;
}
if (result == 0) {
errno = EAGAIN;
return -1;
}
return result;
}
int
SocketPair::readRtcpData(void *buf, int buf_size)
{
if (rtcpHandle_ >= 0) {
// work with system socket
struct sockaddr_storage from;
socklen_t from_len = sizeof(from);
return recvfrom(rtcpHandle_, buf, buf_size, 0,
(struct sockaddr *)&from, &from_len);
}
// work with IceSocket
auto result = rtcp_sock_->recv(static_cast<unsigned char*>(buf), buf_size);
if (result < 0) {
errno = EIO;
return -1;
}
if (result == 0) {
errno = EAGAIN;
return -1;
}
return result;
}
int
SocketPair::writeRtpData(void *buf, int buf_size)
{
if (rtpHandle_ >= 0) {
auto ret = ff_network_wait_fd(rtpHandle_);
if (ret < 0)
return ret;
return sendto(rtpHandle_, buf, buf_size, 0,
(sockaddr*) &rtpDestAddr_, rtpDestAddrLen_);
}
// work with IceSocket
return rtp_sock_->send(static_cast<unsigned char*>(buf), buf_size);
}
int
SocketPair::writeRtcpData(void *buf, int buf_size)
{
std::lock_guard<std::mutex> lock(rtcpWriteMutex_);
if (rtcpHandle_ >= 0) {
auto ret = ff_network_wait_fd(rtcpHandle_);
if (ret < 0)
return ret;
return sendto(rtcpHandle_, buf, buf_size, 0,
(sockaddr*) &rtcpDestAddr_, rtcpDestAddrLen_);
}
// work with IceSocket
return rtcp_sock_->send(static_cast<unsigned char*>(buf), buf_size);
}
int SocketPair::readCallback(void *opaque, uint8_t *buf, int buf_size)
{
SocketPair *context = static_cast<SocketPair*>(opaque);
retry:
if (context->interrupted_) {
RING_ERR("interrupted");
return -EINTR;
}
if (context->waitForData() < 0) {
if (errno == EINTR)
goto retry;
return -EIO;
}
/* RTP */
int len = context->readRtpData(buf, buf_size);
if (len < 0) {
if (errno == EAGAIN or errno == EINTR)
goto retry;
return -EIO;
}
return len;
}
/* RTCP packet types */
enum RTCPType {
RTCP_FIR = 192,
RTCP_IJ = 195,
RTCP_SR = 200,
RTCP_TOKEN = 210
};
#define RTP_PT_IS_RTCP(x) (((x) >= RTCP_FIR && (x) <= RTCP_IJ) || \
((x) >= RTCP_SR && (x) <= RTCP_TOKEN))
int SocketPair::writeCallback(void *opaque, uint8_t *buf, int buf_size)
{
SocketPair *context = static_cast<SocketPair*>(opaque);
int ret;
retry:
if (RTP_PT_IS_RTCP(buf[1])) {
return buf_size;
/* RTCP payload type */
ret = context->writeRtcpData(buf, buf_size);
if (ret < 0) {
if (ret == -EAGAIN)
goto retry;
return ret;
}
} else {
/* RTP payload type */
ret = context->writeRtpData(buf, buf_size);
if (ret < 0) {
if (ret == -EAGAIN)
goto retry;
return ret;
}
}
return ret < 0 ? errno : ret;
}
}