From 057cb349177f63883f545ac78adb9799da463c4a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Fri, 8 Dec 2023 15:39:30 -0500
Subject: [PATCH] conversation_module: parallelize loadConversations()

and avoid to call it twice

Change-Id: I75e7943a2552c1834f697a616b0c115955122abf
---
 src/jamidht/conversation.h                  |   2 +-
 src/jamidht/conversation_module.cpp         | 205 +++++++++++---------
 src/jamidht/jamiaccount.cpp                 |   3 +-
 test/unitTest/conversation/conversation.cpp |  23 +--
 4 files changed, 121 insertions(+), 112 deletions(-)

diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h
index bab3c630e9..3d883ad5c8 100644
--- a/src/jamidht/conversation.h
+++ b/src/jamidht/conversation.h
@@ -116,7 +116,7 @@ struct ConvInfo
     ConvInfo(const std::string& id) : id(id) {};
     explicit ConvInfo(const Json::Value& json);
 
-    bool isRemoved() const { return removed > created; }
+    bool isRemoved() const { return removed >= created; }
 
     ConvInfo& operator=(const ConvInfo&) = default;
     ConvInfo& operator=(ConvInfo&&) = default;
diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp
index f4bf87508e..842f933504 100644
--- a/src/jamidht/conversation_module.cpp
+++ b/src/jamidht/conversation_module.cpp
@@ -1184,17 +1184,8 @@ ConversationModule::Impl::fixStructures(std::shared_ptr<JamiAccount> acc, const
             }
         }
     }
