socket_pair.cpp 8.41 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
 *  Copyright (C) 2004-2013 Savoir-Faire Linux Inc.
 *  Copyright (c) 2002 Fabrice Bellard
 *
 *  Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
 *
 *  Additional permission under GNU GPL version 3 section 7:
 *
 *  If you modify this program, or any covered work, by linking or
 *  combining it with the OpenSSL project's OpenSSL library (or a
 *  modified version of that library), containing parts covered by the
 *  terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
 *  grants you additional permission to convey the resulting work.
 *  Corresponding Source for a non-source form of such a combination
 *  shall include the source code for the parts of OpenSSL used as well
 *  as that of the covered work.
 */

33
#include "libav_deps.h"
34
#include "socket_pair.h"
35
#include "libav_utils.h"
36
#include "logger.h"
37

38
39
40
41
42
43
44
45
46
47
48
49
50
#include <cstring>
#include <stdexcept>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>


namespace {

int ff_network_wait_fd(int fd)
{
51
    struct pollfd p = { fd, POLLOUT, 0 };
52
53
    int ret;
    ret = poll(&p, 1, 100);
54
    return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
55
56
}

57
struct addrinfo* udp_resolve_host(const char *node, int service)
58
{
59
    struct addrinfo hints, *res = 0;
60
61
62
    int error;
    char sport[16];

63
	memset(&hints, 0, sizeof(hints));
64
65
66
67
68
69
70
71
72
73
74
75
76
    snprintf(sport, sizeof(sport), "%d", service);

    hints.ai_socktype = SOCK_DGRAM;
    hints.ai_family   = AF_UNSPEC;
    hints.ai_flags = AI_PASSIVE;
    if ((error = getaddrinfo(node, sport, &hints, &res))) {
        res = NULL;
        ERROR("%s\n", gai_strerror(error));
    }

    return res;
}

77
unsigned udp_set_url(struct sockaddr_storage *addr,
78
            const char *hostname, int port)
79
80
81
82
83
{
    struct addrinfo *res0;
    int addr_len;

    res0 = udp_resolve_host(hostname, port);
84
    if (res0 == 0) return 0;
85
86
87
88
89
90
91
    memcpy(addr, res0->ai_addr, res0->ai_addrlen);
    addr_len = res0->ai_addrlen;
    freeaddrinfo(res0);

    return addr_len;
}

92
int udp_socket_create(sockaddr_storage *addr, socklen_t *addr_len,
93
94
95
96
97
98
99
                  int local_port)
{
    int udp_fd = -1;
    struct addrinfo *res0 = NULL, *res = NULL;

    res0 = udp_resolve_host(0, local_port);
    if (res0 == 0)
100
        return -1;
101
102
103
104
105
106
    for (res = res0; res; res=res->ai_next) {
        udp_fd = socket(res->ai_family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
        if (udp_fd != -1) break;
        ERROR("socket error");
    }

107
108
109
110
    if (udp_fd < 0) {
        freeaddrinfo(res0);
        return -1;
    }
111
112
113
114
115
116
117

    memcpy(addr, res->ai_addr, res->ai_addrlen);
    *addr_len = res->ai_addrlen;

#if HAVE_SDP_CUSTOM_IO
    // bind socket so that we send from and receive
    // on local port
118
119
120
121
122
    if (bind(udp_fd, reinterpret_cast<sockaddr*>(addr), *addr_len) < 0) {
        ERROR("Bind failed: %s", strerror(errno));
        close(udp_fd);
        udp_fd = -1;
    }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#endif

    freeaddrinfo(res0);

    return udp_fd;
}

const int RTP_BUFFER_SIZE = 1472;

}

namespace sfl_video {

SocketPair::SocketPair(const char *uri, int localPort) :
           rtcpWriteMutex_(),
           rtpHandle_(0),
           rtcpHandle_(0),
140
141
142
143
           rtpDestAddr_(),
           rtpDestAddrLen_(),
           rtcpDestAddr_(),
           rtcpDestAddrLen_(),
144
           interrupted_(false)
145
146
147
148
149
150
151
152
153
154
{
    openSockets(uri, localPort);
}

SocketPair::~SocketPair()
{
    interrupted_ = true;
    closeSockets();
}

155
void SocketPair::interrupt()
156
157
158
159
{
    interrupted_ = true;
}

160
void SocketPair::closeSockets()
161
162
163
164
165
166
167
{
    if (rtcpHandle_ > 0 and close(rtcpHandle_))
        ERROR("%s", strerror(errno));
    if (rtpHandle_ > 0 and close(rtpHandle_))
        ERROR("%s", strerror(errno));
}

168
void SocketPair::openSockets(const char *uri, int local_rtp_port)
169
170
171
172
{
    char hostname[256];
    char path[1024];
    int rtp_port;
173
174
175

    libav_utils::sfl_url_split(uri, hostname, sizeof(hostname), &rtp_port, path,
                  sizeof(path));
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192

    const int rtcp_port = rtp_port + 1;

#if HAVE_SDP_CUSTOM_IO
    const int local_rtcp_port = local_rtp_port + 1;
#else
    WARN("libavformat too old for socket reuse, using random source ports");
    local_rtp_port = 0;
    const int local_rtcp_port = 0;
#endif

    sockaddr_storage rtp_addr, rtcp_addr;
    socklen_t rtp_len, rtcp_len;

    // Open sockets and store addresses for sending
    if ((rtpHandle_ = udp_socket_create(&rtp_addr, &rtp_len, local_rtp_port)) == -1 or
        (rtcpHandle_ = udp_socket_create(&rtcp_addr, &rtcp_len, local_rtcp_port)) == -1 or
193
194
        (rtpDestAddrLen_ = udp_set_url(&rtpDestAddr_, hostname, rtp_port)) == 0 or
        (rtcpDestAddrLen_ = udp_set_url(&rtcpDestAddr_, hostname, rtcp_port)) == 0) {
195
196
197
198
199
200
201

        // Handle failed socket creation
        closeSockets();
        throw std::runtime_error("Socket creation failed");
    }
}

202
VideoIOHandle* SocketPair::getIOContext()
203
{
204
205
206
    return new VideoIOHandle(RTP_BUFFER_SIZE, true,
                             &readCallback, &writeCallback, 0,
                             reinterpret_cast<void*>(this));
207
208
}

209
int SocketPair::readCallback(void *opaque, uint8_t *buf, int buf_size)
210
211
212
213
214
215
{
    SocketPair *context = static_cast<SocketPair*>(opaque);

    struct sockaddr_storage from;
    socklen_t from_len;
    int len, n;
216
217
    struct pollfd p[2] = { {context->rtpHandle_, POLLIN, 0},
						   {context->rtcpHandle_, POLLIN, 0}};
218
219

    for(;;) {
220
221
        if (context->interrupted_) {
            ERROR("interrupted");
222
            return -EINTR;
223
        }
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239

        /* build fdset to listen to RTP and RTCP packets */
        n = poll(p, 2, 100);
        if (n > 0) {
            /* first try RTCP */
            if (p[1].revents & POLLIN) {
                from_len = sizeof(from);

                {
                    len = recvfrom(context->rtcpHandle_, buf, buf_size, 0,
                            (struct sockaddr *)&from, &from_len);
                }

                if (len < 0) {
                    if (errno == EAGAIN or errno == EINTR)
                        continue;
240
                    return -EIO;
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
                }
                break;
            }
            /* then RTP */
            if (p[0].revents & POLLIN) {
                from_len = sizeof(from);

                {
                    len = recvfrom(context->rtpHandle_, buf, buf_size, 0,
                            (struct sockaddr *)&from, &from_len);
                }

                if (len < 0) {
                    if (errno == EAGAIN or errno == EINTR)
                        continue;
256
                    return -EIO;
257
258
259
260
261
262
                }
                break;
            }
        } else if (n < 0) {
            if (errno == EINTR)
                continue;
263
            return -EIO;
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
        }
    }
    return len;
}

/* RTCP packet types */
enum RTCPType {
    RTCP_FIR    = 192,
    RTCP_IJ     = 195,
    RTCP_SR     = 200,
    RTCP_TOKEN  = 210
};

#define RTP_PT_IS_RTCP(x) (((x) >= RTCP_FIR && (x) <= RTCP_IJ) || \
                           ((x) >= RTCP_SR  && (x) <= RTCP_TOKEN))

280
int SocketPair::writeCallback(void *opaque, uint8_t *buf, int buf_size)
281
282
283
284
285
286
287
{
    SocketPair *context = static_cast<SocketPair*>(opaque);

    int ret;

    if (RTP_PT_IS_RTCP(buf[1])) {
        /* RTCP payload type */
288
        std::lock_guard<std::mutex> lock(context->rtcpWriteMutex_);
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
        ret = ff_network_wait_fd(context->rtcpHandle_);
        if (ret < 0)
            return ret;

        ret = sendto(context->rtcpHandle_, buf, buf_size, 0,
                     (sockaddr*) &context->rtcpDestAddr_, context->rtcpDestAddrLen_);
    } else {
        /* RTP payload type */
        ret = ff_network_wait_fd(context->rtpHandle_);
        if (ret < 0)
            return ret;

        ret = sendto(context->rtpHandle_, buf, buf_size, 0,
                     (sockaddr*) &context->rtpDestAddr_, context->rtpDestAddrLen_);
    }

    return ret < 0 ? errno : ret;
}

}