diff --git a/src/connectivity/turn_cache.cpp b/src/connectivity/turn_cache.cpp index 12fbb43c3c358139420da84e5253d5b3bac65af0..103f50d5b6c7a2b2cf9c16a0a9c3acae20dc2a8d 100644 --- a/src/connectivity/turn_cache.cpp +++ b/src/connectivity/turn_cache.cpp @@ -25,6 +25,7 @@ #include "manager.h" #include "opendht/thread_pool.h" // TODO remove asio #include "turn_cache.h" +#include <asio/error_code.hpp> namespace jami { @@ -38,17 +39,16 @@ TurnCache::TurnCache(const std::string& accountId, { refreshTimer_ = std::make_unique<asio::steady_timer>(*io_context, std::chrono::steady_clock::now()); - onConnectedTimer_ = std::make_unique<asio::steady_timer>(*io_context, - std::chrono::steady_clock::now()); reconfigure(params, enabled); } TurnCache::~TurnCache() { - { - std::lock_guard<std::mutex> lock(shutdownMtx_); - onConnectedTimer_->cancel(); - onConnectedTimer_.reset(); - } + shutdown(); +} + +void +TurnCache::shutdown() +{ { std::lock_guard<std::mutex> lock(cachedTurnMutex_); testTurnV4_.reset(); @@ -175,13 +175,13 @@ TurnCache::testTurn(IpAddr server) try { turn = std::make_unique<TurnTransport>( params, std::move([this, server](bool ok) { + if (!io_context) return; // Stop server in an async job, because this callback can be called // immediately and cachedTurnMutex_ must not be locked. - std::lock_guard<std::mutex> lock(shutdownMtx_); - if (onConnectedTimer_) { - onConnectedTimer_->expires_at(std::chrono::steady_clock::now()); - onConnectedTimer_->async_wait(std::bind(&TurnCache::onConnected, this, std::placeholders::_1, ok, server)); - } + io_context->post([w= weak(), ok, server] { + if (auto shared = w.lock()) + shared->onConnected(ok, server); + }); })); } catch (const std::exception& e) { JAMI_ERROR("TurnTransport creation error: {}", e.what()); @@ -189,11 +189,8 @@ TurnCache::testTurn(IpAddr server) } void -TurnCache::onConnected(const asio::error_code& ec, bool ok, IpAddr server) +TurnCache::onConnected(bool ok, IpAddr server) { - if (ec == asio::error::operation_aborted) - return; - std::lock_guard<std::mutex> lk(cachedTurnMutex_); auto& cacheTurn = server.isIpv4() ? cacheTurnV4_ : cacheTurnV6_; if (!ok) { @@ -203,11 +200,18 @@ TurnCache::onConnected(const asio::error_code& ec, bool ok, IpAddr server) JAMI_DEBUG("Connection to {:s} ready", server.toString()); cacheTurn = std::make_unique<IpAddr>(server); } - refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_); if (auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_) turn->shutdown(); + refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_); } +void +TurnCache::resetTestTransport() +{ + std::lock_guard<std::mutex> lk(cachedTurnMutex_); + testTurnV4_.reset(); + testTurnV6_.reset(); +} void TurnCache::refreshTurnDelay(bool scheduleNext) @@ -216,12 +220,16 @@ TurnCache::refreshTurnDelay(bool scheduleNext) if (scheduleNext) { JAMI_WARNING("[Account {:s}] Cache for TURN resolution failed.", accountId_); refreshTimer_->expires_at(std::chrono::steady_clock::now() + turnRefreshDelay_); - refreshTimer_->async_wait(std::bind(&TurnCache::refresh, this, std::placeholders::_1)); + refreshTimer_->async_wait(std::bind(&TurnCache::refresh, shared_from_this(), std::placeholders::_1)); if (turnRefreshDelay_ < std::chrono::minutes(30)) turnRefreshDelay_ *= 2; - } else { + } else if (io_context) { JAMI_DEBUG("[Account {:s}] Cache refreshed for TURN resolution", accountId_); turnRefreshDelay_ = std::chrono::seconds(10); + io_context->post([w= weak()] { + if (auto shared = w.lock()) + shared->resetTestTransport(); + }); } } diff --git a/src/connectivity/turn_cache.h b/src/connectivity/turn_cache.h index 9c5fa04f3ddbd96dba76d994e1067eb540691df2..9cf83c8ed6490d89acbd8d8ea721912f3ddec28e 100644 --- a/src/connectivity/turn_cache.h +++ b/src/connectivity/turn_cache.h @@ -34,7 +34,7 @@ namespace jami { -class TurnCache +class TurnCache : public std::enable_shared_from_this<TurnCache> { public: TurnCache(const std::string& accountId, @@ -53,6 +53,10 @@ public: * Refresh cache from current configuration */ void refresh(const asio::error_code& ec = {}); + /** + * Reset connections + */ + void shutdown(); private: std::string accountId_; @@ -80,14 +84,19 @@ private: std::unique_ptr<IpAddr> cacheTurnV4_ {}; std::unique_ptr<IpAddr> cacheTurnV6_ {}; - void onConnected(const asio::error_code& ec, bool ok, IpAddr server); + void onConnected(bool ok, IpAddr server); + void resetTestTransport(); // io std::shared_ptr<asio::io_context> io_context; std::unique_ptr<asio::steady_timer> refreshTimer_; - std::unique_ptr<asio::steady_timer> onConnectedTimer_; - std::mutex shutdownMtx_; + // Asio :( + // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor + std::weak_ptr<TurnCache> weak() + { + return std::static_pointer_cast<TurnCache>(shared_from_this()); + } }; diff --git a/src/connectivity/turn_transport.cpp b/src/connectivity/turn_transport.cpp index 5675a6c5d6e292f32ef6f48e6cbdf19e3831e256..fd5677842091a4d93a051249ea759a9d3ef89c17 100644 --- a/src/connectivity/turn_transport.cpp +++ b/src/connectivity/turn_transport.cpp @@ -38,11 +38,44 @@ namespace jami { +struct CachePoolDeleter { + void operator()(pj_caching_pool* b) { pj_caching_pool_destroy(b); } +}; +struct PoolDeleter { + void operator()(pj_pool_t* p) { pj_pool_release(p); } +}; +using CachePool = std::unique_ptr<pj_caching_pool, CachePoolDeleter>; +using Pool = std::unique_ptr<pj_pool_t, PoolDeleter>; + +class TurnLock +{ + pj_grp_lock_t* lk_; + +public: + TurnLock(pj_turn_sock* strans) + : lk_(pj_turn_sock_get_grp_lock(strans)) + { + lock(); + } + + ~TurnLock() { unlock(); } + + void lock() { pj_grp_lock_acquire(lk_); } + + void unlock() { pj_grp_lock_release(lk_); } +}; + class TurnTransport::Impl { public: - Impl(std::function<void(bool)>&& cb) { cb_ = std::move(cb); } - ~Impl(); + Impl(std::function<void(bool)>&& cb) + : poolCache(new pj_caching_pool()) + , pool(nullptr) + { cb_ = std::move(cb); } + + ~Impl() { + shutdown(); + } /** * Detect new TURN state @@ -59,12 +92,24 @@ public: ioWorker = std::thread([this] { ioJob(); }); } - void stop() { stopped_ = true; } + void shutdown() + { + noCallback_ = true; + { + TurnLock lock(relay); + if (relay) + pj_turn_sock_destroy(relay); + } + if (ioWorker.joinable()) + ioWorker.join(); + pool.reset(); + poolCache.reset(); + } TurnTransportParams settings; - pj_caching_pool poolCache {}; - pj_pool_t* pool {nullptr}; + CachePool poolCache {}; + Pool pool {}; pj_stun_config stunConfig {}; pj_turn_sock* relay {nullptr}; pj_str_t relayAddr {}; @@ -74,15 +119,9 @@ public: std::thread ioWorker; std::atomic_bool stopped_ {false}; + std::atomic_bool noCallback_ {false}; }; -TurnTransport::Impl::~Impl() -{ - stop(); - if (ioWorker.joinable()) - ioWorker.join(); - pj_caching_pool_destroy(&poolCache); -} void TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state) { @@ -93,10 +132,13 @@ TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_ mappedAddr = IpAddr {info.mapped_addr}; JAMI_DEBUG("TURN server ready, peer relay address: {:s}", peerRelayAddr.toString(true, true).c_str()); + noCallback_ = true; cb_(true); - } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY) { + } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY and not noCallback_) { JAMI_WARNING("TURN server disconnected ({:s})", pj_turn_state_name(new_state)); cb_(false); + } else if (new_state >= PJ_TURN_STATE_DESTROYING) { + stopped_ = true; } } void @@ -119,16 +161,16 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo throw std::invalid_argument("invalid turn server address"); pimpl_->settings = params; // PJSIP memory pool - pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0); - pimpl_->pool = pj_pool_create(&pimpl_->poolCache.factory, "TurnTransport", 512, 512, nullptr); + pj_caching_pool_init(pimpl_->poolCache.get(), &pj_pool_factory_default_policy, 0); + pimpl_->pool.reset(pj_pool_create(&pimpl_->poolCache->factory, "TurnTransport", 512, 512, nullptr)); if (not pimpl_->pool) throw std::runtime_error("pj_pool_create() failed"); // STUN config - pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr); + pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache->factory, 0, nullptr, nullptr); // create global timer heap - TRY(pj_timer_heap_create(pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap)); + TRY(pj_timer_heap_create(pimpl_->pool.get(), 1000, &pimpl_->stunConfig.timer_heap)); // create global ioqueue - TRY(pj_ioqueue_create(pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue)); + TRY(pj_ioqueue_create(pimpl_->pool.get(), 16, &pimpl_->stunConfig.ioqueue)); // TURN callbacks pj_turn_sock_cb relay_cb; pj_bzero(&relay_cb, sizeof(relay_cb)); @@ -160,7 +202,7 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo pj_cstr(&cred.data.static_cred.username, pimpl_->settings.username.c_str()); cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; pj_cstr(&cred.data.static_cred.data, pimpl_->settings.password.c_str()); - pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str()); + pimpl_->relayAddr = pj_strdup3(pimpl_->pool.get(), server.toString().c_str()); // TURN connection/allocation JAMI_DEBUG("Connecting to TURN {:s}", server.toString(true, true)); TRY(pj_turn_sock_alloc(pimpl_->relay, @@ -177,7 +219,7 @@ TurnTransport::~TurnTransport() {} void TurnTransport::shutdown() { - pimpl_->stop(); + pimpl_->shutdown(); } } // namespace jami diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index e0c5ed1903f6c2fdfd051c2cdd8a365d07ed021d..8c45a7e3989fbafc343f10d09e5594ff1986dc33 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1968,13 +1968,16 @@ JamiAccount::doRegister_() getAccountID().c_str(), name.c_str()); - if (this->config().turnEnabled && turnCache_) { - auto addr = turnCache_->getResolvedTurn(); - if (addr == std::nullopt) { - // If TURN is enabled, but no TURN cached, there can be a temporary - // resolution error to solve. Sometimes, a connectivity change is not - // enough, so even if this case is really rare, it should be easy to avoid. - turnCache_->refresh(); + if (this->config().turnEnabled) { + std::lock_guard<std::mutex> lock(turnCacheMtx_); + if (turnCache_) { + auto addr = turnCache_->getResolvedTurn(); + if (addr == std::nullopt) { + // If TURN is enabled, but no TURN cached, there can be a temporary + // resolution error to solve. Sometimes, a connectivity change is not + // enough, so even if this case is really rare, it should be easy to avoid. + turnCache_->refresh(); + } } } @@ -2251,8 +2254,11 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb) // Stop all current p2p connections if account is disabled // Else, we let the system managing if the co is down or not // NOTE: this is used for changing account's config. - if (not isEnabled()) + if (not isEnabled()) { shutdownConnections(); + std::lock_guard<std::mutex> lk(turnCacheMtx_); + if (turnCache_) turnCache_->shutdown(); + } // Release current upnp mapping if any. if (upnpCtrl_ and dhtUpnpMapping_.isValid()) { @@ -2284,7 +2290,11 @@ JamiAccount::setRegistrationState(RegistrationState state, if (registrationState_ != state) { if (state == RegistrationState::REGISTERED) { JAMI_WARN("[Account %s] connected", getAccountID().c_str()); - turnCache_->refresh(); + { + std::lock_guard<std::mutex> lock(turnCacheMtx_); + if (turnCache_) + turnCache_->refresh(); + } storeActiveIpAddress(); } else if (state == RegistrationState::TRYING) { JAMI_WARN("[Account %s] connecting…", getAccountID().c_str()); diff --git a/src/sip/sipaccountbase.cpp b/src/sip/sipaccountbase.cpp index 1ecf03a0af7d46c20e420dbd54ebb257d11bd0a2..8f6ba43ac221e5ca9eed4bb719857dda02280551 100644 --- a/src/sip/sipaccountbase.cpp +++ b/src/sip/sipaccountbase.cpp @@ -20,6 +20,8 @@ #include "sip/sipaccountbase.h" #include "sip/sipvoiplink.h" +#include "sipaccountbase.h" +#include <mutex> #ifdef ENABLE_VIDEO #include "libav_utils.h" @@ -148,12 +150,13 @@ SIPAccountBase::loadConfig() turnParams.username = conf.turnServerUserName; turnParams.password = conf.turnServerPwd; turnParams.realm = conf.turnServerRealm; + std::lock_guard<std::mutex> lk(turnCacheMtx_); if (!turnCache_) { auto cachePath = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + getAccountID(); - turnCache_ = std::make_unique<TurnCache>(getAccountID(), - cachePath, - turnParams, - conf.turnEnabled); + turnCache_ = std::make_shared<TurnCache>(getAccountID(), + cachePath, + turnParams, + conf.turnEnabled); } else { turnCache_->reconfigure(turnParams, conf.turnEnabled); } @@ -255,6 +258,7 @@ SIPAccountBase::getIceOptions() const noexcept // if (config().stunEnabled) // opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_)); + std::lock_guard<std::mutex> lk(turnCacheMtx_); if (config().turnEnabled && turnCache_) { auto turnAddr = turnCache_->getResolvedTurn(); if (turnAddr != std::nullopt) { diff --git a/src/sip/sipaccountbase.h b/src/sip/sipaccountbase.h index 70abe43ac5189dee7b6d467ac1cfce68e8ff7248..7278f038dd69ceec1043eecd205c3d23fe8ed2bb 100644 --- a/src/sip/sipaccountbase.h +++ b/src/sip/sipaccountbase.h @@ -266,7 +266,10 @@ protected: std::chrono::steady_clock::time_point::min()}; std::shared_ptr<Task> composingTimeout_; - std::unique_ptr<TurnCache> turnCache_; + mutable std::mutex turnCacheMtx_; + // ASIO :( + // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor + std::shared_ptr<TurnCache> turnCache_; private: NON_COPYABLE(SIPAccountBase);