diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 50bf4c1e9a7c9c9f3d3fdd14e9346c23ae40d21a..bf658536b25c5ca9b108370bfb603fce648424e8 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -122,6 +122,12 @@ using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>; struct History { + // While loading the history, we need to avoid: + // - reloading history (can just be ignored) + // - adding new commits (should wait for history to be loaded) + std::mutex mutex {}; + std::condition_variable cv {}; + bool loading {false}; MessageList messageList {}; std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {}; std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {}; @@ -670,19 +676,12 @@ public: /** * Loaded history represents the linearized history to show for clients */ - mutable std::mutex loadingMtx_; mutable History loadedHistory_ {}; std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory( const std::vector<std::map<std::string, std::string>>& commits, bool messageReceived = false, bool commitFromSelf = false, History* history = nullptr) const; - // While loading the history, we need to avoid: - // - reloading history (can just be ignored) - // - adding new commits (should wait for history to be loaded) - bool isLoadingHistory_ {false}; - mutable std::mutex historyMtx_ {}; - mutable std::condition_variable historyCv_ {}; void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const; @@ -879,15 +878,13 @@ Conversation::Impl::loadMessages(const LogOptions& options) std::vector<libjami::SwarmMessage> Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory) { + auto history = optHistory ? optHistory : &loadedHistory_; - std::lock_guard lk(loadingMtx_); - - if (!optHistory) { - std::lock_guard lock(historyMtx_); - if (!repository_ || isLoadingHistory_) - return {}; - isLoadingHistory_ = true; + // history->mutex is locked by the caller + if (!repository_ || history->loading) { + return {}; } + history->loading = true; // By convention, if options.nbOfCommits is zero, then we // don't impose a limit on the number of commits returned. @@ -956,17 +953,17 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory replies.erase(it); } std::shared_ptr<libjami::SwarmMessage> firstMsg; - if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) { + if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) { firstMsg = *loadedHistory_.messageList.rbegin(); } - auto added = addToHistory({message}, false, false, optHistory); + auto added = addToHistory({message}, false, false, history); if (!added.empty() && firstMsg) { emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg); } msgList.insert(msgList.end(), added.begin(), added.end()); - }, + }, /* postCondition */ [&](auto, auto, auto) { // Stop logging if there was a limit set on the number of commits @@ -979,18 +976,15 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory options.from, options.logIfNotFound); + history->loading = false; + history->cv.notify_all(); + // Convert for client (remove ptr) std::vector<libjami::SwarmMessage> ret; ret.reserve(msgList.size()); for (const auto& msg: msgList) { ret.emplace_back(*msg); } - if (!optHistory) { - std::lock_guard lock(historyMtx_); - isLoadingHistory_ = false; - historyCv_.notify_all(); - } - return ret; } @@ -1180,9 +1174,9 @@ Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::st if (!acc) return {}; auto username = acc->getUsername(); - if (messageReceived && (!optHistory && isLoadingHistory_)) { - std::unique_lock lk(historyMtx_); - historyCv_.wait(lk, [&] { return !isLoadingHistory_; }); + if (messageReceived && (optHistory == &loadedHistory_ && optHistory->loading)) { + std::unique_lock lk(optHistory->mutex); + optHistory->cv.wait(lk, [&] { return !optHistory->loading; }); } std::vector<std::shared_ptr<libjami::SwarmMessage>> messages; auto addCommit = [&](const auto& commit) { @@ -1200,7 +1194,7 @@ Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::st auto sharedCommit = std::make_shared<libjami::SwarmMessage>(); sharedCommit->fromMapStringString(commit); // Set message status based on cache (only on history for client) - if (!commitFromSelf && optHistory == nullptr) { + if (!commitFromSelf && optHistory == &loadedHistory_) { std::lock_guard lk(messageStatusMtx_); for (const auto& member: repository_->members()) { // If we have a status cached, use it @@ -1708,6 +1702,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options return; dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] { if (auto sthis = w.lock()) { + std::lock_guard lk(sthis->pimpl_->loadedHistory_.mutex); cb(sthis->pimpl_->loadMessages2(options)); } }); @@ -1716,7 +1711,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options void Conversation::clearCache() { - std::lock_guard lk(pimpl_->loadingMtx_); + std::lock_guard lk(pimpl_->loadedHistory_.mutex); pimpl_->loadedHistory_.messageList.clear(); pimpl_->loadedHistory_.quickAccess.clear(); pimpl_->loadedHistory_.pendingEditions.clear(); @@ -1730,17 +1725,16 @@ Conversation::clearCache() std::string Conversation::lastCommitId() const { - LogOptions options; - options.nbOfCommits = 1; - options.skipMerge = true; - History optHistory; { - std::lock_guard lk(pimpl_->historyMtx_); + std::lock_guard lk(pimpl_->loadedHistory_.mutex); if (!pimpl_->loadedHistory_.messageList.empty()) return (*pimpl_->loadedHistory_.messageList.begin())->id; } - - std::lock_guard lk(pimpl_->writeMtx_); + LogOptions options; + options.nbOfCommits = 1; + options.skipMerge = true; + History optHistory; + std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex); auto res = pimpl_->loadMessages2(options, &optHistory); if (res.empty()) return {}; @@ -2241,7 +2235,7 @@ Conversation::Impl::updateStatus(const std::string& uri, options.logIfNotFound = false; options.fastLog = true; History optHistory; - std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status. + std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status. auto res = loadMessages2(options, &optHistory); if (res.size() == 0) { // In this case, commit is not received yet, so we cache it @@ -2562,6 +2556,7 @@ Conversation::countInteractions(const std::string& toId, options.logIfNotFound = false; options.fastLog = true; History history; + std::lock_guard lk(history.mutex); auto res = pimpl_->loadMessages2(options, &history); return res.size(); }