-    dht::ThreadPool::io().run(
-        [w = weak(), invalidPendingRequests = std::move(invalidPendingRequests)]() {
-            // Will lock account manager
-            auto shared = w.lock();
-            if (!shared)
-                return;
-            if (auto acc = shared->account_.lock()) {
-                for (const auto& invalidPendingRequest : invalidPendingRequests)
-                    acc->discardTrustRequest(invalidPendingRequest);
-            }
-        });
+    for (const auto& invalidPendingRequest : invalidPendingRequests)
+        acc->discardTrustRequest(invalidPendingRequest);
 
     ////////////////////////////////////////////////////////////////
     for (const auto& conv : toRm) {
@@ -1397,104 +1388,131 @@ ConversationModule::loadConversations()
     auto acc = pimpl_->account_.lock();
     if (!acc)
         return;
-    auto uri = acc->getUsername();
     JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
     auto conversationsRepositories = dhtnet::fileutils::readDirectory(
         fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
 
-    auto contacts = acc->getContacts(); // Avoid to lock configurationMtx while conv Mtx is locked
-    std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
     std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_);
     std::unique_lock<std::mutex> ilk(pimpl_->convInfosMtx_);
     pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
     pimpl_->conversations_.clear();
-    std::set<std::string> toRm;
-    for (const auto& repository : conversationsRepositories) {
-        try {
-            auto sconv = std::make_shared<SyncedConversation>(repository);
 
-            auto conv = std::make_shared<Conversation>(acc, repository);
-            conv->onLastDisplayedUpdated([w = pimpl_->weak_from_this()](auto convId, auto lastId) {
-                if (auto p = w.lock())
-                    p->onLastDisplayedUpdated(convId, lastId);
-            });
-            conv->onMembersChanged([w = pimpl_->weak_from_this(), repository](const auto& members) {
-                if (auto p = w.lock())
-                    p->setConversationMembers(repository, members);
-            });
-            conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
-            auto members = conv->memberUris(uri, {});
-            // NOTE: The following if is here to protect against any incorrect state
-            // that can be introduced
-            if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
-                // If we got a 1:1 conversation, but not in the contact details, it's rather a
-                // duplicate or a weird state
-                auto& otherUri = members[0];
-                std::string convFromDetails;
-                auto itContact = std::find_if(contacts.cbegin(), contacts.cend(), [&](const auto& c) {
-                    return c.at("id") == otherUri;
+    struct Ctx {
+        std::mutex cvMtx;
+        std::condition_variable cv;
+        std::mutex toRmMtx;
+        std::set<std::string> toRm;
+        std::mutex convMtx;
+        std::atomic_int convNb;
+        std::vector<std::map<std::string, std::string>> contacts;
+        std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
+    };
+    auto ctx = std::make_shared<Ctx>();
+    ctx->convNb = conversationsRepositories.empty() ? 0 : conversationsRepositories.size();
+    ctx->contacts = acc->getContacts(); // Avoid to lock configurationMtx while conv Mtx is locked
+
+    for (auto repository : conversationsRepositories) {
+        auto r = std::make_shared<std::string>(repository);
+        dht::ThreadPool::io().run([this, ctx, repository, acc] {
+            try {
+                auto sconv = std::make_shared<SyncedConversation>(repository);
+
+                auto conv = std::make_shared<Conversation>(acc, repository);
+                conv->onLastDisplayedUpdated([w = pimpl_->weak_from_this()](auto convId, auto lastId) {
+                    if (auto p = w.lock())
+                        p->onLastDisplayedUpdated(convId, lastId);
+                });
+                conv->onMembersChanged([w = pimpl_->weak_from_this(), repository](const auto& members) {
+                    if (auto p = w.lock())
+                        p->setConversationMembers(repository, members);
                 });
-                auto isRemoved = itContact == contacts.end();
-                if (!isRemoved)
-                    convFromDetails = itContact->at("conversationId");
-                if (convFromDetails != repository) {
-                    if (convFromDetails.empty()) {
-                        if (isRemoved) {
-                            // If details is empty, contact is removed and not banned.
-                            JAMI_ERROR("Conversation {} detected for {} and should be removed", repository, otherUri);
-                            toRm.insert(repository);
+                conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
+                auto members = conv->memberUris(acc->getUsername(), {});
+                // NOTE: The following if is here to protect against any incorrect state
+                // that can be introduced
+                if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
+                    // If we got a 1:1 conversation, but not in the contact details, it's rather a
+                    // duplicate or a weird state
+                    auto& otherUri = members[0];
+                    std::string convFromDetails;
+                    auto itContact = std::find_if(ctx->contacts.cbegin(), ctx->contacts.cend(), [&](const auto& c) {
+                        return c.at("id") == otherUri;
+                    });
+                    auto isRemoved = itContact == ctx->contacts.end();
+                    if (!isRemoved)
+                        convFromDetails = itContact->at("conversationId");
+                    if (convFromDetails != repository) {
+                        if (convFromDetails.empty()) {
+                            if (isRemoved) {
+                                // If details is empty, contact is removed and not banned.
+                                JAMI_ERROR("Conversation {} detected for {} and should be removed", repository, otherUri);
+                                std::lock_guard<std::mutex> lkMtx {ctx->toRmMtx};
+                                ctx->toRm.insert(repository);
+                            } else {
+                                JAMI_ERROR("No conversation detected for {} but one exists ({}). "
+                                        "Update details",
+                                        otherUri,
+                                        repository);
+                                std::lock_guard<std::mutex> lkMtx {ctx->toRmMtx};
+                                ctx->updateContactConv.emplace_back(std::make_tuple(otherUri, convFromDetails, repository));
+                            }
                         } else {
-                            JAMI_ERROR("No conversation detected for {} but one exists ({}). "
-                                    "Update details",
+                            JAMI_ERROR("Multiple conversation detected for {} but ({} & {})",
                                     otherUri,
-                                    repository);
-                            updateContactConv.emplace_back(std::make_tuple(otherUri, convFromDetails, repository));
+                                    repository,
+                                    convFromDetails);
+                            std::lock_guard<std::mutex> lkMtx {ctx->toRmMtx};
+                            ctx->toRm.insert(repository);
                         }
+                    }
+                }
+                {
+                    std::lock_guard<std::mutex> lkMtx {ctx->convMtx};
+                    auto convInfo = pimpl_->convInfos_.find(repository);
+                    if (convInfo == pimpl_->convInfos_.end()) {
+                        JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
+                        sconv->info.created = std::time(nullptr);
+                        sconv->info.members = std::move(members);
+                        sconv->info.lastDisplayed = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
+                        // convInfosMtx_ is already locked
+                        pimpl_->convInfos_[repository] = sconv->info;
                     } else {
-                        JAMI_ERROR("Multiple conversation detected for {} but ({} & {})",
-                                   otherUri,
-                                   repository,
-                                   convFromDetails);
-                        toRm.insert(repository);
+                        sconv->info = convInfo->second;
+                        if (convInfo->second.isRemoved()) {
+                            // A conversation was removed, but repository still exists
+                            conv->setRemovingFlag();
+                            std::lock_guard<std::mutex> lkMtx {ctx->toRmMtx};
+                            ctx->toRm.insert(repository);
+                        }
                     }
                 }
-            }
-            auto convInfo = pimpl_->convInfos_.find(repository);
-            if (convInfo == pimpl_->convInfos_.end()) {
-                JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
-                sconv->info.created = std::time(nullptr);
-                sconv->info.members = std::move(members);
-                sconv->info.lastDisplayed = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
-                // convInfosMtx_ is already locked
-                pimpl_->convInfos_[repository] = sconv->info;
-                pimpl_->saveConvInfos();
-            } else {
-                sconv->info = convInfo->second;
-                if (convInfo->second.isRemoved()) {
-                    // A conversation was removed, but repository still exists
-                    conv->setRemovingFlag();
-                    toRm.insert(repository);
+                auto commits = conv->commitsEndedCalls();
+
+                if (!commits.empty()) {
+                    // Note: here, this means that some calls were actives while the
+                    // daemon finished (can be a crash).
+                    // Notify other in the conversation that the call is finished
+                    pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
                 }
+                sconv->conversation = conv;
+                std::lock_guard<std::mutex> lkMtx {ctx->convMtx};
+                pimpl_->conversations_.emplace(repository, std::move(sconv));
+            } catch (const std::logic_error& e) {
+                JAMI_WARNING("[Account {}] Conversations not loaded: {}",
+                        pimpl_->accountId_,
+                        e.what());
             }
-            auto commits = conv->commitsEndedCalls();
-            if (!commits.empty()) {
-                // Note: here, this means that some calls were actives while the
-                // daemon finished (can be a crash).
-                // Notify other in the conversation that the call is finished
-                pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
-            }
-            sconv->conversation = conv;
-            pimpl_->conversations_.emplace(repository, std::move(sconv));
-        } catch (const std::logic_error& e) {
-            JAMI_WARNING("[Account {}] Conversations not loaded: {}",
-                      pimpl_->accountId_,
-                      e.what());
-        }
+            std::lock_guard<std::mutex> lkCv {ctx->cvMtx};
+            --ctx->convNb;
+            ctx->cv.notify_all();
+        });
     }
 
+    std::unique_lock<std::mutex> lkCv {ctx->cvMtx};
+    ctx->cv.wait(lkCv, [&] {return ctx->convNb.load() == 0;});
+
     // Prune any invalid conversations without members and
     // set the removed flag if needed
-    size_t oldConvInfosSize = pimpl_->convInfos_.size();
     std::set<std::string> removed;
     for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
         const auto& info = itInfo->second;
@@ -1517,7 +1535,7 @@ ConversationModule::loadConversations()
             itConv->second->conversation->setRemovingFlag();
         if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
             // In this case, the conversation is not synced and we only know ourself
-            if (info.members.size() == 1 && info.members.at(0) == uri) {
+            if (info.members.size() == 1 && info.members.at(0) == acc->getUsername()) {
                 JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.",
                              pimpl_->accountId_,
                              info.id);
@@ -1534,13 +1552,18 @@ ConversationModule::loadConversations()
     if (!removed.empty())
         acc->unlinkConversations(removed);
     // Save if we've removed some invalid entries
-    if (oldConvInfosSize != pimpl_->convInfos_.size())
-        pimpl_->saveConvInfos();
+    pimpl_->saveConvInfos();
 
     ilk.unlock();
     lk.unlock();
 
-    pimpl_->fixStructures(acc, updateContactConv, toRm);
+    dht::ThreadPool::io().run(
+        [w = pimpl_->weak(), acc, updateContactConv = std::move(ctx->updateContactConv), toRm = std::move(ctx->toRm)]() {
+            // Will lock account manager
+            if (auto shared = w.lock())
+                shared->fixStructures(acc, updateContactConv, toRm);
+        });
+
 }
 
 void
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index d462c5e0b5..0729ae7dd5 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -1239,7 +1239,6 @@ JamiAccount::loadAccount(const std::string& archive_password,
             if (not isEnabled()) {
                 setRegistrationState(RegistrationState::UNREGISTERED);
             }
-            convModule()->loadConversations();
         } else if (isEnabled()) {
             JAMI_WARNING("[Account {}] useIdentity failed!", getAccountID());
             if (not conf.managerUri.empty() and archive_password.empty()) {
@@ -1338,7 +1337,6 @@ JamiAccount::loadAccount(const std::string& archive_password,
                         emitSignal<libjami::ConfigurationSignal::AccountProfileReceived>(
                             getAccountID(), config_->displayName, info.photo);
                     setRegistrationState(RegistrationState::UNREGISTERED);
-                    convModule()->loadConversations();
                     doRegister();
                 },
                 [w = weak(),
@@ -2140,6 +2138,7 @@ JamiAccount::convModule()
                  getAccountID().c_str());
         return nullptr;
     }
+    std::unique_lock<std::recursive_mutex> lock(configurationMutex_);
     std::lock_guard<std::mutex> lk(moduleMtx_);
     if (!convModule_) {
         convModule_ = std::make_unique<ConversationModule>(
diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp
index 7ab021d235..a092886c7c 100644
--- a/test/unitTest/conversation/conversation.cpp
+++ b/test/unitTest/conversation/conversation.cpp
@@ -3145,13 +3145,6 @@ END:VCARD";
                     conversationRmBob2 = true;
                 cv.notify_one();
             }));
-    auto aliceProfileReceivedBob = false;
-    confHandlers.insert(libjami::exportable_callback<libjami::ConfigurationSignal::ProfileReceived>(
-        [&](const std::string& accountId, const std::string& peerId, const std::string& path) {
-            if (accountId == bobId && peerId == aliceUri)
-                aliceProfileReceivedBob = true;
-            cv.notify_one();
-        }));
     libjami::registerSignalHandlers(confHandlers);
 
     // Bob creates a second device
@@ -3187,19 +3180,10 @@ END:VCARD";
     // wait that connections are closed.
     std::this_thread::sleep_for(10s);
 
-    // Alice send a message
+    // Alice send a message. This will not trigger any request, because contact was removed.
     requestReceived = false, requestReceivedBob2 = false;
     libjami::sendMessage(aliceId, convId, "hi"s, "");
-    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return requestReceived && requestReceivedBob2; }));
-
-    // Re-Add contact should accept and clone the conversation on all devices
-    conversationReadyBob = false;
-    conversationReadyBob2 = false;
-    aliceProfileReceivedBob = false;
-    libjami::acceptConversationRequest(bobId, convId);
-    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() {
-        return conversationReadyBob && conversationReadyBob2 && aliceProfileReceivedBob;
-    }));
+    CPPUNIT_ASSERT(!cv.wait_for(lk, 30s, [&]() { return requestReceived || requestReceivedBob2; }));
 }
 
 void
@@ -3750,6 +3734,8 @@ ConversationTest::testFixContactDetails()
 
     aliceAccount->convModule()->loadConversations();
 
+    std::this_thread::sleep_for(5s); // Let the daemon fix the structures
+
     details = aliceAccount->getContactDetails(bobUri);
     CPPUNIT_ASSERT(details["conversationId"] == convId);
 }
@@ -4000,6 +3986,7 @@ ConversationTest::testLoadPartiallyRemovedConversation()
     // Reloading conversation should remove directory
     CPPUNIT_ASSERT(std::filesystem::is_directory(repoPathAlice));
     aliceAccount->convModule()->loadConversations();
+    std::this_thread::sleep_for(5s); // Let the daemon the time to fix structures
     CPPUNIT_ASSERT(!std::filesystem::is_directory(repoPathAlice));
 }
 
-- 
GitLab