diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 6230e01f1b5887dcabdc1ac3e5e6a8bba9464b80..93f082d13252ede3f60bece8b9b75d57798a02d9 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -1785,13 +1785,14 @@ void Conversation::checkBootstrapMember(const asio::error_code& ec, std::vector<std::map<std::string, std::string>> members) { + auto acc = pimpl_->account_.lock(); if (ec == asio::error::operation_aborted - or pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0) + or pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 + or not acc) return; // We bootstrap the DRT with devices who already wrote in the repository. // However, in a conversation, a large number of devices may just watch // the conversation, but never write any message. - auto acc = pimpl_->account_.lock(); std::string uri; while (!members.empty()) { auto member = members.back(); diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index d779c41c6f8c4d4452d02625c21c0f14f1ea581a..1125efd3600b174747fab9bc6920f310afbf6a15 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -87,8 +87,6 @@ public: /** * Handle events to receive new commits */ - void checkConversationsEvents(); - bool handlePendingConversations(); void handlePendingConversation(const std::string& conversationId, const std::string& deviceId); // Requests @@ -199,12 +197,13 @@ public: // Conversations mutable std::mutex conversationsMtx_ {}; std::map<std::string, std::shared_ptr<Conversation>, std::less<>> conversations_; - std::mutex pendingConversationsFetchMtx_ {}; std::map<std::string, PendingConversationFetch, std::less<>> pendingConversationsFetch_; - bool startFetch(const std::string& convId, const std::string& deviceId) + bool startFetch(const std::string& convId, const std::string& deviceId, bool checkIfConv = false) { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + // conversationsMtx_ must be locked + if (checkIfConv && conversations_.find(convId) != conversations_.end()) + return false; // Already a conversation auto it = pendingConversationsFetch_.find(convId); if (it == pendingConversationsFetch_.end()) { PendingConversationFetch pf; @@ -223,6 +222,7 @@ public: void stopFetch(const std::string& convId, const std::string& deviceId) { + // conversationsMtx_ must be locked auto it = pendingConversationsFetch_.find(convId); if (it == pendingConversationsFetch_.end()) return; @@ -289,9 +289,6 @@ public: saveConvRequests(); } - // Receiving new commits - std::shared_ptr<RepeatedTask> conversationsEventHandler {}; - // When sending a new message, we need to send the notification to some peers of the // conversation However, the conversation may be not bootstraped, so the list will be empty. // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync @@ -349,31 +346,40 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId, // cloning. // This avoid the case when we try to clone from convInfos + sync message // at the same time. - if (!startFetch(convId, deviceId)) { - JAMI_WARN("[Account %s] Already fetching %s", accountId_.c_str(), convId.c_str()); - std::lock_guard<std::mutex> lk(convInfosMtx_); - auto ci = convInfos_.find(convId); - if (ci != convInfos_.end() && ci->second.lastDisplayed.empty()) { - // If fetchNewCommits called before sync - ci->second.lastDisplayed = lastDisplayed; - saveConvInfos(); + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + if (!startFetch(convId, deviceId, true)) { + JAMI_WARN("[Account %s] Already fetching %s", accountId_.c_str(), convId.c_str()); + std::lock_guard<std::mutex> lk(convInfosMtx_); + auto ci = convInfos_.find(convId); + if (ci != convInfos_.end() && ci->second.lastDisplayed.empty()) { + // If fetchNewCommits called before sync + ci->second.lastDisplayed = lastDisplayed; + saveConvInfos(); + } + return; } - return; } onNeedSocket_( convId, deviceId, [=](const auto& channel) { auto acc = account_.lock(); - std::unique_lock<std::mutex> lk(pendingConversationsFetchMtx_); - auto& pending = pendingConversationsFetch_[convId]; - if (!pending.ready) { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto it = pendingConversationsFetch_.find(convId); + if (it != pendingConversationsFetch_.end() && !it->second.ready) { if (channel) { - pending.ready = true; - pending.deviceId = channel->deviceId().toString(); - pending.socket = channel; - lk.unlock(); - checkConversationsEvents(); + it->second.ready = true; + it->second.deviceId = channel->deviceId().toString(); + it->second.socket = channel; + if (!it->second.cloning) { + it->second.cloning = true; + dht::ThreadPool::io().run( + [w = weak(), convId, deviceId = it->second.deviceId]() { + if (auto sthis = w.lock()) + sthis->handlePendingConversation(convId, deviceId); + }); + } return true; } else { stopFetch(convId, deviceId); @@ -464,11 +470,11 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, peer = std::move(peer), deviceId = std::move(deviceId), commitId = std::move(commitId)](const auto& channel) { + std::lock_guard<std::mutex> lk(conversationsMtx_); auto conversation = conversations_.find(conversationId); auto acc = account_.lock(); if (!channel || !acc || conversation == conversations_.end() || !conversation->second) { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); stopFetch(conversationId, deviceId); syncCnt.fetch_sub(1); return false; @@ -499,7 +505,7 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, conversationId); } { - std::lock_guard<std::mutex> lk(shared->pendingConversationsFetchMtx_); + std::lock_guard<std::mutex> lk(shared->conversationsMtx_); shared->pendingConversationsFetch_.erase(conversationId); } // Notify peers that a new commit is there (DRT) @@ -519,12 +525,8 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, } else { if (getRequest(conversationId) != std::nullopt) return; - { - // Check if the conversation is cloning - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); - if (pendingConversationsFetch_.find(conversationId) != pendingConversationsFetch_.end()) - return; - } + if (pendingConversationsFetch_.find(conversationId) != pendingConversationsFetch_.end()) + return; bool clone = false; { std::lock_guard<std::mutex> lkCi(convInfosMtx_); @@ -546,25 +548,6 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, } } -void -ConversationModule::Impl::checkConversationsEvents() -{ - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); - bool hasHandler = conversationsEventHandler and not conversationsEventHandler->isCancelled(); - if (not pendingConversationsFetch_.empty() and not hasHandler) { - conversationsEventHandler = Manager::instance().scheduler().scheduleAtFixedRate( - [w = weak()] { - if (auto this_ = w.lock()) - return this_->handlePendingConversations(); - return false; - }, - std::chrono::milliseconds(10)); - } else if (pendingConversationsFetch_.empty() and hasHandler) { - conversationsEventHandler->cancel(); - conversationsEventHandler.reset(); - } -} - // Clone and store conversation void ConversationModule::Impl::handlePendingConversation(const std::string& conversationId, @@ -574,16 +557,21 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat if (!acc) return; auto erasePending = [&] { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); - auto oldFetch = pendingConversationsFetch_.find(conversationId); - if (oldFetch != pendingConversationsFetch_.end() && !oldFetch->second.removeId.empty()) - removeConversation(oldFetch->second.removeId); - pendingConversationsFetch_.erase(conversationId); + std::string toRm; + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto oldFetch = pendingConversationsFetch_.find(conversationId); + if (oldFetch != pendingConversationsFetch_.end() && !oldFetch->second.removeId.empty()) + toRm = oldFetch->second.removeId; + pendingConversationsFetch_.erase(conversationId); + } + if (!toRm.empty()) + removeConversation(toRm); }; try { auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId); { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + std::lock_guard<std::mutex> lk(conversationsMtx_); auto oldFetch = pendingConversationsFetch_.find(conversationId); if (oldFetch != pendingConversationsFetch_.end() && oldFetch->second.socket) conversation->addGitSocket(DeviceId(deviceId), std::move(oldFetch->second.socket)); @@ -635,7 +623,7 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat std::map<std::string, std::string> preferences; std::map<std::string, std::string> lastDisplayed; { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + std::lock_guard<std::mutex> lk(conversationsMtx_); auto itFetch = pendingConversationsFetch_.find(conversationId); if (itFetch != pendingConversationsFetch_.end()) { preferences = std::move(itFetch->second.preferences); @@ -694,24 +682,6 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat erasePending(); } -bool -ConversationModule::Impl::handlePendingConversations() -{ - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); - for (auto it = pendingConversationsFetch_.begin(); it != pendingConversationsFetch_.end();) { - if (it->second.ready && !it->second.cloning) { - it->second.cloning = true; - dht::ThreadPool::io().run( - [w = weak(), conversationId = it->first, deviceId = it->second.deviceId]() { - if (auto sthis = w.lock()) - sthis->handlePendingConversation(conversationId, deviceId); - }); - } - ++it; - } - return !pendingConversationsFetch_.empty(); -} - std::optional<ConversationRequest> ConversationModule::Impl::getRequest(const std::string& id) const { @@ -1557,6 +1527,7 @@ ConversationModule::cloneConversationFrom(const std::string& conversationId, JAMI_WARN("Invalid member detected: %s", uri.c_str()); return; } + acc->forEachDevice(memberHash, [w = pimpl_->weak(), conversationId, oldConvId]( const std::shared_ptr<dht::crypto::PublicKey>& pk) { @@ -1564,30 +1535,38 @@ ConversationModule::cloneConversationFrom(const std::string& conversationId, auto deviceId = pk->getLongId().toString(); if (!sthis or deviceId == sthis->deviceId_) return; - if (!sthis->startFetch(conversationId, deviceId)) { - JAMI_WARN("[Account %s] Already fetching %s", - sthis->accountId_.c_str(), - conversationId.c_str()); - return; - } + { + std::lock_guard<std::mutex> lk(sthis->conversationsMtx_); + if (!sthis->startFetch(conversationId, deviceId, true)) { + JAMI_WARN("[Account %s] Already fetching %s", + sthis->accountId_.c_str(), + conversationId.c_str()); + return; + } + } // We need a onNeedSocket_ with old logic. sthis->onNeedSocket_( conversationId, pk->getLongId().toString(), [=](const auto& channel) { - auto acc = sthis->account_.lock(); - std::unique_lock<std::mutex> lk( - sthis->pendingConversationsFetchMtx_); - auto& pending = sthis->pendingConversationsFetch_[conversationId]; - if (!pending.ready) { - pending.removeId = oldConvId; + auto acc = sthis->account_.lock(); + std::lock_guard<std::mutex> lk(sthis->conversationsMtx_); + auto it = sthis->pendingConversationsFetch_.find(conversationId); + if (it != sthis->pendingConversationsFetch_.end() && !it->second.ready) { + it->second.removeId = oldConvId; if (channel) { - pending.ready = true; - pending.deviceId = channel->deviceId().toString(); - pending.socket = channel; - lk.unlock(); - sthis->checkConversationsEvents(); + it->second.ready = true; + it->second.deviceId = channel->deviceId().toString(); + it->second.socket = channel; + if (!it->second.cloning) { + it->second.cloning = true; + dht::ThreadPool::io().run( + [w = sthis->weak(), conversationId, deviceId = it->second.deviceId]() { + if (auto sthis = w.lock()) + sthis->handlePendingConversation(conversationId, deviceId); + }); + } return true; } else { sthis->stopFetch(conversationId, deviceId); @@ -1923,16 +1902,13 @@ ConversationModule::onSyncData(const SyncMsg& msg, } // Updates preferences for conversations + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); for (const auto& [convId, p] : msg.p) { - { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto itConv = pimpl_->conversations_.find(convId); - if (itConv != pimpl_->conversations_.end() && itConv->second) { - itConv->second->updatePreferences(p); - continue; - } + auto itConv = pimpl_->conversations_.find(convId); + if (itConv != pimpl_->conversations_.end() && itConv->second) { + itConv->second->updatePreferences(p); + continue; } - std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_); auto itFetch = pimpl_->pendingConversationsFetch_.find(convId); if (itFetch != pimpl_->pendingConversationsFetch_.end()) itFetch->second.preferences = p; @@ -1940,15 +1916,11 @@ ConversationModule::onSyncData(const SyncMsg& msg, // Updates displayed for conversations for (const auto& [convId, ld] : msg.ld) { - { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto itConv = pimpl_->conversations_.find(convId); - if (itConv != pimpl_->conversations_.end() && itConv->second) { - itConv->second->updateLastDisplayed(ld); - continue; - } + auto itConv = pimpl_->conversations_.find(convId); + if (itConv != pimpl_->conversations_.end() && itConv->second) { + itConv->second->updateLastDisplayed(ld); + continue; } - std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_); auto itFetch = pimpl_->pendingConversationsFetch_.find(convId); if (itFetch != pimpl_->pendingConversationsFetch_.end()) itFetch->second.lastDisplayed = ld; @@ -2672,13 +2644,10 @@ ConversationModule::getConversation(const std::string& convId) std::shared_ptr<ChannelSocket> ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const { - { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto convIt = pimpl_->conversations_.find(convId); - if (convIt != pimpl_->conversations_.end()) - return convIt->second->gitSocket(DeviceId(deviceId)); - } - std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_); + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto convIt = pimpl_->conversations_.find(convId); + if (convIt != pimpl_->conversations_.end()) + return convIt->second->gitSocket(DeviceId(deviceId)); auto it = pimpl_->pendingConversationsFetch_.find(convId); if (it != pimpl_->pendingConversationsFetch_.end()) return it->second.socket; @@ -2709,16 +2678,11 @@ ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view void ConversationModule::shutdownConnections() { - { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - for (auto& [k, conversation] : pimpl_->conversations_) - conversation->shutdownConnections(); - } - { - std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_); - for (auto& [k, pending] : pimpl_->pendingConversationsFetch_) - pending.socket = {}; - } + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [k, conversation] : pimpl_->conversations_) + conversation->shutdownConnections(); + for (auto& [k, pending] : pimpl_->pendingConversationsFetch_) + pending.socket = {}; } void ConversationModule::addSwarmChannel(const std::string& conversationId, diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp index 10bd152eff64d50574eb018bb13b0e86d71664c4..78131ed405491cf6e0388f15e1d4bcdab6adadaf 100644 --- a/test/unitTest/conversation/conversation.cpp +++ b/test/unitTest/conversation/conversation.cpp @@ -113,6 +113,7 @@ private: void testSyncWithoutPinnedCert(); void testImportMalformedContacts(); void testRemoveReaddMultipleDevice(); + void testCloneFromMultipleDevice(); void testSendReply(); void testSearchInConv(); void testConversationPreferences(); @@ -164,6 +165,7 @@ private: CPPUNIT_TEST(testSyncWithoutPinnedCert); CPPUNIT_TEST(testImportMalformedContacts); CPPUNIT_TEST(testRemoveReaddMultipleDevice); + CPPUNIT_TEST(testCloneFromMultipleDevice); CPPUNIT_TEST(testSendReply); CPPUNIT_TEST(testSearchInConv); CPPUNIT_TEST(testConversationPreferences); @@ -3332,6 +3334,131 @@ END:VCARD"; })); } +void +ConversationTest::testCloneFromMultipleDevice() +{ + std::cout << "\nRunning test: " << __func__ << std::endl; + + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); + auto bobUri = bobAccount->getUsername(); + auto aliceUri = aliceAccount->getUsername(); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<libjami::CallbackWrapperBase>> confHandlers; + auto requestReceived = false, requestReceivedBob2 = false; + confHandlers.insert( + libjami::exportable_callback<libjami::ConversationSignal::ConversationRequestReceived>( + [&](const std::string& accountId, + const std::string& conversationId, + std::map<std::string, std::string> /*metadatas*/) { + if (accountId == bobId) + requestReceived = true; + else if (accountId == bob2Id) + requestReceivedBob2 = true; + cv.notify_one(); + })); + std::string convId = ""; + auto conversationReadyBob = false, conversationReadyBob2 = false; + confHandlers.insert(libjami::exportable_callback<libjami::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == aliceId) { + convId = conversationId; + } else if (accountId == bobId) { + conversationReadyBob = true; + } else if (accountId == bob2Id) { + conversationReadyBob2 = true; + } + cv.notify_one(); + })); + auto memberMessageGenerated = false; + confHandlers.insert(libjami::exportable_callback<libjami::ConversationSignal::MessageReceived>( + [&](const std::string& accountId, + const std::string& conversationId, + std::map<std::string, std::string> message) { + if (accountId == aliceId && conversationId == convId) { + if (message["type"] == "member") + memberMessageGenerated = true; + } + cv.notify_one(); + })); + auto bob2Started = false; + confHandlers.insert( + libjami::exportable_callback<libjami::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string& accountId, const std::map<std::string, std::string>& details) { + if (accountId == bob2Id) { + auto daemonStatus = details.at( + libjami::Account::VolatileProperties::DEVICE_ANNOUNCED); + if (daemonStatus == "true") + bob2Started = true; + } + cv.notify_one(); + })); + auto conversationRmAlice = false; + confHandlers.insert( + libjami::exportable_callback<libjami::ConversationSignal::ConversationRemoved>( + [&](const std::string& accountId, const std::string&) { + if (accountId == aliceId) + conversationRmAlice = true; + cv.notify_one(); + })); + auto errorDetected = false; + confHandlers.insert( + libjami::exportable_callback<libjami::ConversationSignal::OnConversationError>( + [&](const std::string& /* accountId */, + const std::string& /* conversationId */, + int code, + const std::string& /* what */) { + if (code == 1) + errorDetected = true; + cv.notify_one(); + })); + libjami::registerSignalHandlers(confHandlers); + + // Bob creates a second device + auto bobArchive = std::filesystem::current_path().string() + "/bob.gz"; + std::remove(bobArchive.c_str()); + bobAccount->exportArchive(bobArchive); + std::map<std::string, std::string> details = libjami::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "BOB2"; + details[ConfProperties::ALIAS] = "BOB2"; + details[ConfProperties::UPNP_ENABLED] = "true"; + details[ConfProperties::ARCHIVE_PASSWORD] = ""; + details[ConfProperties::ARCHIVE_PIN] = ""; + details[ConfProperties::ARCHIVE_PATH] = bobArchive; + bob2Id = Manager::instance().addAccount(details); + + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return bob2Started; })); + + // Alice adds bob + requestReceived = false, requestReceivedBob2 = false; + aliceAccount->addContact(bobUri); + aliceAccount->sendTrustRequest(bobUri, {}); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return requestReceived && requestReceivedBob2; })); + libjami::acceptConversationRequest(bobId, convId); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { + return conversationReadyBob && conversationReadyBob2 && memberMessageGenerated; + })); + + // Remove contact + aliceAccount->removeContact(bobUri, false); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return conversationRmAlice; })); + + // wait that connections are closed. + std::this_thread::sleep_for(10s); + + // Alice re-adds Bob + auto oldConv = convId; + aliceAccount->addContact(bobUri); + aliceAccount->sendTrustRequest(bobUri, {}); + // This should retrieve the conversation from Bob and don't show any error + CPPUNIT_ASSERT(!cv.wait_for(lk, 10s, [&]() { return errorDetected; })); + CPPUNIT_ASSERT(oldConv == convId); // Check that convId didn't change and conversation is ready. +} + void ConversationTest::testSendReply() {