From f119ea141671aa63a7ef2013942193c0a8812dc0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Tue, 7 Feb 2023 15:20:30 -0500
Subject: [PATCH] conversation: synchronize read status across device

On the same model as conversation's preferences

GitLab: #815
Change-Id: I15461d40c5d9e83e39ef5f83b6b663d9427615f8
---
 src/jamidht/conversation.cpp                |  39 +++++++
 src/jamidht/conversation.h                  |  11 ++
 src/jamidht/conversation_module.cpp         |  68 +++++++++--
 src/jamidht/conversation_module.h           |   5 +-
 src/jamidht/sync_module.cpp                 |  27 +++--
 test/unitTest/conversation/conversation.cpp | 121 ++++++++++++++++++++
 6 files changed, 252 insertions(+), 19 deletions(-)

diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp
index c21a1add50..fdf796cc4e 100644
--- a/src/jamidht/conversation.cpp
+++ b/src/jamidht/conversation.cpp
@@ -1527,6 +1527,29 @@ Conversation::setMessageDisplayed(const std::string& uri, const std::string& int
     return true;
 }
 
+void
+Conversation::updateLastDisplayed(const std::map<std::string, std::string>& map)
+{
+    auto filePath = fmt::format("{}/{}", pimpl_->conversationDataPath_, ConversationMapKeys::LAST_DISPLAYED);
+    auto prefs = map;
+    auto itLast = prefs.find(LAST_MODIFIED);
+    if (itLast != prefs.end()) {
+        if (fileutils::isFile(filePath)) {
+            auto lastModified = fileutils::lastWriteTime(filePath);
+            try {
+                if (lastModified >= std::stoul(itLast->second))
+                    return;
+            } catch (...) {
+                return;
+            }
+        }
+        prefs.erase(itLast);
+    }
+
+    for (const auto& [uri, id]: prefs)
+        setMessageDisplayed(uri, id);
+}
+
 void
 Conversation::updateLastDisplayed(const std::string& lastDisplayed)
 {
@@ -1577,6 +1600,22 @@ Conversation::updateLastDisplayed(const std::string& lastDisplayed)
         updateLastDisplayed();
 }
 
