From 9bd73d7b4e169429c98a8b7cb6182a15d60cbfdb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Fri, 16 Dec 2022 10:51:22 -0500
Subject: [PATCH] turncache: avoid turn_sock leaks

Destroying a TURN sock while allocating cause a leak of the socket
and leave it open causing too many opened files to be opened.

This patch resets all structure, move pools into unique pointers
and wait for the destruction of the turn_sock.

Change-Id: I69fc82e204c4abeedb51426396309798602fa866
---
 src/connectivity/turn_cache.cpp     | 46 +++++++++-------
 src/connectivity/turn_cache.h       | 17 ++++--
 src/connectivity/turn_transport.cpp | 82 ++++++++++++++++++++++-------
 src/jamidht/jamiaccount.cpp         | 28 ++++++----
 src/sip/sipaccountbase.cpp          | 12 +++--
 src/sip/sipaccountbase.h            |  5 +-
 6 files changed, 133 insertions(+), 57 deletions(-)

diff --git a/src/connectivity/turn_cache.cpp b/src/connectivity/turn_cache.cpp
index 12fbb43c3c..103f50d5b6 100644
--- a/src/connectivity/turn_cache.cpp
+++ b/src/connectivity/turn_cache.cpp
@@ -25,6 +25,7 @@
 #include "manager.h"
 #include "opendht/thread_pool.h" // TODO remove asio
 #include "turn_cache.h"
+#include <asio/error_code.hpp>
 
 namespace jami {
 
@@ -38,17 +39,16 @@ TurnCache::TurnCache(const std::string& accountId,
 {
     refreshTimer_ = std::make_unique<asio::steady_timer>(*io_context,
                                                          std::chrono::steady_clock::now());
-    onConnectedTimer_ = std::make_unique<asio::steady_timer>(*io_context,
-                                                         std::chrono::steady_clock::now());
     reconfigure(params, enabled);
 }
 
 TurnCache::~TurnCache() {
-    {
-        std::lock_guard<std::mutex> lock(shutdownMtx_);
-        onConnectedTimer_->cancel();
-        onConnectedTimer_.reset();
-    }
+    shutdown();
+}
+
+void
+TurnCache::shutdown()
+{
     {
         std::lock_guard<std::mutex> lock(cachedTurnMutex_);
         testTurnV4_.reset();
@@ -175,13 +175,13 @@ TurnCache::testTurn(IpAddr server)
     try {
         turn = std::make_unique<TurnTransport>(
             params, std::move([this, server](bool ok) {
+                if (!io_context) return;
                 // Stop server in an async job, because this callback can be called
                 // immediately and cachedTurnMutex_ must not be locked.
-                std::lock_guard<std::mutex> lock(shutdownMtx_);
-                if (onConnectedTimer_) {
-                    onConnectedTimer_->expires_at(std::chrono::steady_clock::now());
-                    onConnectedTimer_->async_wait(std::bind(&TurnCache::onConnected, this, std::placeholders::_1, ok, server));
-                }
+                io_context->post([w= weak(), ok, server] {
+                    if (auto shared = w.lock())
+                        shared->onConnected(ok, server);
+                });
             }));
     } catch (const std::exception& e) {
         JAMI_ERROR("TurnTransport creation error: {}", e.what());
@@ -189,11 +189,8 @@ TurnCache::testTurn(IpAddr server)
 }
 
 void
-TurnCache::onConnected(const asio::error_code& ec, bool ok, IpAddr server)
+TurnCache::onConnected(bool ok, IpAddr server)
 {
-    if (ec == asio::error::operation_aborted)
-        return;
-
     std::lock_guard<std::mutex> lk(cachedTurnMutex_);
     auto& cacheTurn = server.isIpv4() ? cacheTurnV4_ : cacheTurnV6_;
     if (!ok) {
@@ -203,11 +200,18 @@ TurnCache::onConnected(const asio::error_code& ec, bool ok, IpAddr server)
         JAMI_DEBUG("Connection to {:s} ready", server.toString());
         cacheTurn = std::make_unique<IpAddr>(server);
     }
-    refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_);
     if (auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_)
         turn->shutdown();
+    refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_);
 }
 
+void
+TurnCache::resetTestTransport()
+{
+    std::lock_guard<std::mutex> lk(cachedTurnMutex_);
+    testTurnV4_.reset();
+    testTurnV6_.reset();
+}
 
 void
 TurnCache::refreshTurnDelay(bool scheduleNext)
@@ -216,12 +220,16 @@ TurnCache::refreshTurnDelay(bool scheduleNext)
     if (scheduleNext) {
         JAMI_WARNING("[Account {:s}] Cache for TURN resolution failed.", accountId_);
         refreshTimer_->expires_at(std::chrono::steady_clock::now() + turnRefreshDelay_);
-        refreshTimer_->async_wait(std::bind(&TurnCache::refresh, this, std::placeholders::_1));
+        refreshTimer_->async_wait(std::bind(&TurnCache::refresh, shared_from_this(), std::placeholders::_1));
         if (turnRefreshDelay_ < std::chrono::minutes(30))
             turnRefreshDelay_ *= 2;
-    } else {
+    } else if (io_context) {
         JAMI_DEBUG("[Account {:s}] Cache refreshed for TURN resolution", accountId_);
         turnRefreshDelay_ = std::chrono::seconds(10);
+        io_context->post([w= weak()] {
+            if (auto shared = w.lock())
+                shared->resetTestTransport();
+        });
     }
 }
 
diff --git a/src/connectivity/turn_cache.h b/src/connectivity/turn_cache.h
index 9c5fa04f3d..9cf83c8ed6 100644
--- a/src/connectivity/turn_cache.h
+++ b/src/connectivity/turn_cache.h
@@ -34,7 +34,7 @@
 
 namespace jami {
 
-class TurnCache
+class TurnCache : public std::enable_shared_from_this<TurnCache>
 {
 public:
     TurnCache(const std::string& accountId,
@@ -53,6 +53,10 @@ public:
      * Refresh cache from current configuration
      */
     void refresh(const asio::error_code& ec = {});
+    /**
+     * Reset connections
+     */
+    void shutdown();
 
 private:
     std::string accountId_;
@@ -80,14 +84,19 @@ private:
     std::unique_ptr<IpAddr> cacheTurnV4_ {};
     std::unique_ptr<IpAddr> cacheTurnV6_ {};
 
-    void onConnected(const asio::error_code& ec, bool ok, IpAddr server);
+    void onConnected(bool ok, IpAddr server);
+    void resetTestTransport();
 
     // io
     std::shared_ptr<asio::io_context> io_context;
     std::unique_ptr<asio::steady_timer> refreshTimer_;
-    std::unique_ptr<asio::steady_timer> onConnectedTimer_;
 
-    std::mutex shutdownMtx_;
+    // Asio :(
+    // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor
+    std::weak_ptr<TurnCache> weak()
+    {
+        return std::static_pointer_cast<TurnCache>(shared_from_this());
+    }
 
 };
 
diff --git a/src/connectivity/turn_transport.cpp b/src/connectivity/turn_transport.cpp
index 5675a6c5d6..fd56778420 100644
--- a/src/connectivity/turn_transport.cpp
+++ b/src/connectivity/turn_transport.cpp
@@ -38,11 +38,44 @@
 
 namespace jami {
 
+struct CachePoolDeleter {
+  void operator()(pj_caching_pool* b) { pj_caching_pool_destroy(b); }
+};
+struct PoolDeleter {
+  void operator()(pj_pool_t* p) { pj_pool_release(p); }
+};
+using CachePool = std::unique_ptr<pj_caching_pool, CachePoolDeleter>;
+using Pool = std::unique_ptr<pj_pool_t, PoolDeleter>;
+
+class TurnLock
+{
+    pj_grp_lock_t* lk_;
+
+public:
+    TurnLock(pj_turn_sock* strans)
+        : lk_(pj_turn_sock_get_grp_lock(strans))
+    {
+        lock();
+    }
+
+    ~TurnLock() { unlock(); }
+
+    void lock() { pj_grp_lock_acquire(lk_); }
+
+    void unlock() { pj_grp_lock_release(lk_); }
+};
+
 class TurnTransport::Impl
 {
 public:
-    Impl(std::function<void(bool)>&& cb) { cb_ = std::move(cb); }
-    ~Impl();
+    Impl(std::function<void(bool)>&& cb)
+    : poolCache(new pj_caching_pool())
+    , pool(nullptr)
+    { cb_ = std::move(cb); }
+
+    ~Impl() {
+        shutdown();
+    }
 
     /**
      * Detect new TURN state
@@ -59,12 +92,24 @@ public:
         ioWorker = std::thread([this] { ioJob(); });
     }
 
-    void stop() { stopped_ = true; }
+    void shutdown()
+    {
+        noCallback_ = true;
+        {
+            TurnLock lock(relay);
+            if (relay)
+                pj_turn_sock_destroy(relay);
+        }
+        if (ioWorker.joinable())
+            ioWorker.join();
+        pool.reset();
+        poolCache.reset();
+    }
 
     TurnTransportParams settings;
 
-    pj_caching_pool poolCache {};
-    pj_pool_t* pool {nullptr};
+    CachePool poolCache {};
+    Pool pool {};
     pj_stun_config stunConfig {};
     pj_turn_sock* relay {nullptr};
     pj_str_t relayAddr {};
@@ -74,15 +119,9 @@ public:
 
     std::thread ioWorker;
     std::atomic_bool stopped_ {false};
+    std::atomic_bool noCallback_ {false};
 };
 
-TurnTransport::Impl::~Impl()
-{
-    stop();
-    if (ioWorker.joinable())
-        ioWorker.join();
-    pj_caching_pool_destroy(&poolCache);
-}
 void
 TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state)
 {
@@ -93,10 +132,13 @@ TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_
         mappedAddr = IpAddr {info.mapped_addr};
         JAMI_DEBUG("TURN server ready, peer relay address: {:s}",
                    peerRelayAddr.toString(true, true).c_str());
+        noCallback_ = true;
         cb_(true);
-    } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY) {
+    } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY and not noCallback_) {
         JAMI_WARNING("TURN server disconnected ({:s})", pj_turn_state_name(new_state));
         cb_(false);
+    } else if (new_state >= PJ_TURN_STATE_DESTROYING) {
+        stopped_ = true;
     }
 }
 void
@@ -119,16 +161,16 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo
         throw std::invalid_argument("invalid turn server address");
     pimpl_->settings = params;
     // PJSIP memory pool
-    pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0);
-    pimpl_->pool = pj_pool_create(&pimpl_->poolCache.factory, "TurnTransport", 512, 512, nullptr);
+    pj_caching_pool_init(pimpl_->poolCache.get(), &pj_pool_factory_default_policy, 0);
+    pimpl_->pool.reset(pj_pool_create(&pimpl_->poolCache->factory, "TurnTransport", 512, 512, nullptr));
     if (not pimpl_->pool)
         throw std::runtime_error("pj_pool_create() failed");
     // STUN config
-    pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr);
+    pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache->factory, 0, nullptr, nullptr);
     // create global timer heap
-    TRY(pj_timer_heap_create(pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap));
+    TRY(pj_timer_heap_create(pimpl_->pool.get(), 1000, &pimpl_->stunConfig.timer_heap));
     // create global ioqueue
-    TRY(pj_ioqueue_create(pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue));
+    TRY(pj_ioqueue_create(pimpl_->pool.get(), 16, &pimpl_->stunConfig.ioqueue));
     // TURN callbacks
     pj_turn_sock_cb relay_cb;
     pj_bzero(&relay_cb, sizeof(relay_cb));
@@ -160,7 +202,7 @@ TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<vo
     pj_cstr(&cred.data.static_cred.username, pimpl_->settings.username.c_str());
     cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
     pj_cstr(&cred.data.static_cred.data, pimpl_->settings.password.c_str());
-    pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str());
+    pimpl_->relayAddr = pj_strdup3(pimpl_->pool.get(), server.toString().c_str());
     // TURN connection/allocation
     JAMI_DEBUG("Connecting to TURN {:s}", server.toString(true, true));
     TRY(pj_turn_sock_alloc(pimpl_->relay,
@@ -177,7 +219,7 @@ TurnTransport::~TurnTransport() {}
 void
 TurnTransport::shutdown()
 {
-    pimpl_->stop();
+    pimpl_->shutdown();
 }
 
 } // namespace jami
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index e0c5ed1903..8c45a7e398 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -1968,13 +1968,16 @@ JamiAccount::doRegister_()
                           getAccountID().c_str(),
                           name.c_str());
 
-                if (this->config().turnEnabled && turnCache_) {
-                    auto addr = turnCache_->getResolvedTurn();
-                    if (addr == std::nullopt) {
-                        // If TURN is enabled, but no TURN cached, there can be a temporary
-                        // resolution error to solve. Sometimes, a connectivity change is not
-                        // enough, so even if this case is really rare, it should be easy to avoid.
-                        turnCache_->refresh();
+                if (this->config().turnEnabled) {
+                    std::lock_guard<std::mutex> lock(turnCacheMtx_);
+                    if (turnCache_) {
+                        auto addr = turnCache_->getResolvedTurn();
+                        if (addr == std::nullopt) {
+                            // If TURN is enabled, but no TURN cached, there can be a temporary
+                            // resolution error to solve. Sometimes, a connectivity change is not
+                            // enough, so even if this case is really rare, it should be easy to avoid.
+                            turnCache_->refresh();
+                        }
                     }
                 }
 
@@ -2251,8 +2254,11 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb)
     // Stop all current p2p connections if account is disabled
     // Else, we let the system managing if the co is down or not
     // NOTE: this is used for changing account's config.
-    if (not isEnabled())
+    if (not isEnabled()) {
         shutdownConnections();
+        std::lock_guard<std::mutex> lk(turnCacheMtx_);
+        if (turnCache_) turnCache_->shutdown();
+    }
 
     // Release current upnp mapping if any.
     if (upnpCtrl_ and dhtUpnpMapping_.isValid()) {
@@ -2284,7 +2290,11 @@ JamiAccount::setRegistrationState(RegistrationState state,
     if (registrationState_ != state) {
         if (state == RegistrationState::REGISTERED) {
             JAMI_WARN("[Account %s] connected", getAccountID().c_str());
-            turnCache_->refresh();
+            {
+                std::lock_guard<std::mutex> lock(turnCacheMtx_);
+                if (turnCache_)
+                    turnCache_->refresh();
+            }
             storeActiveIpAddress();
         } else if (state == RegistrationState::TRYING) {
             JAMI_WARN("[Account %s] connecting…", getAccountID().c_str());
diff --git a/src/sip/sipaccountbase.cpp b/src/sip/sipaccountbase.cpp
index 1ecf03a0af..8f6ba43ac2 100644
--- a/src/sip/sipaccountbase.cpp
+++ b/src/sip/sipaccountbase.cpp
@@ -20,6 +20,8 @@
 
 #include "sip/sipaccountbase.h"
 #include "sip/sipvoiplink.h"
+#include "sipaccountbase.h"
+#include <mutex>
 
 #ifdef ENABLE_VIDEO
 #include "libav_utils.h"
@@ -148,12 +150,13 @@ SIPAccountBase::loadConfig()
     turnParams.username = conf.turnServerUserName;
     turnParams.password = conf.turnServerPwd;
     turnParams.realm = conf.turnServerRealm;
+    std::lock_guard<std::mutex> lk(turnCacheMtx_);
     if (!turnCache_) {
         auto cachePath = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + getAccountID();
-        turnCache_ = std::make_unique<TurnCache>(getAccountID(),
-                                                 cachePath,
-                                                 turnParams,
-                                                 conf.turnEnabled);
+        turnCache_ = std::make_shared<TurnCache>(getAccountID(),
+                                                    cachePath,
+                                                    turnParams,
+                                                    conf.turnEnabled);
     } else {
         turnCache_->reconfigure(turnParams, conf.turnEnabled);
     }
@@ -255,6 +258,7 @@ SIPAccountBase::getIceOptions() const noexcept
 
     // if (config().stunEnabled)
     //     opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_));
+    std::lock_guard<std::mutex> lk(turnCacheMtx_);
     if (config().turnEnabled && turnCache_) {
         auto turnAddr = turnCache_->getResolvedTurn();
         if (turnAddr != std::nullopt) {
diff --git a/src/sip/sipaccountbase.h b/src/sip/sipaccountbase.h
index 70abe43ac5..7278f038dd 100644
--- a/src/sip/sipaccountbase.h
+++ b/src/sip/sipaccountbase.h
@@ -266,7 +266,10 @@ protected:
         std::chrono::steady_clock::time_point::min()};
     std::shared_ptr<Task> composingTimeout_;
 
-    std::unique_ptr<TurnCache> turnCache_;
+    mutable std::mutex turnCacheMtx_;
+    // ASIO :(
+    // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor
+    std::shared_ptr<TurnCache> turnCache_;
 
 private:
     NON_COPYABLE(SIPAccountBase);
-- 
GitLab