diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 798dfa5c477081cdb20c723bd7ae391421f38ac8..1dd69350ed34f7c3afcf3437b86fea8075475fcf 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -415,8 +415,6 @@ IceTransport::Impl::~Impl() if (config_.stun_cfg.timer_heap) pj_timer_heap_destroy(config_.stun_cfg.timer_heap); - - emitSignal<DRing::CallSignal::ConnectionUpdate>(std::to_string((uintptr_t)this), 2); } bool @@ -957,7 +955,6 @@ IceTransport::start(const Attribute& rem_attrs, const std::vector<IceCandidate>& return false; } - emitSignal<DRing::CallSignal::ConnectionUpdate>(std::to_string((uintptr_t)pimpl_.get()), 0); return true; } @@ -994,7 +991,6 @@ IceTransport::start(const SDP& sdp) return false; } - emitSignal<DRing::CallSignal::ConnectionUpdate>(std::to_string((uintptr_t)pimpl_.get()), 0); return true; } diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index 4c85f3a4ef7d3c119a4ae8a141f96fb87453b1dc..b140f0b4df6a013877aea878ac5acd464d838f10 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -60,13 +60,13 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st } void -MessageEngine::onPeerOnline(const std::string& peer) +MessageEngine::onPeerOnline(const std::string& peer, bool retryOnTimeout) { - retrySend(peer); + retrySend(peer, retryOnTimeout); } void -MessageEngine::retrySend(const std::string& peer) +MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout) { struct PendingMsg { MessageToken token; @@ -97,7 +97,7 @@ MessageEngine::retrySend(const std::string& peer) p.token, p.to, (int)DRing::Account::MessageStates::SENDING); - account_.sendTextMessage(p.to, p.payloads, p.token); + account_.sendTextMessage(p.to, p.payloads, p.token, retryOnTimeout); } } @@ -144,8 +144,7 @@ MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool o } auto f = p->second.find(token); if (f != p->second.end()) { - if (f->second.status == MessageStatus::SENDING) { - if (ok) { + if (ok) { f->second.status = MessageStatus::SENT; JAMI_DBG() << "[message " << token << "] Status changed to SENT"; emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(), @@ -153,7 +152,8 @@ MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool o f->second.to, static_cast<int>(DRing::Account::MessageStates::SENT)); save_(); - } else if (f->second.retried >= MAX_RETRIES) { + } else if (f->second.status == MessageStatus::SENDING) { + if (f->second.retried >= MAX_RETRIES) { f->second.status = MessageStatus::FAILURE; JAMI_DBG() << "[message " << token << "] Status changed to FAILURE"; emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(), diff --git a/src/im/message_engine.h b/src/im/message_engine.h index 001607a7408741ba6eb3f41f2827e3b903e64bc7..18b422d8c82629278d6661315535d5d674bc8cf9 100644 --- a/src/im/message_engine.h +++ b/src/im/message_engine.h @@ -61,7 +61,11 @@ public: } void onMessageSent(const std::string& peer, MessageToken t, bool success); - void onPeerOnline(const std::string& peer); + /** + * @TODO change MessageEngine by a queue, + * @NOTE retryOnTimeout is used for failing SIP messages (jamiAccount::sendTextMessage) + */ + void onPeerOnline(const std::string& peer, bool retryOnTimeout=true); /** * Load persisted messages @@ -79,7 +83,7 @@ private: static const std::chrono::minutes RETRY_PERIOD; using clock = std::chrono::steady_clock; - void retrySend(const std::string& peer); + void retrySend(const std::string& peer, bool retryOnTimeout=true); void save_() const; struct Message { diff --git a/src/jamidht/CMakeLists.txt b/src/jamidht/CMakeLists.txt index 1b3b48d6573ad3b910ae434bda98cf995c28f008..ea9d74b3e5ef10424ee83c20a5c9a90eeaf5c732 100644 --- a/src/jamidht/CMakeLists.txt +++ b/src/jamidht/CMakeLists.txt @@ -14,6 +14,8 @@ list (APPEND Source_Files__jamidht "${CMAKE_CURRENT_SOURCE_DIR}/configkeys.h" "${CMAKE_CURRENT_SOURCE_DIR}/connectionmanager.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/connectionmanager.h" + "${CMAKE_CURRENT_SOURCE_DIR}/channeled_transport.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/channeled_transport.h" "${CMAKE_CURRENT_SOURCE_DIR}/contact_list.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/contact_list.h" "${CMAKE_CURRENT_SOURCE_DIR}/jami_contact.h" diff --git a/src/jamidht/Makefile.am b/src/jamidht/Makefile.am index 78d144b700182b54ac271346dfe41073783423cb..d2c7c3147f556a1b3203f70f963dbe1218567301 100644 --- a/src/jamidht/Makefile.am +++ b/src/jamidht/Makefile.am @@ -16,6 +16,8 @@ libringacc_la_SOURCES = \ jamiaccount.h \ connectionmanager.h \ connectionmanager.cpp \ + channeled_transport.h \ + channeled_transport.cpp \ multiplexed_socket.h \ multiplexed_socket.cpp \ sips_transport_ice.cpp \ diff --git a/src/jamidht/channeled_transport.cpp b/src/jamidht/channeled_transport.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3540e263e6a9f50af435022e70e357d202837004 --- /dev/null +++ b/src/jamidht/channeled_transport.cpp @@ -0,0 +1,292 @@ +/* + * Copyright (C) 2020 Savoir-faire Linux Inc. + * + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "channeled_transport.h" + +#include "logger.h" +#include "multiplexed_socket.h" +#include "sip/sip_utils.h" + +#include <pjsip/sip_transport.h> +#include <pjsip/sip_endpoint.h> +#include <pj/compat/socket.h> +#include <pj/lock.h> + +namespace jami { namespace tls { + +ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt, int tp_type, + const std::shared_ptr<ChannelSocket>& socket, + const IpAddr& local, const IpAddr& remote, + onShutdownCb&& cb) + : socket_ (socket) + , local_ {local} + , remote_ {remote} + , trData_ () + , pool_ {nullptr, pj_pool_release} + , rxPool_ (nullptr, pj_pool_release) +{ + JAMI_DBG("ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base); + + // Init memory + trData_.self = this; // up-link for PJSIP callbacks + + pool_ = sip_utils::smart_alloc_pool(endpt, "channeled.pool", + sip_utils::POOL_TP_INIT, sip_utils::POOL_TP_INC); + + auto& base = trData_.base; + std::memset(&base, 0, sizeof(base)); + + pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base); + base.endpt = endpt; + base.tpmgr = pjsip_endpt_get_tpmgr(endpt); + base.pool = pool_.get(); + + if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS) + throw std::runtime_error("Can't create PJSIP atomic."); + + if (pj_lock_create_recursive_mutex(pool_.get(), "chan", + &base.lock) != PJ_SUCCESS) + throw std::runtime_error("Can't create PJSIP mutex."); + + pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr()); + base.key.type = tp_type; + auto reg_type = static_cast<pjsip_transport_type_e>(tp_type); + base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type)); + base.flag = pjsip_transport_get_flag_from_type(reg_type); + base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH)); + + auto remote_addr = remote_.toString(); + pj_ansi_snprintf(base.info, sip_utils::TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name, + remote_addr.c_str()); + base.addr_len = remote_.getLength(); + base.dir = PJSIP_TP_DIR_NONE; + + // Set initial local address + pj_sockaddr_cp(&base.local_addr, local_.pjPtr()); + + sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr); + sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr()); + + // Init transport callbacks + base.send_msg = [](pjsip_transport *transport, + pjsip_tx_data *tdata, + const pj_sockaddr_t *rem_addr, int addr_len, + void *token, pjsip_transport_callback callback) -> pj_status_t { + auto& this_ = reinterpret_cast<TransportData*>(transport)->self; + return this_->send(tdata, rem_addr, addr_len, token, callback); + }; + base.do_shutdown = [](pjsip_transport *transport) -> pj_status_t { + auto& this_ = reinterpret_cast<TransportData*>(transport)->self; + JAMI_DBG("ChanneledSIPTransport@%p {tr=%p {rc=%ld}}: shutdown", this_, + transport, pj_atomic_get(transport->ref_cnt)); + if (this_->socket_) this_->socket_->shutdown(); + return PJ_SUCCESS; + }; + base.destroy = [](pjsip_transport *transport) -> pj_status_t { + auto& this_ = reinterpret_cast<TransportData*>(transport)->self; + JAMI_DBG("ChanneledSIPTransport@%p: destroying", this_); + delete this_; + return PJ_SUCCESS; + }; + + // Init rdata_ + std::memset(&rdata_, 0, sizeof(pjsip_rx_data)); + rxPool_ = sip_utils::smart_alloc_pool(endpt, "channeled.rxPool", + PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_LEN); + rdata_.tp_info.pool = rxPool_.get(); + rdata_.tp_info.transport = &base; + rdata_.tp_info.tp_data = this; + rdata_.tp_info.op_key.rdata = &rdata_; + pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, + sizeof(pj_ioqueue_op_key_t)); + rdata_.pkt_info.src_addr = base.key.rem_addr; + rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr); + auto rem_addr = &base.key.rem_addr; + pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, + sizeof(rdata_.pkt_info.src_name), 0); + rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr); + + // Register callbacks + if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS) + throw std::runtime_error("Can't register PJSIP transport."); + + // Link to Channel Socket + socket->setOnRecv([this](const uint8_t* buf, size_t len) { + std::lock_guard<std::mutex> l(rxMtx_); + std::vector<uint8_t> rx {buf, buf+len}; + rxPending_.emplace_back(std::move(rx)); + scheduler_.run([this]{ handleEvents(); }); + return len; + }); + socket->onShutdown([cb=std::move(cb), this] { + disconnected_ = true; + scheduler_.run([this]{ handleEvents(); }); + cb(); + }); +} + +ChanneledSIPTransport::~ChanneledSIPTransport() +{ + JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base); + // Flush send queue with ENOTCONN error + for (auto tdata : txQueue_) { + tdata->op_key.tdata = nullptr; + if (tdata->op_key.callback) + tdata->op_key.callback(&trData_.base, tdata->op_key.token, + -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); + } + + auto base = getTransportBase(); + + // Stop low-level transport first + socket_->shutdown(); + socket_.reset(); + + // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip) + if (not base->is_shutdown and not base->is_destroying) + pjsip_transport_shutdown(base); + + pj_lock_destroy(base->lock); + pj_atomic_destroy(base->ref_cnt); + JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p} bye", this, &trData_.base); +} + +void +ChanneledSIPTransport::handleEvents() +{ + // Handle SIP transport -> TLS + decltype(txQueue_) tx_queue; + { + std::lock_guard<std::mutex> l(txMutex_); + tx_queue = std::move(txQueue_); + txQueue_.clear(); + } + + bool fatal = false; + for (auto tdata : tx_queue) { + pj_status_t status; + if (!fatal) { + const std::size_t size = tdata->buf.cur - tdata->buf.start; + std::error_code ec; + status = socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec); + if (ec) { + fatal = true; + socket_->shutdown(); + } + } else { + status = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); + } + + tdata->op_key.tdata = nullptr; + if (tdata->op_key.callback) + tdata->op_key.callback(&trData_.base, tdata->op_key.token, status); + } + + // Handle TLS -> SIP transport + decltype(rxPending_) rx; + { + std::lock_guard<std::mutex> l(rxMtx_); + rx = std::move(rxPending_); + rxPending_.clear(); + } + + sip_utils::register_thread(); + for (auto it = rx.begin(); it != rx.end(); ++it) { + auto& pck = *it; + pj_pool_reset(rdata_.tp_info.pool); + pj_gettimeofday(&rdata_.pkt_info.timestamp); + rdata_.pkt_info.len = std::min(pck.size(), (size_t) PJSIP_MAX_PKT_LEN); + std::copy_n(pck.data(), rdata_.pkt_info.len, rdata_.pkt_info.packet); + auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_); + + // Uncomplet parsing? (may be a partial sip packet received) + if (eaten != (pj_ssize_t)pck.size()) { + auto npck_it = std::next(it); + if (npck_it != rx.end()) { + // drop current packet, merge reminder with next one + auto& npck = *npck_it; + npck.insert(npck.begin(), pck.begin()+eaten, pck.end()); + } else { + // erase eaten part, keep remainder + pck.erase(pck.begin(), pck.begin()+eaten); + { + std::lock_guard<std::mutex> l(rxMtx_); + rxPending_.splice(rxPending_.begin(), rx, it); + } + break; + } + } + } + + // Notify transport manager about state changes first + // Note: stop when disconnected event is encountered + // and differ its notification AFTER pending rx msg to let + // them a chance to be delivered to application before closing + // the transport. + auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr); + if (disconnected_ and state_cb) { + JAMI_WARN("[SIPS] process disconnect event"); + pjsip_transport_state_info state_info; + std::memset(&state_info, 0, sizeof(state_info)); + state_info.status = PJ_SUCCESS; + (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info); + } +} + +pj_status_t +ChanneledSIPTransport::send(pjsip_tx_data* tdata, const pj_sockaddr_t* rem_addr, + int addr_len, void* token, + pjsip_transport_callback callback) +{ + // Sanity check + PJ_ASSERT_RETURN(tdata, PJ_EINVAL); + + // Check that there's no pending operation associated with the tdata + PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX); + + // Check the address is supported + PJ_ASSERT_RETURN(rem_addr and + (addr_len==sizeof(pj_sockaddr_in) or + addr_len==sizeof(pj_sockaddr_in6)), + PJ_EINVAL); + + // Check in we are able to send it in synchronous way first + const std::size_t size = tdata->buf.cur - tdata->buf.start; + std::unique_lock<std::mutex> lk {txMutex_}; + if (/*TODO handle disconned: syncTx_ and*/ txQueue_.empty()) { + std::error_code ec; + socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec); + lk.unlock(); + if (ec) { + return PJ_EINVAL; + } + return PJ_SUCCESS; + } + + // Asynchronous sending + tdata->op_key.tdata = tdata; + tdata->op_key.token = token; + tdata->op_key.callback = callback; + txQueue_.push_back(tdata); + scheduler_.run([this]{ handleEvents(); }); + return PJ_EPENDING; +} + +}} // namespace jami::tls diff --git a/src/jamidht/channeled_transport.h b/src/jamidht/channeled_transport.h new file mode 100644 index 0000000000000000000000000000000000000000..a082f400efd18601ca1d5848194a41474f596210 --- /dev/null +++ b/src/jamidht/channeled_transport.h @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2020 Savoir-faire Linux Inc. + * + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "ip_utils.h" +#include "noncopyable.h" +#include "scheduled_executor.h" +#include "sip/sip_utils.h" + +#include <pjsip.h> +#include <pj/pool.h> + +#include <atomic> +#include <condition_variable> +#include <chrono> +#include <list> +#include <memory> +#include <thread> +#include <type_traits> +#include <utility> + +namespace jami { + +class ChannelSocket; +using onShutdownCb = std::function<void(void)>; + +namespace tls { + +/** + * ChanneledSIPTransport + * + * Implements a pjsip_transport on top of a ChannelSocket + */ +class ChanneledSIPTransport +{ +public: + using TransportData = struct { + pjsip_transport base; // do not move, SHOULD be the fist member + ChanneledSIPTransport* self {nullptr}; + }; + static_assert(std::is_standard_layout<TransportData>::value, + "TranportData requires standard-layout"); + + ChanneledSIPTransport(pjsip_endpoint* endpt, int tp_type, + const std::shared_ptr<ChannelSocket>& socket, + const IpAddr& local, const IpAddr& remote, + onShutdownCb&& cb); + ~ChanneledSIPTransport(); + + pjsip_transport* getTransportBase() { return &trData_.base; } +private: + NON_COPYABLE(ChanneledSIPTransport); + + // The SIP transport uses a ChannelSocket to send and receive datas + std::shared_ptr<ChannelSocket> socket_; + IpAddr local_; + IpAddr remote_; + + // PJSIP transport backend + TransportData trData_; // uplink to "this" (used by PJSIP called C-callbacks) + + std::unique_ptr<pj_pool_t, decltype(pj_pool_release)*> pool_; + std::unique_ptr<pj_pool_t, decltype(pj_pool_release)*> rxPool_; + + std::mutex rxMtx_; + std::list<std::vector<uint8_t>> rxPending_; + pjsip_rx_data rdata_; + + std::mutex txMutex_ {}; + std::condition_variable txCv_ {}; + std::list<pjsip_tx_data*> txQueue_ {}; + + ScheduledExecutor scheduler_; + + pj_status_t send(pjsip_tx_data*, const pj_sockaddr_t*, int, void*, pjsip_transport_callback); + void handleEvents(); + + // Handle disconnected event + std::atomic_bool disconnected_ {false}; +}; + +}} // namespace jami::tls diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index a6662dbfbb78254dbda853382c325f3ddb16c097..02bc18e03d2cdfe6dd72f47d0a6c0054adfc623f 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -708,6 +708,7 @@ ConnectionManager::closeConnectionsWith(const std::string& deviceId) } info.second.responseCv_.notify_all(); if (info.second.ice_) { + std::unique_lock<std::mutex> lk{ info.second.mutex_ }; info.second.ice_.reset(); } } diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 11acb010035d2a9ba442e0d38d063600d39a53ad..cfffc9452b427b2ff89693b2c8d8236c2f0dcb24 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -72,6 +72,7 @@ #include "security/certstore.h" #include "libdevcrypto/Common.h" #include "base64.h" +#include "im/instant_messaging.h" #include <opendht/thread_pool.h> #include <opendht/peer_discovery.h> @@ -98,6 +99,23 @@ using namespace std::placeholders; namespace jami { +struct PendingConfirmation { + std::mutex lock; + bool replied {false}; + std::map<dht::InfoHash, std::future<size_t>> listenTokens {}; +}; + +// Used to pass infos to a pjsip callback (pjsip_endpt_send_request) +struct TextMessageCtx { + std::weak_ptr<JamiAccount> acc; + std::string to; + std::string deviceId; + std::map<std::string, std::string> payloads; + uint64_t id; + bool retryOnTimeout; + std::shared_ptr<PendingConfirmation> confirmation; +}; + namespace Migration { enum class State { // Contains all the Migration states @@ -287,6 +305,9 @@ JamiAccount::shutdownConnections() { connectionManager_.reset(); dhtPeerConnector_.reset(); + std::lock_guard<std::mutex> lk(sipConnectionsMtx_); + sipConnections_.clear(); + pendingSipConnections_.clear(); } void @@ -1462,6 +1483,7 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming) auto remote_id = remote_device.toString(); auto remote_addr = best_transport->getRemoteAddress(ICE_COMP_SIP_TRANSPORT); auto& tr_self = *transport; + transport->addStateListener(lid, [&tr_self, lid, wcall, waccount, remote_id, remote_addr](pjsip_transport_state state, UNUSED const pjsip_transport_state_info* info) { @@ -1578,6 +1600,31 @@ JamiAccount::trackBuddyPresence(const std::string& buddy_id, bool track) return; } auto h = dht::InfoHash(buddyUri); + + if (!track && dht_ && dht_->isRunning()) { + std::unique_lock<std::mutex> lk(sipConnectionsMtx_); + std::set<std::string> devices; + for (const auto& deviceConn: sipConnections_[buddy_id]) { + devices.emplace(deviceConn.first); + } + sipConnections_.erase(buddy_id); + + for (auto pendingIt = pendingSipConnections_.begin(); pendingIt != pendingSipConnections_.end();) { + if (buddy_id == pendingIt->first) { + devices.emplace(pendingIt->second); + pendingIt = pendingSipConnections_.erase(pendingIt); + } else { + ++pendingIt; + } + } + + lk.unlock(); + for (const auto& device: devices) { + if (connectionManager_) + connectionManager_->closeConnectionsWith(device); + } + } + std::lock_guard<std::mutex> lock(buddyInfoMtx); if (track) { auto buddy = trackedBuddies_.emplace(h, BuddyInfo {h}); @@ -1602,7 +1649,7 @@ JamiAccount::trackPresence(const dht::InfoHash& h, BuddyInfo& buddy) if (not dht or not dht->isRunning()) { return; } - buddy.listenToken = dht->listen<DeviceAnnouncement>(h, [this, h](DeviceAnnouncement&&, bool expired){ + buddy.listenToken = dht->listen<DeviceAnnouncement>(h, [this, h](DeviceAnnouncement&& dev, bool expired){ bool wasConnected, isConnected; { std::lock_guard<std::mutex> lock(buddyInfoMtx); @@ -1619,12 +1666,14 @@ JamiAccount::trackPresence(const dht::InfoHash& h, BuddyInfo& buddy) if (not expired) { // Retry messages every time a new device announce its presence messageEngine_.onPeerOnline(h.toString()); + requestSIPConnection(h.toString(), dev.dev.toString()); } if (isConnected and not wasConnected) { onTrackedBuddyOnline(h); } else if (not isConnected and wasConnected) { onTrackedBuddyOffline(h); } + return true; }); JAMI_DBG("[Account %s] tracking buddy %s", getAccountID().c_str(), h.to_c_str()); @@ -1807,6 +1856,37 @@ JamiAccount::doRegister_() if (!connectionManager_) connectionManager_ = std::make_unique<ConnectionManager>(*this); connectionManager_->onDhtConnected(accountManager_->getInfo()->deviceId); + connectionManager_->onICERequest([this](const std::string& deviceId) { + std::promise<bool> accept; + std::future<bool> fut = accept.get_future(); + accountManager_->findCertificate(dht::InfoHash(deviceId), + [this, &accept](const std::shared_ptr<dht::crypto::Certificate>& cert) { + dht::InfoHash peer_account_id; + auto res = accountManager_->onPeerCertificate(cert, dhtPublicInCalls_, peer_account_id); + if (res) + JAMI_INFO("Accepting ICE request from account %s", peer_account_id.toString().c_str()); + else + JAMI_INFO("Discarding ICE request from account %s", peer_account_id.toString().c_str()); + accept.set_value(res); + }); + fut.wait(); + auto result = fut.get(); + return result; + }); + connectionManager_->onChannelRequest([](const std::string& /* deviceId */, const std::string& name) { + if (name == "sip") { + return true; + } + return false; + }); + connectionManager_->onConnectionReady([this](const std::string& deviceId, const std::string& name, std::shared_ptr<ChannelSocket> channel) { + if (channel && name == "sip") { + auto cert = tls::CertificateStore::instance().getCertificate(deviceId); + if (!cert || !cert->issuer) return; + auto peerId = cert->issuer->getId().toString(); + if (channel) cacheSIPConnection(std::move(channel), peerId, deviceId); + } + }); // Listen for incoming calls callKey_ = dht::InfoHash::get("callto:"+accountManager_->getInfo()->deviceId); @@ -2021,12 +2101,16 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb) if (upnp_) upnp_->requestMappingRemove(static_cast<in_port_t>(dhtPortUsed_), upnp::PortType::UDP); lock.unlock(); + + // Stop all current p2p connections if account is disabled + // Else, we let the system managing if the co is down or not + if (not isEnabled()) + shutdownConnections(); + setRegistrationState(RegistrationState::UNREGISTERED); if (released_cb) released_cb(false); - - shutdownConnections(); } void @@ -2398,6 +2482,31 @@ JamiAccount::removeContact(const std::string& uri, bool ban) accountManager_->removeContact(uri, ban); else JAMI_WARN("[Account %s] removeContact: account not loaded", getAccountID().c_str()); + + // Remove current connections with contact + dht::InfoHash peer_account(uri); + + std::unique_lock<std::mutex> lk(sipConnectionsMtx_); + std::set<std::string> devices; + for (const auto& deviceConn: sipConnections_[uri]) { + devices.emplace(deviceConn.first); + } + sipConnections_.erase(uri); + + for (auto pendingIt = pendingSipConnections_.begin(); pendingIt != pendingSipConnections_.end();) { + if (uri == pendingIt->first) { + devices.emplace(pendingIt->second); + pendingIt = pendingSipConnections_.erase(pendingIt); + } else { + ++pendingIt; + } + } + + lk.unlock(); + for (const auto& device: devices) { + if (connectionManager_) + connectionManager_->closeConnectionsWith(device); + } } std::map<std::string, std::string> @@ -2484,7 +2593,7 @@ JamiAccount::sendTextMessage(const std::string& to, const std::map<std::string, } void -JamiAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token) +JamiAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token, bool retryOnTimeout) { std::string toUri; try { @@ -2505,16 +2614,149 @@ JamiAccount::sendTextMessage(const std::string& to, const std::map<std::string, auto toH = dht::InfoHash(toUri); auto now = clock::to_time_t(clock::now()); - struct PendingConfirmation { - std::mutex lock; - bool replied {false}; - std::map<dht::InfoHash, std::future<size_t>> listenTokens {}; - }; auto confirm = std::make_shared<PendingConfirmation>(); + std::set<std::string> devices; + std::unique_lock<std::mutex> lk(sipConnectionsMtx_); + sip_utils::register_thread(); + for (auto deviceConnIt = sipConnections_[to].begin(); deviceConnIt != sipConnections_[to].end(); ++deviceConnIt) { + if (deviceConnIt->second.empty()) continue; + auto& it = deviceConnIt->second.back(); + + auto transport = it.transport; + auto channel = it.channel; + if (!channel || !channel->underlyingICE()) { + messageEngine_.onMessageSent(to, token, false); + JAMI_WARN("A SIP transport exists without Channel, this is a bug. Please report"); + // Remove connection in incorrect state + sipConnections_.erase(to); + continue; + } + + // Build SIP Message + // "deviceID@IP" + auto toURI = getToUri(to + "@" + channel->underlyingICE()->getRemoteAddress(0).toString(true)); + std::string from = getFromUri(); + pjsip_tx_data* tdata; + + // Build SIP message + constexpr pjsip_method msg_method = {PJSIP_OTHER_METHOD, jami::sip_utils::CONST_PJ_STR("MESSAGE")}; + pj_str_t pjFrom = pj_str((char*) from.c_str()); + pj_str_t pjTo = pj_str((char*) toURI.c_str()); + + // Create request. + pj_status_t status = pjsip_endpt_create_request(link_->getEndpoint(), &msg_method, + &pjTo, &pjFrom, &pjTo, nullptr, nullptr, -1, + nullptr, &tdata); + if (status != PJ_SUCCESS) { + JAMI_ERR("Unable to create request: %s", sip_utils::sip_strerror(status).c_str()); + messageEngine_.onMessageSent(to, token, false); + continue; + } + + // Add Date Header. + pj_str_t date_str; + constexpr auto key = sip_utils::CONST_PJ_STR("Date"); + pjsip_hdr *hdr; + auto time = std::time(nullptr); + auto date = std::ctime(&time); + // the erase-remove idiom for a cstring, removes _all_ new lines with in date + *std::remove(date, date+strlen(date), '\n') = '\0'; + + // Add Header + hdr = reinterpret_cast<pjsip_hdr*>(pjsip_date_hdr_create(tdata->pool, &key, pj_cstr(&date_str, date))); + pjsip_msg_add_hdr(tdata->msg, hdr); + + // Add user agent header. + pjsip_hdr *hdr_list; + auto pJuseragent = sip_utils::CONST_PJ_STR("Jami"); + constexpr pj_str_t STR_USER_AGENT = jami::sip_utils::CONST_PJ_STR("User-Agent"); + + // Add Header + hdr_list = reinterpret_cast<pjsip_hdr*>(pjsip_user_agent_hdr_create(tdata->pool, &STR_USER_AGENT, &pJuseragent)); + pjsip_msg_add_hdr(tdata->msg, hdr_list); + + // Init tdata + const pjsip_tpselector tp_sel = SIPVoIPLink::getTransportSelector(transport->get()); + status = pjsip_tx_data_set_transport(tdata, &tp_sel); + if (status != PJ_SUCCESS) { + JAMI_ERR("Unable to create request: %s", sip_utils::sip_strerror(status).c_str()); + messageEngine_.onMessageSent(to, token, false); + continue; + } + im::fillPJSIPMessageBody(*tdata, payloads); + + // Re-init sent status + messageEngine_.onMessageSent(to, token, false); + + // Because pjsip_endpt_send_request can take quite some time, move it in a io thread to avoid to block + dht::ThreadPool::io().run([w=weak(), tdata, to, token, payloads, retryOnTimeout, deviceId = deviceConnIt->first, confirm] { + auto shared = w.lock(); + if (!shared) return; + + // Set input token into callback + std::unique_ptr<TextMessageCtx> ctx{ std::make_unique<TextMessageCtx>() }; + ctx->acc = shared; + ctx->to = to; + ctx->deviceId = deviceId; + ctx->id = token; + ctx->payloads = payloads; + ctx->retryOnTimeout = retryOnTimeout; + ctx->confirmation = confirm; + + sip_utils::register_thread(); + + auto status = pjsip_endpt_send_request(shared->link_->getEndpoint(), tdata, -1, ctx.release(), + [](void *token, pjsip_event *event) + { + std::unique_ptr<TextMessageCtx> c{ (TextMessageCtx*)token }; + auto code = event->body.tsx_state.tsx->status_code; + auto acc = c->acc.lock(); + if (not acc) return; + + if (code == PJSIP_SC_OK) { + std::unique_lock<std::mutex> l(c->confirmation->lock); + c->confirmation->replied = true; + l.unlock(); + acc->messageEngine_.onMessageSent(c->to, c->id, true); + } else { + JAMI_WARN("Timeout when send a message, close current connection"); + { + std::unique_lock<std::mutex> lk(acc->sipConnectionsMtx_); + acc->sipConnections_[c->to].erase(c->deviceId); + } + acc->connectionManager_->closeConnectionsWith(c->deviceId); + // This MUST be done after closing the connection to avoid race condition + // with messageEngine_ + acc->messageEngine_.onMessageSent(c->to, c->id, false); + + // In that case, the peer typically changed its connectivity. + // After closing sockets with that peer, we try to re-connect to + // that peer one time. + if (c->retryOnTimeout) acc->messageEngine_.onPeerOnline(c->to, false); + } + }); + + if (status != PJ_SUCCESS) { + JAMI_ERR("Unable to send request: %s", sip_utils::sip_strerror(status).c_str()); + shared->messageEngine_.onMessageSent(to, token, false); + } + }); + + devices.emplace(deviceConnIt->first); + } + lk.unlock(); + // Find listening devices for this account - accountManager_->forEachDevice(toH, [this,confirm,to,token,payloads,now](const dht::InfoHash& dev) + accountManager_->forEachDevice(toH, [this,confirm,to,token,payloads,now,retryOnTimeout, devices](const dht::InfoHash& dev) { + // Test if already sent + if (devices.find(dev.toString()) != devices.end()) { + return; + } + + // Else, ask for a channel and send a DHT message + requestSIPConnection(to, dev.toString()); { std::lock_guard<std::mutex> lock(messageMutex_); sentMessages_[token].to.emplace(dev); @@ -2838,4 +3080,60 @@ JamiAccount::cacheTurnServers() }); } +void +JamiAccount::requestSIPConnection(const std::string& peerId, const std::string& deviceId) +{ + // If a connection already exists or is in progress, no need to do this + std::lock_guard<std::mutex> lk(sipConnectionsMtx_); + auto id = std::make_pair<std::string, std::string>(std::string(peerId), std::string(deviceId)); + if (!sipConnections_[peerId][deviceId].empty() || pendingSipConnections_.find(id) != pendingSipConnections_.end()) { + JAMI_DBG("A SIP connection with %s already exists", deviceId.c_str()); + return; + } + pendingSipConnections_.emplace(id); + // If not present, create it + JAMI_INFO("Ask %s for a new SIP channel", deviceId.c_str()); + if (!connectionManager_) return; + connectionManager_->connectDevice(deviceId, "sip", + [w=weak(), id](std::shared_ptr<ChannelSocket> socket) { + auto shared = w.lock(); + if (!shared) return; + if (socket) shared->cacheSIPConnection(std::move(socket), id.first, id.second); + std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_); + shared->pendingSipConnections_.erase(id); + }); +} + +void +JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const std::string& peerId, const std::string& deviceId) +{ + std::unique_lock<std::mutex> lk(sipConnectionsMtx_); + // Convert to SIP transport + sip_utils::register_thread(); + auto onShutdown = [w=weak(), peerId, deviceId, socket]() { + auto shared = w.lock(); + if (!shared) return; + std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_); + auto& connections = shared->sipConnections_[peerId][deviceId]; + auto conn = std::find_if(connections.begin(), connections.end(), [socket](auto v) { + return v.channel == socket; + }); + if (conn != connections.end()) { + connections.erase(conn); + } + }; + auto sip_tr = link_->sipTransportBroker->getChanneledTransport(socket, std::move(onShutdown)); + // Store the connection + sipConnections_[peerId][deviceId].emplace_back(SipConnection { + std::move(sip_tr), + socket + }); + JAMI_DBG("New SIP channel opened with %s", deviceId.c_str()); + lk.unlock(); + + // Retry messages + messageEngine_.onPeerOnline(peerId); +} + + } // namespace jami diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 7a84ab619a09105d88e155f48179ad10f646d47f..9f1230b4b687e4902a4407e11253d5be3afd3aa2 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -76,6 +76,8 @@ class PeerConnection; class ContactList; class AccountManager; struct AccountInfo; +class ChannelSocket; +class SipTransport; /** * @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity. @@ -310,7 +312,7 @@ public: void sendTrustRequest(const std::string& to, const std::vector<uint8_t>& payload); void sendTrustRequestConfirm(const std::string& to); - virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) override; + virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id, bool retryOnTimeout=true) override; virtual uint64_t sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads) override; /* Devices */ @@ -416,9 +418,11 @@ public: */ void registerDhtAddress(IceTransport&); +#ifdef DRING_TESTABLE ConnectionManager& connectionManager() { return *connectionManager_; } +#endif /** * This should be called before flushing the account. @@ -681,6 +685,36 @@ private: void cacheTurnServers(); std::set<std::shared_ptr<dht::http::Request>> requests_; + + std::mutex sipConnectionsMtx_ {}; + struct SipConnection { + std::shared_ptr<SipTransport> transport; + // Needs to keep track of that channel to access underlying ICE + // informations, as the SipTransport use a generic transport + std::shared_ptr<ChannelSocket> channel; + }; + // NOTE: here we use a vector to avoid race conditions. In fact the contact + // can ask for a SIP channel when we are creating a new SIP Channel with this + // peer too. + std::map<std::string /* accountId */, + std::map<std::string /* deviceId */, std::vector<SipConnection>>> sipConnections_ {}; + // However, we only negotiate one socket from our side + std::set<std::pair<std::string /* accountId */, std::string /* deviceId */>> pendingSipConnections_ {}; + + /** + * Ask a device to open a channeled SIP socket + * @param peerId The contact who owns the device + * @param deviceId The device to ask + * @note triggers cacheSIPConnection + */ + void requestSIPConnection(const std::string& peerId, const std::string& deviceId); + /** + * Store a new SIP connection into sipConnections_ + * @param socket The new sip channel + * @param peerId The contact who owns the device + * @param deviceId Device linked to that transport + */ + void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const std::string& peerId, const std::string& deviceId); }; static inline std::ostream& operator<< (std::ostream& os, const JamiAccount& acc) diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp index ead00d592211093f736922ad034aa346d024711c..e8cde3f02fc28008208a2d995a2b7fb97a94b2ef 100644 --- a/src/jamidht/multiplexed_socket.cpp +++ b/src/jamidht/multiplexed_socket.cpp @@ -59,6 +59,7 @@ public: stop.store(true); isShutdown_ = true; if (onShutdown_) onShutdown_(); + endpoint->setOnStateChange({}); endpoint->shutdown(); { std::lock_guard<std::mutex> lkSockets(socketsMutex); @@ -67,7 +68,6 @@ public: // No need to write the EOF for the channel, the write will fail because endpoint is already shutdown if (socket.second) socket.second->stop(); } - sockets.clear(); } } @@ -94,7 +94,6 @@ public: OnConnectionReadyCb onChannelReady_; OnConnectionRequestCb onRequest_; OnShutdownCb onShutdown_; - std::atomic_bool isShutdown_ {false}; std::string deviceId; // Main socket @@ -114,14 +113,14 @@ public: std::map<uint16_t, std::unique_ptr<ChannelInfo>> channelDatas_ {}; std::mutex channelCbsMtx_ {}; std::map<uint16_t, GenericSocket<uint8_t>::RecvCb> channelCbs_ {}; + std::atomic_bool isShutdown_ {false}; }; void MultiplexedSocket::Impl::eventLoop() { endpoint->setOnStateChange([this](tls::TlsSessionState state) { - if (state == tls::TlsSessionState::SHUTDOWN) { - if (isShutdown_) return; + if (state == tls::TlsSessionState::SHUTDOWN && !isShutdown_) { JAMI_INFO("Tls endpoint is down, shutdown multiplexed socket"); shutdown(); } @@ -256,6 +255,7 @@ MultiplexedSocket::Impl::handleChannelPacket(uint16_t channel, const std::vector sockIt->second->shutdown(); dataIt->second->cv.notify_all(); channelDatas_.erase(dataIt); + std::lock_guard<std::mutex> lkSockets(socketsMutex); sockets.erase(sockIt); } else { std::unique_lock<std::mutex> lk(channelCbsMtx_); @@ -456,6 +456,12 @@ MultiplexedSocket::onShutdown(OnShutdownCb&& cb) } } +std::shared_ptr<IceTransport> +MultiplexedSocket::underlyingICE() const +{ + return pimpl_->endpoint->underlyingICE(); +} + //////////////////////////////////////////////////////////////// class ChannelSocket::Impl @@ -466,8 +472,8 @@ public: ~Impl() {} - std::atomic_bool isShutdown_ {false}; OnShutdownCb shutdownCb_; + std::atomic_bool isShutdown_ {false}; std::string name; uint16_t channel; std::weak_ptr<MultiplexedSocket> endpoint; @@ -536,6 +542,14 @@ ChannelSocket::setOnRecv(RecvCb&& cb) ep->setOnRecv(pimpl_->channel, std::move(cb)); } +std::shared_ptr<IceTransport> +ChannelSocket::underlyingICE() const +{ + if (auto mtx = pimpl_->endpoint.lock()) + return mtx->underlyingICE(); + return {}; +} + void ChannelSocket::stop() { diff --git a/src/jamidht/multiplexed_socket.h b/src/jamidht/multiplexed_socket.h index 108ca4215d178391bd1c68e749c9833f4ae85d96..4f4767ef6fb8d35cf9ea948d2c5aef2a50f3f60d 100644 --- a/src/jamidht/multiplexed_socket.h +++ b/src/jamidht/multiplexed_socket.h @@ -23,6 +23,7 @@ namespace jami { +class IceTransport; class ChannelSocket; class TlsSocketEndpoint; @@ -111,6 +112,8 @@ public: */ void onShutdown(OnShutdownCb&& cb); + std::shared_ptr<IceTransport> underlyingICE() const; + private: class Impl; std::unique_ptr<Impl> pimpl_; @@ -157,6 +160,8 @@ public: void setOnRecv(RecvCb&&) override; + std::shared_ptr<IceTransport> underlyingICE() const; + private: class Impl; std::unique_ptr<Impl> pimpl_; diff --git a/src/jamidht/sips_transport_ice.cpp b/src/jamidht/sips_transport_ice.cpp index 77de1e1787f09a61e2b40df9b5b999e28ca21a6a..afff4e0ea9c3c232f2fddebfc6e8f297839df401 100644 --- a/src/jamidht/sips_transport_ice.cpp +++ b/src/jamidht/sips_transport_ice.cpp @@ -45,21 +45,6 @@ namespace jami { namespace tls { -static constexpr int POOL_TP_INIT {512}; -static constexpr int POOL_TP_INC {512}; -static constexpr int TRANSPORT_INFO_LENGTH {64}; - -static void -sockaddr_to_host_port(pj_pool_t* pool, - pjsip_host_port* host_port, - const pj_sockaddr* addr) -{ - host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); - pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0); - host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); - host_port->port = pj_sockaddr_get_port(addr); -} - static pj_status_t tls_status_from_err(int err) { @@ -138,7 +123,7 @@ tls_status_from_err(int err) SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt, int tp_type, const TlsParams& param, - const std::shared_ptr<jami::IceTransport>& ice, + const std::shared_ptr<IceTransport>& ice, int comp_id) : ice_ (ice) , comp_id_ (comp_id) @@ -155,7 +140,7 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt, trData_.self = this; // up-link for PJSIP callbacks pool_ = sip_utils::smart_alloc_pool(endpt, "dtls.pool", - POOL_TP_INIT, POOL_TP_INC); + sip_utils::POOL_TP_INIT, sip_utils::POOL_TP_INC); auto& base = trData_.base; std::memset(&base, 0, sizeof(base)); @@ -179,10 +164,10 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt, auto reg_type = static_cast<pjsip_transport_type_e>(tp_type); base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type)); base.flag = pjsip_transport_get_flag_from_type(reg_type); - base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), TRANSPORT_INFO_LENGTH)); + base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH)); auto remote_addr = remote_.toString(); - pj_ansi_snprintf(base.info, TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name, + pj_ansi_snprintf(base.info, sip_utils::TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name, remote_addr.c_str()); base.addr_len = remote_.getLength(); base.dir = PJSIP_TP_DIR_NONE; @@ -191,8 +176,8 @@ SipsIceTransport::SipsIceTransport(pjsip_endpoint* endpt, auto local = ice->getDefaultLocalAddress(); pj_sockaddr_cp(&base.local_addr, local.pjPtr()); - sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr); - sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr()); + sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr); + sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr()); base.send_msg = [](pjsip_transport *transport, pjsip_tx_data *tdata, diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 7d80aedc5a0a4fa7ce542806235494273632dd9c..d452032fc77a098fd6fae5232bba3e89eb6003d3 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -418,7 +418,7 @@ public: const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(ep_); if (iceSocket) { iceSocket->underlyingICE()->setOnShutdown([this]() { - tls.reset(); + tls->shutdown(); }); } } @@ -450,16 +450,14 @@ public: if (iceSocket) { iceSocket->underlyingICE()->setOnShutdown([this]() { tls->shutdown(); - if (onStateChangeCb_) - onStateChangeCb_(tls::TlsSessionState::SHUTDOWN); }); } } ~Impl() { - tls.reset(); - onReadyCb_ = {}; onStateChangeCb_ = {}; + onReadyCb_ = {}; + tls.reset(); } // TLS callbacks diff --git a/src/sip/sip_utils.cpp b/src/sip/sip_utils.cpp index 97aee9715fb6b04db301e4c8ee5307dfd077f586..86eb4383a149221f892fe1e4471044780b2e7d21 100644 --- a/src/sip/sip_utils.cpp +++ b/src/sip/sip_utils.cpp @@ -204,4 +204,15 @@ register_thread() } } +void +sockaddr_to_host_port(pj_pool_t* pool, + pjsip_host_port* host_port, + const pj_sockaddr* addr) +{ + host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); + pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 0); + host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); + host_port->port = pj_sockaddr_get_port(addr); +} + }} // namespace jami::sip_utils diff --git a/src/sip/sip_utils.h b/src/sip/sip_utils.h index 44c1978e8e1e371f00031ad20cb0f06963fc6ddb..bcdfd6f026f45ce372bd85b4ccf172fb0c5bde11 100644 --- a/src/sip/sip_utils.h +++ b/src/sip/sip_utils.h @@ -146,6 +146,12 @@ smart_alloc_pool(pjsip_endpoint* endpt, const char* const name, pj_size_t initia return std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&>(pool, pj_pool_release); } +void sockaddr_to_host_port(pj_pool_t* pool, pjsip_host_port* host_port, const pj_sockaddr* addr); + +static constexpr int POOL_TP_INIT {512}; +static constexpr int POOL_TP_INC {512}; +static constexpr int TRANSPORT_INFO_LENGTH {64}; + }} // namespace jami::sip_utils #endif // SIP_UTILS_H_ diff --git a/src/sip/sipaccount.cpp b/src/sip/sipaccount.cpp index 2e86541c39601deef0dba921c69a432c21a996f3..0d1342ad634319db00ba8cf16252826c07e1ba58 100644 --- a/src/sip/sipaccount.cpp +++ b/src/sip/sipaccount.cpp @@ -2013,7 +2013,7 @@ static pjsip_accept_hdr* im_create_accept(pj_pool_t *pool) #endif void -SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) +SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id, bool) { if (to.empty() or payloads.empty()) { JAMI_WARN("No sender or payload"); diff --git a/src/sip/sipaccount.h b/src/sip/sipaccount.h index 9a5b65fcf25b8579d290d839d03f0c0fba59e06d..4b65318e388f6fda762fb3665e4e643452602de8 100644 --- a/src/sip/sipaccount.h +++ b/src/sip/sipaccount.h @@ -521,7 +521,7 @@ class SIPAccount : public SIPAccountBase { virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, - uint64_t id) override; + uint64_t id, bool retryOnTimeout=true) override; void connectivityChanged() override; diff --git a/src/sip/sipaccountbase.h b/src/sip/sipaccountbase.h index 33efb2bb71c89d1616033da7740aa940ed8f27b9..8e452e84523e0cfa9e6e93d0349dabc93a21b74d 100644 --- a/src/sip/sipaccountbase.h +++ b/src/sip/sipaccountbase.h @@ -258,7 +258,7 @@ public: const IceTransportOptions getIceOptions() const noexcept override; - virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) = 0; + virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id, bool retryOnTimeout=true) = 0; virtual uint64_t sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads) override { diff --git a/src/sip/siptransport.cpp b/src/sip/siptransport.cpp index c2d69c81232b0189b28efaceb6b15523d6ebcf7a..bb607ece60743b1ec6587a6a242b3559a850583d 100644 --- a/src/sip/siptransport.cpp +++ b/src/sip/siptransport.cpp @@ -26,6 +26,8 @@ #include "security/tls_session.h" #include "jamidht/sips_transport_ice.h" +#include "jamidht/channeled_transport.h" +#include "jamidht/multiplexed_socket.h" #include "array_size.h" #include "compiler_intrinsics.h" @@ -213,9 +215,8 @@ SipTransportBroker::transportStateChanged(pjsip_transport* tp, { std::lock_guard<std::mutex> lock(transportMapMutex_); auto key = transports_.find(tp); - if (key == transports_.end()) { + if (key == transports_.end()) return; - } sipTransport = key->second.lock(); @@ -413,8 +414,7 @@ SipTransportBroker::getTlsIceTransport(const std::shared_ptr<jami::IceTransport> if (ice->isTCPEnabled()) { type = ipv6 ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS; } - auto sip_ice_tr = std::unique_ptr<tls::SipsIceTransport>( - new tls::SipsIceTransport(endpt_, type, params, ice, comp_id)); + auto sip_ice_tr = std::make_unique<tls::SipsIceTransport>(endpt_, type, params, ice, comp_id); auto tr = sip_ice_tr->getTransportBase(); auto sip_tr = std::make_shared<SipTransport>(tr); sip_tr->setIsIceTransport(); @@ -429,4 +429,27 @@ SipTransportBroker::getTlsIceTransport(const std::shared_ptr<jami::IceTransport> return sip_tr; } +std::shared_ptr<SipTransport> +SipTransportBroker::getChanneledTransport(const std::shared_ptr<ChannelSocket>& socket, onShutdownCb&& cb) +{ + auto ice = socket->underlyingICE(); + if (!ice) return {}; + auto local = ice->getLocalAddress(0); + auto remote = ice->getRemoteAddress(0); + auto type = local.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS; + auto sips_tr = std::make_unique<tls::ChanneledSIPTransport>(endpt_, type, socket, local, remote, std::move(cb)); + auto tr = sips_tr->getTransportBase(); + auto sip_tr = std::make_shared<SipTransport>(tr); + sip_tr->setIsIceTransport(); + sips_tr.release(); // managed by PJSIP now + + { + std::lock_guard<std::mutex> lock(transportMapMutex_); + // we do not check for key existence as we've just created it + // (member of new SipIceTransport instance) + transports_.emplace(std::make_pair(tr, sip_tr)); + } + return sip_tr; +} + } // namespace jami diff --git a/src/sip/siptransport.h b/src/sip/siptransport.h index 0ae459a44f272b19437134b9004a2576c12a7ecf..7fd04a6d6604bf8ef03ea76723e95f4008bd6edc 100644 --- a/src/sip/siptransport.h +++ b/src/sip/siptransport.h @@ -50,6 +50,9 @@ struct Certificate; namespace jami { +class ChannelSocket; +using onShutdownCb = std::function<void(void)>; + struct TlsListener { TlsListener() {} @@ -158,6 +161,8 @@ public: std::shared_ptr<SipTransport> addTransport(pjsip_transport*); + std::shared_ptr<SipTransport> getChanneledTransport(const std::shared_ptr<ChannelSocket>& socket, onShutdownCb&& cb); + /** * Start graceful shutdown procedure for all transports */