Skip to content
Snippets Groups Projects
Commit b9c287ee authored by Guillaume Roguez's avatar Guillaume Roguez
Browse files

WIP: SIP over ReliableStream

wip, do not merge

Change-Id: I113a405d75eef43e600bca2007900a99c31c645a
parent 6f79071c
No related tags found
No related merge requests found
......@@ -34,17 +34,18 @@
#include "call_factory.h"
#include "string_utils.h"
#include "enumclass_utils.h"
#include "data_transfer.h"
#include "errno.h"
namespace ring {
Call::Call(Account& account, const std::string& id, Call::CallType type)
: id_(id)
: creationTime_()
, id_(id)
, type_(type)
, account_(account)
{
time(&timestamp_start_);
account_.attachCall(id_);
}
......@@ -283,7 +284,7 @@ Call::getDetails() const
{DRing::Call::Details::DISPLAY_NAME, peerDisplayName_},
{DRing::Call::Details::CALL_STATE, getStateStr()},
{DRing::Call::Details::CONF_ID, confID_},
{DRing::Call::Details::TIMESTAMP_START, ring::to_string(timestamp_start_)},
{DRing::Call::Details::TIMESTAMP_START, ring::to_string(std::chrono::duration_cast<std::chrono::seconds>(creationTime_.time_since_epoch()).count())},
{DRing::Call::Details::ACCOUNTID, getAccountId()},
{DRing::Call::Details::AUDIO_MUTED, std::string(bool_to_str(isAudioMuted_))},
{DRing::Call::Details::VIDEO_MUTED, std::string(bool_to_str(isVideoMuted_))},
......
......@@ -47,6 +47,11 @@ class VoIPLink;
class Account;
class AccountVideoCodecInfo;
namespace ReliableSocket {
class DataConnection;
class DataStream;
}
template <class T> using CallMap = std::map<std::string, std::shared_ptr<T> >;
/*
......@@ -310,6 +315,11 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> {
void removeCall();
void setDataConnection(std::shared_ptr<ReliableSocket::DataConnection> dc) { dc_ = dc; }
std::shared_ptr<ReliableSocket::DataConnection> getDataConnection() const { return dc_; }
virtual void onDataConnected() = 0;
virtual bool initIceTransport(bool master, unsigned channel_num=4);
int waitForIceInitialization(unsigned timeout);
......@@ -331,6 +341,8 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> {
virtual void restartMediaSender() = 0;
std::chrono::steady_clock::time_point getCreationTime() const { return creationTime_; }
protected:
/**
* Constructor of a call
......@@ -339,6 +351,10 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> {
*/
Call(Account& account, const std::string& id, Call::CallType type);
const std::chrono::steady_clock::time_point creationTime_;
std::shared_ptr<ReliableSocket::DataConnection> dc_ {};
std::shared_ptr<ReliableSocket::DataStream> peerStream_;
std::shared_ptr<IceTransport> iceTransport_ {};
bool isAudioMuted_{false};
......@@ -387,8 +403,6 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> {
/** Peer Display Name */
std::string peerDisplayName_ {};
time_t timestamp_start_ {0};
};
} // namespace ring
......
......@@ -87,6 +87,7 @@ static constexpr int ICE_COMPONENTS {1};
static constexpr int ICE_COMP_SIP_TRANSPORT {0};
static constexpr int ICE_INIT_TIMEOUT {10};
static constexpr auto ICE_NEGOTIATION_TIMEOUT = std::chrono::seconds(60);
static constexpr auto CONNECTION_TIMEOUT = std::chrono::seconds(60);
static constexpr auto TLS_TIMEOUT = std::chrono::seconds(30);
// Limit number of ICE data msg request/msg that waiting for handling
......@@ -172,22 +173,8 @@ RingAccount::~RingAccount()
std::shared_ptr<SIPCall>
RingAccount::newIncomingCall(const std::string& from)
{
std::lock_guard<std::mutex> lock(callsMutex_);
auto call_it = pendingSipCalls_.begin();
while (call_it != pendingSipCalls_.end()) {
auto call = call_it->call.lock();
if (not call) {
RING_WARN("newIncomingCall: discarding deleted call");
call_it = pendingSipCalls_.erase(call_it);
} else if (call->getPeerNumber() == from) {
pendingSipCalls_.erase(call_it);
RING_DBG("newIncomingCall: found matching call for %s", from.c_str());
return call;
} else {
++call_it;
}
}
RING_ERR("newIncomingCall: can't find matching call for %s", from.c_str());
if (auto dc = obtainDataConnection(from, false))
return incomingCall(from, dc);
return nullptr;
}
......@@ -195,101 +182,30 @@ template <>
std::shared_ptr<SIPCall>
RingAccount::newOutgoingCall(const std::string& toUrl)
{
const std::string toUri = parseRingUri(toUrl);
RING_DBG("Calling DHT peer %s", toUri.c_str());
const std::string peer_id = parseRingUri(toUrl);
RING_DBG("Calling DHT peer %s", peer_id.c_str());
auto& manager = Manager::instance();
auto call = manager.callFactory.newCall<SIPCall, RingAccount>(*this, manager.getNewCallID(),
Call::CallType::OUTGOING);
call->setIPToIP(true);
call->setSecure(isTlsEnabled());
// TODO: for now, we automatically trust all explicitly called peers
setCertificateStatus(toUri, tls::TrustStore::PermissionStatus::ALLOWED);
auto shared_this = std::static_pointer_cast<RingAccount>(shared_from_this());
std::weak_ptr<SIPCall> weak_call = call;
manager.addTask([shared_this, weak_call, toUri] {
auto call = weak_call.lock();
if (not call) {
call->onFailure();
return false;
}
// Create an ICE transport for SIP channel
std::shared_ptr<IceTransport> ice {};
try {
ice = shared_this->createIceTransport(("sip:" + call->getCallId()).c_str(),
ICE_COMPONENTS, true, shared_this->getIceOptions());
} catch (std::runtime_error& e) {
RING_ERR("%s", e.what());
call->onFailure();
return false;
}
auto iceInitTimeout = std::chrono::steady_clock::now() + std::chrono::seconds {ICE_INIT_TIMEOUT};
/* First step: wait for an initialized ICE transport for SIP channel */
if (ice->isFailed() or std::chrono::steady_clock::now() >= iceInitTimeout) {
RING_DBG("ice init failed (or timeout)");
call->onFailure();
return false;
}
setCertificateStatus(peer_id, tls::TrustStore::PermissionStatus::ALLOWED);
if (not ice->isInitialized())
return true; // process task again!
/* Next step: sent the ICE data to peer through DHT */
const dht::Value::Id callvid = udist(shared_this->rand_);
const dht::Value::Id vid = udist(shared_this->rand_);
const auto toH = dht::InfoHash(toUri);
const auto callkey = dht::InfoHash::get("callto:" + toUri);
dht::Value val { dht::IceCandidates(callvid, ice->getLocalAttributesAndCandidates()) };
val.id = vid;
call->setState(Call::ConnectionState::TRYING);
shared_this->dht_.putEncrypted(
callkey, toH,
std::move(val),
[=](bool ok) { // Put complete callback
if (!ok) {
RING_WARN("Can't put ICE descriptor on DHT");
if (auto call = weak_call.lock())
call->onFailure();
} else
RING_DBG("Successfully put ICE descriptor on DHT");
}
);
auto listenKey = shared_this->dht_.listen<dht::IceCandidates>(
callkey,
[=] (dht::IceCandidates&& msg) {
if (msg.id != callvid or msg.from != toH)
return true;
RING_WARN("ICE request replied from DHT peer %s\n%s", toH.toString().c_str(),
std::string(msg.ice_data.cbegin(), msg.ice_data.cend()).c_str());
if (auto call = weak_call.lock())
call->setState(Call::ConnectionState::PROGRESSING);
if (!ice->start(msg.ice_data)) {
call->onFailure();
return true;
}
return false;
}
);
// Asynchronous launch of a peer reliable connection
auto dc = obtainDataConnection(peer_id);
if (not dc) {
call->removeCall();
return nullptr;
}
shared_this->pendingCalls_.emplace_back(PendingCall{
std::chrono::steady_clock::now(),
ice, weak_call,
std::move(listenKey),
callkey, toH
});
call->setDataConnection(dc);
return false;
});
// Continue to scan the connection progress into the mainloop
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace_back(call);
return call;
}
......@@ -604,42 +520,25 @@ RingAccount::handleEvents()
void
RingAccount::handlePendingCallList()
{
// Process pending call into a local list to not block threads depending on this list,
// as incoming call handlers.
decltype(pendingCalls_) pending_calls;
{
std::lock_guard<std::mutex> lock(callsMutex_);
pending_calls = std::move(pendingCalls_);
pendingCalls_.clear();
}
static const dht::InfoHash invalid_hash; // Invariant
std::unique_lock<std::mutex> lk(callsMutex_);
auto pc_iter = std::begin(pending_calls);
while (pc_iter != std::end(pending_calls)) {
bool incoming = pc_iter->call_key == invalid_hash; // do it now, handlePendingCall may invalidate pc data
auto it = std::begin(pendingCalls_);
while (it != std::end(pendingCalls_)) {
bool handled;
lk.unlock();
try {
handled = handlePendingCall(*pc_iter, incoming);
handled = handlePendingCall(*it);
} catch (const std::exception& e) {
RING_ERR("[DHT] exception during pending call handling: %s", e.what());
handled = true; // drop from pending list
}
lk.lock();
if (handled) {
// Cancel pending listen (outgoing call)
if (not incoming)
dht_.cancelListen(pc_iter->call_key, pc_iter->listen_key.share());
pc_iter = pending_calls.erase(pc_iter);
} else
++pc_iter;
}
// Re-integrate non-handled and valid pending calls
{
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.splice(std::end(pendingCalls_), pending_calls);
if (handled)
it = pendingCalls_.erase(it);
else
++it;
}
}
......@@ -682,23 +581,30 @@ check_peer_certificate(dht::InfoHash from, unsigned status, const gnutls_datum_t
}
bool
RingAccount::handlePendingCall(PendingCall& pc, bool incoming)
RingAccount::handlePendingCall(std::weak_ptr<SIPCall>& weak_call)
{
auto call = pc.call.lock();
auto call = weak_call.lock();
if (not call)
return true;
auto ice = pc.ice_sp.get();
if (not ice or ice->isFailed()) {
RING_ERR("[call:%s] Null or failed ICE transport", call->getCallId().c_str());
auto dc = call->getDataConnection();
if (!dc or dc->isFailed()) {
RING_ERR("[call:%s] connection failed", call->getCallId().c_str());
call->onFailure();
return true;
}
DRing::DataConnectionInfo info;
dc->getInfo(info);
// Return to pending list if not negotiated yet and not in timeout
if (not ice->isRunning()) {
if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) {
RING_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str());
if (info.code < 200)
return false;
if (info.code >= 400) {
auto creationTime = call->getCreationTime();
if ((decltype(creationTime)::clock::now() - creationTime) >= CONNECTION_TIMEOUT) {
RING_WARN("[call:%s] Connection timeout", call->getCallId().c_str());
call->onFailure();
return true;
}
......@@ -706,41 +612,14 @@ RingAccount::handlePendingCall(PendingCall& pc, bool incoming)
return call->getState() == Call::CallState::OVER;
}
// Securize a SIP transport with TLS (on top of ICE tranport) and assign the call with it
auto remote_h = pc.from;
auto id(loadIdentity());
tls::TlsParams tlsParams {
.ca_list = "",
.cert = id.second,
.cert_key = id.first,
.dh_params = dhParams_,
.timeout = std::chrono::duration_cast<decltype(tls::TlsParams::timeout)>(TLS_TIMEOUT),
.cert_check = [remote_h](unsigned status, const gnutls_datum_t* cert_list,
unsigned cert_num) -> pj_status_t {
try {
return check_peer_certificate(remote_h, status, cert_list, cert_num);
} catch (const std::exception& e) {
RING_ERR("[peer:%s] TLS certificate check exception: %s",
remote_h.toString().c_str(), e.what());
return PJ_SSL_CERT_EUNKNOWN;
}
}
};
auto tr = link_->sipTransportBroker->getTlsIceTransport(pc.ice_sp, ICE_COMP_SIP_TRANSPORT,
tlsParams);
call->setTransport(tr);
// Now info.code is a definitive 2xx code
// Notify of fully available connection between peers
RING_DBG("[call:%s] SIP communication established", call->getCallId().c_str());
call->onDataConnected();
call->setState(Call::ConnectionState::PROGRESSING);
// Incoming call?
if (incoming) {
std::lock_guard<std::mutex> lock(callsMutex_);
pendingSipCalls_.emplace_back(std::move(pc)); // copy of pc
} else
createOutgoingCall(call, remote_h.toString(), ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT));
// Outgoing call?
if (info.isClient)
createOutgoingCall(call, info.peer, dc->getRemoteAddress());
return true;
}
......@@ -904,6 +783,7 @@ RingAccount::doRegister_()
callKey_ = dht::InfoHash::get("callto:"+dht_.getId().toString());
RING_DBG("[DHT:%s] callto key: %s", getAccountID().c_str(), callKey_.toString().c_str());
#if 0
dht_.listen<dht::IceCandidates>(
callKey_,
[shared] (dht::IceCandidates&& msg) {
......@@ -955,7 +835,8 @@ RingAccount::doRegister_()
runOnMainThread([=]() mutable { shared->incomingCall(std::move(msg)); });
return true;
}
);
);
#endif
auto inboxKey = dht::InfoHash::get("inbox:"+dht_.getId().toString());
RING_DBG("[DHT:%s] inbox key: %s", getAccountID().c_str(), inboxKey.toString().c_str());
......@@ -1018,6 +899,58 @@ RingAccount::doRegister_()
dht_.listen<IceDataCandidates>(
inboxKey,
[shared] (IceDataCandidates&& msg) {
auto& this_ = *shared;
// forbid self connection
if (msg.from == this_.dht_.getId()) {
RING_WARN("Discarding loopback DHT connection request");
return true;
}
// quick check in case we already explicilty banned this public key
auto trustStatus = this_.trust_.getCertificateStatus(msg.from.toString());
if (trustStatus == tls::TrustStore::PermissionStatus::BANNED) {
RING_WARN("Discarding DHT connection request from banned peer %s", msg.from.toString().c_str());
return true;
}
auto res = this_.treatedCalls_.insert(msg.id);
this_.saveTreatedCalls();
if (!res.second)
return true;
if (not this_.dhtPublicInCalls_ and trustStatus != tls::TrustStore::PermissionStatus::ALLOWED) {
this_.findCertificate(
msg.from,
[shared, msg](const std::shared_ptr<dht::crypto::Certificate> cert) mutable {
if (!cert or cert->getId() != msg.from) {
RING_WARN("Can't find certificate of %s for incoming call.",
msg.from.toString().c_str());
return;
}
tls::CertificateStore::instance().pinCertificate(cert);
auto& this_ = *shared;
if (!this_.trust_.isAllowed(*cert)) {
RING_WARN("Discarding incoming DHT call from untrusted peer %s.",
msg.from.toString().c_str());
return;
}
runOnMainThread([shared, msg]() mutable {
if (msg.id & 1)
shared->onDataTransactionReply(msg);
else
shared->onDataTransactionRequest(std::move(msg));
});
}
);
return true;
}
else if (this_.dhtPublicInCalls_ and trustStatus != tls::TrustStore::PermissionStatus::BANNED) {
this_.findCertificate(msg.from.toString().c_str());
}
if (msg.id & 1)
shared->onDataTransactionReply(msg);
else
......@@ -1030,11 +963,12 @@ RingAccount::doRegister_()
}
}
#if 0
void
RingAccount::incomingCall(dht::IceCandidates&& msg)
RingAccount::legacyIncomingCall(dht::IceCandidates&& msg)
{
auto from = msg.from.toString();
RING_WARN("ICE incoming from DHT peer %s\n%s", from.c_str(),
RING_WARN("Legacy incoming call from peer %s, ICE msg:\n%s", from.c_str(),
std::string(msg.ice_data.cbegin(), msg.ice_data.cend()).c_str());
auto call = Manager::instance().callFactory.newCall<SIPCall, RingAccount>(*this, Manager::instance().getNewCallID(), Call::CallType::INCOMING);
auto ice = createIceTransport(("sip:"+call->getCallId()).c_str(), ICE_COMPONENTS, false, getIceOptions());
......@@ -1043,12 +977,11 @@ RingAccount::incomingCall(dht::IceCandidates&& msg)
val.id = vid;
std::weak_ptr<SIPCall> weak_call = call;
auto shared_this = std::static_pointer_cast<RingAccount>(shared_from_this());
dht_.putEncrypted(
callKey_,
msg.from,
std::move(val),
[weak_call, shared_this, vid](bool ok) {
[weak_call, vid](bool ok) {
if (!ok) {
RING_WARN("Can't put ICE descriptor reply on DHT");
if (auto call = weak_call.lock())
......@@ -1065,7 +998,7 @@ RingAccount::incomingCall(dht::IceCandidates&& msg)
call->initRecFilename(from);
{
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace_back(PendingCall {
pendingLegacyCalls_.emplace_back(PendingCall {
.start = std::chrono::steady_clock::now(),
.ice_sp = ice,
.call = weak_call,
......@@ -1075,6 +1008,24 @@ RingAccount::incomingCall(dht::IceCandidates&& msg)
});
}
}
#endif
std::shared_ptr<SIPCall>
RingAccount::incomingCall(const std::string& peer_id, std::shared_ptr<ReliableSocket::DataConnection> dc)
{
RING_DBG("[DHT:%s] incoming call from peer %s", getAccountID().c_str(), peer_id.c_str());
auto call = Manager::instance().callFactory.newCall<SIPCall, RingAccount>(*this, Manager::instance().getNewCallID(), Call::CallType::INCOMING);
call->setPeerNumber(peer_id);
call->initRecFilename(peer_id);
call->setDataConnection(dc);
// Continue to scan the connection progress into the mainloop
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace_back(call);
return call;
}
void
RingAccount::doUnregister(std::function<void(bool)> released_cb)
......@@ -1375,10 +1326,10 @@ RingAccount::getContactHeader(pjsip_transport* t)
}
// FIXME: be sure that given transport is from SipIceTransport
auto tlsTr = reinterpret_cast<tls::SipsIceTransport::TransportData*>(t)->self;
auto address = tlsTr->getLocalAddress();
//auto tlsTr = reinterpret_cast<tls::SipsIceTransport::TransportData*>(t)->self;
IpAddr address {"localhost"}; // TODO: ???
contact_.slen = pj_ansi_snprintf(contact_.ptr, PJSIP_MAX_URL_SIZE,
"%s%s<sips:%s%s%s;transport=tls>",
"%s%s<sips:%s%s%s;transport=udp>",
displayName_.c_str(),
(displayName_.empty() ? "" : " "),
ringid.c_str(),
......@@ -1572,6 +1523,8 @@ private:
std::shared_ptr<ReliableSocket::DataConnection> dataConnection_ {};
std::shared_ptr<SIPCall> call_ {};
// DTLS session
std::unique_ptr<tls::TlsSession> tls_;
void onCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int);
......@@ -1926,9 +1879,11 @@ SecureIceTransport::createTlsSession(dht::crypto::Identity& id,
};
tls::TlsSession::TlsSessionCallbacks tls_cbs = {
.onStateChange = [this](tls::TlsSessionState state) {
if (state == tls::TlsSessionState::ESTABLISHED)
if (state == tls::TlsSessionState::ESTABLISHED) {
dataConnection_->connect(tls_.get());
else if (state == tls::TlsSessionState::SHUTDOWN)
if (tid & 1)
call_ = account.newIncomingCall(peer_id);
} else if (state == tls::TlsSessionState::SHUTDOWN)
dataConnection_->disconnect();
},
.onRxData = [this](std::vector<uint8_t>&& buf) {
......
......@@ -312,7 +312,8 @@ class RingAccount : public SIPAccountBase {
NON_COPYABLE(RingAccount);
void doRegister_();
void incomingCall(dht::IceCandidates&& msg);
void legacyIncomingCall(dht::IceCandidates&& msg);
std::shared_ptr<SIPCall> incomingCall(const std::string&, std::shared_ptr<ReliableSocket::DataConnection>);
const dht::ValueType USER_PROFILE_TYPE = {9, "User profile", std::chrono::hours(24 * 7)};
......@@ -346,27 +347,18 @@ class RingAccount : public SIPAccountBase {
dht::InfoHash callKey_;
struct PendingCall {
std::chrono::steady_clock::time_point start;
std::shared_ptr<IceTransport> ice_sp;
std::weak_ptr<SIPCall> call;
std::future<size_t> listen_key;
dht::InfoHash call_key;
dht::InfoHash from;
};
void handlePendingCallList();
bool handlePendingCall(PendingCall& pc, bool incoming);
bool handlePendingCall(std::weak_ptr<SIPCall>&);
/**
* DHT calls waiting for ICE negotiation
*/
std::list<PendingCall> pendingCalls_ {};
std::list<std::weak_ptr<SIPCall>> pendingCalls_ {};
/**
* Incoming DHT calls that are not yet actual SIP calls.
*/
std::list<PendingCall> pendingSipCalls_ {};
std::list<std::weak_ptr<SIPCall>> pendingSipCalls_ {};
std::set<dht::Value::Id> treatedCalls_ {};
mutable std::mutex callsMutex_ {};
......
......@@ -19,7 +19,9 @@ libsiplink_la_SOURCES = \
sip_utils.cpp \
sip_utils.h \
base64.h \
base64.c
base64.c \
multistream_siptransport.cpp \
multistream_siptransport.h
libsiplink_la_SOURCES+=sippresence.cpp \
sippresence.h \
......
/*
* Copyright (C) 2004-2016 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "multistream_siptransport.h"
#include "manager.h"
#include "sip/sip_utils.h"
#include "logger.h"
#include "data_transfer.h"
#include <pjsip/sip_transport.h>
#include <pjsip/sip_endpoint.h>
#include <pj/compat/socket.h>
#include <pj/lock.h>
#include <algorithm>
#include <cstring> // std::memset
namespace ring {
static constexpr int POOL_TP_INIT {512};
static constexpr int POOL_TP_INC {512};
static constexpr int TRANSPORT_INFO_LENGTH {64};
static void
sockaddr_to_host_port(pj_pool_t* pool,
pjsip_host_port* host_port,
const pj_sockaddr* addr)
{
host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0);
host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
host_port->port = pj_sockaddr_get_port(addr);
}
MultiStreamSipTransport::MultiStreamSipTransport(pjsip_endpoint* endpt,
std::shared_ptr<ReliableSocket::DataStream> stream)
: trData_ ()
, pool_ {nullptr, pj_pool_release}
, rxPool_ (nullptr, pj_pool_release)
, stream_ (stream)
, txThreadloop_([]{ return true;},
[this]{ flushTxQueue(); },
[]{})
{
RING_DBG("MultiStreamSipTransport@%p {PjTr=%p}", this, &trData_.base);
trData_.self = this; // up-link for PJSIP C callbacks
pool_ = std::move(sip_utils::smart_alloc_pool(endpt, "SipsIceTransport.pool",
POOL_TP_INIT, POOL_TP_INC));
auto& base = trData_.base;
std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "MultiStreamSipTransport");
base.endpt = endpt;
base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
base.pool = pool_.get();
if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
throw std::runtime_error("Can't create PJSIP atomic.");
if (pj_lock_create_recursive_mutex(pool_.get(), "SipsIceTransport.mutex",
&base.lock) != PJ_SUCCESS)
throw std::runtime_error("Can't create PJSIP mutex.");
IpAddr remote_addr {"10.0.0.2:1"}; // TODO: remote address?
pj_sockaddr_cp(&base.key.rem_addr, remote_addr.pjPtr());
base.key.type = PJSIP_TRANSPORT_TLS;
base.type_name = (char*)pjsip_transport_get_type_name((pjsip_transport_type_e)base.key.type);
base.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)base.key.type);
base.info = (char*) pj_pool_alloc(pool_.get(), TRANSPORT_INFO_LENGTH);
char print_addr[PJ_INET6_ADDRSTRLEN+10];
pj_ansi_snprintf(base.info, TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name,
pj_sockaddr_print(remote_addr.pjPtr(), print_addr, sizeof(print_addr), 3));
base.addr_len = sizeof(pj_sockaddr_in); // TODO: ???
base.dir = PJSIP_TP_DIR_NONE;
base.data = nullptr;
IpAddr local_addr {"10.0.0.1:1"}; // TODO: local address?
pj_sockaddr_cp(&base.local_addr, local_addr.pjPtr());
sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_addr.pjPtr());
base.send_msg = [](pjsip_transport *transport,
pjsip_tx_data *tdata,
const pj_sockaddr_t *rem_addr, int addr_len,
void *token, pjsip_transport_callback callback) -> pj_status_t {
auto& this_ = reinterpret_cast<TransportData*>(transport)->self;
return this_->send(tdata, rem_addr, addr_len, token, callback);
};
base.do_shutdown = [](pjsip_transport *transport) -> pj_status_t {
auto& this_ = reinterpret_cast<TransportData*>(transport)->self;
RING_DBG("MultiStreamSipTransport@%p: shutdown", this_);
{
// Flush pending state changes and rx packet before shutdown
// or pjsip callbacks will crash
std::unique_lock<std::mutex> lk{this_->stateChangeEventsMutex_};
this_->stateChangeEvents_.clear();
this_->stream_->close();
}
return PJ_SUCCESS;
};
base.destroy = [](pjsip_transport *transport) -> pj_status_t {
auto& this_ = reinterpret_cast<TransportData*>(transport)->self;
RING_DBG("MultiStreamSipTransport@%p: destroying", this_);
delete this_; // we're owned by PJSIP
return PJ_SUCCESS;
};
/* Init rdata_ */
std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
rxPool_ = std::move(sip_utils::smart_alloc_pool(endpt, "MultiStreamSipTransport.rxPool",
PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_LEN));
rdata_.tp_info.pool = rxPool_.get();
rdata_.tp_info.transport = &base;
rdata_.tp_info.tp_data = this;
rdata_.tp_info.op_key.rdata = &rdata_;
pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key,
sizeof(pj_ioqueue_op_key_t));
rdata_.pkt_info.src_addr = base.key.rem_addr;
rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
auto rem_addr = &base.key.rem_addr;
pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name,
sizeof(rdata_.pkt_info.src_name), 0);
rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
std::memset(&localCertInfo_, 0, sizeof(pj_ssl_cert_info));
std::memset(&remoteCertInfo_, 0, sizeof(pj_ssl_cert_info));
if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
throw std::runtime_error("Can't register MultiStreamSipTransport on PJSIP");
Manager::instance().registerEventHandler((uintptr_t)this, [this]{ handleEvents(); });
updateTransportState(PJSIP_TP_STATE_CONNECTED);
txThreadloop_.start();
}
MultiStreamSipTransport::~MultiStreamSipTransport()
{
RING_DBG("~MultiStreamSipTransport@%p {PjTr=%p}", this, &trData_.base);
txThreadloop_.join();
// Flush tx queue with ENOTCONN error
for (auto tdata : txQueue_) {
tdata->op_key.tdata = nullptr;
if (tdata->op_key.callback)
tdata->op_key.callback(&trData_.base, tdata->op_key.token,
-PJ_RETURN_OS_ERROR(OSERR_ENOTCONN));
}
Manager::instance().unregisterEventHandler((uintptr_t)this);
// If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
auto base = getTransportBase();
if (not base->is_shutdown and not base->is_destroying)
pjsip_transport_shutdown(base);
// Stop low-level transport
stream_->close();
pj_lock_destroy(base->lock);
pj_atomic_destroy(base->ref_cnt);
}
void
MultiStreamSipTransport::flushTxQueue()
{
// Handle SIP transport -> Stream
decltype(txQueue_) tx_queue;
{
std::lock_guard<std::mutex> l(txMutex_);
if (canWrite_) {
tx_queue = std::move(txQueue_);
txQueue_.clear();
}
}
bool fatal = false;
for (auto tdata : tx_queue) {
pj_status_t status;
if (!fatal) {
const std::size_t size = tdata->buf.cur - tdata->buf.start;
auto ret = stream_->sendData(tdata->buf.start, size);
if (ret < 0) {
RING_ERR("[SIP] fatal error during sending: %s", strerror(ret));
txThreadloop_.stop();
}
if (ret < 0)
status = -PJ_RETURN_OS_ERROR(errno);
else
status = ret;
} else
status = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
tdata->op_key.tdata = nullptr;
if (tdata->op_key.callback)
tdata->op_key.callback(&trData_.base, tdata->op_key.token, status);
}
}
void
MultiStreamSipTransport::handleEvents()
{
// Notify transport manager about state changes first
// Note: stop when disconnected event is encountered
// and differ its notification AFTER pending rx msg to let
// them a chance to be delivered to application before closing
// the transport.
decltype(stateChangeEvents_) eventDataQueue;
{
std::lock_guard<std::mutex> lk{stateChangeEventsMutex_};
eventDataQueue = std::move(stateChangeEvents_);
stateChangeEvents_.clear();
}
ChangeStateEventData disconnectedEvent;
bool disconnected = false;
auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr);
if (state_cb) {
for (auto& evdata : eventDataQueue) {
evdata.state_info.ext_info = nullptr;
if (evdata.state != PJSIP_TP_STATE_DISCONNECTED) {
(*state_cb)(&trData_.base, evdata.state, &evdata.state_info);
} else {
disconnectedEvent = std::move(evdata);
disconnected = true;
break;
}
}
}
// Handle Stream -> SIP transport
if (readBufferSize_ < sizeof(rdata_.pkt_info.packet) and stream_->canRead()) {
auto ret = stream_->recvData(rdata_.pkt_info.packet + readBufferSize_,
sizeof(rdata_.pkt_info.packet) - readBufferSize_);
RING_DBG("recvData(ptr+%zu, max=%zu) >>> %zu B"
, readBufferSize_
, sizeof(rdata_.pkt_info.packet) - readBufferSize_
, ret);
if (ret > 0) {
readBufferSize_ += ret;
rdata_.pkt_info.len = readBufferSize_;
rdata_.pkt_info.zero = 0;
pj_gettimeofday(&rdata_.pkt_info.timestamp);
RING_WARN("%zu B >>> PJSIP:\n>>>>>>>>%s<<<<<<<<\n"
, rdata_.pkt_info.len
, std::string(rdata_.pkt_info.packet, rdata_.pkt_info.len).c_str());
auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
if (eaten)
RING_WARN("PJSIP: eaten=%zd", eaten);
pj_pool_reset(rdata_.tp_info.pool);
if (eaten > 0) {
if (auto remain = readBufferSize_ - eaten)
pj_memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, remain);
readBufferSize_ -= eaten;
}
} else
RING_WARN("[MSST] readData failed, %s", strerror(ret));
}
// Time to deliver disconnected event if exists
if (disconnected and state_cb)
(*state_cb)(&trData_.base, disconnectedEvent.state, &disconnectedEvent.state_info);
}
void
MultiStreamSipTransport::updateTransportState(pjsip_transport_state state)
{
ChangeStateEventData ev;
std::memset(&ev.state_info, 0, sizeof(ev.state_info));
ev.state = state;
if (state == PJSIP_TP_STATE_CONNECTED) {
std::lock_guard<std::mutex> lk {txMutex_};
canWrite_ = true;
}
ev.state_info.status = PJ_SUCCESS;
pushChangeStateEvent(std::move(ev));
}
void
MultiStreamSipTransport::pushChangeStateEvent(ChangeStateEventData&& ev)
{
std::lock_guard<std::mutex> lk{stateChangeEventsMutex_};
stateChangeEvents_.emplace_back(std::move(ev));
}
pj_status_t
MultiStreamSipTransport::send(pjsip_tx_data* tdata, const pj_sockaddr_t* rem_addr,
int addr_len, void* token,
pjsip_transport_callback callback)
{
// Sanity check
PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
// Check that there's no pending operation associated with the tdata
PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
// Check the address is supported
PJ_ASSERT_RETURN(rem_addr and
(addr_len==sizeof(pj_sockaddr_in) or
addr_len==sizeof(pj_sockaddr_in6)),
PJ_EINVAL);
// Check in we are able to send it in synchronous way first
const std::size_t size = tdata->buf.cur - tdata->buf.start;
std::unique_lock<std::mutex> lk {txMutex_};
if (canWrite_ and txQueue_.empty()) {
RING_WARN("[MSST] send %zu", size);
auto ret = stream_->sendData(tdata->buf.start, size);
lk.unlock();
// Shutdown on fatal error, else ignore it
if (ret < 0) {
RING_ERR("[SIP] error during sending: %s", strerror(ret));
return -PJ_RETURN_OS_ERROR(errno);
}
return PJ_SUCCESS;
}
// Asynchronous sending
RING_WARN("[MSST] send async %zu", size);
tdata->op_key.tdata = tdata;
tdata->op_key.token = token;
tdata->op_key.callback = callback;
txQueue_.push_back(tdata);
return PJ_EPENDING;
}
} // namespace ring
/*
* Copyright (C) 2004-2016 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include "ip_utils.h"
#include "noncopyable.h"
#include "threadloop.h"
#include <pjsip.h>
#include <pj/pool.h>
#include <list>
#include <functional>
#include <memory>
#include <mutex>
#include <chrono>
#include <queue>
#include <utility>
#include <vector>
#include <condition_variable>
namespace ring {
namespace ReliableSocket {
class DataStream;
}
/**
* SipsIceTransport
*
* Implements a SipTransport for ReliableSocket::DataStream
*/
class MultiStreamSipTransport
{
private:
NON_COPYABLE(MultiStreamSipTransport);
public:
using Clock = std::chrono::steady_clock;
using TransportData = struct {
pjsip_transport base; // do not move, SHOULD be the fist member
MultiStreamSipTransport* self {nullptr};
};
static_assert(std::is_standard_layout<TransportData>::value,
"TranportData requires standard-layout");
MultiStreamSipTransport(pjsip_endpoint* endpt,
std::shared_ptr<ReliableSocket::DataStream> stream);
~MultiStreamSipTransport();
void shutdown();
public: // Getters
pjsip_transport* getTransportBase() { return &trData_.base; }
private: // PJSIP transport backend
TransportData trData_; // uplink to "this" (used by PJSIP called C-callbacks)
std::unique_ptr<pj_pool_t, decltype(pj_pool_release)*> pool_;
std::unique_ptr<pj_pool_t, decltype(pj_pool_release)*> rxPool_;
pjsip_rx_data rdata_;
pj_ssl_cert_info localCertInfo_;
pj_ssl_cert_info remoteCertInfo_;
pj_status_t verifyStatus_ {PJ_EUNKNOWN};
private: // DataStream backend
const std::shared_ptr<ReliableSocket::DataStream> stream_;
private: // IO / events
struct ChangeStateEventData {
pjsip_transport_state_info state_info;
decltype(PJSIP_TP_STATE_DISCONNECTED) state;
};
std::size_t readBufferSize_ {0};
std::mutex stateChangeEventsMutex_ {};
std::list<ChangeStateEventData> stateChangeEvents_ {};
pj_status_t send(pjsip_tx_data*, const pj_sockaddr_t*, int, void*, pjsip_transport_callback);
void handleEvents();
void pushChangeStateEvent(ChangeStateEventData&&);
void updateTransportState(pjsip_transport_state);
private: // Transmission layer (async by thread)
void flushTxQueue();
std::mutex txMutex_ {};
std::condition_variable txCv_ {};
bool canWrite_ {false}; // true if we can send data (cnx established)
std::list<pjsip_tx_data*> txQueue_ {}; // Used for asynchronous transmissions
ThreadLoop txThreadloop_;
};
} // namespace ring
......@@ -40,6 +40,7 @@
#include "dring/call_const.h"
#include "dring/media_const.h"
#include "client/ring_signal.h"
#include "data_transfer.h"
#ifdef RING_VIDEO
#include "client/videomanager.h"
......@@ -1076,4 +1077,14 @@ SIPCall::initIceTransport(bool master, unsigned channel_num)
return result;
}
void
SIPCall::onDataConnected()
{
RING_DBG("[call:%s] connected", getCallId().c_str());
if (!peerStream_)
peerStream_.reset(new ReliableSocket::DataStream(1));
dc_->attachStream(peerStream_);
setTransport(getSIPVoIPLink()->sipTransportBroker->getMultiStreamTransport(peerStream_));
}
} // namespace ring
......@@ -210,6 +210,9 @@ class SIPCall : public Call
bool initIceTransport(bool master, unsigned channel_num=4) override;
void terminateSipSession(int status);
void onDataConnected() override;
private:
NON_COPYABLE(SIPCall);
......
......@@ -25,6 +25,7 @@
#include "ringdht/sip_transport_ice.h"
#include "ringdht/sips_transport_ice.h"
#include "multistream_siptransport.h"
#include "array_size.h"
#include "compiler_intrinsics.h"
......@@ -458,4 +459,17 @@ SipTransportBroker::getTlsIceTransport(const std::shared_ptr<ring::IceTransport>
return sip_tr;
}
std::shared_ptr<SipTransport>
SipTransportBroker::getMultiStreamTransport(const std::shared_ptr<ReliableSocket::DataStream> stream)
{
auto mss_tr = std::unique_ptr<MultiStreamSipTransport>(new MultiStreamSipTransport(endpt_, stream));
auto tr = mss_tr->getTransportBase();
auto sip_tr = std::make_shared<SipTransport>(tr);
mss_tr.release(); // managed by PJSIP now
std::lock_guard<std::mutex> lock(transportMapMutex_);
transports_.emplace(std::make_pair(tr, sip_tr));
return sip_tr;
}
} // namespace ring
......@@ -157,6 +157,9 @@ class IceTransport;
namespace tls {
struct TlsParams;
};
namespace ReliableSocket {
class DataStream;
};
/**
* Manages the transports and receive callbacks from PJSIP
......@@ -182,6 +185,9 @@ public:
getTlsIceTransport(const std::shared_ptr<IceTransport>, unsigned comp_id,
const tls::TlsParams&);
std::shared_ptr<SipTransport>
getMultiStreamTransport(const std::shared_ptr<ReliableSocket::DataStream> stream);
std::shared_ptr<SipTransport> addTransport(pjsip_transport*);
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment