From e921a0dfcf431bbbbb7ffc3ce7bd7835d9774ea3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Thu, 30 Mar 2023 09:57:48 -0400
Subject: [PATCH] connectionmanager: avoid double lock

removeUnusedConnection can happen and will lock infosMtx_ then cbMtx_
and at the same moment onTlsNegotiationDone can happen and lock
cbMtx_ then infosMtx_ causing a potential double lock.
This patch avoid this case by avoiding the infosMtx_ to be locked
if destroying.

Change-Id: If21d35757e9dbd667ef3e20e8c13025479494333
---
 src/connectivity/connectionmanager.cpp | 70 +++++++++++++-------------
 1 file changed, 36 insertions(+), 34 deletions(-)

diff --git a/src/connectivity/connectionmanager.cpp b/src/connectivity/connectionmanager.cpp
index 169fa7e1be..aae3260260 100644
--- a/src/connectivity/connectionmanager.cpp
+++ b/src/connectivity/connectionmanager.cpp
@@ -72,34 +72,38 @@ public:
 
     void removeUnusedConnections(const DeviceId& deviceId = {})
     {
-        std::lock_guard<std::mutex> lk(infosMtx_);
-        for (auto it = infos_.begin(); it != infos_.end();) {
-            auto& [key, info] = *it;
-            bool erased = false;
-            if (info && (!deviceId || key.first == deviceId)) {
-                if (info->tls_)
-                    info->tls_->shutdown();
-                if (info->socket_)
-                    info->socket_->shutdown();
-                if (info->ice_)
-                    info->ice_->cancelOperations();
-                if (info->waitForAnswer_)
-                    info->waitForAnswer_->cancel();
-                erased = true;
-                it = infos_.erase(it);
+        std::vector<std::shared_ptr<ConnectionInfo>> unused {};
+
+        {
+            std::lock_guard<std::mutex> lk(infosMtx_);
+            for (auto it = infos_.begin(); it != infos_.end();) {
+                auto& [key, info] = *it;
+                if (info && (!deviceId || key.first == deviceId)) {
+                    unused.emplace_back(std::move(info));
+                    it = infos_.erase(it);
+                } else {
+                    ++it;
+                }
             }
-            if (!erased)
-                ++it;
         }
-        if (!deviceId)
-            dht::ThreadPool::io().run([infos = std::move(infos_)]() mutable { infos.clear(); });
+        for (auto& info: unused) {
+            if (info->tls_)
+                info->tls_->shutdown();
+            if (info->socket_)
+                info->socket_->shutdown();
+            if (info->ice_)
+                info->ice_->cancelOperations();
+            if (info->waitForAnswer_)
+                info->waitForAnswer_->cancel();
+        }
+        if (!unused.empty())
+            dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
     }
 
     void shutdown()
     {
-        if (isDestroying_)
+        if (isDestroying_.exchange(true))
             return;
-        isDestroying_ = true;
         {
             std::lock_guard<std::mutex> lk(connectCbsMtx_);
             // Call all pending callbacks that channel is not ready
@@ -161,7 +165,7 @@ public:
     void onDhtPeerRequest(const PeerConnectionRequest& req,
                           const std::shared_ptr<dht::crypto::Certificate>& cert);
 
-    void addNewMultiplexedSocket(const DeviceId& deviceId, const dht::Value::Id& vid);
+    void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
     void onPeerResponse(const PeerConnectionRequest& req);
     void onDhtConnected(const dht::crypto::PublicKey& devicePk);
 
@@ -782,8 +786,7 @@ ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
                                               const dht::Value::Id& vid,
                                               const std::string& name)
 {
-    auto info = getInfo(deviceId, vid);
-    if (!info)
+    if (isDestroying_)
         return;
     // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
     // Note: if not initied by connectDevice() the channel name will be empty (because no channel
@@ -812,7 +815,9 @@ ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
                        << " - Initied by connectDevice(). Initied by channel: " << name
                        << " - vid: " << vid;
         }
-        addNewMultiplexedSocket(deviceId, vid);
+
+        auto info = getInfo(deviceId, vid);
+        addNewMultiplexedSocket({deviceId, vid}, info);
         // Finally, open the channel and launch pending callbacks
         if (info->socket_) {
             // Note: do not remove pending there it's done in sendChannelRequest
@@ -995,7 +1000,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
                 return;
             if (!ok) {
                 JAMI_ERR("ICE negotiation failed");
-                runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
+                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                 return;
             }
 
@@ -1003,7 +1008,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
                 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
                     if (auto shared = w.lock())
                         if (!shared->onRequestOnNegoDone(req))
-                            runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
+                            eraseInfo();
                 });
         };
 
@@ -1029,19 +1034,16 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
         }
         // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
         info->ice_->setOnShutdown([eraseInfo]() {
-            runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
+            dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
         });
         info->ice_->initIceInstance(ice_config);
     });
 }
 
 void
-ConnectionManager::Impl::addNewMultiplexedSocket(const DeviceId& deviceId, const dht::Value::Id& vid)
+ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
 {
-    auto info = getInfo(deviceId, vid);
-    if (!info)
-        return;
-    info->socket_ = std::make_shared<MultiplexedSocket>(deviceId, std::move(info->tls_));
+    info->socket_ = std::make_shared<MultiplexedSocket>(id.first, std::move(info->tls_));
     info->socket_->setOnReady(
         [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
             if (auto sthis = w.lock())
@@ -1056,7 +1058,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const DeviceId& deviceId, const
                 return sthis->channelReqCb_(peer, name);
         return false;
     });
-    info->socket_->onShutdown([w = weak(), deviceId, vid]() {
+    info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
         // Cancel current outgoing connections
         dht::ThreadPool::io().run([w, deviceId, vid] {
             auto sthis = w.lock();
-- 
GitLab