From 8b7afc852a79141729a40a349837d1a697d240d0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Thu, 7 Dec 2023 10:51:19 -0500
Subject: [PATCH] swarm_manager: freely connect to nodes with a p2p link

setKnownNodes will add new nodes that may be already connected
with a TCP link, but maintainBuckets may not choose those nodes,
delaying the bootstrap.
If we detect a new node that we're already connected with, we can
add it in the list of nodes we want to connect with.

Change-Id: I422a32495693e1e2d925a3af127a4a59903c1833
---
 contrib/src/dhtnet/package.json             |  2 +-
 contrib/src/dhtnet/rules.mak                |  2 +-
 src/jamidht/conversation.cpp                |  8 ++-
 src/jamidht/jamiaccount.cpp                 |  9 +++
 src/jamidht/jamiaccount.h                   | 10 ++-
 src/jamidht/swarm/routing_table.cpp         |  3 +-
 src/jamidht/swarm/swarm_manager.cpp         | 31 ++++++---
 src/jamidht/swarm/swarm_manager.h           | 11 +++-
 test/unitTest/conversation/conversation.cpp | 71 +++++++++++++++++++++
 9 files changed, 128 insertions(+), 19 deletions(-)

diff --git a/contrib/src/dhtnet/package.json b/contrib/src/dhtnet/package.json
index 56096192fc..11c3a5cf4b 100644
--- a/contrib/src/dhtnet/package.json
+++ b/contrib/src/dhtnet/package.json
@@ -1,6 +1,6 @@
 {
     "name": "dhtnet",
-    "version": "b1bcdecbac2a41de3941ef5a34faa6fbe4472535",
+    "version": "d0c92c70165cd2855d6b9688b1e3d490c52907ad",
     "url": "https://review.jami.net/plugins/gitiles/dhtnet/+archive/__VERSION__.tar.gz",
     "deps": [
         "opendht",
diff --git a/contrib/src/dhtnet/rules.mak b/contrib/src/dhtnet/rules.mak
index f242cc7f87..714ae7e069 100644
--- a/contrib/src/dhtnet/rules.mak
+++ b/contrib/src/dhtnet/rules.mak
@@ -1,5 +1,5 @@
 # DHTNET
-DHTNET_VERSION := b1bcdecbac2a41de3941ef5a34faa6fbe4472535
+DHTNET_VERSION := d0c92c70165cd2855d6b9688b1e3d490c52907ad
 DHTNET_URL := https://review.jami.net/plugins/gitiles/dhtnet/+archive/$(DHTNET_VERSION).tar.gz
 
 PKGS += dhtnet
diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp
index 07f08e4493..848ef84b60 100644
--- a/src/jamidht/conversation.cpp
+++ b/src/jamidht/conversation.cpp
@@ -165,7 +165,13 @@ public:
         if (auto shared = account_.lock()) {
             ioContext_ = Manager::instance().ioContext();
             fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
-            swarmManager_ = std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId()));
+            swarmManager_ = std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId()),
+            [account=account_](const DeviceId& deviceId) {
+                if (auto acc = account.lock()) {
+                    return acc->isConnectedWith(deviceId);
+                }
+                return false;
+            });
             swarmManager_->setMobility(shared->isMobile());
             accountId_ = shared->getAccountID();
             transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index 0479d1aa20..73557c281a 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -3679,6 +3679,15 @@ JamiAccount::requestSIPConnection(const std::string& peerId,
         connectionType);
 }
 
+bool
+JamiAccount::isConnectedWith(const DeviceId& deviceId) const
+{
+    std::lock_guard<std::mutex> lkCM(connManagerMtx_);
+    if (connectionManager_)
+        return connectionManager_->isConnected(deviceId);
+    return false;
+}
+
 void
 JamiAccount::sendProfile(const std::string& convId,
                          const std::string& peerUri,
diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h
index a0c31287f5..01e3a4f608 100644
--- a/src/jamidht/jamiaccount.h
+++ b/src/jamidht/jamiaccount.h
@@ -90,7 +90,7 @@ class ChanneledOutgoingTransfer;
 class SyncModule;
 struct TextMessageCtx;
 
-using SipConnectionKey = std::pair<std::string /* accountId */, DeviceId>;
+using SipConnectionKey = std::pair<std::string /* uri */, DeviceId>;
 
 /**
  * @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity.
@@ -593,6 +593,12 @@ public:
     {
         return *certStore_;
     }
+    /**
+     * Check if a Device is connected
+     * @param deviceId
+     * @return true if connected
+     */
+    bool isConnectedWith(const DeviceId& deviceId) const;
 
 private:
     NON_COPYABLE(JamiAccount);
@@ -783,7 +789,7 @@ private:
 
     std::set<std::shared_ptr<dht::http::Request>> requests_;
 
-    std::mutex sipConnsMtx_ {};
+    mutable std::mutex sipConnsMtx_ {};
     struct SipConnection
     {
         std::shared_ptr<SipTransport> transport;
diff --git a/src/jamidht/swarm/routing_table.cpp b/src/jamidht/swarm/routing_table.cpp
index 37b8d40ba0..a501adf46b 100644
--- a/src/jamidht/swarm/routing_table.cpp
+++ b/src/jamidht/swarm/routing_table.cpp
@@ -303,8 +303,7 @@ RoutingTable::addKnownNode(const NodeId& nodeId)
     if (bucket == buckets.end())
         return false;
 
-    bucket->addKnownNode(nodeId);
-    return true;
+    return bucket->addKnownNode(nodeId);
 }
 
 bool
diff --git a/src/jamidht/swarm/swarm_manager.cpp b/src/jamidht/swarm/swarm_manager.cpp
index b4708761f2..5c3c4d25f7 100644
--- a/src/jamidht/swarm/swarm_manager.cpp
+++ b/src/jamidht/swarm/swarm_manager.cpp
@@ -28,9 +28,10 @@ namespace jami {
 
 using namespace swarm_protocol;
 
-SwarmManager::SwarmManager(const NodeId& id)
+SwarmManager::SwarmManager(const NodeId& id, ToConnectCb&& toConnectCb)
     : id_(id)
     , rd(dht::crypto::getSeededRandomEngine<std::mt19937_64>())
+    , toConnectCb_(toConnectCb)
 {
     routing_table.setId(id);
 }
@@ -45,12 +46,24 @@ void
 SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
 {
     isShutdown_ = false;
+    std::vector<NodeId> newNodes;
     {
         std::lock_guard<std::mutex> lock(mutex);
-        for (const auto& NodeId : known_nodes)
-            addKnownNodes(std::move(NodeId));
+        for (const auto& nodeId : known_nodes) {
+            if (addKnownNode(nodeId)) {
+                newNodes.emplace_back(nodeId);
+            }
+        }
     }
-    maintainBuckets();
+    // If we detect a new node which already got a TCP link
+    // we can use it to speed-up the bootstrap (because opening
+    // a new channel will be easy)
+    std::set<NodeId> toConnect;
+    for (const auto& nodeId: newNodes) {
+        if (toConnectCb_ && toConnectCb_(nodeId))
+            toConnect.emplace(nodeId);
+    }
+    maintainBuckets(toConnect);
 }
 
 void
@@ -123,10 +136,10 @@ SwarmManager::shutdown()
     routing_table.shutdownAllNodes();
 }
 
-void
-SwarmManager::addKnownNodes(const NodeId& nodeId)
+bool
+SwarmManager::addKnownNode(const NodeId& nodeId)
 {
-    routing_table.addKnownNode(nodeId);
+    return routing_table.addKnownNode(nodeId);
 }
 
 void
@@ -138,9 +151,9 @@ SwarmManager::addMobileNodes(const NodeId& nodeId)
 }
 
 void
