diff --git a/daemon/src/ringdht/Makefile.am b/daemon/src/ringdht/Makefile.am index 1936cee77d571cc490b541b45ebe042e3e471dd7..21554a38cd2c0103841c8b9407aea82ac0d01c23 100644 --- a/daemon/src/ringdht/Makefile.am +++ b/daemon/src/ringdht/Makefile.am @@ -9,6 +9,8 @@ libringacc_la_LIBADD = $(DHT_LIBS) libringacc_la_SOURCES = \ ringaccount.cpp \ - ringaccount.h + ringaccount.h \ + sip_transport_ice.cpp \ + sip_transport_ice.h endif diff --git a/daemon/src/ringdht/sip_transport_ice.cpp b/daemon/src/ringdht/sip_transport_ice.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a91e9bc78945e84f6351bd1f2e9313125ff5e781 --- /dev/null +++ b/daemon/src/ringdht/sip_transport_ice.cpp @@ -0,0 +1,253 @@ +/* + * Copyright (C) 2004-2014 Savoir-Faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Additional permission under GNU GPL version 3 section 7: + * + * If you modify this program, or any covered work, by linking or + * combining it with the OpenSSL project's OpenSSL library (or a + * modified version of that library), containing parts covered by the + * terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc. + * grants you additional permission to convey the resulting work. + * Corresponding Source for a non-source form of such a combination + * shall include the source code for the parts of OpenSSL used as well + * as that of the covered work. + */ + +#include "sip_transport_ice.h" +#include "ice_transport.h" +#include "logger.h" + +#include <pjsip/sip_transport.h> +#include <pjsip/sip_endpoint.h> +#include <pj/lock.h> + +#include <algorithm> + +static constexpr int POOL_TP_INIT {512}; +static constexpr int POOL_TP_INC {512}; +static constexpr int TRANSPORT_INFO_LENGTH {64}; + +static void +sockaddr_to_host_port(pj_pool_t* pool, + pjsip_host_port* host_port, + const pj_sockaddr* addr) +{ + host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); + pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0); + host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); + host_port->port = pj_sockaddr_get_port(addr); +} + +SipIceTransport::SipIceTransport(pjsip_endpoint* endpt, pj_pool_t& /* pool */, + long /* t_type */, + const std::shared_ptr<sfl::IceTransport>& ice, + int comp_id) + : base() + , pool_(nullptr, pj_pool_release) + , rxPool_(nullptr, pj_pool_release) + , rdata() + , ice_(ice) + , comp_id_(comp_id) +{ + if (not ice->isCompleted()) + throw std::logic_error("ice transport must be completed"); + + SFL_DBG("Creating SipIceTransport"); + + pool_.reset(pjsip_endpt_create_pool(endpt, "SipIceTransport.pool", POOL_TP_INIT, POOL_TP_INC)); + if (not pool_) + throw std::bad_alloc(); + auto pool = pool_.get(); + + pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "SipIceTransport"); + base.endpt = endpt; + base.tpmgr = pjsip_endpt_get_tpmgr(endpt); + base.pool = pool; + + rdata.tp_info.pool = pool; + + // FIXME: not destroyed in case of exception + if (pj_atomic_create(pool, 0, &base.ref_cnt) != PJ_SUCCESS) + throw std::runtime_error("Can't create PJSIP atomic."); + + // FIXME: not destroyed in case of exception + if (pj_lock_create_recursive_mutex(pool, "SipIceTransport.mutex", &base.lock) != PJ_SUCCESS) + throw std::runtime_error("Can't create PJSIP mutex."); + + auto remote = ice->getRemoteAddress(comp_id); + SFL_DBG("SipIceTransport: remote is %s", remote.toString(true).c_str()); + pj_sockaddr_cp(&base.key.rem_addr, remote.pjPtr()); + base.key.type = PJSIP_TRANSPORT_UDP;//t_type; + base.type_name = (char*)pjsip_transport_get_type_name((pjsip_transport_type_e)base.key.type); + base.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)base.key.type); + base.info = (char*) pj_pool_alloc(pool, TRANSPORT_INFO_LENGTH); + + char print_addr[PJ_INET6_ADDRSTRLEN+10]; + pj_ansi_snprintf(base.info, TRANSPORT_INFO_LENGTH, "%s to %s", + base.type_name, + pj_sockaddr_print(remote.pjPtr(), print_addr, + sizeof(print_addr), 3)); + base.addr_len = remote.getLength(); + base.dir = PJSIP_TP_DIR_NONE;//is_server? PJSIP_TP_DIR_INCOMING : PJSIP_TP_DIR_OUTGOING; + base.data = nullptr; + + /* Set initial local address */ + auto local = ice->getDefaultLocalAddress(); + pj_sockaddr_cp(&base.local_addr, local.pjPtr()); + + sockaddr_to_host_port(pool, &base.local_name, &base.local_addr); + sockaddr_to_host_port(pool, &base.remote_name, remote.pjPtr()); + + base.send_msg = [](pjsip_transport *transport, + pjsip_tx_data *tdata, + const pj_sockaddr_t *rem_addr, int addr_len, + void *token, pjsip_transport_callback callback) { + auto this_ = reinterpret_cast<SipIceTransport*>(transport); + return this_->send(tdata, rem_addr, addr_len, token, callback); + }; + base.do_shutdown = [](pjsip_transport *transport){ + auto this_ = reinterpret_cast<SipIceTransport*>(transport); + return this_->shutdown(); + }; + base.destroy = [](pjsip_transport *transport){ + auto this_ = reinterpret_cast<SipIceTransport*>(transport); + return this_->destroy(); + }; + + /* Init rdata */ + rxPool_.reset(pjsip_endpt_create_pool(base.endpt, + "SipIceTransport.rtd%p", + PJSIP_POOL_RDATA_LEN, + PJSIP_POOL_RDATA_INC)); + if (not rxPool_) + throw std::bad_alloc(); + auto rx_pool = rxPool_.get(); + + rdata.tp_info.pool = rx_pool; + rdata.tp_info.transport = &base; + rdata.tp_info.tp_data = this; + rdata.tp_info.op_key.rdata = &rdata; + pj_ioqueue_op_key_init(&rdata.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t)); + rdata.pkt_info.src_addr = base.key.rem_addr; + rdata.pkt_info.src_addr_len = sizeof(rdata.pkt_info.src_addr); + auto rem_addr = &base.key.rem_addr; + pj_sockaddr_print(rem_addr, rdata.pkt_info.src_name, sizeof(rdata.pkt_info.src_name), 0); + rdata.pkt_info.src_port = pj_sockaddr_get_port(rem_addr); + rdata.pkt_info.len = 0; + rdata.pkt_info.zero = 0; + + if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS) + throw std::runtime_error("Can't register PJSIP transport."); + is_registered_ = true; +} + +SipIceTransport::~SipIceTransport() +{ + destroy(); + pj_lock_destroy(base.lock); + pj_atomic_destroy(base.ref_cnt); +} + +void +SipIceTransport::start() +{ + using namespace std::placeholders; + ice_->setOnRecv(comp_id_, std::bind(&SipIceTransport::onRecv, this, _1, _2)); +} + +pj_status_t +SipIceTransport::send(pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, + int addr_len, void *token, + pjsip_transport_callback callback) +{ + /* Sanity check */ + PJ_ASSERT_RETURN(tdata, PJ_EINVAL); + + /* Check that there's no pending operation associated with the tdata */ + PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX); + + /* Check the address is supported */ + PJ_ASSERT_RETURN(rem_addr && (addr_len==sizeof(pj_sockaddr_in) || + addr_len==sizeof(pj_sockaddr_in6)), + PJ_EINVAL); + + /* Init op key. */ + tdata->op_key.tdata = tdata; + tdata->op_key.token = token; + tdata->op_key.callback = callback; + + auto buf_sz = tdata->buf.cur - tdata->buf.start; + auto size = ice_->send(comp_id_, (uint8_t*)tdata->buf.start, buf_sz); + if (size > 0) { + if (size < buf_sz) { + std::move(tdata->buf.start + size, + tdata->buf.start + buf_sz, + tdata->buf.start); + tdata->buf.cur -= size; + } + tdata->op_key.tdata = nullptr; + } else + return PJ_EUNKNOWN; + + return PJ_SUCCESS; +} + +ssize_t +SipIceTransport::onRecv(uint8_t* buf, size_t len) +{ + auto max_size = std::min(sizeof(rdata.pkt_info.packet) - rdata.pkt_info.len, len); + std::copy_n(buf, max_size, (uint8_t*)rdata.pkt_info.packet + rdata.pkt_info.len); + rdata.pkt_info.len += max_size; + rdata.pkt_info.zero = 0; + pj_gettimeofday(&rdata.pkt_info.timestamp); + + auto eaten = pjsip_tpmgr_receive_packet(rdata.tp_info.transport->tpmgr, &rdata); + + /* Move unprocessed data to the front of the buffer */ + auto rem = rdata.pkt_info.len - eaten; + if (rem > 0 && rem != rdata.pkt_info.len) { + std::move(rdata.pkt_info.packet + eaten, + rdata.pkt_info.packet + eaten + rem, + rdata.pkt_info.packet); + } + rdata.pkt_info.len = rem; + + /* Reset pool */ + pj_pool_reset(rdata.tp_info.pool); +} + +pj_status_t +SipIceTransport::shutdown() +{ + SFL_WARN("SIP transport ICE: shutdown"); +} + +pj_status_t +SipIceTransport::destroy() +{ + if (not is_registered_) + return PJ_SUCCESS; + + SFL_WARN("SIP transport ICE: destroy"); + + auto status = pjsip_transport_destroy(&base); + is_registered_ = status != PJ_SUCCESS; + + return status; +} diff --git a/daemon/src/ringdht/sip_transport_ice.h b/daemon/src/ringdht/sip_transport_ice.h new file mode 100644 index 0000000000000000000000000000000000000000..680c931f24bc2e4de71dc136b046582c21509d99 --- /dev/null +++ b/daemon/src/ringdht/sip_transport_ice.h @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2004-2014 Savoir-Faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Additional permission under GNU GPL version 3 section 7: + * + * If you modify this program, or any covered work, by linking or + * combining it with the OpenSSL project's OpenSSL library (or a + * modified version of that library), containing parts covered by the + * terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc. + * grants you additional permission to convey the resulting work. + * Corresponding Source for a non-source form of such a combination + * shall include the source code for the parts of OpenSSL used as well + * as that of the covered work. + */ + +#pragma once + +#include <pjsip.h> +#include <pj/pool.h> +#include <memory> + +namespace sfl { +class IceTransport; +} + +struct SipIceTransport +{ + SipIceTransport(pjsip_endpoint* endpt, pj_pool_t& pool, long t_type, + const std::shared_ptr<sfl::IceTransport>& ice, + int comp_id); + ~SipIceTransport(); + + /** + * To be called once to start receiving packets + */ + void start(); + + std::shared_ptr<sfl::IceTransport> getIceTransport() const { + return ice_; + } + + pjsip_transport base; + + private: + std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> pool_; + std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> rxPool_; + pjsip_rx_data rdata; + bool is_registered_ {false}; + const std::shared_ptr<sfl::IceTransport> ice_; + const int comp_id_; + + pj_status_t send(pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, + int addr_len, void *token, + pjsip_transport_callback callback); + + ssize_t onRecv(uint8_t* buf, size_t len); + + pj_status_t shutdown(); + + pj_status_t destroy(); +}; diff --git a/daemon/src/sip/siptransport.cpp b/daemon/src/sip/siptransport.cpp index ea74315e53b2bc2d4c8a1c9f965782999540621a..b6f77c4885027d03f1e67555f96331c63dcae44d 100644 --- a/daemon/src/sip/siptransport.cpp +++ b/daemon/src/sip/siptransport.cpp @@ -34,6 +34,8 @@ #include "sip_utils.h" #include "ip_utils.h" +#include "ringdht/sip_transport_ice.h" + #include "manager.h" #include "client/configurationmanager.h" #include "map_utils.h" @@ -110,7 +112,7 @@ SipTransport::stateToStr(pjsip_transport_state state) } SipTransportBroker::SipTransportBroker(pjsip_endpoint *endpt, pj_caching_pool& cp, pj_pool_t& pool) : -cp_(cp), pool_(pool), endpt_(endpt) +iceTransports_(), cp_(cp), pool_(pool), endpt_(endpt) { instance = this; auto status = pjsip_tpmgr_set_state_cb(pjsip_endpt_get_tpmgr(endpt_), SipTransportBroker::tp_state_callback); @@ -118,6 +120,8 @@ cp_(cp), pool_(pool), endpt_(endpt) SFL_ERR("Can't set transport callback"); sip_utils::sip_strerror(status); } + + pjsip_transport_register_type(PJSIP_TRANSPORT_DATAGRAM, "ICE", pjsip_transport_get_default_port_for_type(PJSIP_TRANSPORT_UDP), &ice_pj_transport_type_); } SipTransportBroker::~SipTransportBroker() @@ -169,7 +173,11 @@ SipTransportBroker::transportStateChanged(pjsip_transport* tp, pjsip_transport_s transports_.erase(t); // If UDP - if (std::strlen(tp->type_name) >= 3 && std::strncmp(tp->type_name, "UDP", 3ul) == 0) { + const auto type = tp->key.type; + //if (std::strlen(tp->type_name) >= 3 && std::strncmp(tp->type_name, "UDP", 3ul) == 0) { + if (type == PJSIP_TRANSPORT_UDP || type == PJSIP_TRANSPORT_UDP6) { + SFL_WARN("UDP transport destroy"); + auto transport_key = std::find_if(udpTransports_.cbegin(), udpTransports_.cend(), [tp](const std::pair<SipTransportDescr, pjsip_transport*>& i) { return i.second == tp; }); @@ -178,6 +186,14 @@ SipTransportBroker::transportStateChanged(pjsip_transport* tp, pjsip_transport_s udpTransports_.erase(transport_key); transportDestroyedCv_.notify_all(); } + } else if (type == ice_pj_transport_type_) { + SFL_WARN("ICE transport destroy"); + std::unique_lock<std::mutex> lock(iceMutex_); + const auto transport_key = std::find_if(iceTransports_.begin(), iceTransports_.end(), [tp](const SipIceTransport& i) { + return reinterpret_cast<const pjsip_transport*>(&i) == tp; + }); + if (transport_key != iceTransports_.end()) + iceTransports_.erase(transport_key); } } } @@ -353,6 +369,21 @@ SipTransportBroker::getTlsTransport(const std::shared_ptr<TlsListener>& l, const } #endif +std::shared_ptr<SipTransport> +SipTransportBroker::getIceTransport(const std::shared_ptr<sfl::IceTransport>& ice) +{ + std::unique_lock<std::mutex> lock(iceMutex_); + iceTransports_.emplace_front(endpt_, pool_, ice_pj_transport_type_, ice, 0); + auto& sip_ice_tr = iceTransports_.front(); + auto ret = std::make_shared<SipTransport>(&sip_ice_tr.base); + { + std::unique_lock<std::mutex> lock(transportMapMutex_); + transports_[ret->get()] = ret; + } + sip_ice_tr.start(); + return ret; +} + std::vector<pj_sockaddr> SipTransportBroker::getSTUNAddresses(const pj_str_t serverName, pj_uint16_t port, std::vector<long> &socketDescriptors) const { diff --git a/daemon/src/sip/siptransport.h b/daemon/src/sip/siptransport.h index 713ccb02c1813c1d80b2f5f618cea491c8a02b02..97abea957f445b631222ad3a8ac562fa84f0b6df 100644 --- a/daemon/src/sip/siptransport.h +++ b/daemon/src/sip/siptransport.h @@ -48,6 +48,7 @@ #include <map> #include <string> #include <vector> +#include <list> #include <memory> #define DEFAULT_SIP_PORT 5060 @@ -102,6 +103,9 @@ private: typedef std::function<void(pjsip_transport_state, const pjsip_transport_state_info*)> SipTransportStateCallback; +/** + * SIP transport wraps pjsip_transport. + */ struct SipTransport { SipTransport() {} @@ -153,7 +157,10 @@ private: }; class IpAddr; - +class SipIceTransport; +namespace sfl { + class IceTransport; +} /** * Manages the transports and receive callbacks from PJSIP */ @@ -171,6 +178,8 @@ public: std::shared_ptr<SipTransport> getTlsTransport(const std::shared_ptr<TlsListener>&, const IpAddr& remote); #endif + std::shared_ptr<SipTransport> getIceTransport(const std::shared_ptr<sfl::IceTransport>&); + std::shared_ptr<SipTransport> findTransport(pjsip_transport*); /** @@ -235,12 +244,20 @@ private: */ std::map<SipTransportDescr, pjsip_transport*> udpTransports_ {}; + /** + * Storage for SIP/ICE transport instances. + */ + std::list<SipIceTransport> iceTransports_; + std::mutex iceMutex_ {}; + std::mutex transportMapMutex_ {}; std::condition_variable transportDestroyedCv_ {}; pj_caching_pool& cp_; pj_pool_t& pool_; pjsip_endpoint *endpt_; + + int ice_pj_transport_type_ {PJSIP_TRANSPORT_START_OTHER}; }; #endif // SIPTRANSPORT_H_