+std::map<std::string, std::string>
+Conversation::displayed() const
+{
+    try {
+        std::map<std::string, std::string> lastDisplayed;
+        auto filePath = fmt::format("{}/{}", pimpl_->conversationDataPath_, ConversationMapKeys::LAST_DISPLAYED);
+        auto file = fileutils::loadFile(filePath);
+        msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
+        oh.get().convert(lastDisplayed);
+        lastDisplayed[LAST_MODIFIED] = fmt::format("{}", fileutils::lastWriteTime(filePath));
+        return lastDisplayed;
+    } catch (const std::exception& e) {
+    }
+    return {};
+}
+
 std::vector<std::string>
 Conversation::refreshActiveCalls()
 {
diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h
index 983a7fb04d..ee267d7c3b 100644
--- a/src/jamidht/conversation.h
+++ b/src/jamidht/conversation.h
@@ -382,6 +382,17 @@ public:
      */
     void updateLastDisplayed(const std::string& lastDisplayed);
 
+    /**
+     * Change last displayed for multiple uris
+     * @param map       New last displayed
+     */
+    void updateLastDisplayed(const std::map<std::string, std::string>& map);
+    /**
+     * Retrieve last displayed id
+     * @return displayed
+     */
+    std::map<std::string, std::string> displayed() const;
+
     /**
      * Retrieve how many interactions there is from HEAD to interactionId
      * @param toId      "" for getting the whole history
diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp
index 218452b139..2170ab1e70 100644
--- a/src/jamidht/conversation_module.cpp
+++ b/src/jamidht/conversation_module.cpp
@@ -45,6 +45,7 @@ struct PendingConversationFetch
     std::string deviceId {};
     std::string removeId {};
     std::map<std::string, std::string> preferences {};
+    std::map<std::string, std::string> lastDisplayed {};
     std::set<std::string> connectingTo {};
 };
 
@@ -579,6 +580,20 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat
         }
         if (!commitId.empty())
             sendMessageNotification(conversationId, commitId, false);
+        std::map<std::string, std::string> preferences;
+        std::map<std::string, std::string> lastDisplayed;
+        {
+            std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_);
+            auto itFetch = pendingConversationsFetch_.find(conversationId);
+            if (itFetch != pendingConversationsFetch_.end()) {
+                preferences = std::move(itFetch->second.preferences);
+                lastDisplayed = std::move(itFetch->second.lastDisplayed);
+            }
+        }
+        if (!preferences.empty())
+            conversation->updatePreferences(preferences);
+        if (!lastDisplayed.empty())
+            conversation->updateLastDisplayed(lastDisplayed);
         // Inform user that the conversation is ready
         emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
         needsSyncingCb_({});
@@ -618,15 +633,6 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat
                 }
             }
         }
-        std::map<std::string, std::string> preferences;
-        {
-            std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_);
-            auto itFetch = pendingConversationsFetch_.find(conversationId);
-            if (itFetch != pendingConversationsFetch_.end())
-                preferences = std::move(itFetch->second.preferences);
-        }
-        if (!preferences.empty())
-            conversation->updatePreferences(preferences);
     } catch (const std::exception& e) {
         JAMI_WARN("Something went wrong when cloning conversation: %s", e.what());
     }
@@ -1472,11 +1478,35 @@ ConversationModule::onMessageDisplayed(const std::string& peer,
 {
     std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_);
     auto conversation = pimpl_->conversations_.find(conversationId);
-    if (conversation != pimpl_->conversations_.end() && conversation->second)
-        return conversation->second->setMessageDisplayed(peer, interactionId);
+    if (conversation != pimpl_->conversations_.end() && conversation->second) {
+        if (conversation->second->setMessageDisplayed(peer, interactionId)) {
+            auto msg = std::make_shared<SyncMsg>();
+            std::map<std::string, std::map<std::string, std::string>> ld;
+            ld[conversationId] = conversation->second->displayed();
+            msg->ld = std::move(ld);
+            lk.unlock();
+            pimpl_->needsSyncingCb_(std::move(msg));
+            return true;
+        }
+    }
     return false;
 }
 
+std::map<std::string, std::map<std::string, std::string>>
+ConversationModule::convDisplayed() const
+{
+    std::map<std::string, std::map<std::string, std::string>> displayed;
+    std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_);
+    for (const auto& [id, conv] : pimpl_->conversations_) {
+        if (conv) {
+            auto d = conv->displayed();
+            if (!d.empty())
+                displayed[id] = std::move(d);
+        }
+    }
+    return displayed;
+}
+
 uint32_t
 ConversationModule::loadConversationMessages(const std::string& conversationId,
                                              const std::string& fromMessage,
@@ -1695,6 +1725,22 @@ ConversationModule::onSyncData(const SyncMsg& msg,
         if (itFetch != pimpl_->pendingConversationsFetch_.end())
             itFetch->second.preferences = p;
     }
+
+    // Updates displayed for conversations
+    for (const auto& [convId, ld] : msg.ld) {
+        {
+            std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_);
+            auto itConv = pimpl_->conversations_.find(convId);
+            if (itConv != pimpl_->conversations_.end() && itConv->second) {
+                itConv->second->updateLastDisplayed(ld);
+                continue;
+            }
+        }
+        std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_);
+        auto itFetch = pimpl_->pendingConversationsFetch_.find(convId);
+        if (itFetch != pimpl_->pendingConversationsFetch_.end())
+            itFetch->second.lastDisplayed = ld;
+    }
 }
 
 bool
diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h
index 4735f081ee..fdf135899f 100644
--- a/src/jamidht/conversation_module.h
+++ b/src/jamidht/conversation_module.h
@@ -42,7 +42,9 @@ struct SyncMsg
     // p is conversation's preferences. It's not stored in c, as
     // we can update the preferences without touching any confInfo.
     std::map<std::string, std::map<std::string, std::string>> p;
-    MSGPACK_DEFINE(ds, c, cr, p)
+    // Last displayed messages
+    std::map<std::string, std::map<std::string, std::string>> ld;
+    MSGPACK_DEFINE(ds, c, cr, p, ld)
 };
 
 using ChannelCb = std::function<bool(const std::shared_ptr<ChannelSocket>&)>;
@@ -195,6 +197,7 @@ public:
     bool onMessageDisplayed(const std::string& peer,
                             const std::string& conversationId,
                             const std::string& interactionId);
+    std::map<std::string, std::map<std::string, std::string>> convDisplayed() const;
 
     /**
      * Load conversation's messages
diff --git a/src/jamidht/sync_module.cpp b/src/jamidht/sync_module.cpp
index 64ef336570..cd5e00e542 100644
--- a/src/jamidht/sync_module.cpp
+++ b/src/jamidht/sync_module.cpp
@@ -74,7 +74,7 @@ SyncModule::Impl::syncInfos(const std::shared_ptr<ChannelSocket>& socket,
                               buffer.size(),
                               ec);
                 if (ec) {
-                    JAMI_ERR("%s", ec.message().c_str());
+                    JAMI_ERROR("{:s}", ec.message());
                     return;
                 }
             }
@@ -88,7 +88,7 @@ SyncModule::Impl::syncInfos(const std::shared_ptr<ChannelSocket>& socket,
             msgpack::pack(buffer, msg);
             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
             if (ec) {
-                JAMI_ERR("%s", ec.message().c_str());
+                JAMI_ERROR("{:s}", ec.message());
                 return;
             }
         }
@@ -101,7 +101,7 @@ SyncModule::Impl::syncInfos(const std::shared_ptr<ChannelSocket>& socket,
             msgpack::pack(buffer, msg);
             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
             if (ec) {
-                JAMI_ERR("%s", ec.message().c_str());
+                JAMI_ERROR("{:s}", ec.message());
                 return;
             }
         }
@@ -117,7 +117,20 @@ SyncModule::Impl::syncInfos(const std::shared_ptr<ChannelSocket>& socket,
             msgpack::pack(buffer, msg);
             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
             if (ec) {
-                JAMI_ERR("%s", ec.message().c_str());
+                JAMI_ERROR("{:s}", ec.message());
+                return;
+            }
+        }
+        buffer.clear();
+        // Sync read's status
+        auto ld = convModule->convDisplayed();
+        if (!ld.empty()) {
+            SyncMsg msg;
+            msg.ld = std::move(ld);
+            msgpack::pack(buffer, msg);
+            socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
+            if (ec) {
+                JAMI_ERROR("{:s}", ec.message());
                 return;
             }
         }
@@ -127,7 +140,7 @@ SyncModule::Impl::syncInfos(const std::shared_ptr<ChannelSocket>& socket,
         msgpack::pack(buffer, *syncMsg);
         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
         if (ec)
-            JAMI_ERR("%s", ec.message().c_str());
+            JAMI_ERROR("{:s}", ec.message());
     }
 }
 
@@ -171,14 +184,14 @@ SyncModule::cacheSyncConnection(std::shared_ptr<ChannelSocket>&& socket,
             msgpack::object_handle oh = msgpack::unpack(reinterpret_cast<const char*>(buf), len);
             oh.get().convert(msg);
         } catch (const std::exception& e) {
-            JAMI_WARN("[convInfo] error on sync: %s", e.what());
+            JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
             return len;
         }
 
         if (auto manager = dynamic_cast<ArchiveAccountManager*>(acc->accountManager()))
             manager->onSyncData(std::move(msg.ds), false);
 
-        if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty())
+        if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty())
             acc->convModule()->onSyncData(msg, peerId, device.toString());
         return len;
     });
diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp
index 88ecfbf969..97d1585871 100644
--- a/test/unitTest/conversation/conversation.cpp
+++ b/test/unitTest/conversation/conversation.cpp
@@ -67,6 +67,7 @@ public:
                                        const std::string& fakeCert = "");
 
     std::string aliceId;
+    std::string alice2Id;
     std::string bobId;
     std::string bob2Id;
     std::string carlaId;
@@ -89,6 +90,7 @@ private:
     void testSetMessageDisplayed();
     void testSetMessageDisplayedTwice();
     void testSetMessageDisplayedPreference();
+    void testSetMessageDisplayedAfterClone();
     void testVoteNonEmpty();
     void testNoBadFileInInitialCommit();
     void testNoBadCertInInitialCommit();
@@ -139,6 +141,7 @@ private:
     CPPUNIT_TEST(testSetMessageDisplayed);
     CPPUNIT_TEST(testSetMessageDisplayedTwice);
     CPPUNIT_TEST(testSetMessageDisplayedPreference);
+    CPPUNIT_TEST(testSetMessageDisplayedAfterClone);
     CPPUNIT_TEST(testVoteNonEmpty);
     CPPUNIT_TEST(testNoBadFileInInitialCommit);
     CPPUNIT_TEST(testNoBadCertInInitialCommit);
@@ -198,6 +201,11 @@ ConversationTest::tearDown()
 {
     auto bobArchive = std::filesystem::current_path().string() + "/bob.gz";
     std::remove(bobArchive.c_str());
+    auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
+    std::remove(aliceArchive.c_str());
+    if (!alice2Id.empty()) {
+        wait_for_removal_of(alice2Id);
+    }
 
     if (bob2Id.empty()) {
         wait_for_removal_of({aliceId, bobId, carlaId});
@@ -1389,6 +1397,119 @@ ConversationTest::testSetMessageDisplayedPreference()
     libjami::unregisterSignalHandlers();
 }
 
+void
+ConversationTest::testSetMessageDisplayedAfterClone()
+{
+    auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
+    auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
+    auto aliceUri = aliceAccount->getUsername();
+    auto bobUri = bobAccount->getUsername();
+    auto convId = libjami::startConversation(aliceId);
+    std::mutex mtx;
+    std::unique_lock<std::mutex> lk {mtx};
+    std::condition_variable cv;
+    std::map<std::string, std::shared_ptr<libjami::CallbackWrapperBase>> confHandlers;
+    bool requestReceived = false, memberMessageGenerated = false,
+         msgDisplayed = false, aliceRegistered = false;
+    confHandlers.insert(
+        libjami::exportable_callback<libjami::ConversationSignal::ConversationRequestReceived>(
+            [&](const std::string& /*accountId*/,
+                const std::string& /* conversationId */,
+                std::map<std::string, std::string> /*metadatas*/) {
+                requestReceived = true;
+                cv.notify_one();
+            }));
+    bool conversationReady = false, conversationAlice2Ready = false;
+    confHandlers.insert(libjami::exportable_callback<libjami::ConversationSignal::ConversationReady>(
+        [&](const std::string& accountId, const std::string& conversationId) {
+            if (accountId == bobId && conversationId == convId) {
+                conversationReady = true;
+            } else if (accountId == alice2Id) {
+                conversationAlice2Ready = true;
+            }
+            cv.notify_one();
+        }));
+    confHandlers.insert(libjami::exportable_callback<libjami::ConversationSignal::MessageReceived>(
+        [&](const std::string& accountId,
+            const std::string& conversationId,
+            std::map<std::string, std::string> message) {
+            if (accountId == aliceId && conversationId == convId && message["type"] == "member") {
+                memberMessageGenerated = true;
+                cv.notify_one();
+            }
+        }));
+    std::string aliceLastMsg;
+    confHandlers.insert(
+        libjami::exportable_callback<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
+            [&](const std::string& accountId,
+                const std::string& conversationId,
+                const std::string& peer,
+                const std::string& msgId,
+                int status) {
+                if (conversationId == convId && peer == aliceUri && status == 3) {
+                    if (accountId == bobId && msgId == conversationId)
+                        msgDisplayed = true;
+                    else if (accountId == aliceId)
+                        aliceLastMsg = msgId;
+                    cv.notify_one();
+                }
+            }));
+    confHandlers.insert(
+        libjami::exportable_callback<libjami::ConfigurationSignal::VolatileDetailsChanged>(
+            [&](const std::string&, const std::map<std::string, std::string>&) {
+                auto details = aliceAccount->getVolatileAccountDetails();
+                auto daemonStatus = details[libjami::Account::ConfProperties::Registration::STATUS];
+                if (daemonStatus == "REGISTERED") {
+                    aliceRegistered = true;
+                    cv.notify_one();
+                }
+            }));
+    libjami::registerSignalHandlers(confHandlers);
+
+    aliceLastMsg = "";
+    libjami::addConversationMember(aliceId, convId, bobUri);
+    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() {
+        return requestReceived && memberMessageGenerated && !aliceLastMsg.empty();
+    }));
+    memberMessageGenerated = false;
+    libjami::acceptConversationRequest(bobId, convId);
+    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return memberMessageGenerated; }));
+
+    aliceAccount->setMessageDisplayed("swarm:" + convId, convId, 3);
+
+    // Alice creates a second device
+    auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
+    std::remove(aliceArchive.c_str());
+    aliceAccount->exportArchive(aliceArchive);
+    std::map<std::string, std::string> details = libjami::getAccountTemplate("RING");
+    details[ConfProperties::TYPE] = "RING";
+    details[ConfProperties::DISPLAYNAME] = "alice2";
+    details[ConfProperties::ALIAS] = "alice2";
+    details[ConfProperties::UPNP_ENABLED] = "true";
+    details[ConfProperties::ARCHIVE_PASSWORD] = "";
+    details[ConfProperties::ARCHIVE_PIN] = "";
+    details[ConfProperties::ARCHIVE_PATH] = aliceArchive;
+    alice2Id = Manager::instance().addAccount(details);
+
+    // Disconnect alice2, to create a valid conv betwen Alice and alice1
+    CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return conversationAlice2Ready; }));
+
+    // Assert that message is set as displayed for self (for the read status)
+    auto membersInfos = libjami::getConversationMembers(aliceId, convId);
+    CPPUNIT_ASSERT(std::find_if(membersInfos.begin(),
+                                membersInfos.end(),
+                                [&](auto infos) {
+                                    return infos["uri"] == aliceUri
+                                           && infos["lastDisplayed"] == convId;
+                                })
+                   != membersInfos.end());
+
+
+
+
+    libjami::unregisterSignalHandlers();
+}
+
 std::string
 ConversationTest::createFakeConversation(std::shared_ptr<JamiAccount> account,
                                          const std::string& fakeCert)
-- 
GitLab