From 2afc2f0392bbd8c202e0c9f20b11718b0a83e7cd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Thu, 15 Apr 2021 13:20:46 -0400
Subject: [PATCH] jamiaccount: do not cancel connecting request on sip timeout

During a connectivityChange(), a SIP channel can be used to send
messages but will close a few seconds after (due to a timeout or keep
alive). However, every message failing was closing connections
(including the current outgoing requests) and re-asking a socket.
Causing multiple connection request to be sent (because the request
was erased just after).
Also, to avoid to ask for multiple SIP channels on the same socket
this patch introduces ConnectionManager::isConnecting().

Note: erasing the request was causing a lot of "no response from DHT"
in the logs because the request was destroyed directly.

Change-Id: Ie06f377743075d4a6cb6a4bb6c940bd7b23f1603
GitLab: #512
GitLab: #421
---
 src/jamidht/connectionmanager.cpp             |   7 ++
 src/jamidht/connectionmanager.h               |   9 ++
 src/jamidht/jamiaccount.cpp                   | 104 +++++++++---------
 src/jamidht/jamiaccount.h                     |  14 ++-
 .../connectionManager/connectionManager.cpp   |  48 ++++++++
 5 files changed, 127 insertions(+), 55 deletions(-)

diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp
index 0abc787218..72932db684 100644
--- a/src/jamidht/connectionmanager.cpp
+++ b/src/jamidht/connectionmanager.cpp
@@ -945,6 +945,13 @@ ConnectionManager::connectDevice(const DeviceId& deviceId,
     pimpl_->connectDevice(deviceId, name, std::move(cb));
 }
 
