From d1fac179af4c71480030504ecc4cddddf14995d4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Fri, 25 Nov 2022 14:42:07 -0500
Subject: [PATCH] connectionmanager: replace wait_for by async task

Change-Id: Ic5597f0713b9abbc6cfa903f13f9811b9a6f03b4
---
 src/connectivity/connectionmanager.cpp        | 92 ++++++++++++++-----
 .../connectionManager/connectionManager.cpp   |  1 -
 2 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/src/connectivity/connectionmanager.cpp b/src/connectivity/connectionmanager.cpp
index 1d15b21b4f..67aaef92b6 100644
--- a/src/connectivity/connectionmanager.cpp
+++ b/src/connectivity/connectionmanager.cpp
@@ -23,6 +23,7 @@
 #include "peer_connection.h"
 #include "logger.h"
 
+#include <asio.hpp>
 #include <opendht/crypto.h>
 #include <opendht/thread_pool.h>
 #include <opendht/value.h>
@@ -49,7 +50,6 @@ struct ConnectionInfo
     }
 
     std::mutex mutex_ {};
-    std::condition_variable responseCv_ {};
     bool responseReceived_ {false};
     PeerConnectionRequest response_ {};
     std::unique_ptr<IceTransport> ice_ {nullptr};
@@ -57,6 +57,10 @@ struct ConnectionInfo
     std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
     std::shared_ptr<MultiplexedSocket> socket_ {};
     std::set<CallbackId> cbIds_ {};
+
+    std::function<void(bool)> onConnected_;
+    std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
+
 };
 
 class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
@@ -80,7 +84,8 @@ public:
                     info->socket_->shutdown();
                 if (info->ice_)
                     info->ice_->cancelOperations();
-                info->responseCv_.notify_all();
+                if (info->waitForAnswer_)
+                    info->waitForAnswer_->cancel();
                 erased = true;
                 it = infos_.erase(it);
             }
@@ -114,9 +119,11 @@ public:
         dht::Value::Id vid;
     };
 
-    bool connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
+    void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
                                const dht::Value::Id& vid,
-                               const std::string& connType);
+                               const std::string& connType,
+                               std::function<void(bool)> onConnected);
+    void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
     bool connectDeviceOnNegoDone(const DeviceId& deviceId,
                                  const std::string& name,
                                  const dht::Value::Id& vid,
@@ -276,23 +283,27 @@ public:
     std::atomic_bool isDestroying_ {false};
 };
 
-bool
+void
 ConnectionManager::Impl::connectDeviceStartIce(
     const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
     const dht::Value::Id& vid,
-    const std::string& connType)
+    const std::string& connType,
+    std::function<void(bool)> onConnected)
 {
     auto deviceId = devicePk->getLongId();
     auto info = getInfo(deviceId, vid);
-    if (!info)
-        return false;
+    if (!info) {
+        onConnected(false);
+        return;
+    }
 
     std::unique_lock<std::mutex> lk(info->mutex_);
     auto& ice = info->ice_;
 
     if (!ice) {
         JAMI_ERR("No ICE detected");
-        return false;
+        onConnected(false);
+        return;
     }
 
     auto iceAttributes = ice->getLocalAttributes();
@@ -327,28 +338,53 @@ ConnectionManager::Impl::connectDeviceStartIce(
                        (ok ? "ok" : "failed"));
         });
     // Wait for call to onResponse() operated by DHT
-    if (isDestroying_)
-        return true; // This avoid to wait new negotiation when destroying
-    info->responseCv_.wait_for(lk, DHT_MSG_TIMEOUT);
-    if (isDestroying_)
-        return true; // The destructor can wake a pending wait here.
+    if (isDestroying_) {
+        onConnected(true); // This avoid to wait new negotiation when destroying
+        return;
+    }
+
+    info->onConnected_ = std::move(onConnected);
+    info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext(), std::chrono::steady_clock::now() + DHT_MSG_TIMEOUT);
+    info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
+}
+
+void
+ConnectionManager::Impl::onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid)
+{
+    if (ec == asio::error::operation_aborted)
+        return;
+    auto info = getInfo(deviceId, vid);
+    if (!info)
+        return;
+
+    std::unique_lock<std::mutex> lk(info->mutex_);
+    auto& ice = info->ice_;
+    if (isDestroying_) {
+        info->onConnected_(true); // The destructor can wake a pending wait here.
+        return;
+    }
     if (!info->responseReceived_) {
         JAMI_ERR("no response from DHT to E2E request.");
-        return false;
+        info->onConnected_(false);
+        return;
     }
 
-    if (!ice)
-        return false;
+    if (!info->ice_) {
+        info->onConnected_(false);
+        return;
+    }
 
     auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
 
     if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
         JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str());
-        return false;
+        info->onConnected_(false);
+        return;
     }
-    return true;
+    info->onConnected_(true);
 }
 
+
 bool
 ConnectionManager::Impl::connectDeviceOnNegoDone(
     const DeviceId& deviceId,
@@ -553,8 +589,15 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif
                                            eraseInfo,
                                            connType] {
                     auto sthis = w.lock();
-                    if (!sthis || !sthis->connectDeviceStartIce(devicePk, vid, connType))
+                    if (!sthis) {
                         runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
+                        return;
+                    }
+                    sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
+                        if (!ok) {
+                            runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
+                        }
+                    });
                 });
             };
             ice_config.onNegoDone = [w,
@@ -657,7 +700,8 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
         std::lock_guard<std::mutex> lk {info->mutex_};
         info->responseReceived_ = true;
         info->response_ = std::move(req);
-        info->responseCv_.notify_one();
+        info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
+        info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, device, req.id));
     } else {
         JAMI_WARN() << account << " respond received, but cannot find request";
     }
@@ -666,8 +710,9 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
 void
 ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
 {
-    if (!account.dht())
+    if (!account.dht()) {
         return;
+    }
     account.dht()->listen<PeerConnectionRequest>(
         dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
         [w = weak()](PeerConnectionRequest&& req) {
@@ -1098,7 +1143,8 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri)
             info->ice_->cancelOperations();
         if (info->socket_)
             info->socket_->shutdown();
-        info->responseCv_.notify_all();
+        if (info->waitForAnswer_)
+            info->waitForAnswer_->cancel();
         if (info->ice_) {
             std::unique_lock<std::mutex> lk {info->mutex_};
             dht::ThreadPool::io().run(
diff --git a/test/unitTest/connectionManager/connectionManager.cpp b/test/unitTest/connectionManager/connectionManager.cpp
index 2b4728e2c7..a8d1983d09 100644
--- a/test/unitTest/connectionManager/connectionManager.cpp
+++ b/test/unitTest/connectionManager/connectionManager.cpp
@@ -750,7 +750,6 @@ ConnectionManagerTest::testShutdownCallbacks()
     auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
     auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId()));
     auto aliceUri = aliceAccount->getUsername();
-    auto aliceDeviceId = DeviceId(std::string(aliceAccount->currentDeviceId()));
 
     bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
     aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
-- 
GitLab