From 5953313b896060aad941e3fb18e93a92fc3a32d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Tue, 2 Jan 2024 11:53:43 -0500 Subject: [PATCH] conversation_module: change lock model for convReq Group operations to avoid race condition. This fix sporadic failures in testAddOfflineContactThenConnect Change-Id: Id136c024be3eb3efe0f5430a4b64480f0ef10804 --- src/jamidht/conversation_module.cpp | 59 +++++++++++++++++------------ 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index b4c9a5b08d..2bdba6bb09 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -327,11 +327,13 @@ public: void declineOtherConversationWith(const std::string& uri) noexcept; bool addConversationRequest(const std::string& id, const ConversationRequest& req) { - std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); + // conversationsRequestsMtx_ MUST BE LOCKED + if (isConversation(id)) + return false; auto it = conversationsRequests_.find(id); if (it != conversationsRequests_.end()) { - // Check if updated - if (req == it->second) + // We only remove requests (if accepted) or change .declined + if (!req.declined) return false; } else if (req.isOneToOne()) { // Check that we're not adding a second one to one trust request @@ -345,7 +347,7 @@ public: } void rmConversationRequest(const std::string& id) { - std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); + // conversationsRequestsMtx_ MUST BE LOCKED conversationsRequests_.erase(id); saveConvRequests(); } @@ -525,10 +527,14 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, return; } } - auto oldReq = getRequest(conversationId); - if (oldReq != std::nullopt && oldReq->declined) { - JAMI_DEBUG("[Account {}] Received a request for a conversation already declined.", accountId_); - return; + std::optional<ConversationRequest> oldReq; + { + std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); + oldReq = getRequest(conversationId); + if (oldReq != std::nullopt && oldReq->declined) { + JAMI_DEBUG("[Account {}] Received a request for a conversation already declined.", accountId_); + return; + } } JAMI_DEBUG("[Account {:s}] fetch commits from {:s}, for {:s}, commit {:s}", accountId_, @@ -645,7 +651,7 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, }, ""); } else { - if (getRequest(conversationId) != std::nullopt) + if (oldReq != std::nullopt) return; if (conv->pending) return; @@ -814,7 +820,7 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat std::optional<ConversationRequest> ConversationModule::Impl::getRequest(const std::string& id) const { - std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); + // ConversationsRequestsMtx MUST BE LOCKED auto it = conversationsRequests_.find(id); if (it != conversationsRequests_.end()) return it->second; @@ -1673,18 +1679,7 @@ ConversationModule::onTrustRequest(const std::string& uri, "clone the old one"); return; } - if (pimpl_->isConversation(conversationId)) { - JAMI_DEBUG("[Account {}] Received a request for a conversation " - "already handled. Ignore", - pimpl_->accountId_); - return; - } - if (pimpl_->getRequest(conversationId) != std::nullopt) { - JAMI_DEBUG("[Account {}] Received a request for a conversation " - "already existing. Ignore", - pimpl_->accountId_); - return; - } + std::unique_lock<std::mutex> lk(pimpl_->conversationsRequestsMtx_); ConversationRequest req; req.from = uri; req.conversationId = conversationId; @@ -1693,6 +1688,7 @@ ConversationModule::onTrustRequest(const std::string& uri, std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size()))); auto reqMap = req.toMap(); if (pimpl_->addConversationRequest(conversationId, std::move(req))) { + lk.unlock(); emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_, conversationId, uri, @@ -1702,12 +1698,17 @@ ConversationModule::onTrustRequest(const std::string& uri, conversationId, reqMap); pimpl_->needsSyncingCb_({}); + } else { + JAMI_DEBUG("[Account {}] Received a request for a conversation " + "already existing. Ignore", + pimpl_->accountId_); } } void ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value) { + std::unique_lock<std::mutex> lk(pimpl_->conversationsRequestsMtx_); ConversationRequest req(value); JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}", pimpl_->accountId_, @@ -1732,6 +1733,7 @@ ConversationModule::onConversationRequest(const std::string& from, const Json::V std::string oldConv; auto acc = pimpl_->account_.lock(); if (acc && isOneToOne) { + lk.unlock(); auto oldConv = getOneToOneConversation(from); if (!oldConv.empty()) { // Already a conversation with the contact. @@ -1749,6 +1751,7 @@ ConversationModule::onConversationRequest(const std::string& from, const Json::V } if (pimpl_->addConversationRequest(convId, std::move(req))) { + lk.unlock(); // Note: no need to sync here because other connected devices should receive // the same conversation request. Will sync when the conversation will be added if (isOneToOne) @@ -1788,9 +1791,11 @@ void ConversationModule::acceptConversationRequest(const std::string& conversationId, const std::string& deviceId) { // For all conversation members, try to open a git channel with this conversation ID + std::unique_lock<std::mutex> lkCr(pimpl_->conversationsRequestsMtx_); auto request = pimpl_->getRequest(conversationId); if (request == std::nullopt) { - if (auto conv = pimpl_->getConversation(conversationId)) { + lkCr.unlock(); + if (auto conv = pimpl_->getConversation(conversationId)) { std::unique_lock<std::mutex> lk(conv->mtx); if (!conv->conversation) { lk.unlock(); @@ -1801,6 +1806,7 @@ ConversationModule::acceptConversationRequest(const std::string& conversationId, return; } pimpl_->rmConversationRequest(conversationId); + lkCr.unlock(); if (pimpl_->updateConvReqCb_) pimpl_->updateConvReqCb_(conversationId, request->from, true); cloneConversationFrom(conversationId, request->from); @@ -2159,7 +2165,10 @@ ConversationModule::onSyncData(const SyncMsg& msg, std::vector<std::string> toClone; for (const auto& [key, convInfo] : msg.c) { const auto& convId = convInfo.id; - pimpl_->rmConversationRequest(convId); + { + std::lock_guard lk(pimpl_->conversationsRequestsMtx_); + pimpl_->rmConversationRequest(convId); + } auto conv = pimpl_->startConversation(convInfo); std::unique_lock<std::mutex> lk(conv->mtx); @@ -2223,6 +2232,7 @@ ConversationModule::onSyncData(const SyncMsg& msg, JAMI_WARNING("Detected request from ourself, ignore {}.", convId); continue; } + std::unique_lock<std::mutex> lk(pimpl_->conversationsRequestsMtx_); if (pimpl_->isConversation(convId)) { // Already handled request pimpl_->rmConversationRequest(convId); @@ -2232,6 +2242,7 @@ ConversationModule::onSyncData(const SyncMsg& msg, // New request if (!pimpl_->addConversationRequest(convId, req)) continue; + lk.unlock(); if (req.declined != 0) { // Request declined -- GitLab