Commit d13a09c2 authored by Tristan Matthews's avatar Tristan Matthews

* #14077: video: send and receive RTP on one socket

Thanks to the new custom_io flag in libavformat's SDP demuxer, we can manage
our own UDP transports for RTP and RTCP. This allows us to comply with
RFC 4961.

If an older version of libavformat is detected, we fallback to sending and
receiving on different sockets.
parent 8c96ebff
......@@ -202,6 +202,10 @@ class Sdp {
return remoteVideoPort_;
}
unsigned int getLocalVideoPort() const {
return localVideoPort_;
}
void addAttributeToLocalAudioMedia(const char *attr);
void removeAttributeFromLocalAudioMedia(const char *attr);
void addAttributeToLocalVideoMedia(const char *attr);
......
......@@ -1772,7 +1772,7 @@ void sdp_media_update_cb(pjsip_inv_session *inv, pj_status_t status)
Manager::instance().getVideoControls()->stopPreview();
call->getVideoRtp().updateSDP(*call->getLocalSDP());
call->getVideoRtp().updateDestination(call->getLocalSDP()->getRemoteIP(), call->getLocalSDP()->getRemoteVideoPort());
call->getVideoRtp().start();
call->getVideoRtp().start(call->getLocalSDP()->getLocalVideoPort());
#endif
// Get the crypto attribute containing srtp's cryptographic context (keys, cipher)
......
......@@ -10,7 +10,8 @@ libvideo_la_SOURCES = libav_utils.cpp libav_utils.h video_rtp_session.cpp \
video_v4l2_list.cpp video_v4l2.h video_v4l2_list.h \
video_preferences.h video_preferences.cpp \
packet_handle.h packet_handle.cpp check.h shm_header.h \
shm_sink.cpp shm_sink.h video_provider.h
shm_sink.cpp shm_sink.h video_provider.h \
socket_pair.cpp socket_pair.h
libvideo_la_LIBADD = @LIBAVCODEC_LIBS@ @LIBAVFORMAT_LIBS@ @LIBAVDEVICE_LIBS@ @LIBSWSCALE_LIBS@ @LIBAVUTIL_LIBS@ @UDEV_LIBS@
......
/*
* Copyright (C) 2004-2013 Savoir-Faire Linux Inc.
* Copyright (c) 2002 Fabrice Bellard
*
* Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Additional permission under GNU GPL version 3 section 7:
*
* If you modify this program, or any covered work, by linking or
* combining it with the OpenSSL project's OpenSSL library (or a
* modified version of that library), containing parts covered by the
* terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
* grants you additional permission to convey the resulting work.
* Corresponding Source for a non-source form of such a combination
* shall include the source code for the parts of OpenSSL used as well
* as that of the covered work.
*/
#include "socket_pair.h"
#include "scoped_lock.h"
#include "logger.h"
#include <cstring>
#include <stdexcept>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
extern "C" {
#include <libavutil/avstring.h>
#include <libavformat/avformat.h>
}
namespace {
int ff_network_wait_fd(int fd)
{
struct pollfd p = { .fd = fd, .events = POLLOUT, .revents = 0 };
int ret;
ret = poll(&p, 1, 100);
return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : AVERROR(EAGAIN);
}
struct addrinfo*
udp_resolve_host(const char *node, int service)
{
struct addrinfo hints = { 0 }, *res = 0;
int error;
char sport[16];
snprintf(sport, sizeof(sport), "%d", service);
hints.ai_socktype = SOCK_DGRAM;
hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_PASSIVE;
if ((error = getaddrinfo(node, sport, &hints, &res))) {
res = NULL;
ERROR("%s\n", gai_strerror(error));
}
return res;
}
int udp_set_url(struct sockaddr_storage *addr,
const char *hostname, int port)
{
struct addrinfo *res0;
int addr_len;
res0 = udp_resolve_host(hostname, port);
if (res0 == 0) return AVERROR(EIO);
memcpy(addr, res0->ai_addr, res0->ai_addrlen);
addr_len = res0->ai_addrlen;
freeaddrinfo(res0);
return addr_len;
}
int
udp_socket_create(sockaddr_storage *addr, socklen_t *addr_len,
int local_port)
{
int udp_fd = -1;
struct addrinfo *res0 = NULL, *res = NULL;
res0 = udp_resolve_host(0, local_port);
if (res0 == 0)
goto fail;
for (res = res0; res; res=res->ai_next) {
udp_fd = socket(res->ai_family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
if (udp_fd != -1) break;
ERROR("socket error");
}
if (udp_fd < 0)
goto fail;
memcpy(addr, res->ai_addr, res->ai_addrlen);
*addr_len = res->ai_addrlen;
#if HAVE_SDP_CUSTOM_IO
// bind socket so that we send from and receive
// on local port
bind(udp_fd, reinterpret_cast<sockaddr*>(addr), *addr_len);
#endif
freeaddrinfo(res0);
return udp_fd;
fail:
if (udp_fd >= 0)
close(udp_fd);
if (res0)
freeaddrinfo(res0);
return -1;
}
const int RTP_BUFFER_SIZE = 1472;
}
namespace sfl_video {
SocketPair::SocketPair(const char *uri, int localPort) :
rtcpWriteMutex_(),
rtpHandle_(0),
rtcpHandle_(0),
interrupted_(false)
{
pthread_mutex_init(&rtcpWriteMutex_, NULL);
openSockets(uri, localPort);
}
SocketPair::~SocketPair()
{
interrupted_ = true;
closeSockets();
// destroy in reverse order
pthread_mutex_destroy(&rtcpWriteMutex_);
}
void
SocketPair::interrupt()
{
interrupted_ = true;
}
void
SocketPair::closeSockets()
{
if (rtcpHandle_ > 0 and close(rtcpHandle_))
ERROR("%s", strerror(errno));
if (rtpHandle_ > 0 and close(rtpHandle_))
ERROR("%s", strerror(errno));
}
void
SocketPair::openSockets(const char *uri, int local_rtp_port)
{
char hostname[256];
char buf[1024];
char path[1024];
int rtp_port;
av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &rtp_port,
path, sizeof(path), uri);
const int rtcp_port = rtp_port + 1;
#if HAVE_SDP_CUSTOM_IO
const int local_rtcp_port = local_rtp_port + 1;
#else
WARN("libavformat too old for socket reuse, using random source ports");
local_rtp_port = 0;
const int local_rtcp_port = 0;
#endif
sockaddr_storage rtp_addr, rtcp_addr;
socklen_t rtp_len, rtcp_len;
// Open sockets and store addresses for sending
if ((rtpHandle_ = udp_socket_create(&rtp_addr, &rtp_len, local_rtp_port)) == -1 or
(rtcpHandle_ = udp_socket_create(&rtcp_addr, &rtcp_len, local_rtcp_port)) == -1 or
(rtpDestAddrLen_ = udp_set_url(&rtpDestAddr_, hostname, rtp_port)) < 0 or
(rtcpDestAddrLen_ = udp_set_url(&rtcpDestAddr_, hostname, rtcp_port)) < 0) {
// Handle failed socket creation
closeSockets();
throw std::runtime_error("Socket creation failed");
}
}
AVIOContext *
SocketPair::createAVIOContext()
{
// FIXME: Caller must free buffer!
unsigned char *buffer(static_cast<unsigned char *>(av_malloc(RTP_BUFFER_SIZE)));
AVIOContext *context = avio_alloc_context(buffer,
RTP_BUFFER_SIZE, 1, reinterpret_cast<void*>(this),
&readCallback, &writeCallback, 0);
context->max_packet_size = RTP_BUFFER_SIZE;
return context;
}
int
SocketPair::readCallback(void *opaque, uint8_t *buf, int buf_size)
{
SocketPair *context = static_cast<SocketPair*>(opaque);
struct sockaddr_storage from;
socklen_t from_len;
int len, n;
struct pollfd p[2] = { {context->rtpHandle_, POLLIN, 0}, {context->rtcpHandle_, POLLIN, 0}};
for(;;) {
if (context->interrupted_)
return AVERROR_EXIT;
/* build fdset to listen to RTP and RTCP packets */
n = poll(p, 2, 100);
if (n > 0) {
/* first try RTCP */
if (p[1].revents & POLLIN) {
from_len = sizeof(from);
{
len = recvfrom(context->rtcpHandle_, buf, buf_size, 0,
(struct sockaddr *)&from, &from_len);
}
if (len < 0) {
if (errno == EAGAIN or errno == EINTR)
continue;
return AVERROR(EIO);
}
break;
}
/* then RTP */
if (p[0].revents & POLLIN) {
from_len = sizeof(from);
{
len = recvfrom(context->rtpHandle_, buf, buf_size, 0,
(struct sockaddr *)&from, &from_len);
}
if (len < 0) {
if (errno == EAGAIN or errno == EINTR)
continue;
return AVERROR(EIO);
}
break;
}
} else if (n < 0) {
if (errno == EINTR)
continue;
return AVERROR(EIO);
}
}
return len;
}
/* RTCP packet types */
enum RTCPType {
RTCP_FIR = 192,
RTCP_IJ = 195,
RTCP_SR = 200,
RTCP_TOKEN = 210
};
#define RTP_PT_IS_RTCP(x) (((x) >= RTCP_FIR && (x) <= RTCP_IJ) || \
((x) >= RTCP_SR && (x) <= RTCP_TOKEN))
int
SocketPair::writeCallback(void *opaque, uint8_t *buf, int buf_size)
{
SocketPair *context = static_cast<SocketPair*>(opaque);
int ret;
if (RTP_PT_IS_RTCP(buf[1])) {
/* RTCP payload type */
sfl::ScopedLock lock(context->rtcpWriteMutex_);
ret = ff_network_wait_fd(context->rtcpHandle_);
if (ret < 0)
return ret;
ret = sendto(context->rtcpHandle_, buf, buf_size, 0,
(sockaddr*) &context->rtcpDestAddr_, context->rtcpDestAddrLen_);
} else {
/* RTP payload type */
ret = ff_network_wait_fd(context->rtpHandle_);
if (ret < 0)
return ret;
ret = sendto(context->rtpHandle_, buf, buf_size, 0,
(sockaddr*) &context->rtpDestAddr_, context->rtpDestAddrLen_);
}
return ret < 0 ? errno : ret;
}
}
/*
* Copyright (C) 2004-2013 Savoir-Faire Linux Inc.
* Copyright (C) 2012 VLC authors and VideoLAN
* Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Additional permission under GNU GPL version 3 section 7:
*
* If you modify this program, or any covered work, by linking or
* combining it with the OpenSSL project's OpenSSL library (or a
* modified version of that library), containing parts covered by the
* terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
* grants you additional permission to convey the resulting work.
* Corresponding Source for a non-source form of such a combination
* shall include the source code for the parts of OpenSSL used as well
* as that of the covered work.
*/
#ifndef SOCKET_PAIR_H_
#define SOCKET_PAIR_H_
#include <sys/socket.h>
#include <pthread.h>
#include <stdint.h>
// FIXME: newer libavformat allows us to do forward declaration
// of AVIOContext and thus avoid this include
extern "C" {
#include <libavformat/avio.h>
}
/* LIBAVFORMAT_VERSION_CHECK checks for the right version of libav and FFmpeg
* a is the major version
* b and c the minor and micro versions of libav
* d and e the minor and micro versions of FFmpeg */
#define LIBAVFORMAT_VERSION_CHECK( a, b, c, d, e ) \
( (LIBAVFORMAT_VERSION_MICRO < 100 && LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT( a, b, c ) ) || \
(LIBAVFORMAT_VERSION_MICRO >= 100 && LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT( a, d, e ) ) )
#define HAVE_SDP_CUSTOM_IO LIBAVFORMAT_VERSION_CHECK(54,20,3,59,103)
namespace sfl_video {
class VideoSendThread;
class VideoReceiveThread;
class SocketPair {
public:
SocketPair(const char *uri, int localPort);
~SocketPair();
void interrupt();
AVIOContext *
createAVIOContext();
void openSockets(const char *uri, int localPort);
void closeSockets();
static int readCallback(void *opaque, uint8_t *buf, int buf_size);
static int writeCallback(void *opaque, uint8_t *buf, int buf_size);
pthread_mutex_t rtcpWriteMutex_;
int rtpHandle_;
int rtcpHandle_;
sockaddr_storage rtpDestAddr_;
socklen_t rtpDestAddrLen_;
sockaddr_storage rtcpDestAddr_;
socklen_t rtcpDestAddrLen_;
bool interrupted_;
};
}
#endif // SOCKET_PAIR_H_
......@@ -40,8 +40,8 @@ int main ()
{
VideoPreference preference;
sfl_video::VideoRtpSession session("test", preference.getSettings());
session.start();
sleep(10);
session.start(12345);
sleep(5);
session.stop();
return 0;
......
......@@ -30,6 +30,7 @@
*/
#include "video_receive_thread.h"
#include "socket_pair.h"
#include "dbus/video_controls.h"
#include "check.h"
#include "packet_handle.h"
......@@ -114,12 +115,19 @@ void VideoReceiveThread::setup()
if (!args_["channel"].empty())
av_dict_set(&options, "channel", args_["channel"].c_str(), 0);
// Open video file
// Open Camera or SDP Demuxer
inputCtx_ = avformat_alloc_context();
inputCtx_->interrupt_callback = interruptCb_;
if (input == SDP_FILENAME) {
#if HAVE_SDP_CUSTOM_IO
// custom_io so the SDP demuxer will not open any UDP connections
av_dict_set(&options, "sdp_flags", "custom_io", 0);
#else
WARN("libavformat too old for custom SDP demuxing");
#endif
EXIT_IF_FAIL(not stream_.str().empty(), "No SDP loaded");
inputCtx_->pb = avioContext_.get();
inputCtx_->pb = sdpContext_.get();
}
int ret = avformat_open_input(&inputCtx_, input.c_str(), file_iformat, options ? &options : NULL);
......@@ -129,6 +137,18 @@ void VideoReceiveThread::setup()
EXIT_IF_FAIL(false, "Could not open input \"%s\"", input.c_str());
}
if (input == SDP_FILENAME) {
#if HAVE_SDP_CUSTOM_IO
// Now replace our custom AVIOContext with one that will read
// packets
inputCtx_->pb = demuxContext_.get();
#endif
}
// FIXME: this is a hack because our peer sends us RTP before
// we're ready for it, and we miss the SPS/PPS. We should be
// ready earlier.
sleep(1);
DEBUG("Finding stream info");
if (requestKeyFrameCallback_)
requestKeyFrameCallback_(id_);
......@@ -225,15 +245,26 @@ VideoReceiveThread::VideoReceiveThread(const std::string &id, const std::map<str
requestKeyFrameCallback_(0),
sdpBuffer_(reinterpret_cast<unsigned char*>(av_malloc(SDP_BUFFER_SIZE)), &av_free),
stream_(args_["receiving_sdp"]),
avioContext_(avio_alloc_context(sdpBuffer_.get(), SDP_BUFFER_SIZE, 0,
sdpContext_(avio_alloc_context(sdpBuffer_.get(), SDP_BUFFER_SIZE, 0,
reinterpret_cast<void*>(static_cast<std::istream*>(&stream_)),
&readFunction, 0, 0), &av_free),
demuxContext_(),
thread_(0)
{
interruptCb_.callback = interruptCb;
interruptCb_.opaque = this;
}
void
VideoReceiveThread::addIOContext(SocketPair &socketPair)
{
#if HAVE_SDP_CUSTOM_IO
demuxContext_.reset(socketPair.createAVIOContext(), &av_free);
#else
(void) socketPair;
#endif
}
void VideoReceiveThread::start()
{
threadRunning_ = true;
......@@ -285,8 +316,11 @@ struct VideoRxContextHandle {
if (rx_.decoderCtx_)
avcodec_close(rx_.decoderCtx_);
if (rx_.demuxContext_ and rx_.demuxContext_->buffer)
av_free(rx_.demuxContext_->buffer);
if (rx_.inputCtx_ and rx_.inputCtx_->nb_streams > 0) {
#if LIBAVFORMAT_VERSION_INT < AV_VERSION_INT(53, 8, 0)
#if LIBAVFORMAT_VERSION_MAJOR < 55
av_close_input_file(rx_.inputCtx_);
#else
avformat_close_input(&rx_.inputCtx_);
......
......@@ -51,6 +51,8 @@ class AVFormatContext;
class AVFrame;
namespace sfl_video {
class SocketPair;
class VideoReceiveThread : public VideoProvider {
private:
NON_COPYABLE(VideoReceiveThread);
......@@ -79,7 +81,8 @@ class VideoReceiveThread : public VideoProvider {
void (* requestKeyFrameCallback_)(const std::string &);
std::tr1::shared_ptr<unsigned char> sdpBuffer_;
std::istringstream stream_;
std::tr1::shared_ptr<AVIOContext> avioContext_;
std::tr1::shared_ptr<AVIOContext> sdpContext_;
std::tr1::shared_ptr<AVIOContext> demuxContext_;
void setup();
void openDecoder();
......@@ -95,6 +98,7 @@ class VideoReceiveThread : public VideoProvider {
public:
VideoReceiveThread(const std::string &id, const std::map<std::string, std::string> &args);
void addIOContext(SocketPair &socketPair);
void addDetails(std::map<std::string, std::string> &details);
~VideoReceiveThread();
void start();
......
......@@ -34,6 +34,7 @@
#include <string>
#include "video_send_thread.h"
#include "video_receive_thread.h"
#include "socket_pair.h"
#include "sip/sdp.h"
#include "sip/sipvoiplink.h"
#include "manager.h"
......@@ -45,10 +46,15 @@ using std::map;
using std::string;
VideoRtpSession::VideoRtpSession(const string &callID, const map<string, string> &txArgs) :
sendThread_(), receiveThread_(), txArgs_(txArgs),
socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs),
rxArgs_(), sending_(false), receiving_(false), callID_(callID)
{}
VideoRtpSession::~VideoRtpSession()
{
stop();
}
void VideoRtpSession::updateSDP(const Sdp &sdp)
{
string desc(sdp.getIncomingVideoDescription());
......@@ -117,12 +123,18 @@ void VideoRtpSession::updateDestination(const string &destination,
}
}
void VideoRtpSession::start()
void VideoRtpSession::start(int localPort)
{
if (not sending_ and not receiving_)
return;
socketPair_.reset(new SocketPair(txArgs_["destination"].c_str(), localPort));
if (sending_) {
if (sendThread_.get())
WARN("Restarting video sender");
sendThread_.reset(new VideoSendThread("local", txArgs_));
sendThread_->addIOContext(*socketPair_);