-SwarmManager::maintainBuckets()
+SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
 {
-    std::set<NodeId> nodes;
+    std::set<NodeId> nodes = toConnect;
     std::unique_lock<std::mutex> lock(mutex);
     auto& buckets = routing_table.getBuckets();
     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
diff --git a/src/jamidht/swarm/swarm_manager.h b/src/jamidht/swarm/swarm_manager.h
index 9fec114c8a..9eb34a1787 100644
--- a/src/jamidht/swarm/swarm_manager.h
+++ b/src/jamidht/swarm/swarm_manager.h
@@ -34,10 +34,11 @@ class SwarmManager : public std::enable_shared_from_this<SwarmManager>
 {
     using ChannelCb = std::function<bool(const std::shared_ptr<dhtnet::ChannelSocketInterface>&)>;
     using NeedSocketCb = std::function<void(const std::string&, ChannelCb&&)>;
+    using ToConnectCb = std::function<bool(const NodeId&)>;
     using OnConnectionChanged = std::function<void(bool ok)>;
 
 public:
-    SwarmManager(const NodeId&);
+    explicit SwarmManager(const NodeId&, ToConnectCb&& toConnectCb);
     ~SwarmManager();
 
     NeedSocketCb needSocketCb_;
@@ -147,8 +148,9 @@ public:
 
     /**
      * Maintain/Update buckets
+     * @param toConnect         Nodes to connect
      */
-    void maintainBuckets();
+    void maintainBuckets(const std::set<NodeId>& toConnect = {});
 
     /**
      * Check if we're connected with a specific device
@@ -167,8 +169,9 @@ private:
     /**
      * Add node to the known_nodes list
      * @param nodeId
+     * @return if node inserted
      */
-    void addKnownNodes(const NodeId& nodeId);
+    bool addKnownNode(const NodeId& nodeId);
 
     /**
      * Add node to the mobile_Nodes list
@@ -232,6 +235,8 @@ private:
     std::atomic_bool isShutdown_ {false};
 
     OnConnectionChanged onConnectionChanged_ {};
+
+    ToConnectCb toConnectCb_;
 };
 
 } // namespace jami
diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp
index 273de2390a..7ab021d235 100644
--- a/test/unitTest/conversation/conversation.cpp
+++ b/test/unitTest/conversation/conversation.cpp
@@ -95,6 +95,7 @@ private:
     void testSetMessageDisplayedTwice();
     void testSetMessageDisplayedPreference();
     void testSetMessageDisplayedAfterClone();
+    void testSendMessageWithLotOfKnownDevices();
     void testVoteNonEmpty();
     void testNoBadFileInInitialCommit();
     void testNoBadCertInInitialCommit();
@@ -147,6 +148,7 @@ private:
     CPPUNIT_TEST(testSetMessageDisplayedTwice);
     CPPUNIT_TEST(testSetMessageDisplayedPreference);
     CPPUNIT_TEST(testSetMessageDisplayedAfterClone);
+    CPPUNIT_TEST(testSendMessageWithLotOfKnownDevices);
     CPPUNIT_TEST(testVoteNonEmpty);
     CPPUNIT_TEST(testNoBadFileInInitialCommit);
     CPPUNIT_TEST(testNoBadCertInInitialCommit);
@@ -1435,6 +1437,75 @@ ConversationTest::testSetMessageDisplayedAfterClone()
     libjami::unregisterSignalHandlers();
 }
 
+void
+ConversationTest::testSendMessageWithLotOfKnownDevices()
+{
+    std::cout << "\nRunning test: " << __func__ << std::endl;
+
+    auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
+
+    // Alice creates a second device
+    auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
+    std::remove(aliceArchive.c_str());
+    aliceAccount->exportArchive(aliceArchive);
+    std::map<std::string, std::string> details = libjami::getAccountTemplate("RING");
+    details[ConfProperties::TYPE] = "RING";
+    details[ConfProperties::DISPLAYNAME] = "alice2";
+    details[ConfProperties::ALIAS] = "alice2";
+    details[ConfProperties::UPNP_ENABLED] = "true";
+    details[ConfProperties::ARCHIVE_PASSWORD] = "";
+    details[ConfProperties::ARCHIVE_PIN] = "";
+    details[ConfProperties::ARCHIVE_PATH] = aliceArchive;
+    alice2Id = Manager::instance().addAccount(details);
+    auto alice2Account = Manager::instance().getAccount<JamiAccount>(alice2Id);
+
+    bool conversationAlice2Ready = false;
+    std::map<std::string, std::shared_ptr<libjami::CallbackWrapperBase>> confHandlers;
+    confHandlers.insert(libjami::exportable_callback<libjami::ConversationSignal::ConversationReady>(
+        [&](const std::string& accountId, const std::string& conversationId) {
+            if (accountId == alice2Id) {
+                conversationAlice2Ready = true;
+            }
+            cv.notify_one();
+        }));
+    bool alice2Registered = false;
+    confHandlers.insert(
+        libjami::exportable_callback<libjami::ConfigurationSignal::VolatileDetailsChanged>(
+            [&](const std::string&, const std::map<std::string, std::string>&) {
+                auto details = alice2Account->getVolatileAccountDetails();
+                auto daemonStatus = details[libjami::Account::ConfProperties::Registration::STATUS];
+                if (daemonStatus == "REGISTERED") {
+                    alice2Registered = true;
+                    cv.notify_one();
+                }
+            }));
+    libjami::registerSignalHandlers(confHandlers);
+
+    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return alice2Registered; }));
+
+    // Add a lot of known devices
+    for (auto i = 0; i < 1000; ++i) {
+        dht::Hash<32> h = dht::Hash<32>::get(std::to_string(i));
+        aliceAccount->accountManager()->getInfo()->contacts->foundAccountDevice(h);
+        alice2Account->accountManager()->getInfo()->contacts->foundAccountDevice(h);
+    }
+
+    auto bootstraped = false;
+    alice2Account->convModule()->onBootstrapStatus(
+        [&](std::string /*convId*/, Conversation::BootstrapStatus status) {
+            bootstraped = status == Conversation::BootstrapStatus::SUCCESS;
+            cv.notify_one();
+        });
+
+    auto convId = libjami::startConversation(aliceId);
+    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return conversationAlice2Ready; }));
+
+    // Should bootstrap successfully
+    bootstraped = false;
+    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return bootstraped; }));
+    libjami::unregisterSignalHandlers();
+}
+
 std::string
 ConversationTest::createFakeConversation(std::shared_ptr<JamiAccount> account,
                                          const std::string& fakeCert)
-- 
GitLab