+bool
+ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
+{
+    auto pending = pimpl_->getPendingCallbacks(deviceId);
+    return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; }) != pending.end();
+}
+
 void
 ConnectionManager::closeConnectionsWith(const DeviceId& deviceId)
 {
diff --git a/src/jamidht/connectionmanager.h b/src/jamidht/connectionmanager.h
index fe0fa33c5d..8549e2016f 100644
--- a/src/jamidht/connectionmanager.h
+++ b/src/jamidht/connectionmanager.h
@@ -87,6 +87,15 @@ public:
      */
     void connectDevice(const DeviceId& deviceId, const std::string& name, ConnectCallback cb);
 
+    /**
+     * Check if we are already connecting to a device with a specific name
+     * @param deviceId      Remote device
+     * @param name          Name of the channel
+     * @return if connecting
+     * @note isConnecting is not true just after connectDevice() as connectDevice is full async
+     */
+    bool isConnecting(const DeviceId& deviceId, const std::string& name) const;
+
     /**
      * Close all connections with a current device
      * @param deviceId      Remote device
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index cbf1c3a346..e47989b540 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -124,6 +124,7 @@ struct TextMessageCtx
     DeviceId deviceId;
     uint64_t id;
     bool retryOnTimeout;
+    std::shared_ptr<ChannelSocket> channel;
     std::shared_ptr<PendingConfirmation> confirmation;
 };
 
@@ -3482,6 +3483,7 @@ JamiAccount::sendTextMessage(const std::string& to,
             continue;
         }
         auto& conn = value.back();
+        auto& channel = conn.channel;
 
         // Set input token into callback
         std::unique_ptr<TextMessageCtx> ctx {std::make_unique<TextMessageCtx>()};
@@ -3490,6 +3492,7 @@ JamiAccount::sendTextMessage(const std::string& to,
         ctx->deviceId = key.second;
         ctx->id = token;
         ctx->retryOnTimeout = retryOnTimeout;
+        ctx->channel = channel;
         ctx->confirmation = confirm;
 
         try {
@@ -3508,15 +3511,7 @@ JamiAccount::sendTextMessage(const std::string& to,
                         acc->messageEngine_.onMessageSent(c->to, c->id, true);
                     } else {
                         JAMI_WARN("Timeout when send a message, close current connection");
-                        {
-                            std::unique_lock<std::mutex> lk(acc->sipConnsMtx_);
-                            acc->sipConns_.erase(std::make_pair(c->to, c->deviceId));
-                        }
-                        {
-                            std::lock_guard<std::mutex> lk(acc->connManagerMtx_);
-                            if (acc->connectionManager_)
-                                acc->connectionManager_->closeConnectionsWith(c->deviceId);
-                        }
+                        acc->shutdownSIPConnection(c->channel, c->to, c->deviceId);
                         // This MUST be done after closing the connection to avoid race condition
                         // with messageEngine_
                         acc->messageEngine_.onMessageSent(c->to, c->id, false);
@@ -3536,8 +3531,9 @@ JamiAccount::sendTextMessage(const std::string& to,
         } catch (const std::runtime_error& ex) {
             JAMI_WARN("%s", ex.what());
             messageEngine_.onMessageSent(to, token, false);
+            ++it;
             // Remove connection in incorrect state
-            it = sipConns_.erase(it);
+            shutdownSIPConnection(channel, to, key.second);
             continue;
         }
 
@@ -4612,41 +4608,38 @@ JamiAccount::requestSIPConnection(const std::string& peerId, const DeviceId& dev
                  deviceId.to_c_str());
         return;
     }
-    sipConns_[id] = {};
     // If not present, create it
-    JAMI_INFO("[Account %s] Ask %s for a new SIP channel",
-              getAccountID().c_str(),
-              deviceId.to_c_str());
     std::lock_guard<std::mutex> lkCM(connManagerMtx_);
     if (!connectionManager_)
         return;
+    // Note, Even if we send 50 "sip" request, the connectionManager_ will only use one socket.
+    // however, this will still ask for multiple channels, so only ask
+    // if there is no pending request
+    if (connectionManager_->isConnecting(deviceId, "sip")) {
+        JAMI_INFO("[Account %s] Already connecting to %s",
+              getAccountID().c_str(),
+              deviceId.to_c_str());
+        return;
+    }
+    JAMI_INFO("[Account %s] Ask %s for a new SIP channel",
+              getAccountID().c_str(),
+              deviceId.to_c_str());
     connectionManager_->connectDevice(deviceId,
                                       "sip",
                                       [w = weak(), id](std::shared_ptr<ChannelSocket> socket,
                                                        const DeviceId&) {
-                                          auto shared = w.lock();
-                                          if (!shared)
-                                              return;
-                                          // NOTE: No need to cache Connection there.
-                                          // OnConnectionReady is called before this callback, so
-                                          // the socket is already cached if succeed. We just need
-                                          // to remove the pending request.
-                                          if (!socket) {
-                                              // If this is triggered, this means that the
-                                              // connectDevice didn't get any response from the DHT.
-                                              // Stop searching pending call.
-                                              shared->callConnectionClosed(id.second, true);
-                                              shared->forEachPendingCall(id.second,
-                                                                         [](const auto& pc) {
-                                                                             pc->onFailure();
-                                                                         });
-                                          }
-
-                                          std::lock_guard<std::mutex> lk(shared->sipConnsMtx_);
-                                          auto it = shared->sipConns_.find(id);
-                                          if (it != shared->sipConns_.end() && it->second.empty()) {
-                                              shared->sipConns_.erase(it);
-                                          }
+                                            if (socket) return;
+                                            auto shared = w.lock();
+                                            if (!shared)
+                                                return;
+                                            // If this is triggered, this means that the
+                                            // connectDevice didn't get any response from the DHT.
+                                            // Stop searching pending call.
+                                            shared->callConnectionClosed(id.second, true);
+                                            shared->forEachPendingCall(id.second,
+                                                                        [](const auto& pc) {
+                                                                            pc->onFailure();
+                                                                        });
                                       });
 }
 
@@ -4816,22 +4809,7 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket,
         auto shared = w.lock();
         if (!shared)
             return;
-        {
-            std::lock_guard<std::mutex> lk(shared->sipConnsMtx_);
-            auto it = shared->sipConns_.find(key);
-            if (it == shared->sipConns_.end())
-                return;
-            auto& connections = it->second;
-            auto conn = connections.begin();
-            while (conn != connections.end()) {
-                if (conn->channel == socket)
-                    conn = connections.erase(conn);
-                else
-                    conn++;
-            }
-            if (connections.empty())
-                shared->sipConns_.erase(it);
-        }
+        shared->shutdownSIPConnection(socket, key.first, key.second);
         // The connection can be closed during the SIP initialization, so
         // if this happens, the request should be re-sent to ask for a new
         // SIP channel to make the call pass through
@@ -4873,6 +4851,26 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket,
     });
 }
 
+void
+JamiAccount::shutdownSIPConnection(const std::shared_ptr<ChannelSocket>& channel, const std::string& peerId, const DeviceId& deviceId)
+{
+    std::unique_lock<std::mutex> lk(sipConnsMtx_);
+    SipConnectionKey key(peerId, deviceId);
+    auto it = sipConns_.find(key);
+    if (it != sipConns_.end()) {
+        auto& conns = it->second;
+        conns.erase(std::remove_if(conns.begin(), conns.end(),
+            [&](auto v) {
+                return v.channel == channel;
+            }), conns.end());
+        if (conns.empty())
+            sipConns_.erase(it);
+    }
+    lk.unlock();
+    // Shutdown after removal to let the callbacks do stuff if needed
+    if (channel) channel->shutdown();
+}
+
 std::string_view
 JamiAccount::currentDeviceId() const
 {
diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h
index 434e43f804..14dab42370 100644
--- a/src/jamidht/jamiaccount.h
+++ b/src/jamidht/jamiaccount.h
@@ -885,13 +885,23 @@ private:
     void requestSIPConnection(const std::string& peerId, const DeviceId& deviceId);
     /**
      * Store a new SIP connection into sipConnections_
-     * @param socket    The new sip channel
+     * @param channel   The new sip channel
      * @param peerId    The contact who owns the device
      * @param deviceId  Device linked to that transport
      */
-    void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket,
+    void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& channel,
                             const std::string& peerId,
                             const DeviceId& deviceId);
