diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 553d7c444e9984c678540cd7448f5e85aec073a1..63db5e8cbf0a46c7abe3c618c06fdad583f61935 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -228,13 +228,14 @@ public: } } #ifdef ENABLE_PLUGIN - auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager(); + auto& pluginChatManager + = Manager::instance().getJamiPluginManager().getChatServicesManager(); if (pluginChatManager.hasHandlers()) { auto cm = std::make_shared<JamiMessage>(shared->getAccountID(), - convId, - c.at("author") != shared->getUsername(), - c, - false); + convId, + c.at("author") != shared->getUsername(), + c, + false); cm->isSwarm = true; pluginChatManager.publishMessage(std::move(cm)); } @@ -655,19 +656,17 @@ Conversation::sendMessage(std::string&& message, } void -Conversation::sendMessage(Json::Value&& value, - const std::string& /*parent*/, - OnDoneCb&& cb) +Conversation::sendMessage(Json::Value&& value, const std::string& /*parent*/, OnDoneCb&& cb) { dht::ThreadPool::io().run([w = weak(), value = std::move(value), cb = std::move(cb)] { if (auto sthis = w.lock()) { auto shared = sthis->pimpl_->account_.lock(); if (!shared) return; + std::unique_lock<std::mutex> lk(sthis->pimpl_->writeMtx_); Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; wbuilder["indentation"] = ""; - std::unique_lock<std::mutex> lk(sthis->pimpl_->writeMtx_); auto commit = sthis->pimpl_->repository_->commitMessage( Json::writeString(wbuilder, value)); sthis->clearFetched(); @@ -679,6 +678,36 @@ Conversation::sendMessage(Json::Value&& value, }); } +void +Conversation::sendMessages(std::vector<Json::Value>&& messages, + const std::string& /*parent*/, + OnMultiDoneCb&& cb) +{ + dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] { + if (auto sthis = w.lock()) { + auto shared = sthis->pimpl_->account_.lock(); + if (!shared) + return; + std::vector<std::string> commits; + commits.reserve(messages.size()); + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + std::unique_lock<std::mutex> lk(sthis->pimpl_->writeMtx_); + for (const auto& message : messages) { + auto commit = sthis->pimpl_->repository_->commitMessage( + Json::writeString(wbuilder, message)); + commits.emplace_back(std::move(commit)); + } + lk.unlock(); + sthis->pimpl_->announce(commits); + sthis->clearFetched(); + if (cb) + cb(commits); + } + }); +} + void Conversation::loadMessages(const OnLoadMessages& cb, const std::string& fromMessage, size_t n) { diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h index c492a1cadc72439d27c76280b068cc8111ba34ca..8edee067232734d13a8cd18d56830e0728999dfb 100644 --- a/src/jamidht/conversation.h +++ b/src/jamidht/conversation.h @@ -88,6 +88,7 @@ using OnPullCb = std::function<void(bool fetchOk)>; using OnLoadMessages = std::function<void(std::vector<std::map<std::string, std::string>>&& messages)>; using OnDoneCb = std::function<void(bool, const std::string&)>; +using OnMultiDoneCb = std::function<void(const std::vector<std::string>&)>; class Conversation : public std::enable_shared_from_this<Conversation> { @@ -144,9 +145,11 @@ public: const std::string& type = "text/plain", const std::string& parent = "", OnDoneCb&& cb = {}); - void sendMessage(Json::Value&& message, - const std::string& parent = "", - OnDoneCb&& cb = {}); + void sendMessage(Json::Value&& message, const std::string& parent = "", OnDoneCb&& cb = {}); + // Note: used for replay. Should not be used by clients + void sendMessages(std::vector<Json::Value>&& messages, + const std::string& parent = "", + OnMultiDoneCb&& cb = {}); /** * Get a range of messages * @param cb The callback when loaded diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 14427380c3e40dd59f309308a850909c027daa5c..3753fd250b6c95f1f5b3a0b97c288a05ef56d8f6 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -151,6 +151,20 @@ public: return true; } + // Message send/load + void sendMessage(const std::string& conversationId, + Json::Value&& value, + const std::string& parent = "", + bool announce = true, + OnDoneCb&& cb = {}); + + void sendMessage(const std::string& conversationId, + std::string message, + const std::string& parent = "", + const std::string& type = "text/plain", + bool announce = true, + OnDoneCb&& cb = {}); + // The following informations are stored on the disk mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed std::map<std::string, ConvInfo> convInfos_; @@ -190,6 +204,10 @@ public: std::shared_ptr<RepeatedTask> conversationsEventHandler {}; std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); } + + // Replay conversations (after erasing/re-adding) + std::mutex replayMtx_; + std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_; }; ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account, @@ -430,12 +448,43 @@ ConversationModule::Impl::handlePendingConversations() return; } auto commitId = conversation->join(); + std::vector<std::map<std::string, std::string>> messages; + { + std::lock_guard<std::mutex> lk(sthis->replayMtx_); + auto replayIt = sthis->replay_.find(conversationId); + if (replayIt != sthis->replay_.end()) { + messages = std::move(replayIt->second); + sthis->replay_.erase(replayIt); + } + } if (!commitId.empty()) sthis->sendMessageNotification(conversationId, commitId, false); // Inform user that the conversation is ready emitSignal<DRing::ConversationSignal::ConversationReady>(sthis->accountId_, conversationId); sthis->needsSyncingCb_(); + std::vector<Json::Value> values; + values.reserve(messages.size()); + for (const auto& message : messages) { + // For now, only replay text messages. + // File transfers will need more logic, and don't care about calls for now. + if (message.at("type") == "text/plain" + && message.at("author") == sthis->username_) { + Json::Value json; + json["body"] = message.at("body"); + json["type"] = "text/plain"; + values.emplace_back(std::move(json)); + } + } + if (!values.empty()) + conversation->sendMessages( + std::move(values), "", [w, conversationId](const auto& commits) { + auto shared = w.lock(); + if (!shared || !commits.empty()) + shared->sendMessageNotification(conversationId, + *commits.rbegin(), + true); + }); } } catch (const std::exception& e) { emitSignal<DRing::ConversationSignal::OnConversationError>(sthis->accountId_, @@ -530,6 +579,47 @@ ConversationModule::Impl::sendMessageNotification(const Conversation& conversati } } +void +ConversationModule::Impl::sendMessage(const std::string& conversationId, + std::string message, + const std::string& parent, + const std::string& type, + bool announce, + OnDoneCb&& cb) +{ + Json::Value json; + json["body"] = std::move(message); + json["type"] = type; + sendMessage(conversationId, std::move(json), parent, announce, std::move(cb)); +} + +void +ConversationModule::Impl::sendMessage(const std::string& conversationId, + Json::Value&& value, + const std::string& parent, + bool announce, + OnDoneCb&& cb) +{ + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto conversation = conversations_.find(conversationId); + if (conversation != conversations_.end() && conversation->second) { + conversation->second->sendMessage( + std::move(value), + parent, + [this, conversationId, announce, cb = std::move(cb)](bool ok, + const std::string& commitId) { + if (cb) + cb(ok, commitId); + if (!announce) + return; + if (ok) + sendMessageNotification(conversationId, commitId, true); + else + JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); + }); + } +} + //////////////////////////////////////////////////////////////// void @@ -886,10 +976,7 @@ ConversationModule::sendMessage(const std::string& conversationId, bool announce, OnDoneCb&& cb) { - Json::Value json; - json["body"] = std::move(message); - json["type"] = type; - sendMessage(conversationId, std::move(json), parent, announce, std::move(cb)); + pimpl_->sendMessage(conversationId, std::move(message), parent, type, announce, std::move(cb)); } void @@ -899,24 +986,7 @@ ConversationModule::sendMessage(const std::string& conversationId, bool announce, OnDoneCb&& cb) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation != pimpl_->conversations_.end() && conversation->second) { - conversation->second->sendMessage( - std::move(value), - parent, - [this, conversationId, announce, cb = std::move(cb)](bool ok, - const std::string& commitId) { - if (cb) - cb(ok, commitId); - if (!announce) - return; - if (ok) - pimpl_->sendMessageNotification(conversationId, commitId, true); - else - JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); - }); - } + pimpl_->sendMessage(conversationId, std::move(value), parent, announce, std::move(cb)); } void @@ -1464,6 +1534,30 @@ ConversationModule::checkIfRemoveForCompat(const std::string& peerUri) removeConversation(convId); } +void +ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId) +{ + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto acc = pimpl_->account_.lock(); + auto conversation = pimpl_->conversations_.find(oldConvId); + if (acc && conversation != pimpl_->conversations_.end() && conversation->second) { + std::promise<bool> waitLoad; + std::future<bool> fut = waitLoad.get_future(); + // we should wait for loadMessage, because it will be deleted after this. + conversation->second->loadMessages( + [&](auto&& messages) { + std::reverse(messages.begin(), + messages.end()); // Log is inverted as we want to replay + std::lock_guard<std::mutex> lk(pimpl_->replayMtx_); + pimpl_->replay_[newConvId] = std::move(messages); + waitLoad.set_value(true); + }, + "", + 0); + fut.wait(); + } +} + std::map<std::string, ConvInfo> ConversationModule::convInfos(const std::string& accountId) { diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h index 9891ff9f9cb595331f52587dd3ec7487f17db760..8c52d9afabb9f31a10bb250383c71fd2922be47a 100644 --- a/src/jamidht/conversation_module.h +++ b/src/jamidht/conversation_module.h @@ -327,6 +327,8 @@ public: */ void checkIfRemoveForCompat(const std::string& peerUri); + void initReplay(const std::string& oldConvId, const std::string& newConvId); + // The following methods modify what is stored on the disk static void saveConvInfos(const std::string& accountId, const std::map<std::string, ConvInfo>& conversations); diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 102329a287024ad6a9f194d4d9d6cdab2077926f..dab5ac9f16382336290ece5d9956852e9a47cf92 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1172,6 +1172,7 @@ JamiAccount::loadAccount(const std::string& archive_password, // TODO: In the future, we may want to re-commit the messages we // may have send in the request we sent. if (updateConvForContact(uri, oldConv, convFromReq)) { + convModule()->initReplay(oldConv, convFromReq); convModule()->removeConversation(oldConv); convModule()->cloneConversationFrom(convFromReq, uri); } diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp index 2f97957bd09297cb5332f7a23386517b85c3b18f..448a8f2ada58761edd250bdccde1be22cb42a06a 100644 --- a/test/unitTest/conversation/conversation.cpp +++ b/test/unitTest/conversation/conversation.cpp @@ -105,6 +105,7 @@ private: void testDoNotLoadIncorrectConversation(); void testSyncingWhileAccepting(); void testCountInteractions(); + void testReplayConversation(); CPPUNIT_TEST_SUITE(ConversationTest); CPPUNIT_TEST(testCreateConversation); @@ -134,6 +135,7 @@ private: CPPUNIT_TEST(testDoNotLoadIncorrectConversation); CPPUNIT_TEST(testSyncingWhileAccepting); CPPUNIT_TEST(testCountInteractions); + CPPUNIT_TEST(testReplayConversation); CPPUNIT_TEST_SUITE_END(); }; @@ -2258,6 +2260,90 @@ ConversationTest::testCountInteractions() CPPUNIT_ASSERT(DRing::countInteractions(aliceId, convId, msgId2, "", "") == 1); } +void +ConversationTest::testReplayConversation() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); + auto bobUri = bobAccount->getUsername(); + auto aliceUri = aliceAccount->getUsername(); + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + bool conversationReady = false, requestReceived = false, memberMessageGenerated = false, + conversationRemoved = false, messageReceived = false; + std::vector<std::string> bobMessages; + std::string convId = ""; + confHandlers.insert(DRing::exportable_callback<DRing::ConfigurationSignal::IncomingTrustRequest>( + [&](const std::string& account_id, + const std::string& /*from*/, + const std::string& /*conversationId*/, + const std::vector<uint8_t>& /*payload*/, + time_t /*received*/) { + if (account_id == bobId) + requestReceived = true; + cv.notify_one(); + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == aliceId) { + convId = conversationId; + } else if (accountId == bobId) { + conversationReady = true; + } + cv.notify_one(); + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationRemoved>( + [&](const std::string& accountId, const std::string&) { + if (accountId == aliceId) + conversationRemoved = true; + cv.notify_one(); + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::MessageReceived>( + [&](const std::string& accountId, + const std::string& conversationId, + std::map<std::string, std::string> message) { + if (accountId == aliceId && conversationId == convId) { + messageReceived = true; + if (message["type"] == "member") + memberMessageGenerated = true; + } else if (accountId == bobId && message["type"] == "text/plain") { + bobMessages.emplace_back(message["body"]); + } + cv.notify_one(); + })); + DRing::registerSignalHandlers(confHandlers); + requestReceived = false; + aliceAccount->addContact(bobUri); + aliceAccount->sendTrustRequest(bobUri, {}); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return requestReceived; })); + CPPUNIT_ASSERT(bobAccount->acceptTrustRequest(aliceUri)); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { + return conversationReady && memberMessageGenerated; + })); + // removeContact + aliceAccount->removeContact(bobUri, false); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return conversationRemoved; })); + // re-add + CPPUNIT_ASSERT(convId != ""); + auto oldConvId = convId; + convId = ""; + aliceAccount->addContact(bobUri); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return !convId.empty(); })); + DRing::sendMessage(aliceId, convId, "foo"s, ""); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return messageReceived; })); + DRing::sendMessage(aliceId, convId, "bar"s, ""); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return messageReceived; })); + convId = ""; + bobMessages.clear(); + aliceAccount->sendTrustRequest(bobUri, {}); + // Should retrieve previous conversation + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { + return bobMessages.size() == 2 && bobMessages[0] == "foo" && bobMessages[1] == "bar"; + })); +} + } // namespace test } // namespace jami