Skip to content
Snippets Groups Projects
Commit 4bade6fa authored by Sébastien Blin's avatar Sébastien Blin
Browse files

sip: negotiate both UDP and TCP for the control channel

NOTE: SIP over TCP is disabled for now on Windows, waiting for
TLS 1.3 support. To re-enable it, check the #ifdef _WIN32 in
ice_transport.cpp

Our pjsip version supports the RFC6544. With this patch, when
starting a call, the daemon is using two ICE sessions for the SIP
channel. One is negotiating a UDP socket, and the other a TCP socket
and transmits both SDP on the DHT.

If both negotiations succeed, TCP is prefered and will be used
to transmit SIP messages and the VCard. This should solve the 30
seconds timeout on bad networks.

Note that the media channel is still using UDP to transmit audio
and video.

MAJOR CHANGE: the SIP channel use TLS on top of TCP, no DTLS,
so the transport is considered as reliable.

Also lot of changes in rfc6544.patch to link to rfc6062. The patch
needs to be cleaned, cf TODO notes

Also this seems to fix the ICE shutdown at the end of the call
(after the IDLE Timeout)

Change-Id: I01210da3abfcc448071268b4e1e38abdd58f9f05
Gitlab: #103
Gitlab: #108
parent 5fdb9649
No related branches found
No related tags found
No related merge requests found
......@@ -35,7 +35,6 @@ bash -c "%PATCH_CMD% %UNIXPATH%pjproject/fix_ioqueue_ipv6_sendto.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/add_dtls_transport.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/rfc6544.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/ice_config.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/win32_ice_tcp_temp_fix.patch"
%APPLY_CMD% %SRC%\pjproject\win32_vs_gnutls.patch
%APPLY_CMD% %SRC%\pjproject\win_config.patch
......
This diff is collapsed.
From 5f288fe0067f995b91ea87ba4ed19fd65b75ff31 Mon Sep 17 00:00:00 2001
From: Andreas Traczyk <andreas.traczyk@savoirfairelinux.com>
Date: Tue, 11 Jun 2019 16:47:06 -0400
Subject: [PATCH] fix for windows GetAdaptersAddresses
---
pjnath/src/pjnath/ice_strans.c | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c
index 6172172..33ac521 100644
--- a/pjnath/src/pjnath/ice_strans.c
+++ b/pjnath/src/pjnath/ice_strans.c
@@ -1645,9 +1645,7 @@ pj_ice_strans_sendto2(pj_ice_strans *ice_st, unsigned comp_id, const void *data,
dest_addr_len = dst_addr_len;
}
- pj_stun_sock_info stun_sock_info;
- pj_stun_sock_get_info(comp->stun[tp_idx].sock, &stun_sock_info);
- pj_bool_t add_header = stun_sock_info.conn_type != PJ_STUN_TP_UDP;
+ pj_bool_t add_header = comp->ice_st->cfg.stun_tp->conn_type == PJ_STUN_TP_TCP;
if (add_header) {
//TCP
/*
@@ -1864,9 +1862,7 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
if (comp->stun[tp_idx].sock) {
pj_ssize_t sent_size;
- pj_stun_sock_info stun_sock_info;
- pj_stun_sock_get_info(comp->stun[tp_idx].sock, &stun_sock_info);
- pj_bool_t add_header = stun_sock_info.conn_type != PJ_STUN_TP_UDP;
+ pj_bool_t add_header = comp->ice_st->cfg.stun_tp->conn_type == PJ_STUN_TP_TCP;
if (add_header) {
//TCP
/*
--
2.7.4
......@@ -67,14 +67,17 @@ public:
static constexpr uint16_t IPV4_HEADER_SIZE = 20; // Size in bytes of IPv4 packet header
static constexpr uint16_t UDP_HEADER_SIZE = 8; // Size in bytes of UDP header
IceSocketTransport(std::shared_ptr<IceTransport>& ice, int comp_id)
IceSocketTransport(std::shared_ptr<IceTransport>& ice, int comp_id, bool reliable = false)
: compId_ {comp_id}
, ice_ {ice} {}
, ice_ {ice}
, reliable_ {reliable} {}
bool isReliable() const override {
return false; // we consider that a ICE transport is never reliable (UDP support only)
return reliable_;
}
void shutdown() override;
bool isInitiator() const override;
int maxPayload() const override;
......@@ -94,6 +97,7 @@ public:
private:
const int compId_;
std::shared_ptr<IceTransport> ice_;
bool reliable_;
};
};
......@@ -26,7 +26,6 @@
#include "upnp/upnp_control.h"
#include <pjlib.h>
#include <msgpack.hpp>
#include <map>
#include <atomic>
......@@ -92,8 +91,6 @@ public:
MutexGuard lk{mutex_};
stream_.clear();
stream_ << data;
notified_ = true;
cv_.notify_one();
}
......@@ -107,18 +104,15 @@ public:
}
template <typename Duration> bool wait(Duration timeout) {
std::lock(apiMutex_, mutex_);
MutexGuard lk_api{apiMutex_, std::adopt_lock};
MutexLock lk{mutex_, std::adopt_lock};
MutexLock lk{mutex_};
auto a = cv_.wait_for(lk, timeout,
[this] { return stop_ or /*(data.size() != 0)*/ !stream_.eof(); });
[this] { return stop_ or !stream_.eof(); });
return a;
}
std::size_t read(char *output, std::size_t size) {
std::lock(apiMutex_, mutex_);
MutexGuard lk_api{apiMutex_, std::adopt_lock};
MutexLock lk{mutex_, std::adopt_lock};
MutexLock lk{mutex_};
if (stream_.eof()) return 0;
cv_.wait(lk, [&, this] {
if (stop_)
return true;
......@@ -129,29 +123,17 @@ public:
}
void stop() noexcept {
{
MutexGuard lk{mutex_};
if (stop_)
return;
stop_ = true;
}
stop_ = true;
cv_.notify_all();
// Make sure that no thread is blocked into read() or wait() methods
MutexGuard lk_api{apiMutex_};
}
private:
PeerChannel(const PeerChannel &o) = delete;
PeerChannel &operator=(const PeerChannel &o) = delete;
std::mutex apiMutex_{};
std::mutex mutex_{};
std::condition_variable cv_{};
std::stringstream stream_{};
bool stop_{false};
bool notified_{false};
std::vector<char> data;
friend void operator<<(std::vector<char> &, PeerChannel &);
};
......@@ -213,6 +195,8 @@ public:
pj_ice_strans_cfg config_;
std::string last_errmsg_;
std::atomic_bool is_stopped_ {false};
struct Packet {
Packet(void *pkt, pj_size_t size)
: data{reinterpret_cast<char *>(pkt), reinterpret_cast<char *>(pkt) + size} { }
......@@ -220,7 +204,6 @@ public:
};
std::vector<PeerChannel> peerChannels_;
std::mutex apiMutex_;
struct ComponentIO {
std::mutex mutex;
......@@ -869,13 +852,11 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size)
if (on_recv_cb_) {
on_recv_cb_();
}
if (io.cb) {
io.cb((uint8_t*)pkt, size);
} else {
MutexLock lk{apiMutex_};
auto &channel = peerChannels_.at(comp_id);
lk.unlock();
channel << std::string(reinterpret_cast<const char *>(pkt), size);
peerChannels_.at(comp_id-1) << std::string(reinterpret_cast<const char *>(pkt), size);
}
}
......@@ -907,6 +888,13 @@ IceTransport::isRunning() const
return pimpl_->_isRunning();
}
bool
IceTransport::isStopped() const
{
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
return pimpl_->is_stopped_;
}
bool
IceTransport::isFailed() const
{
......@@ -950,12 +938,14 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>&
{
if (not isInitialized()) {
JAMI_ERR("[ice:%p] not initialized transport", this);
pimpl_->is_stopped_ = true;
return false;
}
// pj_ice_strans_start_ice crashes if remote candidates array is empty
if (rem_candidates.empty()) {
JAMI_ERR("[ice:%p] start failed: no remote candidates", this);
pimpl_->is_stopped_ = true;
return false;
}
......@@ -969,62 +959,49 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>&
if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] start failed: %s", this, pimpl_->last_errmsg_.c_str());
pimpl_->is_stopped_ = true;
return false;
}
return true;
}
bool
IceTransport::start(const std::vector<uint8_t>& rem_data)
IceTransport::start(const SDP& sdp)
{
std::string rem_ufrag;
std::string rem_pwd;
std::vector<IceCandidate> rem_candidates;
auto data = reinterpret_cast<const char*>(rem_data.data());
auto size = rem_data.size();
try {
std::size_t offset = 0;
auto result = msgpack::unpack(data, size, offset);
auto version = result.get().as<uint8_t>();
JAMI_DBG("[ice:%p] rx msg v%u", this, version);
if (version == 1) {
result = msgpack::unpack(data, size, offset);
std::tie(rem_ufrag, rem_pwd) = result.get().as<std::pair<std::string, std::string>>();
result = msgpack::unpack(data, size, offset);
auto comp_cnt = result.get().as<uint8_t>();
while (comp_cnt-- > 0) {
result = msgpack::unpack(data, size, offset);
IceCandidate cand;
for (const auto& line : result.get().as<std::vector<std::string>>()) {
if (getCandidateFromSDP(line, cand))
rem_candidates.emplace_back(cand);
}
}
} else {
JAMI_ERR("[ice:%p] invalid msg version", this);
return false;
}
} catch (const msgpack::unpack_error& e) {
JAMI_ERR("[ice:%p] remote msg unpack error: %s", this, e.what());
if (not isInitialized()) {
JAMI_ERR("[ice:%p] not initialized transport", this);
pimpl_->is_stopped_ = true;
return false;
}
if (rem_ufrag.empty() or rem_pwd.empty() or rem_candidates.empty()) {
JAMI_ERR("[ice:%p] invalid remote attributes", this);
JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", this, sdp.candidates.size());
pj_str_t ufrag, pwd;
std::vector<IceCandidate> rem_candidates;
rem_candidates.reserve(sdp.candidates.size());
IceCandidate cand;
for (const auto &line : sdp.candidates) {
if (getCandidateFromSDP(line, cand))
rem_candidates.emplace_back(cand);
}
auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(),
pj_strset(&ufrag, (char*)sdp.ufrag.c_str(), sdp.ufrag.size()),
pj_strset(&pwd, (char*)sdp.pwd.c_str(), sdp.pwd.size()),
rem_candidates.size(),
rem_candidates.data());
if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] start failed: %s", this, pimpl_->last_errmsg_.c_str());
pimpl_->is_stopped_ = true;
return false;
}
if (pimpl_->onlyIPv4Private_)
JAMI_WARN("[ice:%p] no public IPv4 found, your connection may fail!", this);
return start({rem_ufrag, rem_pwd}, rem_candidates);
return true;
}
bool
IceTransport::stop()
{
pimpl_->is_stopped_ = true;
if (isStarted()) {
auto status = pj_ice_strans_stop_ice(pimpl_->icest_.get());
if (status != PJ_SUCCESS) {
......@@ -1036,6 +1013,14 @@ IceTransport::stop()
return true;
}
void
IceTransport::cancelOperations()
{
for (auto& c: pimpl_->peerChannels_) {
c.stop();
}
}
IpAddr
IceTransport::getLocalAddress(unsigned comp_id) const
{
......@@ -1139,20 +1124,29 @@ IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP)
}
std::vector<uint8_t>
IceTransport::packIceMsg() const
IceTransport::packIceMsg(uint8_t version) const
{
static constexpr uint8_t ICE_MSG_VERSION = 1;
if (not isInitialized())
return {};
std::stringstream ss;
msgpack::pack(ss, ICE_MSG_VERSION);
msgpack::pack(ss, std::make_pair(pimpl_->local_ufrag_, pimpl_->local_pwd_));
msgpack::pack(ss, static_cast<uint8_t>(pimpl_->component_count_));
for (unsigned i=0; i<pimpl_->component_count_; i++)
msgpack::pack(ss, getLocalCandidates(i));
if (version == 1) {
msgpack::pack(ss, version);
msgpack::pack(ss, std::make_pair(pimpl_->local_ufrag_, pimpl_->local_pwd_));
msgpack::pack(ss, static_cast<uint8_t>(pimpl_->component_count_));
for (unsigned i=0; i<pimpl_->component_count_; i++)
msgpack::pack(ss, getLocalCandidates(i));
} else {
SDP sdp;
sdp.ufrag = pimpl_->local_ufrag_;
sdp.pwd = pimpl_->local_pwd_;
for (unsigned i = 0; i < pimpl_->component_count_; i++) {
auto candidates = getLocalCandidates(i);
sdp.candidates.reserve(sdp.candidates.size() + candidates.size());
sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end());
}
msgpack::pack(ss, sdp);
}
auto str(ss.str());
return std::vector<uint8_t>(str.begin(), str.end());
}
......@@ -1274,10 +1268,7 @@ IceTransport::recv(int comp_id, unsigned char* buf, size_t len)
ssize_t
IceTransport::recvfrom(int comp_id, char *buf, size_t len) {
MutexLock lk{pimpl_->apiMutex_};
auto &channel = pimpl_->peerChannels_.at(comp_id);
lk.unlock();
return channel.read(buf, len);
return pimpl_->peerChannels_.at(comp_id).read(buf, len);
}
void
......@@ -1357,19 +1348,60 @@ IceTransport::waitForNegotiation(unsigned timeout)
ssize_t
IceTransport::isDataAvailable(int comp_id)
{
MutexLock lk{pimpl_->apiMutex_};
auto &channel = pimpl_->peerChannels_.at(comp_id);
lk.unlock();
return channel.isDataAvailable();
return pimpl_->peerChannels_.at(comp_id).isDataAvailable();
}
ssize_t
IceTransport::waitForData(int comp_id, unsigned int timeout, std::error_code& ec)
{
MutexLock lk{pimpl_->apiMutex_};
auto &channel = pimpl_->peerChannels_.at(comp_id);
lk.unlock();
return channel.wait(std::chrono::milliseconds(timeout));
return pimpl_->peerChannels_.at(comp_id).wait(std::chrono::milliseconds(timeout));
}
std::vector<SDP>
IceTransport::parseSDPList(const std::vector<uint8_t>& msg)
{
std::vector<SDP> sdp_list;
msgpack::unpacker pac;
pac.reserve_buffer(msg.size());
memcpy(pac.buffer(), msg.data(), msg.size());
pac.buffer_consumed(msg.size());
msgpack::object_handle oh;
while (auto result = pac.next(oh)) {
try {
SDP sdp;
if (oh.get().type == msgpack::type::POSITIVE_INTEGER) {
// Version 1
result = pac.next(oh);
if (!result) break;
std::tie(sdp.ufrag, sdp.pwd) = oh.get().as<std::pair<std::string, std::string>>();
result = pac.next(oh);
if (!result) break;
auto comp_cnt = oh.get().as<uint8_t>();
while (comp_cnt-- > 0) {
result = pac.next(oh);
if (!result) break;
auto candidates = oh.get().as<std::vector<std::string>>();
sdp.candidates.reserve(sdp.candidates.size() + candidates.size());
sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end());
}
} else {
oh.get().convert(sdp);
}
sdp_list.emplace_back(sdp);
} catch (const msgpack::unpack_error &e) {
break;
}
}
return sdp_list;
}
bool
IceTransport::isTCPEnabled()
{
return pimpl_->config_.protocol == PJ_ICE_TP_TCP;
}
//==============================================================================
......@@ -1431,6 +1463,12 @@ IceSocketTransport::isInitiator() const
return ice_->isInitiator();
}
void
IceSocketTransport::shutdown()
{
ice_->cancelOperations();
}
int
IceSocketTransport::maxPayload() const
{
......@@ -1442,6 +1480,7 @@ IceSocketTransport::maxPayload() const
int
IceSocketTransport::waitForData(unsigned ms_timeout, std::error_code& ec) const
{
if (!ice_->isRunning()) return -1;
return ice_->waitForData(compId_, ms_timeout, ec);
}
......@@ -1460,13 +1499,21 @@ IceSocketTransport::write(const ValueType* buf, std::size_t len, std::error_code
std::size_t
IceSocketTransport::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
auto res = ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
if (!ice_->isRunning()) return 0;
try {
auto res = reliable_
? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len)
: ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
}
ec.clear();
return res;
} catch (const std::exception &e) {
JAMI_ERR("IceSocketTransport::read exception: %s", e.what());
}
ec.clear();
return res;
return 0;
}
IpAddr
......
......@@ -29,6 +29,7 @@
#include <functional>
#include <memory>
#include <msgpack.hpp>
#include <vector>
namespace jami {
......@@ -73,6 +74,14 @@ struct IceTransportOptions {
bool aggressive {false}; // If we use the aggressive nomination strategy
};
struct SDP {
std::string ufrag;
std::string pwd;
std::vector<std::string> candidates;
MSGPACK_DEFINE(ufrag, pwd, candidates)
};
class IceTransport {
public:
using Attribute = struct {
......@@ -85,7 +94,6 @@ public:
*/
IceTransport(const char* name, int component_count, bool master,
const IceTransportOptions& options = {});
/**
* Get current state
*/
......@@ -100,13 +108,18 @@ public:
*/
bool start(const Attribute& rem_attrs,
const std::vector<IceCandidate>& rem_candidates);
bool start(const std::vector<uint8_t>& attrs_candidates);
bool start(const SDP& sdp);
/**
* Stop a started or completed transport.
*/
bool stop();
/**
* Cancel operations
*/
void cancelOperations();
/**
* Returns true if ICE transport has been initialized
* [mutex protected]
......@@ -125,6 +138,12 @@ public:
*/
bool isRunning() const;
/**
* Return true if a start operations fails or if stop() has been called
* [mutex protected]
*/
bool isStopped() const;
/**
* Returns true if ICE transport is in failure state
* [mutex protected]
......@@ -156,7 +175,7 @@ public:
/**
* Returns serialized ICE attributes and candidates.
*/
std::vector<uint8_t> packIceMsg() const;
std::vector<uint8_t> packIceMsg(uint8_t version = 1) const;
bool getCandidateFromSDP(const std::string& line, IceCandidate& cand);
......@@ -188,6 +207,15 @@ public:
bool setSlaveSession();
bool setInitiatorSession();
/**
* Get SDP messages list
* @param msg The payload to parse
* @return the list of SDP messages
*/
static std::vector<SDP> parseSDPList(const std::vector<uint8_t>& msg);
bool isTCPEnabled();
private:
class Impl;
std::unique_ptr<Impl> pimpl_;
......
......@@ -131,6 +131,7 @@ ip_utils::getAnyHostAddr(pj_uint16_t family)
IpAddr
ip_utils::getLocalAddr(pj_uint16_t family)
{
sip_utils::register_thread();
if (family == pj_AF_UNSPEC()) {
family = pj_AF_INET6();
}
......
......@@ -138,6 +138,7 @@ struct JamiAccount::PendingCall
{
std::chrono::steady_clock::time_point start;
std::shared_ptr<IceTransport> ice_sp;
std::shared_ptr<IceTransport> ice_tcp_sp;
std::weak_ptr<SIPCall> call;
std::future<size_t> listen_key;
dht::InfoHash call_key;
......@@ -411,6 +412,30 @@ JamiAccount::newOutgoingCall(const std::string& toUrl,
return call;
}
void
initICE(const std::vector<uint8_t> &msg, const std::shared_ptr<IceTransport> &ice,
const std::shared_ptr<IceTransport> &ice_tcp, bool &udp_failed, bool &tcp_failed)
{
auto sdp_list = IceTransport::parseSDPList(msg);
for (const auto &sdp : sdp_list) {
if (sdp.candidates.size() > 0) {
if (sdp.candidates[0].find("TCP") != std::string::npos) {
// It is a SDP for the TCP component
tcp_failed = (ice_tcp && !ice_tcp->start(sdp));
} else {
// For UDP
udp_failed = (ice && !ice->start(sdp));
}
}
}
// During the ICE reply we can start the ICE negotiation
if (tcp_failed && ice_tcp) {
ice_tcp->stop();
JAMI_WARN("ICE over TCP not started, will only use UDP");
}
}
void
JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::string& toUri)
{
......@@ -455,9 +480,16 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
return;
}
auto ice_config = getIceOptions();
ice_config.tcpEnable = true;
ice_config.aggressive = true; // This will directly select the first candidate.
auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config);
if (not ice_tcp) {
JAMI_WARN("Can't create ICE over TCP, will only use UDP");
}
call->addSubCall(*dev_call);
manager.addTask([sthis=shared(), weak_dev_call, ice, dev, toUri, peer_account] {
manager.addTask([sthis=shared(), weak_dev_call, ice, ice_tcp, dev, toUri, peer_account] {
auto call = weak_dev_call.lock();
// call aborted?
......@@ -470,18 +502,27 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
return false;
}
if (ice_tcp && ice_tcp->isFailed()) {
JAMI_WARN("[call:%s] ice tcp init failed, will only use UDP", call->getCallId().c_str());
}
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborded (our first check upper).
if (not ice->isInitialized())
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
return true;
sthis->registerDhtAddress(*ice);
if (ice_tcp) sthis->registerDhtAddress(*ice_tcp);
// Next step: sent the ICE data to peer through DHT
const dht::Value::Id callvid = ValueIdDist()(sthis->rand);
const auto callkey = dht::InfoHash::get("callto:" + dev.toString());
dht::Value val { dht::IceCandidates(callvid, ice->packIceMsg()) };
auto blob = ice->packIceMsg();
if (ice_tcp) {
auto ice_tcp_msg = ice_tcp->packIceMsg(2);
blob.insert(blob.end(), ice_tcp_msg.begin(), ice_tcp_msg.end());
}
dht::Value val { dht::IceCandidates(callvid, blob) };
sthis->dht_.putEncrypted(
callkey, dev,
......@@ -498,7 +539,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
auto listenKey = sthis->dht_.listen<dht::IceCandidates>(
callkey,
[weak_dev_call, ice, callvid, dev] (dht::IceCandidates&& msg) {
[weak_dev_call, ice, ice_tcp, callvid, dev] (dht::IceCandidates&& msg) {
if (msg.id != callvid or msg.from != dev)
return true;
// remove unprintable characters
......@@ -509,7 +550,10 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
JAMI_WARN("ICE request replied from DHT peer %s\nData: %s", dev.toString().c_str(), iceData.c_str());
if (auto call = weak_dev_call.lock()) {
call->setState(Call::ConnectionState::PROGRESSING);
if (!ice->start(msg.ice_data)) {
auto udp_failed = true, tcp_failed = true;
initICE(msg.ice_data, ice, ice_tcp, udp_failed, tcp_failed);
if (udp_failed && tcp_failed) {
call->onFailure();
return true;
}
......@@ -521,7 +565,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
std::lock_guard<std::mutex> lock(sthis->callsMutex_);
sthis->pendingCalls_.emplace_back(PendingCall{
std::chrono::steady_clock::now(),
ice, weak_dev_call,
ice, ice_tcp, weak_dev_call,
std::move(listenKey),
callkey,
dev,
......@@ -1823,25 +1867,47 @@ bool
JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
{
auto call = pc.call.lock();
if (not call)
// Cleanup pending call if call is over (cancelled by user or any other reason)
if (not call || call->getState() == Call::CallState::OVER)
return true;
auto ice = pc.ice_sp.get();
if (not ice or ice->isFailed()) {
JAMI_ERR("[call:%s] Null or failed ICE transport", call->getCallId().c_str());
if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) {
JAMI_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str());
call->onFailure();
return true;
}
// Return to pending list if not negotiated yet and not in timeout
if (not ice->isRunning()) {
if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) {
JAMI_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str());
call->onFailure();
return true;
auto ice_tcp = pc.ice_tcp_sp.get();
auto ice = pc.ice_sp.get();
bool tcp_finished = ice_tcp == nullptr || ice_tcp->isStopped();
bool udp_finished = ice == nullptr || ice->isStopped();
if (not udp_finished and ice->isFailed()) {
udp_finished = true;
}
if (not tcp_finished and ice_tcp->isFailed()) {
tcp_finished = true;
}
// At least wait for TCP
if (not tcp_finished and not ice_tcp->isRunning()) {
return false;
} else if (tcp_finished and (not ice_tcp or not ice_tcp->isRunning())) {
// If TCP is finished but not running, wait for UDP
if (not udp_finished and ice and not ice->isRunning()) {
return false;
}
// Cleanup pending call if call is over (cancelled by user or any other reason)
return call->getState() == Call::CallState::OVER;
}
udp_finished = ice && ice->isRunning();
tcp_finished = ice_tcp && ice_tcp->isRunning();
// If both transport are not running, the negotiation failed
if (not udp_finished and not tcp_finished) {
JAMI_ERR("[call:%s] Both ICE negotations failed", call->getCallId().c_str());
call->onFailure();
return true;
}
// Securize a SIP transport with TLS (on top of ICE tranport) and assign the call with it
......@@ -1893,9 +1959,15 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
}
};
auto best_transport = pc.ice_tcp_sp;
if (!tcp_finished) {
JAMI_DBG("TCP not running, will use SIP over UDP");
best_transport = pc.ice_sp;
}
// Following can create a transport that need to be negotiated (TLS).
// This is a asynchronous task. So we're going to process the SIP after this negotiation.
auto transport = link_->sipTransportBroker->getTlsIceTransport(pc.ice_sp,
auto transport = link_->sipTransportBroker->getTlsIceTransport(best_transport,
ICE_COMP_SIP_TRANSPORT,
tlsParams);
if (!transport)
......@@ -1910,7 +1982,7 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
// Be acknowledged on transport connection/disconnection
auto lid = reinterpret_cast<uintptr_t>(this);
auto remote_id = remote_device.toString();
auto remote_addr = ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT);
auto remote_addr = best_transport->getRemoteAddress(ICE_COMP_SIP_TRANSPORT);
auto& tr_self = *transport;
transport->addStateListener(lid,
[&tr_self, lid, wcall, waccount, remote_id, remote_addr](pjsip_transport_state state,
......@@ -1933,7 +2005,7 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
call->setState(Call::ConnectionState::PROGRESSING);
return true;
}
}
bool
JamiAccount::mapPortUPnP()
......@@ -2481,9 +2553,13 @@ JamiAccount::incomingCall(dht::IceCandidates&& msg, const std::shared_ptr<dht::c
{
auto call = Manager::instance().callFactory.newCall<SIPCall, JamiAccount>(*this, Manager::instance().getNewCallID(), Call::CallType::INCOMING);
auto ice = createIceTransport(("sip:"+call->getCallId()).c_str(), ICE_COMPONENTS, false, getIceOptions());
auto ice_config = getIceOptions();
ice_config.tcpEnable = true;
ice_config.aggressive = true; // This will directly select the first candidate.
auto ice_tcp = createIceTransport(("sip:" + call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config);
std::weak_ptr<SIPCall> wcall = call;
Manager::instance().addTask([account=shared(), wcall, ice, msg, from_cert, from] {
Manager::instance().addTask([account=shared(), wcall, ice, ice_tcp, msg, from_cert, from] {
auto call = wcall.lock();
// call aborted?
......@@ -2499,10 +2575,10 @@ JamiAccount::incomingCall(dht::IceCandidates&& msg, const std::shared_ptr<dht::c
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborted (our first check upper).
if (not ice->isInitialized())
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
return true;
account->replyToIncomingIceMsg(call, ice, msg, from_cert, from);
account->replyToIncomingIceMsg(call, ice, ice_tcp, msg, from_cert, from);
return false;
});
}
......@@ -2575,6 +2651,7 @@ JamiAccount::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& cr
void
JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
const std::shared_ptr<IceTransport>& ice,
const std::shared_ptr<IceTransport>& ice_tcp,
const dht::IceCandidates& peer_ice_msg,
const std::shared_ptr<dht::crypto::Certificate>& from_cert,
const dht::InfoHash& from_id)
......@@ -2591,13 +2668,20 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
}
});
#endif
registerDhtAddress(*ice);
if (ice_tcp) registerDhtAddress(*ice_tcp);
auto blob = ice->packIceMsg();
if (ice_tcp) {
auto ice_tcp_msg = ice_tcp->packIceMsg(2);
blob.insert(blob.end(), ice_tcp_msg.begin(), ice_tcp_msg.end());
}
// Asynchronous DHT put of our local ICE data
dht_.putEncrypted(
callKey_,
peer_ice_msg.from,
dht::Value {dht::IceCandidates(peer_ice_msg.id, ice->packIceMsg())},
dht::Value {dht::IceCandidates(peer_ice_msg.id, blob)},
[wcall](bool ok) {
if (!ok) {
JAMI_WARN("Can't put ICE descriptor reply on DHT");
......@@ -2609,8 +2693,11 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
auto started_time = std::chrono::steady_clock::now();
// During the ICE reply we can start the ICE negotiation
if (!ice->start(peer_ice_msg.ice_data)) {
auto sdp_list = IceTransport::parseSDPList(peer_ice_msg.ice_data);
auto udp_failed = true, tcp_failed = true;
initICE(peer_ice_msg.ice_data, ice, ice_tcp, udp_failed, tcp_failed);
if (udp_failed && tcp_failed) {
call->onFailure(EIO);
return;
}
......@@ -2620,15 +2707,16 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
// Let the call handled by the PendingCall handler loop
{
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace_back(PendingCall {
/*.start = */started_time,
/*.ice_sp = */ice,
/*.call = */wcall,
/*.listen_key = */{},
/*.call_key = */{},
/*.from = */peer_ice_msg.from,
/*.from_account = */from_id,
/*.from_cert = */from_cert });
pendingCalls_.emplace_back(
PendingCall{/*.start = */ started_time,
/*.ice_sp = */ udp_failed ? nullptr : ice,
/*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp,
/*.call = */ wcall,
/*.listen_key = */ {},
/*.call_key = */ {},
/*.from = */ peer_ice_msg.from,
/*.from_account = */ from_id,
/*.from_cert = */ from_cert});
checkPendingCallsTask();
}
}
......
......@@ -615,6 +615,7 @@ class JamiAccount : public SIPAccountBase {
void saveKnownDevices() const;
void replyToIncomingIceMsg(const std::shared_ptr<SIPCall>&,
const std::shared_ptr<IceTransport>&,
const std::shared_ptr<IceTransport>&,
const dht::IceCandidates&,
const std::shared_ptr<dht::crypto::Certificate>& from_cert,
......
......@@ -236,7 +236,7 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt,
std::memset(&localCertInfo_, 0, sizeof(pj_ssl_cert_info));
std::memset(&remoteCertInfo_, 0, sizeof(pj_ssl_cert_info));
iceSocket_ = std::make_unique<IceSocketTransport>(ice_, comp_id);
iceSocket_ = std::make_unique<IceSocketTransport>(ice_, comp_id, PJSIP_TRANSPORT_IS_RELIABLE(&trData_.base));
TlsSession::TlsSessionCallbacks cbs = {
/*.onStateChange = */[this](TlsSessionState state){ onTlsStateChange(state); },
......@@ -249,11 +249,22 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt,
if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
throw std::runtime_error("Can't register PJSIP transport.");
if (PJSIP_TRANSPORT_IS_RELIABLE(&trData_.base)) {
eventLoop_ = std::thread([this] {
try {
eventLoop();
} catch (const std::exception& e) {
JAMI_ERR() << "SipIceTransport: eventLoop() failure: " << e.what();
}
});
}
}
SipsIceTransport::~SipsIceTransport()
{
JAMI_DBG("~SipIceTransport@%p {tr=%p}", this, &trData_.base);
stopLoop_ = true;
// Flush send queue with ENOTCONN error
for (auto tdata : txQueue_) {
......@@ -266,6 +277,8 @@ SipsIceTransport::~SipsIceTransport()
auto base = getTransportBase();
// Stop low-level transport first
tls_->shutdown();
if (eventLoop_.joinable()) eventLoop_.join();
tls_.reset();
// If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
......@@ -500,7 +513,10 @@ SipsIceTransport::getInfo(pj_ssl_sock_info* info, bool established)
std::memset(info, 0, sizeof(*info));
info->established = established;
info->proto = PJ_SSL_SOCK_PROTO_DTLS1;
if (PJSIP_TRANSPORT_IS_RELIABLE(&trData_.base))
info->proto = PJSIP_SSL_DEFAULT_PROTO;
else
info->proto = PJ_SSL_SOCK_PROTO_DTLS1;
pj_sockaddr_cp(&info->local_addr, local_.pjPtr());
......@@ -708,4 +724,23 @@ SipsIceTransport::getTlsSessionMtu()
return tls_->maxPayload();
}
void
SipsIceTransport::eventLoop()
{
while(!stopLoop_) {
std::error_code err;
if (tls_ && tls_->waitForData(100, err)) {
std::vector<uint8_t> pkt;
pkt.resize(PJSIP_MAX_PKT_LEN);
auto read = tls_->read(pkt.data(), PJSIP_MAX_PKT_LEN, err);
if (read > 0) {
pkt.resize(read);
std::lock_guard<std::mutex> l(rxMtx_);
rxPending_.emplace_back(std::move(pkt));
scheduler_.run([this]{ handleEvents(); });
}
}
}
}
}} // namespace jami::tls
......@@ -138,6 +138,10 @@ private:
void onRxData(std::vector<uint8_t>&&);
void onCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int);
int verifyCertificate(gnutls_session_t);
std::thread eventLoop_;
void eventLoop();
std::atomic_bool stopLoop_ {false};
};
}} // namespace jami::tls
......@@ -334,7 +334,7 @@ IceSocketEndpoint::waitForData(unsigned ms_timeout, std::error_code& ec) const
{
if (ice_) {
if (!ice_->isRunning()) return -1;
return iceIsSender ? ice_->isDataAvailable(1) : ice_->waitForData(1, ms_timeout, ec);
return iceIsSender ? ice_->isDataAvailable(compId_) : ice_->waitForData(compId_, ms_timeout, ec);
}
return -1;
}
......@@ -345,7 +345,7 @@ IceSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
if (ice_) {
if (!ice_->isRunning()) return 0;
try {
auto res = ice_->recvfrom(1, reinterpret_cast<char *>(buf), len);
auto res = ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len);
if (res < 0)
ec.assign(errno, std::generic_category());
else
......@@ -365,7 +365,7 @@ IceSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code&
if (ice_) {
if (!ice_->isRunning()) return 0;
auto res = 0;
res = ice_->send(0, reinterpret_cast<const unsigned char *>(buf), len);
res = ice_->send(compId_, reinterpret_cast<const unsigned char *>(buf), len);
if (res < 0) {
ec.assign(errno, std::generic_category());
} else {
......
......@@ -157,7 +157,7 @@ public:
void setOnRecv(RecvCb&& cb) override {
if (ice_) {
ice_->setOnRecv(0, cb);
ice_->setOnRecv(compId_, cb);
}
}
......@@ -165,6 +165,7 @@ private:
std::shared_ptr<IceTransport> ice_ {nullptr};
std::atomic_bool iceStopped{false};
std::atomic_bool iceIsSender{false};
uint8_t compId_ {0};
};
//==============================================================================
......
......@@ -234,6 +234,7 @@ public:
std::unique_ptr<TlsAnonymousClientCredendials> cacred_; // ctor init.
std::unique_ptr<TlsAnonymousServerCredendials> sacred_; // ctor init.
std::unique_ptr<TlsCertificateCredendials> xcred_; // ctor init.
std::mutex sessionMutex_;
gnutls_session_t session_ {nullptr};
gnutls_datum_t cookie_key_ {nullptr, 0};
gnutls_dtls_prestate_st prestate_ {};
......@@ -724,19 +725,23 @@ TlsSession::TlsSessionImpl::cleanup()
state_ = TlsSessionState::SHUTDOWN; // be sure to block any user operations
stateCondition_.notify_all();
if (session_) {
if (transport_.isReliable())
gnutls_bye(session_, GNUTLS_SHUT_RDWR);
else
gnutls_bye(session_, GNUTLS_SHUT_WR); // not wait for a peer answer
gnutls_deinit(session_);
session_ = nullptr;
// This will stop current read of the ice_transport.cpp
transport_.shutdown();
{
std::lock_guard<std::mutex> lk(sessionMutex_);
if (session_) {
if (transport_.isReliable())
gnutls_bye(session_, GNUTLS_SHUT_RDWR);
else
gnutls_bye(session_, GNUTLS_SHUT_WR); // not wait for a peer answer
gnutls_deinit(session_);
session_ = nullptr;
}
}
if (cookie_key_.data)
gnutls_free(cookie_key_.data);
transport_.shutdown();
}
TlsSessionState
......@@ -1218,7 +1223,7 @@ TlsSession::TlsSession(SocketType& transport, const TlsParams& params,
TlsSession::~TlsSession()
{
shutdown();
if (pimpl_) shutdown();
}
bool
......@@ -1237,8 +1242,8 @@ int
TlsSession::maxPayload() const
{
if (pimpl_->state_ == TlsSessionState::SHUTDOWN)
throw std::runtime_error("Getting MTU from non-valid TLS session");
return gnutls_dtls_get_data_mtu(pimpl_->session_);
throw std::runtime_error("Getting maxPayload from non-valid TLS session");
return pimpl_->transport_.maxPayload();
}
const char*
......@@ -1295,15 +1300,22 @@ TlsSession::read(ValueType* data, std::size_t size, std::error_code& ec)
}
while (true) {
auto ret = gnutls_record_recv(pimpl_->session_, data, size);
ssize_t ret;
{
std::lock_guard<std::mutex> lk(pimpl_->sessionMutex_);
if (!pimpl_->session_) return 0;
ret = gnutls_record_recv(pimpl_->session_, data, size);
}
if (ret > 0) {
ec.clear();
return ret;
}
if (ret == 0) {
JAMI_DBG("[TLS] eof");
shutdown();
if (pimpl_) {
JAMI_ERR("[TLS] eof");
shutdown();
}
error = std::errc::broken_pipe;
break;
} else if (ret == GNUTLS_E_REHANDSHAKE) {
......@@ -1312,8 +1324,10 @@ TlsSession::read(ValueType* data, std::size_t size, std::error_code& ec)
pimpl_->rxCv_.notify_one(); // unblock waiting FSM
pimpl_->stateCondition_.notify_all();
} else if (gnutls_error_is_fatal(ret)) {
JAMI_ERR("[TLS] fatal error in recv: %s", gnutls_strerror(ret));
shutdown();
if (pimpl_ && pimpl_->state_ != TlsSessionState::SHUTDOWN) {
JAMI_ERR("[TLS] fatal error in recv: %s", gnutls_strerror(ret));
shutdown();
}
error = std::errc::io_error;
break;
}
......
......@@ -440,6 +440,9 @@ SipTransportBroker::getTlsIceTransport(const std::shared_ptr<jami::IceTransport>
{
auto ipv6 = ice->getLocalAddress(comp_id).isIpv6();
auto type = ipv6 ? PJSIP_TRANSPORT_DTLS6 : PJSIP_TRANSPORT_DTLS;
if (ice->isTCPEnabled()) {
type = ipv6 ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
}
auto sip_ice_tr = std::unique_ptr<tls::SipsIceTransport>(
new tls::SipsIceTransport(endpt_, type, params, ice, comp_id));
auto tr = sip_ice_tr->getTransportBase();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment