From 8a753f0463ae4b0092d79dcb39014731379f2a69 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Fri, 11 Jun 2021 12:23:28 -0400
Subject: [PATCH] swarm: only request a connection when peer is detected if a
 sync is needed

fetched check the devices who has fetched since the last commit. With this file
we can detect if a device needs to fetch and if it's the case we start a connection.
cacheSipConnection will sync the conversations when called. This avoids the
daemon to connect too all contacts

Change-Id: Id85db09c3c0a6aa44dd48b3bbee5ed7e0a5d6b84
---
 src/jamidht/conversation.cpp |  69 ++++++++++++++--
 src/jamidht/conversation.h   |  16 ++++
 src/jamidht/jamiaccount.cpp  | 148 ++++++++---------------------------
 src/jamidht/jamiaccount.h    |   4 -
 4 files changed, 109 insertions(+), 128 deletions(-)

diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp
index b5a2948e18..fa320d9c89 100644
--- a/src/jamidht/conversation.cpp
+++ b/src/jamidht/conversation.cpp
@@ -119,9 +119,7 @@ public:
         if (!repository_) {
             throw std::logic_error("Couldn't create repository");
         }
-        if (auto shared = account_.lock())
-            transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
-                                                                 repository_->id());
+        init();
     }
 
     Impl(const std::weak_ptr<JamiAccount>& account, const std::string& conversationId)
@@ -131,9 +129,7 @@ public:
         if (!repository_) {
             throw std::logic_error("Couldn't create repository");
         }
-        if (auto shared = account_.lock())
-            transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
-                                                                 repository_->id());
+        init();
     }
 
     Impl(const std::weak_ptr<JamiAccount>& account,
@@ -151,10 +147,22 @@ public:
             }
             throw std::logic_error("Couldn't clone repository");
         }
-        if (auto shared = account_.lock())
+        init();
+    }
+
+    void init()
+    {
+        if (auto shared = account_.lock()) {
             transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
                                                                  repository_->id());
+            conversationDataPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR
+                                    + shared->getAccountID() + DIR_SEPARATOR_STR
+                                    + "conversation_data" + DIR_SEPARATOR_STR + repository_->id();
+            fetchedPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR + "fetched";
+            loadFetched();
+        }
     }
+
     ~Impl() = default;
 
     bool isAdmin() const;
@@ -233,6 +241,25 @@ public:
         }
     }
 
+    void loadFetched()
+    {
+        try {
+            // read file
+            auto file = fileutils::loadFile(fetchedPath_);
+            // load values
+            msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
+            std::lock_guard<std::mutex> lk {fetchedDevicesMtx_};
+            oh.get().convert(fetchedDevices_);
+        } catch (const std::exception& e) {
+            return;
+        }
+    }
+    void saveFetched()
+    {
+        std::ofstream file(fetchedPath_, std::ios::trunc | std::ios::binary);
+        msgpack::pack(file, fetchedDevices_);
+    }
+
     std::unique_ptr<ConversationRepository> repository_;
     std::weak_ptr<JamiAccount> account_;
     std::atomic_bool isRemoving_ {false};
@@ -248,6 +275,10 @@ public:
     std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch
     std::deque<std::tuple<std::string, std::string, OnPullCb>> pullcbs_ {};
     std::shared_ptr<TransferManager> transferManager_ {};
+    std::string conversationDataPath_ {};
+    std::string fetchedPath_ {};
+    std::mutex fetchedDevicesMtx_ {};
+    std::set<std::string> fetchedDevices_ {};
 };
 
 bool
@@ -610,6 +641,7 @@ Conversation::sendMessage(const Json::Value& value,
             std::unique_lock<std::mutex> lk(sthis->pimpl_->writeMtx_);
             auto commit = sthis->pimpl_->repository_->commitMessage(
                 Json::writeString(wbuilder, value));
+            sthis->clearFetched();
             sthis->pimpl_->announce(commit);
             lk.unlock();
             if (cb)
@@ -1005,4 +1037,27 @@ Conversation::downloadFile(const std::string& interactionId,
     return true;
 }
 
+void
+Conversation::clearFetched()
+{
+    std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_);
+    pimpl_->fetchedDevices_.clear();
+    pimpl_->saveFetched();
+}
+
+bool
+Conversation::needsFetch(const std::string& deviceId) const
+{
+    std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_);
+    return pimpl_->fetchedDevices_.find(deviceId) != pimpl_->fetchedDevices_.end();
+}
+
+void
+Conversation::hasFetched(const std::string& deviceId)
+{
+    std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_);
+    pimpl_->fetchedDevices_.emplace(deviceId);
+    pimpl_->saveFetched();
+}
+
 } // namespace jami
