Commit 29ae5d8a authored by Olivier SOLDANO's avatar Olivier SOLDANO Committed by Andreas Traczyk

Path MTU discovery implementation

This implementation uses gnutls dtls heartbeat API to test path MTU.
heartbeat allowing messages with automated response in a datagram,
the application is able to guess the MTU via a timeout in the heartbeat.
(timeout on packet sent and no response, implies that the MTU is lower
than the lost payload.)
To minimize false positives (a response is lost for example), each attempt
triggers one retry on the first timeout.
This version ensures a minimal MTU of 512 bytes will be returned in
case of any failure in the procedure.
For retrocompatibility with non heartbeat capable clients,
a fallback MTU is set at 1280.

Change-Id: Ib9a7f63a70e8bdad239d8fc103779a0f2c387e87
Reviewed-by: Andreas Traczyk's avatarAndreas Traczyk <andreas.traczyk@savoirfairelinux.com>
parent b5e2496c
......@@ -23,9 +23,9 @@
#include <memory>
#include <functional>
#if defined(_MSC_VER)
#include <BaseTsd.h>
using ssize_t = SSIZE_T;
#if defined(_MSC_VER)
#include <BaseTsd.h>
using ssize_t = SSIZE_T;
#endif
namespace ring {
......@@ -49,6 +49,7 @@ class IceSocket
ssize_t getNextPacketSize() const;
ssize_t waitForData(unsigned int timeout);
void setOnRecv(IceRecvCb cb);
uint16_t getTransportOverhead();
};
};
......
......@@ -44,6 +44,8 @@
namespace ring {
static constexpr unsigned STUN_MAX_PACKET_SIZE {8192};
static constexpr uint16_t IPV6_HEADER_SIZE = 40; // Size in bytes of IPV6 packet header
static constexpr uint16_t IPV4_HEADER_SIZE = 20; // Size in bytes of IPV4 packet header
// TODO: C++14 ? remove me and use std::min
template< class T >
......@@ -1061,4 +1063,9 @@ IceSocket::setOnRecv(IceRecvCb cb)
return ice_transport_->setOnRecv(compId_, cb);
}
uint16_t
IceSocket::getTransportOverhead(){
return (ice_transport_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE : IPV6_HEADER_SIZE;
}
} // namespace ring
......@@ -49,7 +49,8 @@ class AudioSender {
const MediaDescription& args,
SocketPair& socketPair,
const uint16_t seqVal,
bool muteState);
bool muteState,
const uint16_t mtu);
~AudioSender();
void setMuted(bool isMuted);
......@@ -71,6 +72,7 @@ class AudioSender {
AudioBuffer resampledData_;
const uint16_t seqVal_;
bool muteState_ = false;
uint16_t mtu_;
using seconds = std::chrono::duration<double, std::ratio<1>>;
const seconds secondsPerPacket_ {0.02}; // 20 ms
......@@ -85,12 +87,14 @@ AudioSender::AudioSender(const std::string& id,
const MediaDescription& args,
SocketPair& socketPair,
const uint16_t seqVal,
bool muteState) :
bool muteState,
const uint16_t mtu) :
id_(id),
dest_(dest),
args_(args),
seqVal_(seqVal),
muteState_(muteState),
mtu_(mtu),
loop_([&] { return setup(socketPair); },
std::bind(&AudioSender::process, this),
std::bind(&AudioSender::cleanup, this))
......@@ -107,7 +111,7 @@ bool
AudioSender::setup(SocketPair& socketPair)
{
audioEncoder_.reset(new MediaEncoder);
muxContext_.reset(socketPair.createIOContext());
muxContext_.reset(socketPair.createIOContext(mtu_));
try {
/* Encoder setup */
......@@ -199,7 +203,8 @@ class AudioReceiveThread
public:
AudioReceiveThread(const std::string &id,
const AudioFormat& format,
const std::string& sdp);
const std::string& sdp,
const uint16_t mtu);
~AudioReceiveThread();
void addIOContext(SocketPair &socketPair);
void startLoop();
......@@ -230,6 +235,8 @@ class AudioReceiveThread
std::shared_ptr<RingBuffer> ringbuffer_;
uint16_t mtu_;
ThreadLoop loop_;
bool setup();
void process();
......@@ -238,10 +245,12 @@ class AudioReceiveThread
AudioReceiveThread::AudioReceiveThread(const std::string& id,
const AudioFormat& format,
const std::string& sdp)
const std::string& sdp,
const uint16_t mtu)
: id_(id)
, format_(format)
, stream_(sdp)
, mtu_(mtu)
, sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction,
0, 0, this))
, loop_(std::bind(&AudioReceiveThread::setup, this),
......@@ -345,7 +354,7 @@ AudioReceiveThread::interruptCb(void* data)
void
AudioReceiveThread::addIOContext(SocketPair& socketPair)
{
demuxContext_.reset(socketPair.createIOContext());
demuxContext_.reset(socketPair.createIOContext(mtu_));
}
void
......@@ -391,7 +400,7 @@ AudioRtpSession::startSender()
sender_.reset();
socketPair_->stopSendOp(false);
sender_.reset(new AudioSender(callID_, getRemoteRtpUri(), send_,
*socketPair_, initSeqVal_, muteState_));
*socketPair_, initSeqVal_, muteState_, mtu_));
} catch (const MediaEncoderException &e) {
RING_ERR("%s", e.what());
send_.enabled = false;
......@@ -423,7 +432,8 @@ AudioRtpSession::startReceiver()
auto accountAudioCodec = std::static_pointer_cast<AccountAudioCodecInfo>(receive_.codec);
receiveThread_.reset(new AudioReceiveThread(callID_, accountAudioCodec->audioformat,
receive_.receiving_sdp));
receive_.receiving_sdp,
mtu_));
receiveThread_->addIOContext(*socketPair_);
receiveThread_->startLoop();
}
......
......@@ -51,6 +51,8 @@ public:
bool isSending() const noexcept { return send_.enabled; }
bool isReceiving() const noexcept { return receive_.enabled; }
void setMtu(uint16_t mtu) { mtu_ = mtu; }
protected:
std::recursive_mutex mutex_;
std::unique_ptr<SocketPair> socketPair_;
......@@ -59,6 +61,8 @@ protected:
MediaDescription send_;
MediaDescription receive_;
uint16_t mtu_;
std::string getRemoteRtpUri() const {
return "rtp://" + send_.addr.toString(true);
}
......
......@@ -60,6 +60,8 @@ namespace ring {
static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
static constexpr auto UDP_HEADER_SIZE = 8;
static constexpr auto SRTP_OVERHEAD = 10;
enum class DataType : unsigned { RTP=1<<0, RTCP=1<<1 };
......@@ -190,10 +192,6 @@ udp_socket_create(sockaddr_storage* addr, socklen_t* addr_len, int local_port)
return udp_fd;
}
// Maximal size allowed for a RTP packet, this value of 1232 bytes is an IPv6 minimum (1280 - 40 IPv6 header - 8 UDP header).
static const size_t RTP_BUFFER_SIZE = 1232;
static const size_t SRTP_BUFFER_SIZE = RTP_BUFFER_SIZE - 10;
SocketPair::SocketPair(const char *uri, int localPort)
: rtp_sock_()
, rtcp_sock_()
......@@ -334,9 +332,11 @@ SocketPair::openSockets(const char* uri, int local_rtp_port)
}
MediaIOHandle*
SocketPair::createIOContext()
SocketPair::createIOContext(const uint16_t mtu)
{
return new MediaIOHandle(srtpContext_ ? SRTP_BUFFER_SIZE : RTP_BUFFER_SIZE, true,
auto ip_header_size = rtp_sock_->getTransportOverhead();
return new MediaIOHandle( mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
true,
[](void* sp, uint8_t* buf, int len){ return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
[](void* sp, uint8_t* buf, int len){ return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
0, reinterpret_cast<void*>(this));
......
......@@ -80,7 +80,7 @@ class SocketPair {
void interrupt();
MediaIOHandle* createIOContext();
MediaIOHandle* createIOContext(const uint16_t mtu);
void openSockets(const char* uri, int localPort);
void closeSockets();
......
......@@ -38,13 +38,15 @@ using std::string;
VideoReceiveThread::VideoReceiveThread(const std::string& id,
const std::string &sdp,
const bool isReset) :
const bool isReset,
uint16_t mtu) :
VideoGenerator::VideoGenerator()
, args_()
, dstWidth_(0)
, dstHeight_(0)
, id_(id)
, stream_(sdp)
, mtu_(mtu)
, sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
, sink_ {Manager::instance().createSinkClient(id)}
, restartDecoder_(false)
......@@ -162,7 +164,7 @@ int VideoReceiveThread::readFunction(void *opaque, uint8_t *buf, int buf_size)
void VideoReceiveThread::addIOContext(SocketPair &socketPair)
{
demuxContext_.reset(socketPair.createIOContext());
demuxContext_.reset(socketPair.createIOContext(mtu_));
}
bool VideoReceiveThread::decodeFrame()
......
......@@ -47,7 +47,7 @@ class SinkClient;
class VideoReceiveThread : public VideoGenerator {
public:
VideoReceiveThread(const std::string &id, const std::string &sdp, const bool isReset);
VideoReceiveThread(const std::string &id, const std::string &sdp, const bool isReset, uint16_t mtu);
~VideoReceiveThread();
void startLoop();
......@@ -80,6 +80,8 @@ private:
std::shared_ptr<SinkClient> sink_;
std::atomic_bool restartDecoder_;
bool isReset_;
uint16_t mtu_;
void (*requestKeyFrameCallback_)(const std::string &);
void openDecoder();
bool decodeFrame();
......
......@@ -102,7 +102,7 @@ void VideoRtpSession::startSender()
sender_.reset();
socketPair_->stopSendOp(false);
sender_.reset(new VideoSender(getRemoteRtpUri(), localVideoParams_,
send_, *socketPair_, initSeqVal_));
send_, *socketPair_, initSeqVal_, mtu_));
} catch (const MediaEncoderException &e) {
RING_ERR("%s", e.what());
send_.enabled = false;
......@@ -138,8 +138,9 @@ void VideoRtpSession::startReceiver()
isReset = true;
}
receiveThread_.reset(
new VideoReceiveThread(callID_, receive_.receiving_sdp, isReset)
new VideoReceiveThread(callID_, receive_.receiving_sdp, isReset, mtu_)
);
/* ebail: keyframe requests can lead to timeout if they are not answered.
* we decided so to disable them for the moment
receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest);
......
......@@ -37,8 +37,9 @@ using std::string;
VideoSender::VideoSender(const std::string& dest, const DeviceParams& dev,
const MediaDescription& args, SocketPair& socketPair,
const uint16_t seqVal)
: muxContext_(socketPair.createIOContext())
const uint16_t seqVal,
uint16_t mtu)
: muxContext_(socketPair.createIOContext(mtu))
, videoEncoder_(new MediaEncoder)
{
videoEncoder_->setDeviceOptions(dev);
......
......@@ -46,7 +46,8 @@ public:
const DeviceParams& dev,
const MediaDescription& args,
SocketPair& socketPair,
const uint16_t seqVal);
const uint16_t seqVal,
uint16_t mtu);
~VideoSender();
......
......@@ -694,4 +694,10 @@ SipsIceTransport::send(pjsip_tx_data* tdata, const pj_sockaddr_t* rem_addr,
return PJ_EPENDING;
}
uint16_t
SipsIceTransport::getTlsSessionMtu()
{
return tls_->getMtu();
}
}} // namespace ring::tls
......@@ -74,6 +74,9 @@ struct SipsIceTransport
IpAddr getLocalAddress() const { return local_; }
IpAddr getRemoteAddress() const { return remote_; }
// uses the tls_ uniquepointer internal gnutls_session_t, to call its method to get its MTU
uint16_t getTlsSessionMtu();
private:
NON_COPYABLE(SipsIceTransport);
......
This diff is collapsed.
......@@ -39,6 +39,8 @@
#include <vector>
#include <map>
#include <atomic>
#include <iterator>
#include <array>
namespace ring {
class IceTransport;
......@@ -52,10 +54,13 @@ struct PrivateKey;
namespace ring { namespace tls {
static constexpr uint8_t MTUS_TO_TEST = 4; //number of mtus to test in path mtu discovery.
enum class TlsSessionState {
SETUP,
COOKIE, // server only
HANDSHAKE,
MTU_DISCOVERY,
ESTABLISHED,
SHUTDOWN
};
......@@ -171,6 +176,8 @@ public:
ssize_t send(const void* data, std::size_t size);
ssize_t send(const std::vector<uint8_t>& data);
uint16_t getMtu();
private:
using clock = std::chrono::steady_clock;
using StateHandler = std::function<TlsSessionState(TlsSessionState state)>;
......@@ -186,6 +193,7 @@ private:
TlsSessionState handleStateSetup(TlsSessionState state);
TlsSessionState handleStateCookie(TlsSessionState state);
TlsSessionState handleStateHandshake(TlsSessionState state);
TlsSessionState handleStateMtuDiscovery(TlsSessionState state);
TlsSessionState handleStateEstablished(TlsSessionState state);
TlsSessionState handleStateShutdown(TlsSessionState state);
std::map<TlsSessionState, StateHandler> fsmHandlers_ {};
......@@ -235,6 +243,13 @@ private:
bool setup();
void process();
void cleanup();
// Path mtu discovery
std::array<uint16_t, MTUS_TO_TEST>::const_iterator mtuProbe_;
unsigned hbPingRecved_ {0};
bool pmtudOver_ {false};
uint8_t transportOverhead_;
void pathMtuHeartbeat();
};
}} // namespace ring::tls
......@@ -856,11 +856,15 @@ SIPCall::startAllMedia()
continue;
}
auto new_mtu = transport_->getTlsMtu();
avformatrtp_->setMtu(new_mtu);
#ifdef RING_VIDEO
if (local.type == MEDIA_VIDEO)
videortp_->switchInput(videoInput_);
#endif
videortp_->setMtu(new_mtu);
#endif
rtp->updateMedia(remote, local);
// Not restarting media loop on hold as it's a huge waste of CPU ressources
......
......@@ -173,6 +173,12 @@ SipTransport::removeStateListener(uintptr_t lid)
return false;
}
uint16_t
SipTransport::getTlsMtu(){
auto tls_tr = reinterpret_cast<tls::SipsIceTransport::TransportData*>(transport_.get())->self;
return tls_tr->getTlsSessionMtu();
}
SipTransportBroker::SipTransportBroker(pjsip_endpoint *endpt,
pj_caching_pool& cp, pj_pool_t& pool) :
cp_(cp), pool_(pool), endpt_(endpt)
......
......@@ -138,6 +138,8 @@ class SipTransport
/** Only makes sense for connection-oriented transports */
bool isConnected() const noexcept { return connected_; }
uint16_t getTlsMtu();
private:
NON_COPYABLE(SipTransport);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment