From cdd1f24e97f08fdaf5fe3462adff3118e002e3b4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Mon, 5 Oct 2020 12:27:17 -0400
Subject: [PATCH] connectionManager: simplify states and mutex usage

All structures are now stored in a map<<InfoHash, Vid>, ConnectionInfo> instead
of 3 maps to manipulate states. This avoids a lot of complexity and potential
crashes

Change-Id: I003c4bd9a94b6c160911e2dcc8c4a15835a749c9
---
 src/jamidht/connectionmanager.cpp | 453 ++++++++++++++----------------
 1 file changed, 204 insertions(+), 249 deletions(-)

diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp
index 86bd08b3b5..319baf0acf 100644
--- a/src/jamidht/connectionmanager.cpp
+++ b/src/jamidht/connectionmanager.cpp
@@ -28,6 +28,7 @@
 #include <opendht/thread_pool.h>
 #include <opendht/value.h>
 
+#include <algorithm>
 #include <mutex>
 #include <map>
 #include <condition_variable>
@@ -48,11 +49,16 @@ struct ConnectionInfo
     PeerConnectionRequest response_ {};
     std::mutex mutex_ {};
     std::unique_ptr<IceTransport> ice_ {nullptr};
+    // Used to store currently non ready TLS Socket
+    std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
+    std::shared_ptr<MultiplexedSocket> socket_ {};
 };
 
 class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
 {
 public:
+    using ConnectionKey = std::pair<dht::InfoHash /* device id */, dht::Value::Id /* uid */>;
+
     explicit Impl(JamiAccount& account)
         : account {account}
     {}
@@ -60,39 +66,26 @@ public:
 
     void removeUnusedConnections(const std::string& deviceId = "")
     {
-        {
-            std::lock_guard<std::mutex> lk(nonReadySocketsMutex_);
-            for (auto& listSocks : nonReadySockets_) {
-                if (!deviceId.empty() && listSocks.first != deviceId)
-                    continue;
-                for (auto& tlsSock : listSocks.second) {
-                    if (tlsSock.second)
-                        tlsSock.second->shutdown();
+        std::lock_guard<std::mutex> lk(infosMtx_);
+        for (auto it = infos_.begin(); it != infos_.end();) {
+            auto& [key, info] = *it;
+            bool erased = false;
+            if (info) {
+                if (info->tls_)
+                    info->tls_->shutdown();
+                if (info->socket_)
+                    info->socket_->shutdown();
+                if (!deviceId.empty() && key.first == dht::InfoHash(deviceId)) {
+                    erased = true;
+                    it = infos_.erase(it);
                 }
             }
-            if (deviceId.empty()) {
-                dht::ThreadPool::io().run([nrs = std::make_shared<decltype(nonReadySockets_)>(
-                                               std::move(nonReadySockets_))] { nrs->clear(); });
-            } else {
-                nonReadySockets_.erase(deviceId);
-            }
+            if (!erased)
+                ++it;
         }
-        {
-            std::lock_guard<std::mutex> lk(msocketsMutex_);
-            for (auto& listSocks : multiplexedSockets_) {
-                if (!deviceId.empty() && listSocks.first != deviceId)
-                    continue;
-                for (auto& mxSock : listSocks.second) {
-                    if (mxSock.second)
-                        mxSock.second->shutdown();
-                }
-            }
-            if (deviceId.empty()) {
-                dht::ThreadPool::io().run([ms = std::make_shared<decltype(multiplexedSockets_)>(
-                                               std::move(multiplexedSockets_))] { ms->clear(); });
-            } else {
-                multiplexedSockets_.erase(deviceId);
-            }
+        if (deviceId.empty()) {
+            dht::ThreadPool::io().run([infos = std::make_shared<decltype(infos_)>(
+                                           std::move(infos_))] { infos->clear(); });
         }
     }
     void shutdown()
@@ -104,21 +97,24 @@ public:
             std::lock_guard<std::mutex> lk(connectCbsMtx_);
             pendingCbs_.clear();
         }
-        for (auto& connection : connectionsInfos_) {
-            for (auto& info : connection.second) {
-                if (info.second.ice_) {
-                    info.second.ice_->cancelOperations();
-                    info.second.ice_->stop();
+        {
+            std::lock_guard<std::mutex> lk(infosMtx_);
+            for (auto& [key, connInfo] : infos_) {
+                if (!connInfo)
+                    continue;
+                if (connInfo->ice_) {
+                    connInfo->ice_->cancelOperations();
+                    connInfo->ice_->stop();
                 }
-                info.second.responseCv_.notify_all();
+                connInfo->responseCv_.notify_all();
             }
+            // This is called when the account is only disabled.
+            // Move this on the thread pool because each
+            // IceTransport takes 500ms to delete, and it's sequential
+            // So, it can increase quickly the time to unregister an account
+            dht::ThreadPool::io().run(
+                [co = std::make_shared<decltype(infos_)>(std::move(infos_))] { co->clear(); });
         }
-        // This is called when the account is only disabled.
-        // Move this on the thread pool because each
-        // IceTransport takes 500ms to delete, and it's sequential
-        // So, it can increase quickly the time to unregister an account
-        dht::ThreadPool::io().run([co = std::make_shared<decltype(connectionsInfos_)>(
-                                       std::move(connectionsInfos_))] { co->clear(); });
         removeUnusedConnections();
     }
 
@@ -148,9 +144,7 @@ public:
     void onDhtPeerRequest(const PeerConnectionRequest& req,
                           const std::shared_ptr<dht::crypto::Certificate>& cert);
 
-    void addNewMultiplexedSocket(const std::string& deviceId,
-                                 const dht::Value::Id& vid,
-                                 std::unique_ptr<TlsSocketEndpoint>&& tlsSocket);
+    void addNewMultiplexedSocket(const std::string& deviceId, const dht::Value::Id& vid);
     void onPeerResponse(const PeerConnectionRequest& req);
     void onDhtConnected(const std::string& deviceId);
 
@@ -164,20 +158,19 @@ public:
 
     JamiAccount& account;
 
+    std::mutex infosMtx_ {};
     // Note: Someone can ask multiple sockets, so to avoid any race condition,
     // each device can have multiple multiplexed sockets.
-    std::map<std::string /* device id */, std::map<dht::Value::Id /* uid */, ConnectionInfo>>
-        connectionsInfos_ {};
-    // Used to store currently non ready TLS Socket
-    std::mutex nonReadySocketsMutex_ {};
-    std::map<std::string /* device id */,
-             std::map<dht::Value::Id /* uid */, std::unique_ptr<TlsSocketEndpoint>>>
-        nonReadySockets_ {};
-    std::mutex msocketsMutex_ {};
-    // Note: Multiplexed sockets is also stored in ChannelSockets, so has to be shared_ptr
-    std::map<std::string /* device id */,
-             std::map<dht::Value::Id /* uid */, std::shared_ptr<MultiplexedSocket>>>
-        multiplexedSockets_ {};
+    std::map<ConnectionKey, std::shared_ptr<ConnectionInfo>> infos_ {};
+
+    std::shared_ptr<ConnectionInfo> getInfo(const dht::InfoHash& deviceId, const dht::Value::Id& id)
+    {
+        std::lock_guard<std::mutex> lk(infosMtx_);
+        auto it = infos_.find({deviceId, id});
+        if (it == infos_.end())
+            return {};
+        return it->second;
+    }
 
     // key: Stored certificate PublicKey id (normaly it's the DeviceId)
     // value: pair of shared_ptr<Certificate> and associated RingId
@@ -216,16 +209,14 @@ void
 ConnectionManager::Impl::connectDeviceStartIce(const std::string& deviceId,
                                                const dht::Value::Id& vid)
 {
-    auto tit = connectionsInfos_.find(deviceId);
-    if (tit == connectionsInfos_.end())
-        return;
-    auto it = tit->second.find(vid);
-    if (it == tit->second.end())
+    auto info = getInfo(dht::InfoHash(deviceId), vid);
+    if (!info) {
         return;
+    }
 
     std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
-    std::unique_lock<std::mutex> lk {it->second.mutex_};
-    auto& ice = it->second.ice_;
+    std::unique_lock<std::mutex> lk(info->mutex_);
+    auto& ice = info->ice_;
 
     auto onError = [&]() {
         ice.reset();
@@ -271,16 +262,16 @@ ConnectionManager::Impl::connectDeviceStartIce(const std::string& deviceId,
     // Wait for call to onResponse() operated by DHT
     if (isDestroying_)
         return; // This avoid to wait new negotiation when destroying
-    it->second.responseCv_.wait_for(lk, DHT_MSG_TIMEOUT);
+    info->responseCv_.wait_for(lk, DHT_MSG_TIMEOUT);
     if (isDestroying_)
         return; // The destructor can wake a pending wait here.
-    if (!it->second.responseReceived_) {
+    if (!info->responseReceived_) {
         JAMI_ERR("no response from DHT to E2E request.");
         onError();
         return;
     }
 
-    auto& response = it->second.response_;
+    auto& response = info->response_;
     if (!ice)
         return;
     auto sdp = IceTransport::parse_SDP(response.ice_msg, *ice);
@@ -300,16 +291,13 @@ ConnectionManager::Impl::connectDeviceOnNegoDone(
     const dht::Value::Id& vid,
     const std::shared_ptr<dht::crypto::Certificate>& cert)
 {
-    auto tit = connectionsInfos_.find(deviceId);
-    if (tit == connectionsInfos_.end())
-        return;
-    auto it = tit->second.find(vid);
-    if (it == tit->second.end())
+    auto info = getInfo(dht::InfoHash(deviceId), vid);
+    if (!info)
         return;
 
     std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
-    std::unique_lock<std::mutex> lk {it->second.mutex_};
-    auto& ice = it->second.ice_;
+    std::unique_lock<std::mutex> lk {info->mutex_};
+    auto& ice = info->ice_;
 
     auto onError = [&]() {
         std::lock_guard<std::mutex> lk(connectCbsMtx_);
@@ -328,58 +316,44 @@ ConnectionManager::Impl::connectDeviceOnNegoDone(
     }
 
     // Build socket
-    std::lock_guard<std::mutex> lknrs(nonReadySocketsMutex_);
     auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
                                                             std::move(ice)),
                                                         true);
 
     // Negotiate a TLS session
     JAMI_DBG() << account << "Start TLS session";
-    auto tlsSocket = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
-                                                         account.identity(),
-                                                         account.dhParams(),
-                                                         *cert);
-
-    auto& nonReadyIt = nonReadySockets_[deviceId][vid];
-    nonReadyIt = std::move(tlsSocket);
-    nonReadyIt->setOnReady([w = weak(),
-                            deviceId = std::move(deviceId),
-                            vid = std::move(vid),
-                            name = std::move(name)](bool ok) {
-        auto sthis = w.lock();
-        if (!sthis)
-            return;
-        auto mSockIt = sthis->multiplexedSockets_[deviceId];
-        if (mSockIt.find(vid) != mSockIt.end())
-            return;
-        if (!ok) {
-            JAMI_ERR() << "TLS connection failure for peer " << deviceId;
-            std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
-            std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
-            auto cbIt = sthis->pendingCbs_.find(cbId);
-            if (cbIt != sthis->pendingCbs_.end()) {
-                if (cbIt->second)
-                    cbIt->second(nullptr);
-                sthis->pendingCbs_.erase(cbIt);
-            }
-        } else {
-            // The socket is ready, store it in multiplexedSockets_
-            std::lock_guard<std::mutex> lkmSockets(sthis->msocketsMutex_);
-            std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_);
-            auto nonReadyIt = sthis->nonReadySockets_.find(deviceId);
-            if (nonReadyIt != sthis->nonReadySockets_.end()) {
-                sthis->addNewMultiplexedSocket(deviceId, vid, std::move(nonReadyIt->second[vid]));
-                nonReadyIt->second.erase(vid);
-                if (nonReadyIt->second.empty()) {
-                    sthis->nonReadySockets_.erase(nonReadyIt);
+    info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
+                                                     account.identity(),
+                                                     account.dhParams(),
+                                                     *cert);
+
+    info->tls_->setOnReady(
+        [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
+            bool ok) {
+            auto sthis = w.lock();
+            if (!sthis)
+                return;
+            auto info = sthis->getInfo(dht::InfoHash(deviceId), vid);
+            if (!info)
+                return;
+            if (!ok) {
+                JAMI_ERR() << "TLS connection failure for peer " << deviceId;
+                std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
+                std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
+                auto cbIt = sthis->pendingCbs_.find(cbId);
+                if (cbIt != sthis->pendingCbs_.end()) {
+                    if (cbIt->second)
+                        cbIt->second(nullptr);
+                    sthis->pendingCbs_.erase(cbIt);
                 }
+            } else {
+                // The socket is ready, store it
+                sthis->addNewMultiplexedSocket(deviceId, vid);
+                // Finally, open the channel
+                if (info->socket_)
+                    sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
             }
-            // Finally, open the channel
-            auto mxSockIt = sthis->multiplexedSockets_.at(deviceId);
-            if (!mxSockIt.empty())
-                sthis->sendChannelRequest(mxSockIt.rbegin()->second, name, deviceId, vid);
-        }
-    });
+        });
 }
 
 void
@@ -425,16 +399,25 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
                     }
                 }
 
+                std::shared_ptr<MultiplexedSocket> sock;
                 {
                     // Test if a socket already exists for this device
-                    std::lock_guard<std::mutex> lk(sthis->msocketsMutex_);
-                    auto it = sthis->multiplexedSockets_.find(deviceId);
-                    if (it != sthis->multiplexedSockets_.end() && !it->second.empty()) {
-                        JAMI_DBG("Peer already connected. Add a new channel");
-                        sthis->sendChannelRequest(it->second.rbegin()->second, name, deviceId, vid);
-                        return;
+                    std::lock_guard<std::mutex> lk(sthis->infosMtx_);
+                    auto it = std::find_if(sthis->infos_.begin(),
+                                           sthis->infos_.end(),
+                                           [deviceId](const auto& item) {
+                                               auto& [key, value] = item;
+                                               return key.first == dht::InfoHash(deviceId);
+                                           });
+                    if (it != sthis->infos_.end() && it->second) {
+                        sock = it->second->socket_;
                     }
                 }
+                if (sock) {
+                    JAMI_DBG("Peer already connected. Add a new channel");
+                    sthis->sendChannelRequest(sock, name, deviceId, vid);
+                    return;
+                }
                 // If no socket exists, we need to initiate an ICE connection.
                 auto& iceTransportFactory = Manager::instance().getIceTransportFactory();
                 auto ice_config = sthis->account.getIceOptions();
@@ -501,15 +484,19 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
                     });
                 };
 
-                auto& connectionInfo = sthis->connectionsInfos_[deviceId][vid];
-                std::unique_lock<std::mutex> lk {connectionInfo.mutex_};
-                connectionInfo.ice_ = iceTransportFactory
-                                          .createUTransport(sthis->account.getAccountID().c_str(),
-                                                            1,
-                                                            false,
-                                                            ice_config);
-
-                if (!connectionInfo.ice_) {
+                auto info = std::make_shared<ConnectionInfo>();
+                {
+                    std::lock_guard<std::mutex> lk(sthis->infosMtx_);
+                    sthis->infos_[{dht::InfoHash(deviceId), vid}] = info;
+                }
+                std::unique_lock<std::mutex> lk {info->mutex_};
+                info->ice_ = iceTransportFactory
+                                 .createUTransport(sthis->account.getAccountID().c_str(),
+                                                   1,
+                                                   false,
+                                                   ice_config);
+
+                if (!info->ice_) {
                     JAMI_ERR("Cannot initialize ICE session.");
                     std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
                     auto cbIt = sthis->pendingCbs_.find(cbId);
@@ -567,16 +554,14 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
 {
     auto device = req.from;
     JAMI_INFO() << account << " New response received from " << device.toString().c_str();
-    auto& infos = connectionsInfos_[device.toString().c_str()];
-    auto it = infos.find(req.id);
-    if (it == infos.end()) {
+    auto info = getInfo(device, req.id);
+    if (!info) {
         JAMI_WARN() << account << " respond received, but cannot find request";
         return;
     }
-    auto& connectionInfo = it->second;
-    connectionInfo.responseReceived_ = true;
-    connectionInfo.response_ = std::move(req);
-    connectionInfo.responseCv_.notify_one();
+    info->responseReceived_ = true;
+    info->response_ = std::move(req);
+    info->responseCv_.notify_one();
 }
 
 void
@@ -652,15 +637,12 @@ ConnectionManager::Impl::answerTo(IceTransport& ice,
 void
 ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
 {
-    auto tit = connectionsInfos_.find(req.from.toString());
-    if (tit == connectionsInfos_.end())
-        return;
-    auto it = tit->second.find(req.id);
-    if (it == tit->second.end())
+    auto info = getInfo(req.from, req.id);
+    if (!info)
         return;
 
-    std::unique_lock<std::mutex> lk {it->second.mutex_};
-    auto& ice = it->second.ice_;
+    std::unique_lock<std::mutex> lk {info->mutex_};
+    auto& ice = info->ice_;
     if (!ice) {
         JAMI_ERR("No ICE detected");
         if (connReadyCb_)
@@ -687,20 +669,16 @@ ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
 void
 ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
 {
-    auto deviceId = req.from.toString();
-    auto tit = connectionsInfos_.find(deviceId);
-    if (tit == connectionsInfos_.end())
-        return;
-    auto it = tit->second.find(req.id);
-    if (it == tit->second.end())
+    auto info = getInfo(req.from, req.id);
+    if (!info)
         return;
 
-    std::unique_lock<std::mutex> lk {it->second.mutex_};
-    auto& ice = it->second.ice_;
+    std::unique_lock<std::mutex> lk {info->mutex_};
+    auto& ice = info->ice_;
     if (!ice) {
         JAMI_ERR("No ICE detected");
         if (connReadyCb_)
-            connReadyCb_(deviceId, "", nullptr);
+            connReadyCb_(req.from.toString(), "", nullptr);
         return;
     }
 
@@ -710,14 +688,13 @@ ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
         answerTo(*ice, req.id, req.from);
 
     // Build socket
-    std::lock_guard<std::mutex> lknrs(nonReadySocketsMutex_);
     auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
                                                             std::move(ice)),
                                                         false);
 
     // init TLS session
     auto ph = req.from;
-    auto tlsSocket = std::make_unique<TlsSocketEndpoint>(
+    info->tls_ = std::make_unique<TlsSocketEndpoint>(
         std::move(endpoint),
         account.identity(),
         account.dhParams(),
@@ -729,34 +706,24 @@ ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
             return shared->validatePeerCertificate(cert, peer_h) && peer_h == ph;
         });
 
-    auto& nonReadyIt = nonReadySockets_[deviceId][req.id];
-    nonReadyIt = std::move(tlsSocket);
-    nonReadyIt->setOnReady([w = weak(), deviceId, vid = std::move(req.id)](bool ok) {
-        auto shared = w.lock();
-        if (!shared)
-            return;
-        if (shared->multiplexedSockets_[deviceId].find(vid)
-            != shared->multiplexedSockets_[deviceId].end())
-            return;
-        if (!ok) {
-            JAMI_ERR() << "TLS connection failure for peer " << deviceId;
-            if (shared->connReadyCb_)
-                shared->connReadyCb_(deviceId, "", nullptr);
-        } else {
-            // The socket is ready, store it in multiplexedSockets_
-            std::lock_guard<std::mutex> lk(shared->msocketsMutex_);
-            std::lock_guard<std::mutex> lknrs(shared->nonReadySocketsMutex_);
-            auto nonReadyIt = shared->nonReadySockets_.find(deviceId);
-            if (nonReadyIt != shared->nonReadySockets_.end()) {
-                JAMI_DBG("Connection to %s is ready", deviceId.c_str());
-                shared->addNewMultiplexedSocket(deviceId, vid, std::move(nonReadyIt->second[vid]));
-                nonReadyIt->second.erase(vid);
-                if (nonReadyIt->second.empty()) {
-                    shared->nonReadySockets_.erase(nonReadyIt);
-                }
+    info->tls_->setOnReady(
+        [w = weak(), deviceId = std::move(req.from), vid = std::move(req.id)](bool ok) {
+            auto shared = w.lock();
+            if (!shared)
+                return;
+            auto info = shared->getInfo(deviceId, vid);
+            if (!info)
+                return;
+            if (!ok) {
+                JAMI_ERR() << "TLS connection failure for peer " << deviceId;
+                if (shared->connReadyCb_)
+                    shared->connReadyCb_(deviceId.toString(), "", nullptr);
+            } else {
+                // The socket is ready, store it
+                JAMI_DBG("Connection to %s is ready", deviceId.to_c_str());
+                shared->addNewMultiplexedSocket(deviceId.toString(), vid);
             }
-        }
-    });
+        });
 }
 
 void
@@ -803,7 +770,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
             return;
         }
 
-        dht::ThreadPool::io().run([w = std::move(w), deviceId, req = std::move(req)] {
+        dht::ThreadPool::io().run([w = std::move(w), req = std::move(req)] {
             auto shared = w.lock();
             if (!shared)
                 return;
@@ -822,7 +789,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
             return;
         }
 
-        dht::ThreadPool::io().run([w = std::move(w), deviceId, req = std::move(req)] {
+        dht::ThreadPool::io().run([w = std::move(w), req = std::move(req)] {
             auto shared = w.lock();
             if (!shared)
                 return;
@@ -830,16 +797,18 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
         });
     };
 
-    // 1. Create a new Multiplexed Socket
-
     // Negotiate a new ICE socket
-    auto& connectionInfo = connectionsInfos_[deviceId][req.id];
-    std::unique_lock<std::mutex> lk {connectionInfo.mutex_};
-    connectionInfo.ice_ = iceTransportFactory.createUTransport(account.getAccountID().c_str(),
-                                                               1,
-                                                               true,
-                                                               ice_config);
-    if (not connectionInfo.ice_) {
+    auto info = std::make_shared<ConnectionInfo>();
+    {
+        std::lock_guard<std::mutex> lk(infosMtx_);
+        infos_[{req.from, req.id}] = info;
+    }
+    std::unique_lock<std::mutex> lk {info->mutex_};
+    info->ice_ = iceTransportFactory.createUTransport(account.getAccountID().c_str(),
+                                                      1,
+                                                      true,
+                                                      ice_config);
+    if (not info->ice_) {
         JAMI_ERR("Cannot initialize ICE session.");
         if (connReadyCb_)
             connReadyCb_(deviceId, "", nullptr);
@@ -849,25 +818,26 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
 
 void
 ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId,
-                                                 const dht::Value::Id& vid,
-                                                 std::unique_ptr<TlsSocketEndpoint>&& tlsSocket)
+                                                 const dht::Value::Id& vid)
 {
-    // mSocketsMutex_ MUST be locked
-    auto mSock = std::make_shared<MultiplexedSocket>(deviceId, std::move(tlsSocket));
-    mSock->setOnReady(
+    auto info = getInfo(dht::InfoHash(deviceId), vid);
+    if (!info)
+        return;
+    info->socket_ = std::make_shared<MultiplexedSocket>(deviceId, std::move(info->tls_));
+    info->socket_->setOnReady(
         [w = weak()](const std::string& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
             if (auto sthis = w.lock())
                 if (sthis->connReadyCb_)
                     sthis->connReadyCb_(deviceId, socket->name(), socket);
         });
-    mSock->setOnRequest(
+    info->socket_->setOnRequest(
         [w = weak()](const std::string& deviceId, const uint16_t&, const std::string& name) {
             if (auto sthis = w.lock())
                 if (sthis->channelReqCb_)
                     return sthis->channelReqCb_(deviceId, name);
             return false;
         });
-    mSock->onShutdown([w = weak(), deviceId, vid]() {
+    info->socket_->onShutdown([w = weak(), deviceId, vid]() {
         auto sthis = w.lock();
         if (!sthis)
             return;
@@ -890,45 +860,20 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId,
             auto sthis = w.lock();
             if (!sthis)
                 return;
-            // Delete the socket
-            std::lock_guard<std::mutex> lk(sthis->msocketsMutex_);
-            auto mxSockIt = sthis->multiplexedSockets_.find(deviceId);
-            if (mxSockIt != sthis->multiplexedSockets_.end()) {
-                auto vidIt = mxSockIt->second.find(vid);
-                if (vidIt != mxSockIt->second.end() && vidIt->second) {
-                    vidIt->second->shutdown();
-                }
-            }
+            auto info = sthis->getInfo(dht::InfoHash(deviceId), vid);
+            if (!info)
+                return;
 
-            auto connIt = sthis->connectionsInfos_.find(deviceId);
-            if (connIt != sthis->connectionsInfos_.end()) {
-                auto it = connIt->second.find(vid);
-                if (it != connIt->second.end()) {
-                    if (it->second.ice_)
-                        it->second.ice_->cancelOperations();
-                    connIt->second.erase(vid);
-                    if (connIt->second.empty())
-                        sthis->connectionsInfos_.erase(deviceId);
-                }
-                // This will close the TLS Session
-                std::lock_guard<std::mutex> lk(sthis->nonReadySocketsMutex_);
-                auto nonReadyIt = sthis->nonReadySockets_.find(deviceId);
-                if (nonReadyIt != sthis->nonReadySockets_.end()) {
-                    nonReadyIt->second.erase(vid);
-                    if (nonReadyIt->second.empty()) {
-                        sthis->nonReadySockets_.erase(nonReadyIt);
-                    }
-                }
-            }
+            if (info->socket_)
+                info->socket_->shutdown();
 
-            if (mxSockIt != sthis->multiplexedSockets_.end()) {
-                mxSockIt->second.erase(vid);
-                if (mxSockIt->second.empty())
-                    sthis->multiplexedSockets_.erase(mxSockIt);
-            }
+            if (info && info->ice_)
+                info->ice_->cancelOperations();
+
+            std::lock_guard<std::mutex> lk(sthis->infosMtx_);
+            sthis->infos_.erase({dht::InfoHash(deviceId), vid});
         });
     });
-    multiplexedSockets_[deviceId][vid] = std::move(mSock);
 }
 
 bool
@@ -979,20 +924,30 @@ ConnectionManager::closeConnectionsWith(const std::string& deviceId)
             }
         }
     }
-    auto it = pimpl_->connectionsInfos_.find(deviceId);
-    if (it != pimpl_->connectionsInfos_.end()) {
-        for (auto& info : it->second) {
-            if (info.second.ice_) {
-                info.second.ice_->cancelOperations();
-                info.second.ice_->stop();
-            }
-            info.second.responseCv_.notify_all();
-            if (info.second.ice_) {
-                std::unique_lock<std::mutex> lk {info.second.mutex_};
-                info.second.ice_.reset();
+    std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
+    {
+        std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
+        for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
+            auto const& [key, value] = *iter;
+            if (key.first == dht::InfoHash(deviceId)) {
+                connInfos.emplace_back(value);
+                iter = pimpl_->infos_.erase(iter);
+            } else {
+                iter++;
             }
         }
     }
+    for (auto& info : connInfos) {
+        if (info->ice_) {
+            info->ice_->cancelOperations();
+            info->ice_->stop();
+        }
+        info->responseCv_.notify_all();
+        if (info->ice_) {
+            std::unique_lock<std::mutex> lk {info->mutex_};
+            info->ice_.reset();
+        }
+    }
     // This will close the TLS Session
     pimpl_->removeUnusedConnections(deviceId);
 }
-- 
GitLab