Commit e83a1006 authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud

sip: negotiate both UDP and TCP for the control channel

NOTE: SIP over TCP is disabled for now on Windows, waiting for
TLS 1.3 support. To re-enable it, check the #ifdef _WIN32 in
ice_transport.cpp

Our pjsip version supports the RFC6544. With this patch, when
starting a call, the daemon is using two ICE sessions for the SIP
channel. One is negotiating a UDP socket, and the other a TCP socket
and transmits both SDP on the DHT.

If both negotiations succeed, TCP is prefered and will be used
to transmit SIP messages and the VCard. This should solve the 30
seconds timeout on bad networks.

Note that the media channel is still using UDP to transmit audio
and video.

MAJOR CHANGE: the SIP channel use TLS on top of TCP, no DTLS,
so the transport is considered as reliable.

Also lot of changes in rfc6544.patch to link to rfc6062. The patch
needs to be cleaned, cf TODO notes

Also this seems to fix the ICE shutdown at the end of the call
(after the IDLE Timeout)

Change-Id: I55c5f51377fd8787bc951d6d282eec46f8eaf977
Gitlab: #103
Gitlab: #108
parent 8091683e
......@@ -35,7 +35,6 @@ bash -c "%PATCH_CMD% %UNIXPATH%pjproject/fix_ioqueue_ipv6_sendto.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/add_dtls_transport.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/rfc6544.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/ice_config.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/win32_ice_tcp_temp_fix.patch"
%APPLY_CMD% %SRC%\pjproject\win32_vs_gnutls.patch
%APPLY_CMD% %SRC%\pjproject\win_config.patch
......
This diff is collapsed.
From 5f288fe0067f995b91ea87ba4ed19fd65b75ff31 Mon Sep 17 00:00:00 2001
From: Andreas Traczyk <andreas.traczyk@savoirfairelinux.com>
Date: Tue, 11 Jun 2019 16:47:06 -0400
Subject: [PATCH] fix for windows GetAdaptersAddresses
---
pjnath/src/pjnath/ice_strans.c | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c
index 6172172..33ac521 100644
--- a/pjnath/src/pjnath/ice_strans.c
+++ b/pjnath/src/pjnath/ice_strans.c
@@ -1645,9 +1645,7 @@ pj_ice_strans_sendto2(pj_ice_strans *ice_st, unsigned comp_id, const void *data,
dest_addr_len = dst_addr_len;
}
- pj_stun_sock_info stun_sock_info;
- pj_stun_sock_get_info(comp->stun[tp_idx].sock, &stun_sock_info);
- pj_bool_t add_header = stun_sock_info.conn_type != PJ_STUN_TP_UDP;
+ pj_bool_t add_header = comp->ice_st->cfg.stun_tp->conn_type == PJ_STUN_TP_TCP;
if (add_header) {
//TCP
/*
@@ -1864,9 +1862,7 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
if (comp->stun[tp_idx].sock) {
pj_ssize_t sent_size;
- pj_stun_sock_info stun_sock_info;
- pj_stun_sock_get_info(comp->stun[tp_idx].sock, &stun_sock_info);
- pj_bool_t add_header = stun_sock_info.conn_type != PJ_STUN_TP_UDP;
+ pj_bool_t add_header = comp->ice_st->cfg.stun_tp->conn_type == PJ_STUN_TP_TCP;
if (add_header) {
//TCP
/*
--
2.7.4
......@@ -67,12 +67,13 @@ public:
static constexpr uint16_t IPV4_HEADER_SIZE = 20; // Size in bytes of IPv4 packet header
static constexpr uint16_t UDP_HEADER_SIZE = 8; // Size in bytes of UDP header
IceSocketTransport(std::shared_ptr<IceTransport>& ice, int comp_id)
IceSocketTransport(std::shared_ptr<IceTransport>& ice, int comp_id, bool reliable = false)
: compId_ {comp_id}
, ice_ {ice} {}
, ice_ {ice}
, reliable_ {reliable} {}
bool isReliable() const override {
return false; // we consider that a ICE transport is never reliable (UDP support only)
return reliable_;
}
bool isInitiator() const override;
......@@ -94,6 +95,7 @@ public:
private:
const int compId_;
std::shared_ptr<IceTransport> ice_;
bool reliable_;
};
};
......@@ -26,7 +26,6 @@
#include "upnp/upnp_control.h"
#include <pjlib.h>
#include <msgpack.hpp>
#include <map>
#include <atomic>
......@@ -92,8 +91,6 @@ public:
MutexGuard lk{mutex_};
stream_.clear();
stream_ << data;
notified_ = true;
cv_.notify_one();
}
......@@ -111,14 +108,14 @@ public:
MutexGuard lk_api{apiMutex_, std::adopt_lock};
MutexLock lk{mutex_, std::adopt_lock};
auto a = cv_.wait_for(lk, timeout,
[this] { return stop_ or /*(data.size() != 0)*/ !stream_.eof(); });
[this] { return stop_ or !stream_.eof(); });
return a;
}
std::size_t read(char *output, std::size_t size) {
std::lock(apiMutex_, mutex_);
MutexGuard lk_api{apiMutex_, std::adopt_lock};
MutexLock lk{mutex_, std::adopt_lock};
if (stream_.eof()) return 0;
MutexGuard lk_api{apiMutex_, std::adopt_lock};
cv_.wait(lk, [&, this] {
if (stop_)
return true;
......@@ -136,9 +133,6 @@ public:
stop_ = true;
}
cv_.notify_all();
// Make sure that no thread is blocked into read() or wait() methods
MutexGuard lk_api{apiMutex_};
}
private:
......@@ -149,9 +143,6 @@ private:
std::condition_variable cv_{};
std::stringstream stream_{};
bool stop_{false};
bool notified_{false};
std::vector<char> data;
friend void operator<<(std::vector<char> &, PeerChannel &);
};
......@@ -213,6 +204,8 @@ public:
pj_ice_strans_cfg config_;
std::string last_errmsg_;
std::atomic_bool is_stopped_ {false};
struct Packet {
Packet(void *pkt, pj_size_t size)
: data{reinterpret_cast<char *>(pkt), reinterpret_cast<char *>(pkt) + size} { }
......@@ -869,11 +862,12 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size)
if (on_recv_cb_) {
on_recv_cb_();
}
if (io.cb) {
io.cb((uint8_t*)pkt, size);
} else {
MutexLock lk{apiMutex_};
auto &channel = peerChannels_.at(comp_id);
auto &channel = peerChannels_.at(comp_id-1);
lk.unlock();
channel << std::string(reinterpret_cast<const char *>(pkt), size);
}
......@@ -907,6 +901,13 @@ IceTransport::isRunning() const
return pimpl_->_isRunning();
}
bool
IceTransport::isStopped() const
{
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
return pimpl_->is_stopped_;
}
bool
IceTransport::isFailed() const
{
......@@ -950,12 +951,14 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>&
{
if (not isInitialized()) {
JAMI_ERR("[ice:%p] not initialized transport", this);
pimpl_->is_stopped_ = true;
return false;
}
// pj_ice_strans_start_ice crashes if remote candidates array is empty
if (rem_candidates.empty()) {
JAMI_ERR("[ice:%p] start failed: no remote candidates", this);
pimpl_->is_stopped_ = true;
return false;
}
......@@ -969,62 +972,49 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>&
if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] start failed: %s", this, pimpl_->last_errmsg_.c_str());
pimpl_->is_stopped_ = true;
return false;
}
return true;
}
bool
IceTransport::start(const std::vector<uint8_t>& rem_data)
IceTransport::start(const SDP& sdp)
{
std::string rem_ufrag;
std::string rem_pwd;
std::vector<IceCandidate> rem_candidates;
auto data = reinterpret_cast<const char*>(rem_data.data());
auto size = rem_data.size();
try {
std::size_t offset = 0;
auto result = msgpack::unpack(data, size, offset);
auto version = result.get().as<uint8_t>();
JAMI_DBG("[ice:%p] rx msg v%u", this, version);
if (version == 1) {
result = msgpack::unpack(data, size, offset);
std::tie(rem_ufrag, rem_pwd) = result.get().as<std::pair<std::string, std::string>>();
result = msgpack::unpack(data, size, offset);
auto comp_cnt = result.get().as<uint8_t>();
while (comp_cnt-- > 0) {
result = msgpack::unpack(data, size, offset);
IceCandidate cand;
for (const auto& line : result.get().as<std::vector<std::string>>()) {
if (getCandidateFromSDP(line, cand))
rem_candidates.emplace_back(cand);
}
}
} else {
JAMI_ERR("[ice:%p] invalid msg version", this);
return false;
}
} catch (const msgpack::unpack_error& e) {
JAMI_ERR("[ice:%p] remote msg unpack error: %s", this, e.what());
if (not isInitialized()) {
JAMI_ERR("[ice:%p] not initialized transport", this);
pimpl_->is_stopped_ = true;
return false;
}
if (rem_ufrag.empty() or rem_pwd.empty() or rem_candidates.empty()) {
JAMI_ERR("[ice:%p] invalid remote attributes", this);
JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", this, sdp.candidates.size());
pj_str_t ufrag, pwd;
std::vector<IceCandidate> rem_candidates;
rem_candidates.reserve(sdp.candidates.size());
IceCandidate cand;
for (const auto &line : sdp.candidates) {
if (getCandidateFromSDP(line, cand))
rem_candidates.emplace_back(cand);
}
auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(),
pj_strset(&ufrag, (char*)sdp.ufrag.c_str(), sdp.ufrag.size()),
pj_strset(&pwd, (char*)sdp.pwd.c_str(), sdp.pwd.size()),
rem_candidates.size(),
rem_candidates.data());
if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] start failed: %s", this, pimpl_->last_errmsg_.c_str());
pimpl_->is_stopped_ = true;
return false;
}
if (pimpl_->onlyIPv4Private_)
JAMI_WARN("[ice:%p] no public IPv4 found, your connection may fail!", this);
return start({rem_ufrag, rem_pwd}, rem_candidates);
return true;
}
bool
IceTransport::stop()
{
pimpl_->is_stopped_ = true;
if (isStarted()) {
auto status = pj_ice_strans_stop_ice(pimpl_->icest_.get());
if (status != PJ_SUCCESS) {
......@@ -1139,20 +1129,29 @@ IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP)
}
std::vector<uint8_t>
IceTransport::packIceMsg() const
IceTransport::packIceMsg(uint8_t version) const
{
static constexpr uint8_t ICE_MSG_VERSION = 1;
if (not isInitialized())
return {};
std::stringstream ss;
msgpack::pack(ss, ICE_MSG_VERSION);
msgpack::pack(ss, std::make_pair(pimpl_->local_ufrag_, pimpl_->local_pwd_));
msgpack::pack(ss, static_cast<uint8_t>(pimpl_->component_count_));
for (unsigned i=0; i<pimpl_->component_count_; i++)
msgpack::pack(ss, getLocalCandidates(i));
if (version == 1) {
msgpack::pack(ss, version);
msgpack::pack(ss, std::make_pair(pimpl_->local_ufrag_, pimpl_->local_pwd_));
msgpack::pack(ss, static_cast<uint8_t>(pimpl_->component_count_));
for (unsigned i=0; i<pimpl_->component_count_; i++)
msgpack::pack(ss, getLocalCandidates(i));
} else {
SDP sdp;
sdp.ufrag = pimpl_->local_ufrag_;
sdp.pwd = pimpl_->local_pwd_;
for (unsigned i = 0; i < pimpl_->component_count_; i++) {
auto candidates = getLocalCandidates(i);
sdp.candidates.reserve(sdp.candidates.size() + candidates.size());
sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end());
}
msgpack::pack(ss, sdp);
}
auto str(ss.str());
return std::vector<uint8_t>(str.begin(), str.end());
}
......@@ -1372,6 +1371,53 @@ IceTransport::waitForData(int comp_id, unsigned int timeout, std::error_code& ec
return channel.wait(std::chrono::milliseconds(timeout));
}
std::vector<SDP>
IceTransport::parseSDPList(const std::vector<uint8_t>& msg)
{
std::vector<SDP> sdp_list;
msgpack::unpacker pac;
pac.reserve_buffer(msg.size());
memcpy(pac.buffer(), msg.data(), msg.size());
pac.buffer_consumed(msg.size());
msgpack::object_handle oh;
while (auto result = pac.next(oh)) {
try {
SDP sdp;
if (oh.get().type == msgpack::type::POSITIVE_INTEGER) {
// Version 1
result = pac.next(oh);
if (!result) break;
std::tie(sdp.ufrag, sdp.pwd) = oh.get().as<std::pair<std::string, std::string>>();
result = pac.next(oh);
if (!result) break;
auto comp_cnt = oh.get().as<uint8_t>();
while (comp_cnt-- > 0) {
result = pac.next(oh);
if (!result) break;
auto candidates = oh.get().as<std::vector<std::string>>();
sdp.candidates.reserve(sdp.candidates.size() + candidates.size());
sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end());
}
} else {
oh.get().convert(sdp);
}
sdp_list.emplace_back(sdp);
} catch (const msgpack::unpack_error &e) {
break;
}
}
return sdp_list;
}
bool
IceTransport::isTCPEnabled()
{
return pimpl_->config_.protocol == PJ_ICE_TP_TCP;
}
//==============================================================================
IceTransportFactory::IceTransportFactory()
......@@ -1442,6 +1488,7 @@ IceSocketTransport::maxPayload() const
int
IceSocketTransport::waitForData(unsigned ms_timeout, std::error_code& ec) const
{
if (!ice_->isRunning()) return -1;
return ice_->waitForData(compId_, ms_timeout, ec);
}
......@@ -1460,13 +1507,21 @@ IceSocketTransport::write(const ValueType* buf, std::size_t len, std::error_code
std::size_t
IceSocketTransport::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
auto res = ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
if (!ice_->isRunning()) return 0;
try {
auto res = reliable_
? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len)
: ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
}
ec.clear();
return res;
} catch (const std::exception &e) {
JAMI_ERR("IceSocketTransport::read exception: %s", e.what());
}
ec.clear();
return res;
return 0;
}
IpAddr
......
......@@ -29,6 +29,7 @@
#include <functional>
#include <memory>
#include <msgpack.hpp>
#include <vector>
namespace jami {
......@@ -73,6 +74,14 @@ struct IceTransportOptions {
bool aggressive {false}; // If we use the aggressive nomination strategy
};
struct SDP {
std::string ufrag;
std::string pwd;
std::vector<std::string> candidates;
MSGPACK_DEFINE(ufrag, pwd, candidates)
};
class IceTransport {
public:
using Attribute = struct {
......@@ -85,7 +94,6 @@ public:
*/
IceTransport(const char* name, int component_count, bool master,
const IceTransportOptions& options = {});
/**
* Get current state
*/
......@@ -100,7 +108,7 @@ public:
*/
bool start(const Attribute& rem_attrs,
const std::vector<IceCandidate>& rem_candidates);
bool start(const std::vector<uint8_t>& attrs_candidates);
bool start(const SDP& sdp);
/**
* Stop a started or completed transport.
......@@ -125,6 +133,12 @@ public:
*/
bool isRunning() const;
/**
* Return true if a start operations fails or if stop() has been called
* [mutex protected]
*/
bool isStopped() const;
/**
* Returns true if ICE transport is in failure state
* [mutex protected]
......@@ -156,7 +170,7 @@ public:
/**
* Returns serialized ICE attributes and candidates.
*/
std::vector<uint8_t> packIceMsg() const;
std::vector<uint8_t> packIceMsg(uint8_t version = 1) const;
bool getCandidateFromSDP(const std::string& line, IceCandidate& cand);
......@@ -188,6 +202,15 @@ public:
bool setSlaveSession();
bool setInitiatorSession();
/**
* Get SDP messages list
* @param msg The payload to parse
* @return the list of SDP messages
*/
static std::vector<SDP> parseSDPList(const std::vector<uint8_t>& msg);
bool isTCPEnabled();
private:
class Impl;
std::unique_ptr<Impl> pimpl_;
......
......@@ -138,6 +138,7 @@ struct JamiAccount::PendingCall
{
std::chrono::steady_clock::time_point start;
std::shared_ptr<IceTransport> ice_sp;
std::shared_ptr<IceTransport> ice_tcp_sp;
std::weak_ptr<SIPCall> call;
std::future<size_t> listen_key;
dht::InfoHash call_key;
......@@ -411,6 +412,30 @@ JamiAccount::newOutgoingCall(const std::string& toUrl,
return call;
}
void
initICE(const std::vector<uint8_t> &msg, const std::shared_ptr<IceTransport> &ice,
const std::shared_ptr<IceTransport> &ice_tcp, bool &udp_failed, bool &tcp_failed)
{
auto sdp_list = IceTransport::parseSDPList(msg);
for (const auto &sdp : sdp_list) {
if (sdp.candidates.size() > 0) {
if (sdp.candidates[0].find("TCP") != std::string::npos) {
// It is a SDP for the TCP component
tcp_failed = (ice_tcp && !ice_tcp->start(sdp));
} else {
// For UDP
udp_failed = (ice && !ice->start(sdp));
}
}
}
// During the ICE reply we can start the ICE negotiation
if (tcp_failed) {
ice_tcp->stop();
JAMI_WARN("ICE over TCP not started, will only use UDP");
}
}
void
JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::string& toUri)
{
......@@ -455,9 +480,21 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
return;
}
auto ice_config = getIceOptions();
ice_config.tcpEnable = true;
ice_config.aggressive = true; // This will directly select the first candidate.
auto ice_tcp =
#ifdef _WIN32
std::shared_ptr<IceTransport>(nullptr);
#else
createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config);
#endif
if (not ice_tcp) {
JAMI_WARN("Can't create ICE over TCP, will only use UDP");
}
call->addSubCall(*dev_call);
manager.addTask([sthis=shared(), weak_dev_call, ice, dev, toUri, peer_account] {
manager.addTask([sthis=shared(), weak_dev_call, ice, ice_tcp, dev, toUri, peer_account] {
auto call = weak_dev_call.lock();
// call aborted?
......@@ -470,18 +507,27 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
return false;
}
if (ice_tcp && ice_tcp->isFailed()) {
JAMI_WARN("[call:%s] ice tcp init failed, will only use UDP", call->getCallId().c_str());
}
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborded (our first check upper).
if (not ice->isInitialized())
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
return true;
sthis->registerDhtAddress(*ice);
if (ice_tcp) sthis->registerDhtAddress(*ice_tcp);
// Next step: sent the ICE data to peer through DHT
const dht::Value::Id callvid = ValueIdDist()(sthis->rand);
const auto callkey = dht::InfoHash::get("callto:" + dev.toString());
dht::Value val { dht::IceCandidates(callvid, ice->packIceMsg()) };
auto blob = ice->packIceMsg();
if (ice_tcp) {
auto ice_tcp_msg = ice_tcp->packIceMsg(2);
blob.insert(blob.end(), ice_tcp_msg.begin(), ice_tcp_msg.end());
}
dht::Value val { dht::IceCandidates(callvid, blob) };
sthis->dht_.putEncrypted(
callkey, dev,
......@@ -498,7 +544,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
auto listenKey = sthis->dht_.listen<dht::IceCandidates>(
callkey,
[weak_dev_call, ice, callvid, dev] (dht::IceCandidates&& msg) {
[weak_dev_call, ice, ice_tcp, callvid, dev] (dht::IceCandidates&& msg) {
if (msg.id != callvid or msg.from != dev)
return true;
// remove unprintable characters
......@@ -509,7 +555,10 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
JAMI_WARN("ICE request replied from DHT peer %s\nData: %s", dev.toString().c_str(), iceData.c_str());
if (auto call = weak_dev_call.lock()) {
call->setState(Call::ConnectionState::PROGRESSING);
if (!ice->start(msg.ice_data)) {
auto udp_failed = true, tcp_failed = true;
initICE(msg.ice_data, ice, ice_tcp, udp_failed, tcp_failed);
if (udp_failed && tcp_failed) {
call->onFailure();
return true;
}
......@@ -521,7 +570,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
std::lock_guard<std::mutex> lock(sthis->callsMutex_);
sthis->pendingCalls_.emplace_back(PendingCall{
std::chrono::steady_clock::now(),
ice, weak_dev_call,
ice, ice_tcp, weak_dev_call,
std::move(listenKey),
callkey,
dev,
......@@ -1823,25 +1872,47 @@ bool
JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
{
auto call = pc.call.lock();
if (not call)
// Cleanup pending call if call is over (cancelled by user or any other reason)
if (not call || call->getState() == Call::CallState::OVER)
return true;
auto ice = pc.ice_sp.get();
if (not ice or ice->isFailed()) {
JAMI_ERR("[call:%s] Null or failed ICE transport", call->getCallId().c_str());
if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) {
JAMI_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str());
call->onFailure();
return true;
}
// Return to pending list if not negotiated yet and not in timeout
if (not ice->isRunning()) {
if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) {
JAMI_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str());
call->onFailure();
return true;
auto ice_tcp = pc.ice_tcp_sp.get();
auto ice = pc.ice_sp.get();
bool tcp_finished = ice_tcp == nullptr || ice_tcp->isStopped();
bool udp_finished = ice == nullptr || ice->isStopped();
if (not udp_finished and ice->isFailed()) {
udp_finished = true;
}
if (not tcp_finished and ice_tcp->isFailed()) {
tcp_finished = true;
}
// At least wait for TCP
if (not tcp_finished and not ice_tcp->isRunning()) {
return false;
} else if (