Commit 227c8d1c authored by Adrien Béraud's avatar Adrien Béraud

Revert "sip: negotiate both UDP and TCP for the control channel"

This reverts commit e83a1006.

Reason for revert: some major issues remain

Change-Id: I6c59880ef9aacb1a0646c4879186b3f269b8541e
parent 8b3399e8
......@@ -35,6 +35,7 @@ bash -c "%PATCH_CMD% %UNIXPATH%pjproject/fix_ioqueue_ipv6_sendto.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/add_dtls_transport.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/rfc6544.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/ice_config.patch"
bash -c "%PATCH_CMD% %UNIXPATH%pjproject/win32_ice_tcp_temp_fix.patch"
%APPLY_CMD% %SRC%\pjproject\win32_vs_gnutls.patch
%APPLY_CMD% %SRC%\pjproject\win_config.patch
......
This diff is collapsed.
From 5f288fe0067f995b91ea87ba4ed19fd65b75ff31 Mon Sep 17 00:00:00 2001
From: Andreas Traczyk <andreas.traczyk@savoirfairelinux.com>
Date: Tue, 11 Jun 2019 16:47:06 -0400
Subject: [PATCH] fix for windows GetAdaptersAddresses
---
pjnath/src/pjnath/ice_strans.c | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c
index 6172172..33ac521 100644
--- a/pjnath/src/pjnath/ice_strans.c
+++ b/pjnath/src/pjnath/ice_strans.c
@@ -1645,9 +1645,7 @@ pj_ice_strans_sendto2(pj_ice_strans *ice_st, unsigned comp_id, const void *data,
dest_addr_len = dst_addr_len;
}
- pj_stun_sock_info stun_sock_info;
- pj_stun_sock_get_info(comp->stun[tp_idx].sock, &stun_sock_info);
- pj_bool_t add_header = stun_sock_info.conn_type != PJ_STUN_TP_UDP;
+ pj_bool_t add_header = comp->ice_st->cfg.stun_tp->conn_type == PJ_STUN_TP_TCP;
if (add_header) {
//TCP
/*
@@ -1864,9 +1862,7 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
if (comp->stun[tp_idx].sock) {
pj_ssize_t sent_size;
- pj_stun_sock_info stun_sock_info;
- pj_stun_sock_get_info(comp->stun[tp_idx].sock, &stun_sock_info);
- pj_bool_t add_header = stun_sock_info.conn_type != PJ_STUN_TP_UDP;
+ pj_bool_t add_header = comp->ice_st->cfg.stun_tp->conn_type == PJ_STUN_TP_TCP;
if (add_header) {
//TCP
/*
--
2.7.4
......@@ -67,13 +67,12 @@ public:
static constexpr uint16_t IPV4_HEADER_SIZE = 20; // Size in bytes of IPv4 packet header
static constexpr uint16_t UDP_HEADER_SIZE = 8; // Size in bytes of UDP header
IceSocketTransport(std::shared_ptr<IceTransport>& ice, int comp_id, bool reliable = false)
IceSocketTransport(std::shared_ptr<IceTransport>& ice, int comp_id)
: compId_ {comp_id}
, ice_ {ice}
, reliable_ {reliable} {}
, ice_ {ice} {}
bool isReliable() const override {
return reliable_;
return false; // we consider that a ICE transport is never reliable (UDP support only)
}
bool isInitiator() const override;
......@@ -95,7 +94,6 @@ public:
private:
const int compId_;
std::shared_ptr<IceTransport> ice_;
bool reliable_;
};
};
......@@ -26,6 +26,7 @@
#include "upnp/upnp_control.h"
#include <pjlib.h>
#include <msgpack.hpp>
#include <map>
#include <atomic>
......@@ -91,6 +92,8 @@ public:
MutexGuard lk{mutex_};
stream_.clear();
stream_ << data;
notified_ = true;
cv_.notify_one();
}
......@@ -108,14 +111,14 @@ public:
MutexGuard lk_api{apiMutex_, std::adopt_lock};
MutexLock lk{mutex_, std::adopt_lock};
auto a = cv_.wait_for(lk, timeout,
[this] { return stop_ or !stream_.eof(); });
[this] { return stop_ or /*(data.size() != 0)*/ !stream_.eof(); });
return a;
}
std::size_t read(char *output, std::size_t size) {
MutexLock lk{mutex_, std::adopt_lock};
if (stream_.eof()) return 0;
std::lock(apiMutex_, mutex_);
MutexGuard lk_api{apiMutex_, std::adopt_lock};
MutexLock lk{mutex_, std::adopt_lock};
cv_.wait(lk, [&, this] {
if (stop_)
return true;
......@@ -133,6 +136,9 @@ public:
stop_ = true;
}
cv_.notify_all();
// Make sure that no thread is blocked into read() or wait() methods
MutexGuard lk_api{apiMutex_};
}
private:
......@@ -143,6 +149,9 @@ private:
std::condition_variable cv_{};
std::stringstream stream_{};
bool stop_{false};
bool notified_{false};
std::vector<char> data;
friend void operator<<(std::vector<char> &, PeerChannel &);
};
......@@ -204,8 +213,6 @@ public:
pj_ice_strans_cfg config_;
std::string last_errmsg_;
std::atomic_bool is_stopped_ {false};
struct Packet {
Packet(void *pkt, pj_size_t size)
: data{reinterpret_cast<char *>(pkt), reinterpret_cast<char *>(pkt) + size} { }
......@@ -862,12 +869,11 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size)
if (on_recv_cb_) {
on_recv_cb_();
}
if (io.cb) {
io.cb((uint8_t*)pkt, size);
} else {
MutexLock lk{apiMutex_};
auto &channel = peerChannels_.at(comp_id-1);
auto &channel = peerChannels_.at(comp_id);
lk.unlock();
channel << std::string(reinterpret_cast<const char *>(pkt), size);
}
......@@ -901,13 +907,6 @@ IceTransport::isRunning() const
return pimpl_->_isRunning();
}
bool
IceTransport::isStopped() const
{
std::lock_guard<std::mutex> lk {pimpl_->iceMutex_};
return pimpl_->is_stopped_;
}
bool
IceTransport::isFailed() const
{
......@@ -951,14 +950,12 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>&
{
if (not isInitialized()) {
JAMI_ERR("[ice:%p] not initialized transport", this);
pimpl_->is_stopped_ = true;
return false;
}
// pj_ice_strans_start_ice crashes if remote candidates array is empty
if (rem_candidates.empty()) {
JAMI_ERR("[ice:%p] start failed: no remote candidates", this);
pimpl_->is_stopped_ = true;
return false;
}
......@@ -972,49 +969,62 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>&
if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] start failed: %s", this, pimpl_->last_errmsg_.c_str());
pimpl_->is_stopped_ = true;
return false;
}
return true;
}
bool
IceTransport::start(const SDP& sdp)
IceTransport::start(const std::vector<uint8_t>& rem_data)
{
if (not isInitialized()) {
JAMI_ERR("[ice:%p] not initialized transport", this);
pimpl_->is_stopped_ = true;
return false;
}
std::string rem_ufrag;
std::string rem_pwd;
std::vector<IceCandidate> rem_candidates;
JAMI_DBG("[ice:%p] negotiation starting (%zu remote candidates)", this, sdp.candidates.size());
pj_str_t ufrag, pwd;
auto data = reinterpret_cast<const char*>(rem_data.data());
auto size = rem_data.size();
std::vector<IceCandidate> rem_candidates;
rem_candidates.reserve(sdp.candidates.size());
IceCandidate cand;
for (const auto &line : sdp.candidates) {
if (getCandidateFromSDP(line, cand))
rem_candidates.emplace_back(cand);
try {
std::size_t offset = 0;
auto result = msgpack::unpack(data, size, offset);
auto version = result.get().as<uint8_t>();
JAMI_DBG("[ice:%p] rx msg v%u", this, version);
if (version == 1) {
result = msgpack::unpack(data, size, offset);
std::tie(rem_ufrag, rem_pwd) = result.get().as<std::pair<std::string, std::string>>();
result = msgpack::unpack(data, size, offset);
auto comp_cnt = result.get().as<uint8_t>();
while (comp_cnt-- > 0) {
result = msgpack::unpack(data, size, offset);
IceCandidate cand;
for (const auto& line : result.get().as<std::vector<std::string>>()) {
if (getCandidateFromSDP(line, cand))
rem_candidates.emplace_back(cand);
}
}
} else {
JAMI_ERR("[ice:%p] invalid msg version", this);
return false;
}
} catch (const msgpack::unpack_error& e) {
JAMI_ERR("[ice:%p] remote msg unpack error: %s", this, e.what());
return false;
}
auto status = pj_ice_strans_start_ice(pimpl_->icest_.get(),
pj_strset(&ufrag, (char*)sdp.ufrag.c_str(), sdp.ufrag.size()),
pj_strset(&pwd, (char*)sdp.pwd.c_str(), sdp.pwd.size()),
rem_candidates.size(),
rem_candidates.data());
if (status != PJ_SUCCESS) {
pimpl_->last_errmsg_ = sip_utils::sip_strerror(status);
JAMI_ERR("[ice:%p] start failed: %s", this, pimpl_->last_errmsg_.c_str());
pimpl_->is_stopped_ = true;
if (rem_ufrag.empty() or rem_pwd.empty() or rem_candidates.empty()) {
JAMI_ERR("[ice:%p] invalid remote attributes", this);
return false;
}
return true;
if (pimpl_->onlyIPv4Private_)
JAMI_WARN("[ice:%p] no public IPv4 found, your connection may fail!", this);
return start({rem_ufrag, rem_pwd}, rem_candidates);
}
bool
IceTransport::stop()
{
pimpl_->is_stopped_ = true;
if (isStarted()) {
auto status = pj_ice_strans_stop_ice(pimpl_->icest_.get());
if (status != PJ_SUCCESS) {
......@@ -1129,29 +1139,20 @@ IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP)
}
std::vector<uint8_t>
IceTransport::packIceMsg(uint8_t version) const
IceTransport::packIceMsg() const
{
static constexpr uint8_t ICE_MSG_VERSION = 1;
if (not isInitialized())
return {};
std::stringstream ss;
if (version == 1) {
msgpack::pack(ss, version);
msgpack::pack(ss, std::make_pair(pimpl_->local_ufrag_, pimpl_->local_pwd_));
msgpack::pack(ss, static_cast<uint8_t>(pimpl_->component_count_));
for (unsigned i=0; i<pimpl_->component_count_; i++)
msgpack::pack(ss, getLocalCandidates(i));
} else {
SDP sdp;
sdp.ufrag = pimpl_->local_ufrag_;
sdp.pwd = pimpl_->local_pwd_;
for (unsigned i = 0; i < pimpl_->component_count_; i++) {
auto candidates = getLocalCandidates(i);
sdp.candidates.reserve(sdp.candidates.size() + candidates.size());
sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end());
}
msgpack::pack(ss, sdp);
}
msgpack::pack(ss, ICE_MSG_VERSION);
msgpack::pack(ss, std::make_pair(pimpl_->local_ufrag_, pimpl_->local_pwd_));
msgpack::pack(ss, static_cast<uint8_t>(pimpl_->component_count_));
for (unsigned i=0; i<pimpl_->component_count_; i++)
msgpack::pack(ss, getLocalCandidates(i));
auto str(ss.str());
return std::vector<uint8_t>(str.begin(), str.end());
}
......@@ -1371,53 +1372,6 @@ IceTransport::waitForData(int comp_id, unsigned int timeout, std::error_code& ec
return channel.wait(std::chrono::milliseconds(timeout));
}
std::vector<SDP>
IceTransport::parseSDPList(const std::vector<uint8_t>& msg)
{
std::vector<SDP> sdp_list;
msgpack::unpacker pac;
pac.reserve_buffer(msg.size());
memcpy(pac.buffer(), msg.data(), msg.size());
pac.buffer_consumed(msg.size());
msgpack::object_handle oh;
while (auto result = pac.next(oh)) {
try {
SDP sdp;
if (oh.get().type == msgpack::type::POSITIVE_INTEGER) {
// Version 1
result = pac.next(oh);
if (!result) break;
std::tie(sdp.ufrag, sdp.pwd) = oh.get().as<std::pair<std::string, std::string>>();
result = pac.next(oh);
if (!result) break;
auto comp_cnt = oh.get().as<uint8_t>();
while (comp_cnt-- > 0) {
result = pac.next(oh);
if (!result) break;
auto candidates = oh.get().as<std::vector<std::string>>();
sdp.candidates.reserve(sdp.candidates.size() + candidates.size());
sdp.candidates.insert(sdp.candidates.end(), candidates.begin(), candidates.end());
}
} else {
oh.get().convert(sdp);
}
sdp_list.emplace_back(sdp);
} catch (const msgpack::unpack_error &e) {
break;
}
}
return sdp_list;
}
bool
IceTransport::isTCPEnabled()
{
return pimpl_->config_.protocol == PJ_ICE_TP_TCP;
}
//==============================================================================
IceTransportFactory::IceTransportFactory()
......@@ -1488,7 +1442,6 @@ IceSocketTransport::maxPayload() const
int
IceSocketTransport::waitForData(unsigned ms_timeout, std::error_code& ec) const
{
if (!ice_->isRunning()) return -1;
return ice_->waitForData(compId_, ms_timeout, ec);
}
......@@ -1507,21 +1460,13 @@ IceSocketTransport::write(const ValueType* buf, std::size_t len, std::error_code
std::size_t
IceSocketTransport::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
if (!ice_->isRunning()) return 0;
try {
auto res = reliable_
? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len)
: ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
}
ec.clear();
return res;
} catch (const std::exception &e) {
JAMI_ERR("IceSocketTransport::read exception: %s", e.what());
auto res = ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
}
return 0;
ec.clear();
return res;
}
IpAddr
......
......@@ -29,7 +29,6 @@
#include <functional>
#include <memory>
#include <msgpack.hpp>
#include <vector>
namespace jami {
......@@ -74,14 +73,6 @@ struct IceTransportOptions {
bool aggressive {false}; // If we use the aggressive nomination strategy
};
struct SDP {
std::string ufrag;
std::string pwd;
std::vector<std::string> candidates;
MSGPACK_DEFINE(ufrag, pwd, candidates)
};
class IceTransport {
public:
using Attribute = struct {
......@@ -94,6 +85,7 @@ public:
*/
IceTransport(const char* name, int component_count, bool master,
const IceTransportOptions& options = {});
/**
* Get current state
*/
......@@ -108,7 +100,7 @@ public:
*/
bool start(const Attribute& rem_attrs,
const std::vector<IceCandidate>& rem_candidates);
bool start(const SDP& sdp);
bool start(const std::vector<uint8_t>& attrs_candidates);
/**
* Stop a started or completed transport.
......@@ -133,12 +125,6 @@ public:
*/
bool isRunning() const;
/**
* Return true if a start operations fails or if stop() has been called
* [mutex protected]
*/
bool isStopped() const;
/**
* Returns true if ICE transport is in failure state
* [mutex protected]
......@@ -170,7 +156,7 @@ public:
/**
* Returns serialized ICE attributes and candidates.
*/
std::vector<uint8_t> packIceMsg(uint8_t version = 1) const;
std::vector<uint8_t> packIceMsg() const;
bool getCandidateFromSDP(const std::string& line, IceCandidate& cand);
......@@ -202,15 +188,6 @@ public:
bool setSlaveSession();
bool setInitiatorSession();
/**
* Get SDP messages list
* @param msg The payload to parse
* @return the list of SDP messages
*/
static std::vector<SDP> parseSDPList(const std::vector<uint8_t>& msg);
bool isTCPEnabled();
private:
class Impl;
std::unique_ptr<Impl> pimpl_;
......
......@@ -138,7 +138,6 @@ struct JamiAccount::PendingCall
{
std::chrono::steady_clock::time_point start;
std::shared_ptr<IceTransport> ice_sp;
std::shared_ptr<IceTransport> ice_tcp_sp;
std::weak_ptr<SIPCall> call;
std::future<size_t> listen_key;
dht::InfoHash call_key;
......@@ -412,30 +411,6 @@ JamiAccount::newOutgoingCall(const std::string& toUrl,
return call;
}
void
initICE(const std::vector<uint8_t> &msg, const std::shared_ptr<IceTransport> &ice,
const std::shared_ptr<IceTransport> &ice_tcp, bool &udp_failed, bool &tcp_failed)
{
auto sdp_list = IceTransport::parseSDPList(msg);
for (const auto &sdp : sdp_list) {
if (sdp.candidates.size() > 0) {
if (sdp.candidates[0].find("TCP") != std::string::npos) {
// It is a SDP for the TCP component
tcp_failed = (ice_tcp && !ice_tcp->start(sdp));
} else {
// For UDP
udp_failed = (ice && !ice->start(sdp));
}
}
}
// During the ICE reply we can start the ICE negotiation
if (tcp_failed) {
ice_tcp->stop();
JAMI_WARN("ICE over TCP not started, will only use UDP");
}
}
void
JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::string& toUri)
{
......@@ -480,21 +455,9 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
return;
}
auto ice_config = getIceOptions();
ice_config.tcpEnable = true;
ice_config.aggressive = true; // This will directly select the first candidate.
auto ice_tcp =
#ifdef _WIN32
std::shared_ptr<IceTransport>(nullptr);
#else
createIceTransport(("sip:" + dev_call->getCallId()).c_str(), ICE_COMPONENTS, true, ice_config);
#endif
if (not ice_tcp) {
JAMI_WARN("Can't create ICE over TCP, will only use UDP");
}
call->addSubCall(*dev_call);
manager.addTask([sthis=shared(), weak_dev_call, ice, ice_tcp, dev, toUri, peer_account] {
manager.addTask([sthis=shared(), weak_dev_call, ice, dev, toUri, peer_account] {
auto call = weak_dev_call.lock();
// call aborted?
......@@ -507,27 +470,18 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
return false;
}
if (ice_tcp && ice_tcp->isFailed()) {
JAMI_WARN("[call:%s] ice tcp init failed, will only use UDP", call->getCallId().c_str());
}
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborded (our first check upper).
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
if (not ice->isInitialized())
return true;
sthis->registerDhtAddress(*ice);
if (ice_tcp) sthis->registerDhtAddress(*ice_tcp);
// Next step: sent the ICE data to peer through DHT
const dht::Value::Id callvid = ValueIdDist()(sthis->rand);
const auto callkey = dht::InfoHash::get("callto:" + dev.toString());
auto blob = ice->packIceMsg();
if (ice_tcp) {
auto ice_tcp_msg = ice_tcp->packIceMsg(2);
blob.insert(blob.end(), ice_tcp_msg.begin(), ice_tcp_msg.end());
}
dht::Value val { dht::IceCandidates(callvid, blob) };
dht::Value val { dht::IceCandidates(callvid, ice->packIceMsg()) };
sthis->dht_.putEncrypted(
callkey, dev,
......@@ -544,7 +498,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
auto listenKey = sthis->dht_.listen<dht::IceCandidates>(
callkey,
[weak_dev_call, ice, ice_tcp, callvid, dev] (dht::IceCandidates&& msg) {
[weak_dev_call, ice, callvid, dev] (dht::IceCandidates&& msg) {
if (msg.id != callvid or msg.from != dev)
return true;
// remove unprintable characters
......@@ -555,10 +509,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
JAMI_WARN("ICE request replied from DHT peer %s\nData: %s", dev.toString().c_str(), iceData.c_str());
if (auto call = weak_dev_call.lock()) {
call->setState(Call::ConnectionState::PROGRESSING);
auto udp_failed = true, tcp_failed = true;
initICE(msg.ice_data, ice, ice_tcp, udp_failed, tcp_failed);
if (udp_failed && tcp_failed) {
if (!ice->start(msg.ice_data)) {
call->onFailure();
return true;
}
......@@ -570,7 +521,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
std::lock_guard<std::mutex> lock(sthis->callsMutex_);
sthis->pendingCalls_.emplace_back(PendingCall{
std::chrono::steady_clock::now(),
ice, ice_tcp, weak_dev_call,
ice, weak_dev_call,
std::move(listenKey),
callkey,
dev,
......@@ -1872,47 +1823,25 @@ bool
JamiAccount::handlePendingCall(PendingCall& pc, bool incoming)
{
auto call = pc.call.lock();
// Cleanup pending call if call is over (cancelled by user or any other reason)
if (not call || call->getState() == Call::CallState::OVER)
if (not call)
return true;
if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) {
JAMI_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str());
auto ice = pc.ice_sp.get();
if (not ice or ice->isFailed()) {
JAMI_ERR("[call:%s] Null or failed ICE transport", call->getCallId().c_str());
call->onFailure();
return true;
}
auto ice_tcp = pc.ice_tcp_sp.get();
auto ice = pc.ice_sp.get();
bool tcp_finished = ice_tcp == nullptr || ice_tcp->isStopped();
bool udp_finished = ice == nullptr || ice->isStopped();
if (not udp_finished and ice->isFailed()) {
udp_finished = true;
}
if (not tcp_finished and ice_tcp->isFailed()) {
tcp_finished = true;
}
// At least wait for TCP
if (not tcp_finished and not ice_tcp->isRunning()) {
return false;
} else if (tcp_finished and (not ice_tcp or not ice_tcp->isRunning())) {
// If TCP is finished but not running, wait for UDP
if (not udp_finished and ice and not ice->isRunning()) {
return false;
// Return to pending list if not negotiated yet and not in timeout
if (not ice->isRunning()) {
if ((std::chrono::steady_clock::now() - pc.start)