+    /**
+     * Shutdown a SIP connection
+     * @param channel   The channel to close
+     * @param peerId    The contact who owns the device
+     * @param deviceId  Device linked to that transport
+     */
+    void shutdownSIPConnection(const std::shared_ptr<ChannelSocket>& channel,
+                               const std::string& peerId,
+                               const DeviceId& deviceId);
+
     /**
      * Store a new Sync connection
      * @param socket    The new sync channel
diff --git a/test/unitTest/connectionManager/connectionManager.cpp b/test/unitTest/connectionManager/connectionManager.cpp
index b1fc2e3648..69a939c924 100644
--- a/test/unitTest/connectionManager/connectionManager.cpp
+++ b/test/unitTest/connectionManager/connectionManager.cpp
@@ -68,6 +68,7 @@ private:
     void testShutdownCallbacks();
     void testFloodSocket();
     void testDestroyWhileSending();
+    void testIsConnecting();
 
     CPPUNIT_TEST_SUITE(ConnectionManagerTest);
     CPPUNIT_TEST(testConnectDevice);
@@ -84,6 +85,7 @@ private:
     CPPUNIT_TEST(testShutdownCallbacks);
     CPPUNIT_TEST(testFloodSocket);
     CPPUNIT_TEST(testDestroyWhileSending);
+    CPPUNIT_TEST(testIsConnecting);
     CPPUNIT_TEST_SUITE_END();
 };
 
@@ -1000,6 +1002,52 @@ ConnectionManagerTest::testDestroyWhileSending()
     // No need to wait, immediately destroy, no segfault must occurs
 }
 
+void
+ConnectionManagerTest::testIsConnecting()
+{
+    auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
+    auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
+    auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId()));
+
+    bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
+    aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
+
+    std::mutex mtx;
+    std::unique_lock<std::mutex> lk {mtx};
+    std::condition_variable cv;
+    bool successfullyConnected = false, successfullyReceive = false;
+
+    bobAccount->connectionManager().onChannelRequest(
+        [&](const DeviceId&, const std::string&) {
+            successfullyReceive = true;
+            cv.notify_one();
+            std::this_thread::sleep_for(std::chrono::seconds(2));
+            return true;
+        });
+
+    CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip"));
+    aliceAccount->connectionManager()
+        .connectDevice(bobDeviceId,
+                       "sip",
+                       [&cv, &successfullyConnected](std::shared_ptr<ChannelSocket> socket,
+                                                     const DeviceId&) {
+                           if (socket) {
+                               successfullyConnected = true;
+                           }
+                           cv.notify_one();
+                       });
+    // connectDevice is full async, so isConnecting will be true after a few ms.
+    CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] {
+        return successfullyReceive;
+    }));
+    CPPUNIT_ASSERT(aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip"));
+    CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
+        return successfullyConnected;
+    }));
+    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Just to wait for the callback to finish
+    CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip"));
+}
+
 } // namespace test
 } // namespace jami
 
-- 
GitLab