From b9c287ee79221eeffcfd35141c85892fa8410731 Mon Sep 17 00:00:00 2001 From: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> Date: Thu, 21 Jul 2016 17:44:35 -0400 Subject: [PATCH] WIP: SIP over ReliableStream wip, do not merge Change-Id: I113a405d75eef43e600bca2007900a99c31c645a --- src/call.cpp | 7 +- src/call.h | 18 +- src/ringdht/ringaccount.cpp | 311 ++++++++++------------- src/ringdht/ringaccount.h | 18 +- src/sip/Makefile.am | 4 +- src/sip/multistream_siptransport.cpp | 358 +++++++++++++++++++++++++++ src/sip/multistream_siptransport.h | 116 +++++++++ src/sip/sipcall.cpp | 11 + src/sip/sipcall.h | 3 + src/sip/siptransport.cpp | 14 ++ src/sip/siptransport.h | 6 + 11 files changed, 669 insertions(+), 197 deletions(-) create mode 100644 src/sip/multistream_siptransport.cpp create mode 100644 src/sip/multistream_siptransport.h diff --git a/src/call.cpp b/src/call.cpp index a17fc9fb1c..a84fc1ff48 100644 --- a/src/call.cpp +++ b/src/call.cpp @@ -34,17 +34,18 @@ #include "call_factory.h" #include "string_utils.h" #include "enumclass_utils.h" +#include "data_transfer.h" #include "errno.h" namespace ring { Call::Call(Account& account, const std::string& id, Call::CallType type) - : id_(id) + : creationTime_() + , id_(id) , type_(type) , account_(account) { - time(×tamp_start_); account_.attachCall(id_); } @@ -283,7 +284,7 @@ Call::getDetails() const {DRing::Call::Details::DISPLAY_NAME, peerDisplayName_}, {DRing::Call::Details::CALL_STATE, getStateStr()}, {DRing::Call::Details::CONF_ID, confID_}, - {DRing::Call::Details::TIMESTAMP_START, ring::to_string(timestamp_start_)}, + {DRing::Call::Details::TIMESTAMP_START, ring::to_string(std::chrono::duration_cast<std::chrono::seconds>(creationTime_.time_since_epoch()).count())}, {DRing::Call::Details::ACCOUNTID, getAccountId()}, {DRing::Call::Details::AUDIO_MUTED, std::string(bool_to_str(isAudioMuted_))}, {DRing::Call::Details::VIDEO_MUTED, std::string(bool_to_str(isVideoMuted_))}, diff --git a/src/call.h b/src/call.h index 54c264a698..183020fbad 100644 --- a/src/call.h +++ b/src/call.h @@ -47,6 +47,11 @@ class VoIPLink; class Account; class AccountVideoCodecInfo; +namespace ReliableSocket { +class DataConnection; +class DataStream; +} + template <class T> using CallMap = std::map<std::string, std::shared_ptr<T> >; /* @@ -310,6 +315,11 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { void removeCall(); + void setDataConnection(std::shared_ptr<ReliableSocket::DataConnection> dc) { dc_ = dc; } + std::shared_ptr<ReliableSocket::DataConnection> getDataConnection() const { return dc_; } + + virtual void onDataConnected() = 0; + virtual bool initIceTransport(bool master, unsigned channel_num=4); int waitForIceInitialization(unsigned timeout); @@ -331,6 +341,8 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { virtual void restartMediaSender() = 0; + std::chrono::steady_clock::time_point getCreationTime() const { return creationTime_; } + protected: /** * Constructor of a call @@ -339,6 +351,10 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { */ Call(Account& account, const std::string& id, Call::CallType type); + const std::chrono::steady_clock::time_point creationTime_; + + std::shared_ptr<ReliableSocket::DataConnection> dc_ {}; + std::shared_ptr<ReliableSocket::DataStream> peerStream_; std::shared_ptr<IceTransport> iceTransport_ {}; bool isAudioMuted_{false}; @@ -387,8 +403,6 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { /** Peer Display Name */ std::string peerDisplayName_ {}; - - time_t timestamp_start_ {0}; }; } // namespace ring diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index a7adc9d36a..649844d0a8 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -87,6 +87,7 @@ static constexpr int ICE_COMPONENTS {1}; static constexpr int ICE_COMP_SIP_TRANSPORT {0}; static constexpr int ICE_INIT_TIMEOUT {10}; static constexpr auto ICE_NEGOTIATION_TIMEOUT = std::chrono::seconds(60); +static constexpr auto CONNECTION_TIMEOUT = std::chrono::seconds(60); static constexpr auto TLS_TIMEOUT = std::chrono::seconds(30); // Limit number of ICE data msg request/msg that waiting for handling @@ -172,22 +173,8 @@ RingAccount::~RingAccount() std::shared_ptr<SIPCall> RingAccount::newIncomingCall(const std::string& from) { - std::lock_guard<std::mutex> lock(callsMutex_); - auto call_it = pendingSipCalls_.begin(); - while (call_it != pendingSipCalls_.end()) { - auto call = call_it->call.lock(); - if (not call) { - RING_WARN("newIncomingCall: discarding deleted call"); - call_it = pendingSipCalls_.erase(call_it); - } else if (call->getPeerNumber() == from) { - pendingSipCalls_.erase(call_it); - RING_DBG("newIncomingCall: found matching call for %s", from.c_str()); - return call; - } else { - ++call_it; - } - } - RING_ERR("newIncomingCall: can't find matching call for %s", from.c_str()); + if (auto dc = obtainDataConnection(from, false)) + return incomingCall(from, dc); return nullptr; } @@ -195,101 +182,30 @@ template <> std::shared_ptr<SIPCall> RingAccount::newOutgoingCall(const std::string& toUrl) { - const std::string toUri = parseRingUri(toUrl); - RING_DBG("Calling DHT peer %s", toUri.c_str()); + const std::string peer_id = parseRingUri(toUrl); + RING_DBG("Calling DHT peer %s", peer_id.c_str()); auto& manager = Manager::instance(); auto call = manager.callFactory.newCall<SIPCall, RingAccount>(*this, manager.getNewCallID(), Call::CallType::OUTGOING); - call->setIPToIP(true); call->setSecure(isTlsEnabled()); // TODO: for now, we automatically trust all explicitly called peers - setCertificateStatus(toUri, tls::TrustStore::PermissionStatus::ALLOWED); - - auto shared_this = std::static_pointer_cast<RingAccount>(shared_from_this()); - std::weak_ptr<SIPCall> weak_call = call; - manager.addTask([shared_this, weak_call, toUri] { - auto call = weak_call.lock(); - - if (not call) { - call->onFailure(); - return false; - } - - // Create an ICE transport for SIP channel - std::shared_ptr<IceTransport> ice {}; - - try { - ice = shared_this->createIceTransport(("sip:" + call->getCallId()).c_str(), - ICE_COMPONENTS, true, shared_this->getIceOptions()); - } catch (std::runtime_error& e) { - RING_ERR("%s", e.what()); - call->onFailure(); - return false; - } - - auto iceInitTimeout = std::chrono::steady_clock::now() + std::chrono::seconds {ICE_INIT_TIMEOUT}; - - /* First step: wait for an initialized ICE transport for SIP channel */ - if (ice->isFailed() or std::chrono::steady_clock::now() >= iceInitTimeout) { - RING_DBG("ice init failed (or timeout)"); - call->onFailure(); - return false; - } + setCertificateStatus(peer_id, tls::TrustStore::PermissionStatus::ALLOWED); - if (not ice->isInitialized()) - return true; // process task again! - - /* Next step: sent the ICE data to peer through DHT */ - const dht::Value::Id callvid = udist(shared_this->rand_); - const dht::Value::Id vid = udist(shared_this->rand_); - const auto toH = dht::InfoHash(toUri); - const auto callkey = dht::InfoHash::get("callto:" + toUri); - dht::Value val { dht::IceCandidates(callvid, ice->getLocalAttributesAndCandidates()) }; - val.id = vid; - - call->setState(Call::ConnectionState::TRYING); - shared_this->dht_.putEncrypted( - callkey, toH, - std::move(val), - [=](bool ok) { // Put complete callback - if (!ok) { - RING_WARN("Can't put ICE descriptor on DHT"); - if (auto call = weak_call.lock()) - call->onFailure(); - } else - RING_DBG("Successfully put ICE descriptor on DHT"); - } - ); - - auto listenKey = shared_this->dht_.listen<dht::IceCandidates>( - callkey, - [=] (dht::IceCandidates&& msg) { - if (msg.id != callvid or msg.from != toH) - return true; - RING_WARN("ICE request replied from DHT peer %s\n%s", toH.toString().c_str(), - std::string(msg.ice_data.cbegin(), msg.ice_data.cend()).c_str()); - if (auto call = weak_call.lock()) - call->setState(Call::ConnectionState::PROGRESSING); - if (!ice->start(msg.ice_data)) { - call->onFailure(); - return true; - } - return false; - } - ); + // Asynchronous launch of a peer reliable connection + auto dc = obtainDataConnection(peer_id); + if (not dc) { + call->removeCall(); + return nullptr; + } - shared_this->pendingCalls_.emplace_back(PendingCall{ - std::chrono::steady_clock::now(), - ice, weak_call, - std::move(listenKey), - callkey, toH - }); + call->setDataConnection(dc); - return false; - }); + // Continue to scan the connection progress into the mainloop + std::lock_guard<std::mutex> lock(callsMutex_); + pendingCalls_.emplace_back(call); return call; } @@ -604,42 +520,25 @@ RingAccount::handleEvents() void RingAccount::handlePendingCallList() { - // Process pending call into a local list to not block threads depending on this list, - // as incoming call handlers. - decltype(pendingCalls_) pending_calls; - { - std::lock_guard<std::mutex> lock(callsMutex_); - pending_calls = std::move(pendingCalls_); - pendingCalls_.clear(); - } - - static const dht::InfoHash invalid_hash; // Invariant + std::unique_lock<std::mutex> lk(callsMutex_); - auto pc_iter = std::begin(pending_calls); - while (pc_iter != std::end(pending_calls)) { - bool incoming = pc_iter->call_key == invalid_hash; // do it now, handlePendingCall may invalidate pc data + auto it = std::begin(pendingCalls_); + while (it != std::end(pendingCalls_)) { bool handled; + lk.unlock(); try { - handled = handlePendingCall(*pc_iter, incoming); + handled = handlePendingCall(*it); } catch (const std::exception& e) { RING_ERR("[DHT] exception during pending call handling: %s", e.what()); handled = true; // drop from pending list } + lk.lock(); - if (handled) { - // Cancel pending listen (outgoing call) - if (not incoming) - dht_.cancelListen(pc_iter->call_key, pc_iter->listen_key.share()); - pc_iter = pending_calls.erase(pc_iter); - } else - ++pc_iter; - } - - // Re-integrate non-handled and valid pending calls - { - std::lock_guard<std::mutex> lock(callsMutex_); - pendingCalls_.splice(std::end(pendingCalls_), pending_calls); + if (handled) + it = pendingCalls_.erase(it); + else + ++it; } } @@ -682,23 +581,30 @@ check_peer_certificate(dht::InfoHash from, unsigned status, const gnutls_datum_t } bool -RingAccount::handlePendingCall(PendingCall& pc, bool incoming) +RingAccount::handlePendingCall(std::weak_ptr<SIPCall>& weak_call) { - auto call = pc.call.lock(); + auto call = weak_call.lock(); if (not call) return true; - auto ice = pc.ice_sp.get(); - if (not ice or ice->isFailed()) { - RING_ERR("[call:%s] Null or failed ICE transport", call->getCallId().c_str()); + auto dc = call->getDataConnection(); + if (!dc or dc->isFailed()) { + RING_ERR("[call:%s] connection failed", call->getCallId().c_str()); call->onFailure(); return true; } + DRing::DataConnectionInfo info; + dc->getInfo(info); + // Return to pending list if not negotiated yet and not in timeout - if (not ice->isRunning()) { - if ((std::chrono::steady_clock::now() - pc.start) >= ICE_NEGOTIATION_TIMEOUT) { - RING_WARN("[call:%s] Timeout on ICE negotiation", call->getCallId().c_str()); + if (info.code < 200) + return false; + + if (info.code >= 400) { + auto creationTime = call->getCreationTime(); + if ((decltype(creationTime)::clock::now() - creationTime) >= CONNECTION_TIMEOUT) { + RING_WARN("[call:%s] Connection timeout", call->getCallId().c_str()); call->onFailure(); return true; } @@ -706,41 +612,14 @@ RingAccount::handlePendingCall(PendingCall& pc, bool incoming) return call->getState() == Call::CallState::OVER; } - // Securize a SIP transport with TLS (on top of ICE tranport) and assign the call with it - auto remote_h = pc.from; - auto id(loadIdentity()); - - tls::TlsParams tlsParams { - .ca_list = "", - .cert = id.second, - .cert_key = id.first, - .dh_params = dhParams_, - .timeout = std::chrono::duration_cast<decltype(tls::TlsParams::timeout)>(TLS_TIMEOUT), - .cert_check = [remote_h](unsigned status, const gnutls_datum_t* cert_list, - unsigned cert_num) -> pj_status_t { - try { - return check_peer_certificate(remote_h, status, cert_list, cert_num); - } catch (const std::exception& e) { - RING_ERR("[peer:%s] TLS certificate check exception: %s", - remote_h.toString().c_str(), e.what()); - return PJ_SSL_CERT_EUNKNOWN; - } - } - }; - auto tr = link_->sipTransportBroker->getTlsIceTransport(pc.ice_sp, ICE_COMP_SIP_TRANSPORT, - tlsParams); - call->setTransport(tr); + // Now info.code is a definitive 2xx code - // Notify of fully available connection between peers - RING_DBG("[call:%s] SIP communication established", call->getCallId().c_str()); + call->onDataConnected(); call->setState(Call::ConnectionState::PROGRESSING); - // Incoming call? - if (incoming) { - std::lock_guard<std::mutex> lock(callsMutex_); - pendingSipCalls_.emplace_back(std::move(pc)); // copy of pc - } else - createOutgoingCall(call, remote_h.toString(), ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT)); + // Outgoing call? + if (info.isClient) + createOutgoingCall(call, info.peer, dc->getRemoteAddress()); return true; } @@ -904,6 +783,7 @@ RingAccount::doRegister_() callKey_ = dht::InfoHash::get("callto:"+dht_.getId().toString()); RING_DBG("[DHT:%s] callto key: %s", getAccountID().c_str(), callKey_.toString().c_str()); +#if 0 dht_.listen<dht::IceCandidates>( callKey_, [shared] (dht::IceCandidates&& msg) { @@ -955,7 +835,8 @@ RingAccount::doRegister_() runOnMainThread([=]() mutable { shared->incomingCall(std::move(msg)); }); return true; } - ); + ); +#endif auto inboxKey = dht::InfoHash::get("inbox:"+dht_.getId().toString()); RING_DBG("[DHT:%s] inbox key: %s", getAccountID().c_str(), inboxKey.toString().c_str()); @@ -1018,6 +899,58 @@ RingAccount::doRegister_() dht_.listen<IceDataCandidates>( inboxKey, [shared] (IceDataCandidates&& msg) { + auto& this_ = *shared; + + // forbid self connection + if (msg.from == this_.dht_.getId()) { + RING_WARN("Discarding loopback DHT connection request"); + return true; + } + + // quick check in case we already explicilty banned this public key + auto trustStatus = this_.trust_.getCertificateStatus(msg.from.toString()); + if (trustStatus == tls::TrustStore::PermissionStatus::BANNED) { + RING_WARN("Discarding DHT connection request from banned peer %s", msg.from.toString().c_str()); + return true; + } + + auto res = this_.treatedCalls_.insert(msg.id); + this_.saveTreatedCalls(); + if (!res.second) + return true; + + if (not this_.dhtPublicInCalls_ and trustStatus != tls::TrustStore::PermissionStatus::ALLOWED) { + this_.findCertificate( + msg.from, + [shared, msg](const std::shared_ptr<dht::crypto::Certificate> cert) mutable { + if (!cert or cert->getId() != msg.from) { + RING_WARN("Can't find certificate of %s for incoming call.", + msg.from.toString().c_str()); + return; + } + + tls::CertificateStore::instance().pinCertificate(cert); + + auto& this_ = *shared; + if (!this_.trust_.isAllowed(*cert)) { + RING_WARN("Discarding incoming DHT call from untrusted peer %s.", + msg.from.toString().c_str()); + return; + } + + runOnMainThread([shared, msg]() mutable { + if (msg.id & 1) + shared->onDataTransactionReply(msg); + else + shared->onDataTransactionRequest(std::move(msg)); + }); + } + ); + return true; + } + else if (this_.dhtPublicInCalls_ and trustStatus != tls::TrustStore::PermissionStatus::BANNED) { + this_.findCertificate(msg.from.toString().c_str()); + } if (msg.id & 1) shared->onDataTransactionReply(msg); else @@ -1030,11 +963,12 @@ RingAccount::doRegister_() } } +#if 0 void -RingAccount::incomingCall(dht::IceCandidates&& msg) +RingAccount::legacyIncomingCall(dht::IceCandidates&& msg) { auto from = msg.from.toString(); - RING_WARN("ICE incoming from DHT peer %s\n%s", from.c_str(), + RING_WARN("Legacy incoming call from peer %s, ICE msg:\n%s", from.c_str(), std::string(msg.ice_data.cbegin(), msg.ice_data.cend()).c_str()); auto call = Manager::instance().callFactory.newCall<SIPCall, RingAccount>(*this, Manager::instance().getNewCallID(), Call::CallType::INCOMING); auto ice = createIceTransport(("sip:"+call->getCallId()).c_str(), ICE_COMPONENTS, false, getIceOptions()); @@ -1043,12 +977,11 @@ RingAccount::incomingCall(dht::IceCandidates&& msg) val.id = vid; std::weak_ptr<SIPCall> weak_call = call; - auto shared_this = std::static_pointer_cast<RingAccount>(shared_from_this()); dht_.putEncrypted( callKey_, msg.from, std::move(val), - [weak_call, shared_this, vid](bool ok) { + [weak_call, vid](bool ok) { if (!ok) { RING_WARN("Can't put ICE descriptor reply on DHT"); if (auto call = weak_call.lock()) @@ -1065,7 +998,7 @@ RingAccount::incomingCall(dht::IceCandidates&& msg) call->initRecFilename(from); { std::lock_guard<std::mutex> lock(callsMutex_); - pendingCalls_.emplace_back(PendingCall { + pendingLegacyCalls_.emplace_back(PendingCall { .start = std::chrono::steady_clock::now(), .ice_sp = ice, .call = weak_call, @@ -1075,6 +1008,24 @@ RingAccount::incomingCall(dht::IceCandidates&& msg) }); } } +#endif + +std::shared_ptr<SIPCall> +RingAccount::incomingCall(const std::string& peer_id, std::shared_ptr<ReliableSocket::DataConnection> dc) +{ + RING_DBG("[DHT:%s] incoming call from peer %s", getAccountID().c_str(), peer_id.c_str()); + auto call = Manager::instance().callFactory.newCall<SIPCall, RingAccount>(*this, Manager::instance().getNewCallID(), Call::CallType::INCOMING); + + call->setPeerNumber(peer_id); + call->initRecFilename(peer_id); + call->setDataConnection(dc); + + // Continue to scan the connection progress into the mainloop + std::lock_guard<std::mutex> lock(callsMutex_); + pendingCalls_.emplace_back(call); + + return call; +} void RingAccount::doUnregister(std::function<void(bool)> released_cb) @@ -1375,10 +1326,10 @@ RingAccount::getContactHeader(pjsip_transport* t) } // FIXME: be sure that given transport is from SipIceTransport - auto tlsTr = reinterpret_cast<tls::SipsIceTransport::TransportData*>(t)->self; - auto address = tlsTr->getLocalAddress(); + //auto tlsTr = reinterpret_cast<tls::SipsIceTransport::TransportData*>(t)->self; + IpAddr address {"localhost"}; // TODO: ??? contact_.slen = pj_ansi_snprintf(contact_.ptr, PJSIP_MAX_URL_SIZE, - "%s%s<sips:%s%s%s;transport=tls>", + "%s%s<sips:%s%s%s;transport=udp>", displayName_.c_str(), (displayName_.empty() ? "" : " "), ringid.c_str(), @@ -1572,6 +1523,8 @@ private: std::shared_ptr<ReliableSocket::DataConnection> dataConnection_ {}; + std::shared_ptr<SIPCall> call_ {}; + // DTLS session std::unique_ptr<tls::TlsSession> tls_; void onCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int); @@ -1926,9 +1879,11 @@ SecureIceTransport::createTlsSession(dht::crypto::Identity& id, }; tls::TlsSession::TlsSessionCallbacks tls_cbs = { .onStateChange = [this](tls::TlsSessionState state) { - if (state == tls::TlsSessionState::ESTABLISHED) + if (state == tls::TlsSessionState::ESTABLISHED) { dataConnection_->connect(tls_.get()); - else if (state == tls::TlsSessionState::SHUTDOWN) + if (tid & 1) + call_ = account.newIncomingCall(peer_id); + } else if (state == tls::TlsSessionState::SHUTDOWN) dataConnection_->disconnect(); }, .onRxData = [this](std::vector<uint8_t>&& buf) { diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 15680e9ff9..c2e9834926 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -312,7 +312,8 @@ class RingAccount : public SIPAccountBase { NON_COPYABLE(RingAccount); void doRegister_(); - void incomingCall(dht::IceCandidates&& msg); + void legacyIncomingCall(dht::IceCandidates&& msg); + std::shared_ptr<SIPCall> incomingCall(const std::string&, std::shared_ptr<ReliableSocket::DataConnection>); const dht::ValueType USER_PROFILE_TYPE = {9, "User profile", std::chrono::hours(24 * 7)}; @@ -346,27 +347,18 @@ class RingAccount : public SIPAccountBase { dht::InfoHash callKey_; - struct PendingCall { - std::chrono::steady_clock::time_point start; - std::shared_ptr<IceTransport> ice_sp; - std::weak_ptr<SIPCall> call; - std::future<size_t> listen_key; - dht::InfoHash call_key; - dht::InfoHash from; - }; - void handlePendingCallList(); - bool handlePendingCall(PendingCall& pc, bool incoming); + bool handlePendingCall(std::weak_ptr<SIPCall>&); /** * DHT calls waiting for ICE negotiation */ - std::list<PendingCall> pendingCalls_ {}; + std::list<std::weak_ptr<SIPCall>> pendingCalls_ {}; /** * Incoming DHT calls that are not yet actual SIP calls. */ - std::list<PendingCall> pendingSipCalls_ {}; + std::list<std::weak_ptr<SIPCall>> pendingSipCalls_ {}; std::set<dht::Value::Id> treatedCalls_ {}; mutable std::mutex callsMutex_ {}; diff --git a/src/sip/Makefile.am b/src/sip/Makefile.am index 6b791bae6d..820cd18e79 100644 --- a/src/sip/Makefile.am +++ b/src/sip/Makefile.am @@ -19,7 +19,9 @@ libsiplink_la_SOURCES = \ sip_utils.cpp \ sip_utils.h \ base64.h \ - base64.c + base64.c \ + multistream_siptransport.cpp \ + multistream_siptransport.h libsiplink_la_SOURCES+=sippresence.cpp \ sippresence.h \ diff --git a/src/sip/multistream_siptransport.cpp b/src/sip/multistream_siptransport.cpp new file mode 100644 index 0000000000..56be88e812 --- /dev/null +++ b/src/sip/multistream_siptransport.cpp @@ -0,0 +1,358 @@ +/* + * Copyright (C) 2004-2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "multistream_siptransport.h" + +#include "manager.h" +#include "sip/sip_utils.h" +#include "logger.h" +#include "data_transfer.h" + +#include <pjsip/sip_transport.h> +#include <pjsip/sip_endpoint.h> +#include <pj/compat/socket.h> +#include <pj/lock.h> + +#include <algorithm> +#include <cstring> // std::memset + +namespace ring { + +static constexpr int POOL_TP_INIT {512}; +static constexpr int POOL_TP_INC {512}; +static constexpr int TRANSPORT_INFO_LENGTH {64}; + +static void +sockaddr_to_host_port(pj_pool_t* pool, + pjsip_host_port* host_port, + const pj_sockaddr* addr) +{ + host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); + pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0); + host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); + host_port->port = pj_sockaddr_get_port(addr); +} + +MultiStreamSipTransport::MultiStreamSipTransport(pjsip_endpoint* endpt, + std::shared_ptr<ReliableSocket::DataStream> stream) + : trData_ () + , pool_ {nullptr, pj_pool_release} + , rxPool_ (nullptr, pj_pool_release) + , stream_ (stream) + , txThreadloop_([]{ return true;}, + [this]{ flushTxQueue(); }, + []{}) +{ + RING_DBG("MultiStreamSipTransport@%p {PjTr=%p}", this, &trData_.base); + + trData_.self = this; // up-link for PJSIP C callbacks + + pool_ = std::move(sip_utils::smart_alloc_pool(endpt, "SipsIceTransport.pool", + POOL_TP_INIT, POOL_TP_INC)); + + auto& base = trData_.base; + std::memset(&rdata_, 0, sizeof(pjsip_rx_data)); + + pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "MultiStreamSipTransport"); + base.endpt = endpt; + base.tpmgr = pjsip_endpt_get_tpmgr(endpt); + base.pool = pool_.get(); + + if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS) + throw std::runtime_error("Can't create PJSIP atomic."); + + if (pj_lock_create_recursive_mutex(pool_.get(), "SipsIceTransport.mutex", + &base.lock) != PJ_SUCCESS) + throw std::runtime_error("Can't create PJSIP mutex."); + + IpAddr remote_addr {"10.0.0.2:1"}; // TODO: remote address? + pj_sockaddr_cp(&base.key.rem_addr, remote_addr.pjPtr()); + base.key.type = PJSIP_TRANSPORT_TLS; + base.type_name = (char*)pjsip_transport_get_type_name((pjsip_transport_type_e)base.key.type); + base.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)base.key.type); + base.info = (char*) pj_pool_alloc(pool_.get(), TRANSPORT_INFO_LENGTH); + + char print_addr[PJ_INET6_ADDRSTRLEN+10]; + pj_ansi_snprintf(base.info, TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name, + pj_sockaddr_print(remote_addr.pjPtr(), print_addr, sizeof(print_addr), 3)); + base.addr_len = sizeof(pj_sockaddr_in); // TODO: ??? + base.dir = PJSIP_TP_DIR_NONE; + base.data = nullptr; + + IpAddr local_addr {"10.0.0.1:1"}; // TODO: local address? + pj_sockaddr_cp(&base.local_addr, local_addr.pjPtr()); + + sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr); + sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_addr.pjPtr()); + + base.send_msg = [](pjsip_transport *transport, + pjsip_tx_data *tdata, + const pj_sockaddr_t *rem_addr, int addr_len, + void *token, pjsip_transport_callback callback) -> pj_status_t { + auto& this_ = reinterpret_cast<TransportData*>(transport)->self; + return this_->send(tdata, rem_addr, addr_len, token, callback); + }; + base.do_shutdown = [](pjsip_transport *transport) -> pj_status_t { + auto& this_ = reinterpret_cast<TransportData*>(transport)->self; + RING_DBG("MultiStreamSipTransport@%p: shutdown", this_); + { + // Flush pending state changes and rx packet before shutdown + // or pjsip callbacks will crash + + std::unique_lock<std::mutex> lk{this_->stateChangeEventsMutex_}; + this_->stateChangeEvents_.clear(); + this_->stream_->close(); + } + return PJ_SUCCESS; + }; + base.destroy = [](pjsip_transport *transport) -> pj_status_t { + auto& this_ = reinterpret_cast<TransportData*>(transport)->self; + RING_DBG("MultiStreamSipTransport@%p: destroying", this_); + delete this_; // we're owned by PJSIP + return PJ_SUCCESS; + }; + + /* Init rdata_ */ + std::memset(&rdata_, 0, sizeof(pjsip_rx_data)); + rxPool_ = std::move(sip_utils::smart_alloc_pool(endpt, "MultiStreamSipTransport.rxPool", + PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_LEN)); + rdata_.tp_info.pool = rxPool_.get(); + rdata_.tp_info.transport = &base; + rdata_.tp_info.tp_data = this; + rdata_.tp_info.op_key.rdata = &rdata_; + pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, + sizeof(pj_ioqueue_op_key_t)); + rdata_.pkt_info.src_addr = base.key.rem_addr; + rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr); + auto rem_addr = &base.key.rem_addr; + pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, + sizeof(rdata_.pkt_info.src_name), 0); + rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr); + + std::memset(&localCertInfo_, 0, sizeof(pj_ssl_cert_info)); + std::memset(&remoteCertInfo_, 0, sizeof(pj_ssl_cert_info)); + + if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS) + throw std::runtime_error("Can't register MultiStreamSipTransport on PJSIP"); + + Manager::instance().registerEventHandler((uintptr_t)this, [this]{ handleEvents(); }); + + updateTransportState(PJSIP_TP_STATE_CONNECTED); + + txThreadloop_.start(); +} + +MultiStreamSipTransport::~MultiStreamSipTransport() +{ + RING_DBG("~MultiStreamSipTransport@%p {PjTr=%p}", this, &trData_.base); + + txThreadloop_.join(); + + // Flush tx queue with ENOTCONN error + for (auto tdata : txQueue_) { + tdata->op_key.tdata = nullptr; + if (tdata->op_key.callback) + tdata->op_key.callback(&trData_.base, tdata->op_key.token, + -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); + } + + Manager::instance().unregisterEventHandler((uintptr_t)this); + + // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip) + auto base = getTransportBase(); + if (not base->is_shutdown and not base->is_destroying) + pjsip_transport_shutdown(base); + + // Stop low-level transport + stream_->close(); + + pj_lock_destroy(base->lock); + pj_atomic_destroy(base->ref_cnt); +} + +void +MultiStreamSipTransport::flushTxQueue() +{ + // Handle SIP transport -> Stream + decltype(txQueue_) tx_queue; + { + std::lock_guard<std::mutex> l(txMutex_); + if (canWrite_) { + tx_queue = std::move(txQueue_); + txQueue_.clear(); + } + } + + bool fatal = false; + for (auto tdata : tx_queue) { + pj_status_t status; + if (!fatal) { + const std::size_t size = tdata->buf.cur - tdata->buf.start; + auto ret = stream_->sendData(tdata->buf.start, size); + if (ret < 0) { + RING_ERR("[SIP] fatal error during sending: %s", strerror(ret)); + txThreadloop_.stop(); + } + if (ret < 0) + status = -PJ_RETURN_OS_ERROR(errno); + else + status = ret; + } else + status = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); + + tdata->op_key.tdata = nullptr; + if (tdata->op_key.callback) + tdata->op_key.callback(&trData_.base, tdata->op_key.token, status); + } +} + +void +MultiStreamSipTransport::handleEvents() +{ + // Notify transport manager about state changes first + // Note: stop when disconnected event is encountered + // and differ its notification AFTER pending rx msg to let + // them a chance to be delivered to application before closing + // the transport. + decltype(stateChangeEvents_) eventDataQueue; + { + std::lock_guard<std::mutex> lk{stateChangeEventsMutex_}; + eventDataQueue = std::move(stateChangeEvents_); + stateChangeEvents_.clear(); + } + + ChangeStateEventData disconnectedEvent; + bool disconnected = false; + auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr); + if (state_cb) { + for (auto& evdata : eventDataQueue) { + evdata.state_info.ext_info = nullptr; + if (evdata.state != PJSIP_TP_STATE_DISCONNECTED) { + (*state_cb)(&trData_.base, evdata.state, &evdata.state_info); + } else { + disconnectedEvent = std::move(evdata); + disconnected = true; + break; + } + } + } + + // Handle Stream -> SIP transport + if (readBufferSize_ < sizeof(rdata_.pkt_info.packet) and stream_->canRead()) { + auto ret = stream_->recvData(rdata_.pkt_info.packet + readBufferSize_, + sizeof(rdata_.pkt_info.packet) - readBufferSize_); + RING_DBG("recvData(ptr+%zu, max=%zu) >>> %zu B" + , readBufferSize_ + , sizeof(rdata_.pkt_info.packet) - readBufferSize_ + , ret); + if (ret > 0) { + readBufferSize_ += ret; + rdata_.pkt_info.len = readBufferSize_; + rdata_.pkt_info.zero = 0; + pj_gettimeofday(&rdata_.pkt_info.timestamp); + RING_WARN("%zu B >>> PJSIP:\n>>>>>>>>%s<<<<<<<<\n" + , rdata_.pkt_info.len + , std::string(rdata_.pkt_info.packet, rdata_.pkt_info.len).c_str()); + auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); + if (eaten) + RING_WARN("PJSIP: eaten=%zd", eaten); + pj_pool_reset(rdata_.tp_info.pool); + if (eaten > 0) { + if (auto remain = readBufferSize_ - eaten) + pj_memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, remain); + readBufferSize_ -= eaten; + } + } else + RING_WARN("[MSST] readData failed, %s", strerror(ret)); + } + + // Time to deliver disconnected event if exists + if (disconnected and state_cb) + (*state_cb)(&trData_.base, disconnectedEvent.state, &disconnectedEvent.state_info); +} + +void +MultiStreamSipTransport::updateTransportState(pjsip_transport_state state) +{ + ChangeStateEventData ev; + + std::memset(&ev.state_info, 0, sizeof(ev.state_info)); + + ev.state = state; + if (state == PJSIP_TP_STATE_CONNECTED) { + std::lock_guard<std::mutex> lk {txMutex_}; + canWrite_ = true; + } + ev.state_info.status = PJ_SUCCESS; + + pushChangeStateEvent(std::move(ev)); +} + +void +MultiStreamSipTransport::pushChangeStateEvent(ChangeStateEventData&& ev) +{ + std::lock_guard<std::mutex> lk{stateChangeEventsMutex_}; + stateChangeEvents_.emplace_back(std::move(ev)); +} + +pj_status_t +MultiStreamSipTransport::send(pjsip_tx_data* tdata, const pj_sockaddr_t* rem_addr, + int addr_len, void* token, + pjsip_transport_callback callback) +{ + // Sanity check + PJ_ASSERT_RETURN(tdata, PJ_EINVAL); + + // Check that there's no pending operation associated with the tdata + PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX); + + // Check the address is supported + PJ_ASSERT_RETURN(rem_addr and + (addr_len==sizeof(pj_sockaddr_in) or + addr_len==sizeof(pj_sockaddr_in6)), + PJ_EINVAL); + + // Check in we are able to send it in synchronous way first + const std::size_t size = tdata->buf.cur - tdata->buf.start; + std::unique_lock<std::mutex> lk {txMutex_}; + if (canWrite_ and txQueue_.empty()) { + RING_WARN("[MSST] send %zu", size); + auto ret = stream_->sendData(tdata->buf.start, size); + lk.unlock(); + + // Shutdown on fatal error, else ignore it + if (ret < 0) { + RING_ERR("[SIP] error during sending: %s", strerror(ret)); + return -PJ_RETURN_OS_ERROR(errno); + } + + return PJ_SUCCESS; + } + + // Asynchronous sending + RING_WARN("[MSST] send async %zu", size); + tdata->op_key.tdata = tdata; + tdata->op_key.token = token; + tdata->op_key.callback = callback; + txQueue_.push_back(tdata); + return PJ_EPENDING; +} + +} // namespace ring diff --git a/src/sip/multistream_siptransport.h b/src/sip/multistream_siptransport.h new file mode 100644 index 0000000000..6b3f903bfb --- /dev/null +++ b/src/sip/multistream_siptransport.h @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2004-2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "ip_utils.h" +#include "noncopyable.h" +#include "threadloop.h" + +#include <pjsip.h> +#include <pj/pool.h> + +#include <list> +#include <functional> +#include <memory> +#include <mutex> +#include <chrono> +#include <queue> +#include <utility> +#include <vector> +#include <condition_variable> + +namespace ring { + +namespace ReliableSocket { +class DataStream; +} + +/** + * SipsIceTransport + * + * Implements a SipTransport for ReliableSocket::DataStream + */ +class MultiStreamSipTransport +{ +private: + NON_COPYABLE(MultiStreamSipTransport); + +public: + using Clock = std::chrono::steady_clock; + using TransportData = struct { + pjsip_transport base; // do not move, SHOULD be the fist member + MultiStreamSipTransport* self {nullptr}; + }; + static_assert(std::is_standard_layout<TransportData>::value, + "TranportData requires standard-layout"); + + MultiStreamSipTransport(pjsip_endpoint* endpt, + std::shared_ptr<ReliableSocket::DataStream> stream); + ~MultiStreamSipTransport(); + + void shutdown(); + +public: // Getters + pjsip_transport* getTransportBase() { return &trData_.base; } + +private: // PJSIP transport backend + TransportData trData_; // uplink to "this" (used by PJSIP called C-callbacks) + + std::unique_ptr<pj_pool_t, decltype(pj_pool_release)*> pool_; + std::unique_ptr<pj_pool_t, decltype(pj_pool_release)*> rxPool_; + + pjsip_rx_data rdata_; + + pj_ssl_cert_info localCertInfo_; + pj_ssl_cert_info remoteCertInfo_; + + pj_status_t verifyStatus_ {PJ_EUNKNOWN}; + +private: // DataStream backend + const std::shared_ptr<ReliableSocket::DataStream> stream_; + +private: // IO / events + struct ChangeStateEventData { + pjsip_transport_state_info state_info; + decltype(PJSIP_TP_STATE_DISCONNECTED) state; + }; + + std::size_t readBufferSize_ {0}; + + std::mutex stateChangeEventsMutex_ {}; + std::list<ChangeStateEventData> stateChangeEvents_ {}; + + pj_status_t send(pjsip_tx_data*, const pj_sockaddr_t*, int, void*, pjsip_transport_callback); + void handleEvents(); + void pushChangeStateEvent(ChangeStateEventData&&); + void updateTransportState(pjsip_transport_state); + +private: // Transmission layer (async by thread) + void flushTxQueue(); + + std::mutex txMutex_ {}; + std::condition_variable txCv_ {}; + bool canWrite_ {false}; // true if we can send data (cnx established) + std::list<pjsip_tx_data*> txQueue_ {}; // Used for asynchronous transmissions + ThreadLoop txThreadloop_; +}; + +} // namespace ring diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index d13165e873..827d5516e8 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -40,6 +40,7 @@ #include "dring/call_const.h" #include "dring/media_const.h" #include "client/ring_signal.h" +#include "data_transfer.h" #ifdef RING_VIDEO #include "client/videomanager.h" @@ -1076,4 +1077,14 @@ SIPCall::initIceTransport(bool master, unsigned channel_num) return result; } +void +SIPCall::onDataConnected() +{ + RING_DBG("[call:%s] connected", getCallId().c_str()); + if (!peerStream_) + peerStream_.reset(new ReliableSocket::DataStream(1)); + dc_->attachStream(peerStream_); + setTransport(getSIPVoIPLink()->sipTransportBroker->getMultiStreamTransport(peerStream_)); +} + } // namespace ring diff --git a/src/sip/sipcall.h b/src/sip/sipcall.h index 74f35b1980..8d5f97802f 100644 --- a/src/sip/sipcall.h +++ b/src/sip/sipcall.h @@ -210,6 +210,9 @@ class SIPCall : public Call bool initIceTransport(bool master, unsigned channel_num=4) override; void terminateSipSession(int status); + + void onDataConnected() override; + private: NON_COPYABLE(SIPCall); diff --git a/src/sip/siptransport.cpp b/src/sip/siptransport.cpp index 410716d53f..0c640f0d4c 100644 --- a/src/sip/siptransport.cpp +++ b/src/sip/siptransport.cpp @@ -25,6 +25,7 @@ #include "ringdht/sip_transport_ice.h" #include "ringdht/sips_transport_ice.h" +#include "multistream_siptransport.h" #include "array_size.h" #include "compiler_intrinsics.h" @@ -458,4 +459,17 @@ SipTransportBroker::getTlsIceTransport(const std::shared_ptr<ring::IceTransport> return sip_tr; } +std::shared_ptr<SipTransport> +SipTransportBroker::getMultiStreamTransport(const std::shared_ptr<ReliableSocket::DataStream> stream) +{ + auto mss_tr = std::unique_ptr<MultiStreamSipTransport>(new MultiStreamSipTransport(endpt_, stream)); + auto tr = mss_tr->getTransportBase(); + auto sip_tr = std::make_shared<SipTransport>(tr); + mss_tr.release(); // managed by PJSIP now + + std::lock_guard<std::mutex> lock(transportMapMutex_); + transports_.emplace(std::make_pair(tr, sip_tr)); + return sip_tr; +} + } // namespace ring diff --git a/src/sip/siptransport.h b/src/sip/siptransport.h index b12a0948bc..0c725481b5 100644 --- a/src/sip/siptransport.h +++ b/src/sip/siptransport.h @@ -157,6 +157,9 @@ class IceTransport; namespace tls { struct TlsParams; }; +namespace ReliableSocket { + class DataStream; +}; /** * Manages the transports and receive callbacks from PJSIP @@ -182,6 +185,9 @@ public: getTlsIceTransport(const std::shared_ptr<IceTransport>, unsigned comp_id, const tls::TlsParams&); + std::shared_ptr<SipTransport> + getMultiStreamTransport(const std::shared_ptr<ReliableSocket::DataStream> stream); + std::shared_ptr<SipTransport> addTransport(pjsip_transport*); /** -- GitLab