diff --git a/src/connectivity/turn_cache.cpp b/src/connectivity/turn_cache.cpp index 103f50d5b6c7a2b2cf9c16a0a9c3acae20dc2a8d..94a0d13b059b746c86d9667c4e2f719039131448 100644 --- a/src/connectivity/turn_cache.cpp +++ b/src/connectivity/turn_cache.cpp @@ -25,7 +25,6 @@ #include "manager.h" #include "opendht/thread_pool.h" // TODO remove asio #include "turn_cache.h" -#include <asio/error_code.hpp> namespace jami { @@ -39,16 +38,17 @@ TurnCache::TurnCache(const std::string& accountId, { refreshTimer_ = std::make_unique<asio::steady_timer>(*io_context, std::chrono::steady_clock::now()); + onConnectedTimer_ = std::make_unique<asio::steady_timer>(*io_context, + std::chrono::steady_clock::now()); reconfigure(params, enabled); } TurnCache::~TurnCache() { - shutdown(); -} - -void -TurnCache::shutdown() -{ + { + std::lock_guard<std::mutex> lock(shutdownMtx_); + onConnectedTimer_->cancel(); + onConnectedTimer_.reset(); + } { std::lock_guard<std::mutex> lock(cachedTurnMutex_); testTurnV4_.reset(); @@ -175,13 +175,13 @@ TurnCache::testTurn(IpAddr server) try { turn = std::make_unique<TurnTransport>( params, std::move([this, server](bool ok) { - if (!io_context) return; // Stop server in an async job, because this callback can be called // immediately and cachedTurnMutex_ must not be locked. - io_context->post([w= weak(), ok, server] { - if (auto shared = w.lock()) - shared->onConnected(ok, server); - }); + std::lock_guard<std::mutex> lock(shutdownMtx_); + if (onConnectedTimer_) { + onConnectedTimer_->expires_at(std::chrono::steady_clock::now()); + onConnectedTimer_->async_wait(std::bind(&TurnCache::onConnected, shared_from_this(), std::placeholders::_1, ok, server)); + } })); } catch (const std::exception& e) { JAMI_ERROR("TurnTransport creation error: {}", e.what()); @@ -189,8 +189,11 @@ TurnCache::testTurn(IpAddr server) } void -TurnCache::onConnected(bool ok, IpAddr server) +TurnCache::onConnected(const asio::error_code& ec, bool ok, IpAddr server) { + if (ec == asio::error::operation_aborted) + return; + std::lock_guard<std::mutex> lk(cachedTurnMutex_); auto& cacheTurn = server.isIpv4() ? cacheTurnV4_ : cacheTurnV6_; if (!ok) { @@ -200,18 +203,11 @@ TurnCache::onConnected(bool ok, IpAddr server) JAMI_DEBUG("Connection to {:s} ready", server.toString()); cacheTurn = std::make_unique<IpAddr>(server); } + refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_); if (auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_) turn->shutdown(); - refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_); } -void -TurnCache::resetTestTransport() -{ - std::lock_guard<std::mutex> lk(cachedTurnMutex_); - testTurnV4_.reset(); - testTurnV6_.reset(); -} void TurnCache::refreshTurnDelay(bool scheduleNext) @@ -223,13 +219,9 @@ TurnCache::refreshTurnDelay(bool scheduleNext) refreshTimer_->async_wait(std::bind(&TurnCache::refresh, shared_from_this(), std::placeholders::_1)); if (turnRefreshDelay_ < std::chrono::minutes(30)) turnRefreshDelay_ *= 2; - } else if (io_context) { + } else { JAMI_DEBUG("[Account {:s}] Cache refreshed for TURN resolution", accountId_); turnRefreshDelay_ = std::chrono::seconds(10); - io_context->post([w= weak()] { - if (auto shared = w.lock()) - shared->resetTestTransport(); - }); } } diff --git a/src/connectivity/turn_cache.h b/src/connectivity/turn_cache.h index 9cf83c8ed6490d89acbd8d8ea721912f3ddec28e..34781d3d94d465b0eb523c6992e7ba6ccd4e2817 100644 --- a/src/connectivity/turn_cache.h +++ b/src/connectivity/turn_cache.h @@ -53,10 +53,6 @@ public: * Refresh cache from current configuration */ void refresh(const asio::error_code& ec = {}); - /** - * Reset connections - */ - void shutdown(); private: std::string accountId_; @@ -84,20 +80,20 @@ private: std::unique_ptr<IpAddr> cacheTurnV4_ {}; std::unique_ptr<IpAddr> cacheTurnV6_ {}; - void onConnected(bool ok, IpAddr server); - void resetTestTransport(); + void onConnected(const asio::error_code& ec, bool ok, IpAddr server); // io std::shared_ptr<asio::io_context> io_context; std::unique_ptr<asio::steady_timer> refreshTimer_; + std::unique_ptr<asio::steady_timer> onConnectedTimer_; + std::mutex shutdownMtx_; // Asio :( // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor std::weak_ptr<TurnCache> weak() { return std::static_pointer_cast<TurnCache>(shared_from_this()); } - }; } // namespace jami diff --git a/src/connectivity/turn_transport.cpp b/src/connectivity/turn_transport.cpp index fd5677842091a4d93a051249ea759a9d3ef89c17..9266f7717a06a71b004b10578bfa518912a1e83e 100644 --- a/src/connectivity/turn_transport.cpp +++ b/src/connectivity/turn_transport.cpp @@ -38,44 +38,29 @@ namespace jami { -struct CachePoolDeleter { - void operator()(pj_caching_pool* b) { pj_caching_pool_destroy(b); } -}; -struct PoolDeleter { - void operator()(pj_pool_t* p) { pj_pool_release(p); } -}; -using CachePool = std::unique_ptr<pj_caching_pool, CachePoolDeleter>; -using Pool = std::unique_ptr<pj_pool_t, PoolDeleter>; - class TurnLock { pj_grp_lock_t* lk_; public: - TurnLock(pj_turn_sock* strans) - : lk_(pj_turn_sock_get_grp_lock(strans)) + TurnLock(pj_turn_sock* sock) + : lk_(pj_turn_sock_get_grp_lock(sock)) { lock(); } ~TurnLock() { unlock(); } - void lock() { pj_grp_lock_acquire(lk_); } + void lock() { pj_grp_lock_add_ref(lk_); } - void unlock() { pj_grp_lock_release(lk_); } + void unlock() { pj_grp_lock_dec_ref(lk_); } }; class TurnTransport::Impl { public: - Impl(std::function<void(bool)>&& cb) - : poolCache(new pj_caching_pool()) - , pool(nullptr) - { cb_ = std::move(cb); } - - ~Impl() { - shutdown(); - } + Impl(std::function<void(bool)>&& cb) { cb_ = std::move(cb); } + ~Impl(); /** * Detect new TURN state @@ -92,26 +77,24 @@ public: ioWorker = std::thread([this] { ioJob(); }); } - void shutdown() - { - noCallback_ = true; - { - TurnLock lock(relay); - if (relay) - pj_turn_sock_destroy(relay); + void shutdown() { + std::lock_guard<std::mutex> lock(shutdownMtx_); + if (relay) { + pj_turn_sock_destroy(relay); + relay = nullptr; } + turnLock.reset(); if (ioWorker.joinable()) ioWorker.join(); - pool.reset(); - poolCache.reset(); - } + } TurnTransportParams settings; - CachePool poolCache {}; - Pool pool {}; + pj_caching_pool poolCache {}; + pj_pool_t* pool {nullptr}; pj_stun_config stunConfig {}; pj_turn_sock* relay {nullptr}; + std::unique_ptr<TurnLock> turnLock; pj_str_t relayAddr {}; IpAddr peerRelayAddr; // address where peers should connect to IpAddr mappedAddr; @@ -119,12 +102,23 @@ public: std::thread ioWorker; std::atomic_bool stopped_ {false}; - std::atomic_bool noCallback_ {false}; + std::atomic_bool cbCalled_ {false}; + std::mutex shutdownMtx_; }; +TurnTransport::Impl::~Impl() +{ + shutdown(); + pj_caching_pool_destroy(&poolCache); +} void TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state) { + if (new_state == PJ_TURN_STATE_DESTROYING) { + stopped_ = true; + return; + } + if (new_state == PJ_TURN_STATE_READY) { pj_turn_session_info info; pj_turn_sock_get_info(relay, &info); @@ -132,13 +126,11 @@ TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_ mappedAddr = IpAddr {info.mapped_addr}; JAMI_DEBUG("TURN server ready, peer relay address: {:s}", peerRelayAddr.toString(true, true).c_str()); - noCallback_ = true; + cbCalled_ = true; cb_(true); - } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY and not noCallback_) { + } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY and not cbCalled_) { JAMI_WARNING("TURN server disconnected ({:s})", pj_turn_state_name(new_state)); cb_(false); - } else if (new_state >= PJ_TURN_STATE_DESTROYING) { - stopped_ = true; } } void @@ -161,16 +153,16 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo throw std::invalid_argument("invalid turn server address"); pimpl_->settings = params; // PJSIP memory pool - pj_caching_pool_init(pimpl_->poolCache.get(), &pj_pool_factory_default_policy, 0); - pimpl_->pool.reset(pj_pool_create(&pimpl_->poolCache->factory, "TurnTransport", 512, 512, nullptr)); + pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0); + pimpl_->pool = pj_pool_create(&pimpl_->poolCache.factory, "TurnTransport", 512, 512, nullptr); if (not pimpl_->pool) throw std::runtime_error("pj_pool_create() failed"); // STUN config - pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache->factory, 0, nullptr, nullptr); + pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr); // create global timer heap - TRY(pj_timer_heap_create(pimpl_->pool.get(), 1000, &pimpl_->stunConfig.timer_heap)); + TRY(pj_timer_heap_create(pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap)); // create global ioqueue - TRY(pj_ioqueue_create(pimpl_->pool.get(), 16, &pimpl_->stunConfig.ioqueue)); + TRY(pj_ioqueue_create(pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue)); // TURN callbacks pj_turn_sock_cb relay_cb; pj_bzero(&relay_cb, sizeof(relay_cb)); @@ -202,7 +194,7 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo pj_cstr(&cred.data.static_cred.username, pimpl_->settings.username.c_str()); cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; pj_cstr(&cred.data.static_cred.data, pimpl_->settings.password.c_str()); - pimpl_->relayAddr = pj_strdup3(pimpl_->pool.get(), server.toString().c_str()); + pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str()); // TURN connection/allocation JAMI_DEBUG("Connecting to TURN {:s}", server.toString(true, true)); TRY(pj_turn_sock_alloc(pimpl_->relay, @@ -211,6 +203,7 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo nullptr, &cred, &turn_alloc_param)); + pimpl_->turnLock = std::make_unique<TurnLock>(pimpl_->relay); pimpl_->start(); } diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 8c45a7e3989fbafc343f10d09e5594ff1986dc33..e0c5ed1903f6c2fdfd051c2cdd8a365d07ed021d 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1968,16 +1968,13 @@ JamiAccount::doRegister_() getAccountID().c_str(), name.c_str()); - if (this->config().turnEnabled) { - std::lock_guard<std::mutex> lock(turnCacheMtx_); - if (turnCache_) { - auto addr = turnCache_->getResolvedTurn(); - if (addr == std::nullopt) { - // If TURN is enabled, but no TURN cached, there can be a temporary - // resolution error to solve. Sometimes, a connectivity change is not - // enough, so even if this case is really rare, it should be easy to avoid. - turnCache_->refresh(); - } + if (this->config().turnEnabled && turnCache_) { + auto addr = turnCache_->getResolvedTurn(); + if (addr == std::nullopt) { + // If TURN is enabled, but no TURN cached, there can be a temporary + // resolution error to solve. Sometimes, a connectivity change is not + // enough, so even if this case is really rare, it should be easy to avoid. + turnCache_->refresh(); } } @@ -2254,11 +2251,8 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb) // Stop all current p2p connections if account is disabled // Else, we let the system managing if the co is down or not // NOTE: this is used for changing account's config. - if (not isEnabled()) { + if (not isEnabled()) shutdownConnections(); - std::lock_guard<std::mutex> lk(turnCacheMtx_); - if (turnCache_) turnCache_->shutdown(); - } // Release current upnp mapping if any. if (upnpCtrl_ and dhtUpnpMapping_.isValid()) { @@ -2290,11 +2284,7 @@ JamiAccount::setRegistrationState(RegistrationState state, if (registrationState_ != state) { if (state == RegistrationState::REGISTERED) { JAMI_WARN("[Account %s] connected", getAccountID().c_str()); - { - std::lock_guard<std::mutex> lock(turnCacheMtx_); - if (turnCache_) - turnCache_->refresh(); - } + turnCache_->refresh(); storeActiveIpAddress(); } else if (state == RegistrationState::TRYING) { JAMI_WARN("[Account %s] connecting…", getAccountID().c_str()); diff --git a/src/sip/sipaccountbase.cpp b/src/sip/sipaccountbase.cpp index 8f6ba43ac221e5ca9eed4bb719857dda02280551..fa3609aa81f83e6bcab9768f1a3f3bf8c0ba226e 100644 --- a/src/sip/sipaccountbase.cpp +++ b/src/sip/sipaccountbase.cpp @@ -20,8 +20,6 @@ #include "sip/sipaccountbase.h" #include "sip/sipvoiplink.h" -#include "sipaccountbase.h" -#include <mutex> #ifdef ENABLE_VIDEO #include "libav_utils.h" @@ -150,13 +148,12 @@ SIPAccountBase::loadConfig() turnParams.username = conf.turnServerUserName; turnParams.password = conf.turnServerPwd; turnParams.realm = conf.turnServerRealm; - std::lock_guard<std::mutex> lk(turnCacheMtx_); if (!turnCache_) { auto cachePath = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + getAccountID(); turnCache_ = std::make_shared<TurnCache>(getAccountID(), - cachePath, - turnParams, - conf.turnEnabled); + cachePath, + turnParams, + conf.turnEnabled); } else { turnCache_->reconfigure(turnParams, conf.turnEnabled); } @@ -258,7 +255,6 @@ SIPAccountBase::getIceOptions() const noexcept // if (config().stunEnabled) // opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_)); - std::lock_guard<std::mutex> lk(turnCacheMtx_); if (config().turnEnabled && turnCache_) { auto turnAddr = turnCache_->getResolvedTurn(); if (turnAddr != std::nullopt) { diff --git a/src/sip/sipaccountbase.h b/src/sip/sipaccountbase.h index 7278f038dd69ceec1043eecd205c3d23fe8ed2bb..9c2eb8e130787afe0cc4df4def701b2e42e769ce 100644 --- a/src/sip/sipaccountbase.h +++ b/src/sip/sipaccountbase.h @@ -266,9 +266,6 @@ protected: std::chrono::steady_clock::time_point::min()}; std::shared_ptr<Task> composingTimeout_; - mutable std::mutex turnCacheMtx_; - // ASIO :( - // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor std::shared_ptr<TurnCache> turnCache_; private: