socket_pair.cpp 8.41 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
/*
 *  Copyright (C) 2004-2013 Savoir-Faire Linux Inc.
 *  Copyright (c) 2002 Fabrice Bellard
 *
 *  Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
 *
 *  Additional permission under GNU GPL version 3 section 7:
 *
 *  If you modify this program, or any covered work, by linking or
 *  combining it with the OpenSSL project's OpenSSL library (or a
 *  modified version of that library), containing parts covered by the
 *  terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
 *  grants you additional permission to convey the resulting work.
 *  Corresponding Source for a non-source form of such a combination
 *  shall include the source code for the parts of OpenSSL used as well
 *  as that of the covered work.
 */

33
#include "libav_deps.h"
34
#include "socket_pair.h"
35
#include "libav_utils.h"
36
#include "logger.h"
37

38 39 40 41 42 43 44 45 46 47 48 49 50
#include <cstring>
#include <stdexcept>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>


namespace {

int ff_network_wait_fd(int fd)
{
51
    struct pollfd p = { fd, POLLOUT, 0 };
52 53
    int ret;
    ret = poll(&p, 1, 100);
54
    return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
55 56
}

57
struct addrinfo* udp_resolve_host(const char *node, int service)
58
{
59
    struct addrinfo hints, *res = 0;
60 61 62
    int error;
    char sport[16];

63
	memset(&hints, 0, sizeof(hints));
64 65 66 67 68 69 70 71 72 73 74 75 76
    snprintf(sport, sizeof(sport), "%d", service);

    hints.ai_socktype = SOCK_DGRAM;
    hints.ai_family   = AF_UNSPEC;
    hints.ai_flags = AI_PASSIVE;
    if ((error = getaddrinfo(node, sport, &hints, &res))) {
        res = NULL;
        ERROR("%s\n", gai_strerror(error));
    }

    return res;
}

77
unsigned udp_set_url(struct sockaddr_storage *addr,
78
            const char *hostname, int port)
79 80 81 82 83
{
    struct addrinfo *res0;
    int addr_len;

    res0 = udp_resolve_host(hostname, port);
84
    if (res0 == 0) return 0;
85 86 87 88 89 90 91
    memcpy(addr, res0->ai_addr, res0->ai_addrlen);
    addr_len = res0->ai_addrlen;
    freeaddrinfo(res0);

    return addr_len;
}

92
int udp_socket_create(sockaddr_storage *addr, socklen_t *addr_len,
93 94 95 96 97 98 99
                  int local_port)
{
    int udp_fd = -1;
    struct addrinfo *res0 = NULL, *res = NULL;

    res0 = udp_resolve_host(0, local_port);
    if (res0 == 0)
100
        return -1;
101 102 103 104 105 106
    for (res = res0; res; res=res->ai_next) {
        udp_fd = socket(res->ai_family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
        if (udp_fd != -1) break;
        ERROR("socket error");
    }

107 108 109 110
    if (udp_fd < 0) {
        freeaddrinfo(res0);
        return -1;
    }
111 112 113 114 115 116 117

    memcpy(addr, res->ai_addr, res->ai_addrlen);
    *addr_len = res->ai_addrlen;

#if HAVE_SDP_CUSTOM_IO
    // bind socket so that we send from and receive
    // on local port
118 119 120 121 122
    if (bind(udp_fd, reinterpret_cast<sockaddr*>(addr), *addr_len) < 0) {
        ERROR("Bind failed: %s", strerror(errno));
        close(udp_fd);
        udp_fd = -1;
    }
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
#endif

    freeaddrinfo(res0);

    return udp_fd;
}

const int RTP_BUFFER_SIZE = 1472;

}

namespace sfl_video {

SocketPair::SocketPair(const char *uri, int localPort) :
           rtcpWriteMutex_(),
           rtpHandle_(0),
           rtcpHandle_(0),
140 141 142 143
           rtpDestAddr_(),
           rtpDestAddrLen_(),
           rtcpDestAddr_(),
           rtcpDestAddrLen_(),
144
           interrupted_(false)
145 146 147 148 149 150 151 152 153 154
{
    openSockets(uri, localPort);
}

SocketPair::~SocketPair()
{
    interrupted_ = true;
    closeSockets();
}

155
void SocketPair::interrupt()
156 157 158 159
{
    interrupted_ = true;
}

160
void SocketPair::closeSockets()
161 162 163 164 165 166 167
{
    if (rtcpHandle_ > 0 and close(rtcpHandle_))
        ERROR("%s", strerror(errno));
    if (rtpHandle_ > 0 and close(rtpHandle_))
        ERROR("%s", strerror(errno));
}

168
void SocketPair::openSockets(const char *uri, int local_rtp_port)
169 170 171 172
{
    char hostname[256];
    char path[1024];
    int rtp_port;
173 174 175

    libav_utils::sfl_url_split(uri, hostname, sizeof(hostname), &rtp_port, path,
                  sizeof(path));
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192

    const int rtcp_port = rtp_port + 1;

#if HAVE_SDP_CUSTOM_IO
    const int local_rtcp_port = local_rtp_port + 1;
#else
    WARN("libavformat too old for socket reuse, using random source ports");
    local_rtp_port = 0;
    const int local_rtcp_port = 0;
#endif

    sockaddr_storage rtp_addr, rtcp_addr;
    socklen_t rtp_len, rtcp_len;

    // Open sockets and store addresses for sending
    if ((rtpHandle_ = udp_socket_create(&rtp_addr, &rtp_len, local_rtp_port)) == -1 or
        (rtcpHandle_ = udp_socket_create(&rtcp_addr, &rtcp_len, local_rtcp_port)) == -1 or
193 194
        (rtpDestAddrLen_ = udp_set_url(&rtpDestAddr_, hostname, rtp_port)) == 0 or
        (rtcpDestAddrLen_ = udp_set_url(&rtcpDestAddr_, hostname, rtcp_port)) == 0) {
195 196 197 198 199 200 201

        // Handle failed socket creation
        closeSockets();
        throw std::runtime_error("Socket creation failed");
    }
}

202
VideoIOHandle* SocketPair::getIOContext()
203
{
204 205 206
    return new VideoIOHandle(RTP_BUFFER_SIZE, true,
                             &readCallback, &writeCallback, 0,
                             reinterpret_cast<void*>(this));
207 208
}

209
int SocketPair::readCallback(void *opaque, uint8_t *buf, int buf_size)
210 211 212 213 214 215
{
    SocketPair *context = static_cast<SocketPair*>(opaque);

    struct sockaddr_storage from;
    socklen_t from_len;
    int len, n;
216 217
    struct pollfd p[2] = { {context->rtpHandle_, POLLIN, 0},
						   {context->rtcpHandle_, POLLIN, 0}};
218 219

    for(;;) {
220 221
        if (context->interrupted_) {
            ERROR("interrupted");
222
            return -EINTR;
223
        }
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239

        /* build fdset to listen to RTP and RTCP packets */
        n = poll(p, 2, 100);
        if (n > 0) {
            /* first try RTCP */
            if (p[1].revents & POLLIN) {
                from_len = sizeof(from);

                {
                    len = recvfrom(context->rtcpHandle_, buf, buf_size, 0,
                            (struct sockaddr *)&from, &from_len);
                }

                if (len < 0) {
                    if (errno == EAGAIN or errno == EINTR)
                        continue;
240
                    return -EIO;
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
                }
                break;
            }
            /* then RTP */
            if (p[0].revents & POLLIN) {
                from_len = sizeof(from);

                {
                    len = recvfrom(context->rtpHandle_, buf, buf_size, 0,
                            (struct sockaddr *)&from, &from_len);
                }

                if (len < 0) {
                    if (errno == EAGAIN or errno == EINTR)
                        continue;
256
                    return -EIO;
257 258 259 260 261 262
                }
                break;
            }
        } else if (n < 0) {
            if (errno == EINTR)
                continue;
263
            return -EIO;
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
        }
    }
    return len;
}

/* RTCP packet types */
enum RTCPType {
    RTCP_FIR    = 192,
    RTCP_IJ     = 195,
    RTCP_SR     = 200,
    RTCP_TOKEN  = 210
};

#define RTP_PT_IS_RTCP(x) (((x) >= RTCP_FIR && (x) <= RTCP_IJ) || \
                           ((x) >= RTCP_SR  && (x) <= RTCP_TOKEN))

280
int SocketPair::writeCallback(void *opaque, uint8_t *buf, int buf_size)
281 282 283 284 285 286 287
{
    SocketPair *context = static_cast<SocketPair*>(opaque);

    int ret;

    if (RTP_PT_IS_RTCP(buf[1])) {
        /* RTCP payload type */
288
        std::lock_guard<std::mutex> lock(context->rtcpWriteMutex_);
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
        ret = ff_network_wait_fd(context->rtcpHandle_);
        if (ret < 0)
            return ret;

        ret = sendto(context->rtcpHandle_, buf, buf_size, 0,
                     (sockaddr*) &context->rtcpDestAddr_, context->rtcpDestAddrLen_);
    } else {
        /* RTP payload type */
        ret = ff_network_wait_fd(context->rtpHandle_);
        if (ret < 0)
            return ret;

        ret = sendto(context->rtpHandle_, buf, buf_size, 0,
                     (sockaddr*) &context->rtpDestAddr_, context->rtpDestAddrLen_);
    }

    return ret < 0 ? errno : ret;
}

}