\ No newline at end of file
diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h
index d63046b494..66e483a1de 100644
--- a/src/jamidht/conversation.h
+++ b/src/jamidht/conversation.h
@@ -291,6 +291,22 @@ public:
                       std::size_t start = 0,
                       std::size_t end = 0);
 
+    /**
+     * Reset fetched informations
+     */
+    void clearFetched();
+    /**
+     * Check if a device has fetched last commit
+     * @param deviceId
+     */
+    bool needsFetch(const std::string& deviceId) const;
+    /**
+     * Store informations about who fetch or not. This simplify sync (sync when a device without the
+     * last fetch is detected)
+     * @param deviceId
+     */
+    void hasFetched(const std::string& deviceId);
+
 private:
     std::shared_ptr<Conversation> shared()
     {
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index b0dd36ce90..b7892d5a7f 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -1951,7 +1951,20 @@ JamiAccount::trackPresence(const dht::InfoHash& h, BuddyInfo& buddy)
                   if (not expired) {
                       // Retry messages every time a new device announce its presence
                       sthis->messageEngine_.onPeerOnline(h.toString());
-                      sthis->requestSIPConnection(h.toString(), dev.dev);
+                      auto deviceId = dev.dev.toString();
+                      auto needsSync = false;
+                      {
+                          std::unique_lock<std::mutex> lk(sthis->conversationsMtx_);
+                          for (const auto& [_, conv] : sthis->conversations_) {
+                              if (conv->isMember(deviceId, false) && conv->needsFetch(deviceId)) {
+                                  needsSync = true;
+                                  break;
+                              }
+                          }
+                      }
+                      if (needsSync)
+                          sthis->requestSIPConnection(h.toString(),
+                                                      dev.dev); // Both sides will sync conversations
                   }
                   if (isConnected and not wasConnected) {
                       sthis->onTrackedBuddyOnline(h);
@@ -2416,11 +2429,21 @@ JamiAccount::doRegister_()
                               deviceId.to_c_str(),
                               channel->channel());
                     auto gs = std::make_unique<GitServer>(accountId, conversationId, channel);
-                    gs->setOnFetched([w = weak(), conversationId](const std::string&) {
+                    gs->setOnFetched([w = weak(), conversationId, deviceId](const std::string&) {
                         auto shared = w.lock();
                         if (!shared)
                             return;
-                        shared->removeRepository(conversationId, true);
+                        auto remove = false;
+                        {
+                            std::unique_lock<std::mutex> lk(shared->conversationsMtx_);
+                            auto it = shared->conversations_.find(conversationId);
+                            if (it != shared->conversations_.end() && it->second) {
+                                remove = it->second->isRemoving();
+                                it->second->hasFetched(deviceId.toString());
+                            }
+                        }
+                        if (remove)
+                            shared->removeRepository(conversationId, true);
                     });
                     const dht::Value::Id serverId = ValueIdDist()(rand);
                     {
@@ -2523,56 +2546,10 @@ JamiAccount::doRegister_()
         if (!dhtPeerConnector_)
             dhtPeerConnector_ = std::make_unique<DhtPeerConnector>(*this);
 
-        {
-            std::lock_guard<std::mutex> lock(buddyInfoMtx);
-            for (auto& buddy : trackedBuddies_) {
-                buddy.second.devices_cnt = 0;
-                trackPresence(buddy.first, buddy.second);
-            }
-        }
-
-        if (needsConvSync_.exchange(false)) {
-            // Clone malformed conversations if needed
-            // (no-sync with others as onPeerOnline will do the job)
-            auto info = accountManager_->getInfo();
-            if (!info)
-                return;
-            std::lock_guard<std::mutex> lk(conversationsMtx_);
-            for (const auto& c : info->conversations) {
-                if (!c.removed) {
-                    auto it = conversations_.find(c.id);
-                    if (it == conversations_.end()) {
-                        std::shared_ptr<std::atomic_bool> willClone
-                            = std::make_shared<std::atomic_bool>(false);
-                        for (const auto& member : c.members) {
-                            if (member != getUsername()) {
-                                // Try to clone from first other members device found
-                                // NOTE: rescehdule this in a few seconds, to let the time
-                                // to the peers to discover the device if it's the first time
-                                // we create the device
-                                Manager::instance().scheduleTaskIn(
-                                    [w = weak(), member, convId = c.id, willClone]() {
-                                        if (auto shared = w.lock())
-                                            shared->accountManager_->forEachDevice(
-                                                dht::InfoHash(member),
-                                                [w, convId, member, willClone](
-                                                    const dht::InfoHash& dev) {
-                                                    if (willClone->exchange(true))
-                                                        return;
-                                                    auto shared = w.lock();
-                                                    if (!shared)
-                                                        return;
-                                                    shared->cloneConversation(dev.toString(),
-                                                                              member,
-                                                                              convId);
-                                                });
-                                    },
-                                    std::chrono::seconds(5));
-                            }
-                        }
-                    }
-                }
-            }
+        std::lock_guard<std::mutex> lock(buddyInfoMtx);
+        for (auto& buddy : trackedBuddies_) {
+            buddy.second.devices_cnt = 0;
+            trackPresence(buddy.first, buddy.second);
         }
     } catch (const std::exception& e) {
         JAMI_ERR("Error registering DHT account: %s", e.what());
@@ -2626,10 +2603,8 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb)
 
     // Stop all current p2p connections if account is disabled
     // Else, we let the system managing if the co is down or not
-    if (not isEnabled()) {
-        needsConvSync_ = true;
+    if (not isEnabled())
         shutdownConnections();
-    }
 
     // Release current upnp mapping if any.
     if (upnpCtrl_ and dhtUpnpMapping_.isValid()) {
@@ -3463,67 +3438,6 @@ JamiAccount::sendTextMessage(const std::string& to,
         std::chrono::minutes(1));
 }
 
-void
-JamiAccount::sendSIPMessageToDevice(const std::string& to,
-                                    const DeviceId& deviceId,
-                                    const std::map<std::string, std::string>& payloads)
-{
-    std::lock_guard<std::mutex> lk(sipConnsMtx_);
-    sip_utils::register_thread();
-
-    for (auto it = sipConns_.begin(); it != sipConns_.end();) {
-        auto& [key, value] = *it;
-        if (key.first == to && key.second != deviceId) {
-            ++it;
-            continue;
-        }
-        auto& conn = value.back();
-        auto& channel = conn.channel;
-
-        // Set input token into callback
-        std::unique_ptr<TextMessageCtx> ctx {std::make_unique<TextMessageCtx>()};
-        ctx->acc = weak();
-        ctx->to = to;
-        ctx->deviceId = key.second;
-        ctx->channel = channel;
-
-        try {
-            auto res = sendSIPMessage(
-                conn, to, ctx.release(), {}, payloads, [](void* token, pjsip_event* event) {
-                    std::unique_ptr<TextMessageCtx> c {(TextMessageCtx*) token};
-                    auto code = event->body.tsx_state.tsx->status_code;
-                    auto acc = c->acc.lock();
-                    if (not acc)
-                        return;
-
-                    if (code == PJSIP_SC_OK) {
-                        std::unique_lock<std::mutex> l(c->confirmation->lock);
-                        c->confirmation->replied = true;
-                        l.unlock();
-                        if (!c->onlyConnected)
-                            acc->messageEngine_.onMessageSent(c->to, c->id, true);
-                    } else {
-                        JAMI_WARN("Timeout when send a message, close current connection");
-                        acc->shutdownSIPConnection(c->channel, c->to, c->deviceId);
-                    }
-                });
-            if (!res) {
-                ++it;
-                continue;
-            }
-            break;
-        } catch (const std::runtime_error& ex) {
-            JAMI_WARN("%s", ex.what());
-            ++it;
-            // Remove connection in incorrect state
-            shutdownSIPConnection(channel, to, key.second);
-            continue;
-        }
-
-        ++it;
-    }
-}
-
 void
 JamiAccount::onIsComposing(const std::string& conversationId,
                            const std::string& peer,
diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h
index 6b4c07391f..de227a26eb 100644
--- a/src/jamidht/jamiaccount.h
+++ b/src/jamidht/jamiaccount.h
@@ -340,9 +340,6 @@ public:
                              const std::map<std::string, std::string>& payloads) override;
     void sendInstantMessage(const std::string& convId,
                             const std::map<std::string, std::string>& msg) override;
-    void sendSIPMessageToDevice(const std::string& to,
-                                const DeviceId& deviceId,
-                                const std::map<std::string, std::string>& payloads);
     void onIsComposing(const std::string& conversationId,
                        const std::string& peer,
                        bool isWriting) override;
@@ -1032,7 +1029,6 @@ private:
     void syncWith(const std::string& deviceId, const std::shared_ptr<ChannelSocket>& socket);
     void syncInfos(const std::shared_ptr<ChannelSocket>& socket);
     void syncWithConnected();
-    std::atomic_bool needsConvSync_ {true};
 
     /**
      * Remove a repository and all files
-- 
GitLab