Skip to content
Snippets Groups Projects
Commit e048f1b9 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

conversation: mutex is History

Change-Id: I178b437e99cf31c13622df136d635ff2147a04a9
parent 0bd34020
No related branches found
No related tags found
No related merge requests found
...@@ -122,6 +122,12 @@ using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>; ...@@ -122,6 +122,12 @@ using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
struct History struct History
{ {
// While loading the history, we need to avoid:
// - reloading history (can just be ignored)
// - adding new commits (should wait for history to be loaded)
std::mutex mutex {};
std::condition_variable cv {};
bool loading {false};
MessageList messageList {}; MessageList messageList {};
std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {}; std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {}; std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
...@@ -670,19 +676,12 @@ public: ...@@ -670,19 +676,12 @@ public:
/** /**
* Loaded history represents the linearized history to show for clients * Loaded history represents the linearized history to show for clients
*/ */
mutable std::mutex loadingMtx_;
mutable History loadedHistory_ {}; mutable History loadedHistory_ {};
std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory( std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
const std::vector<std::map<std::string, std::string>>& commits, const std::vector<std::map<std::string, std::string>>& commits,
bool messageReceived = false, bool messageReceived = false,
bool commitFromSelf = false, bool commitFromSelf = false,
History* history = nullptr) const; History* history = nullptr) const;
// While loading the history, we need to avoid:
// - reloading history (can just be ignored)
// - adding new commits (should wait for history to be loaded)
bool isLoadingHistory_ {false};
mutable std::mutex historyMtx_ {};
mutable std::condition_variable historyCv_ {};
void handleReaction(History& history, void handleReaction(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const; const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
...@@ -879,15 +878,13 @@ Conversation::Impl::loadMessages(const LogOptions& options) ...@@ -879,15 +878,13 @@ Conversation::Impl::loadMessages(const LogOptions& options)
std::vector<libjami::SwarmMessage> std::vector<libjami::SwarmMessage>
Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory) Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
{ {
auto history = optHistory ? optHistory : &loadedHistory_;
std::lock_guard lk(loadingMtx_); // history->mutex is locked by the caller
if (!repository_ || history->loading) {
if (!optHistory) { return {};
std::lock_guard lock(historyMtx_);
if (!repository_ || isLoadingHistory_)
return {};
isLoadingHistory_ = true;
} }
history->loading = true;
// By convention, if options.nbOfCommits is zero, then we // By convention, if options.nbOfCommits is zero, then we
// don't impose a limit on the number of commits returned. // don't impose a limit on the number of commits returned.
...@@ -956,17 +953,17 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory ...@@ -956,17 +953,17 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory
replies.erase(it); replies.erase(it);
} }
std::shared_ptr<libjami::SwarmMessage> firstMsg; std::shared_ptr<libjami::SwarmMessage> firstMsg;
if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) { if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
firstMsg = *loadedHistory_.messageList.rbegin(); firstMsg = *loadedHistory_.messageList.rbegin();
} }
auto added = addToHistory({message}, false, false, optHistory); auto added = addToHistory({message}, false, false, history);
if (!added.empty() && firstMsg) { if (!added.empty() && firstMsg) {
emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
repository_->id(), repository_->id(),
*firstMsg); *firstMsg);
} }
msgList.insert(msgList.end(), added.begin(), added.end()); msgList.insert(msgList.end(), added.begin(), added.end());
}, },
/* postCondition */ /* postCondition */
[&](auto, auto, auto) { [&](auto, auto, auto) {
// Stop logging if there was a limit set on the number of commits // Stop logging if there was a limit set on the number of commits
...@@ -979,18 +976,15 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory ...@@ -979,18 +976,15 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory
options.from, options.from,
options.logIfNotFound); options.logIfNotFound);
history->loading = false;
history->cv.notify_all();
// Convert for client (remove ptr) // Convert for client (remove ptr)
std::vector<libjami::SwarmMessage> ret; std::vector<libjami::SwarmMessage> ret;
ret.reserve(msgList.size()); ret.reserve(msgList.size());
for (const auto& msg: msgList) { for (const auto& msg: msgList) {
ret.emplace_back(*msg); ret.emplace_back(*msg);
} }
if (!optHistory) {
std::lock_guard lock(historyMtx_);
isLoadingHistory_ = false;
historyCv_.notify_all();
}
return ret; return ret;
} }
...@@ -1180,9 +1174,9 @@ Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::st ...@@ -1180,9 +1174,9 @@ Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::st
if (!acc) if (!acc)
return {}; return {};
auto username = acc->getUsername(); auto username = acc->getUsername();
if (messageReceived && (!optHistory && isLoadingHistory_)) { if (messageReceived && (optHistory == &loadedHistory_ && optHistory->loading)) {
std::unique_lock lk(historyMtx_); std::unique_lock lk(optHistory->mutex);
historyCv_.wait(lk, [&] { return !isLoadingHistory_; }); optHistory->cv.wait(lk, [&] { return !optHistory->loading; });
} }
std::vector<std::shared_ptr<libjami::SwarmMessage>> messages; std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
auto addCommit = [&](const auto& commit) { auto addCommit = [&](const auto& commit) {
...@@ -1200,7 +1194,7 @@ Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::st ...@@ -1200,7 +1194,7 @@ Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::st
auto sharedCommit = std::make_shared<libjami::SwarmMessage>(); auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
sharedCommit->fromMapStringString(commit); sharedCommit->fromMapStringString(commit);
// Set message status based on cache (only on history for client) // Set message status based on cache (only on history for client)
if (!commitFromSelf && optHistory == nullptr) { if (!commitFromSelf && optHistory == &loadedHistory_) {
std::lock_guard lk(messageStatusMtx_); std::lock_guard lk(messageStatusMtx_);
for (const auto& member: repository_->members()) { for (const auto& member: repository_->members()) {
// If we have a status cached, use it // If we have a status cached, use it
...@@ -1708,6 +1702,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options ...@@ -1708,6 +1702,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options
return; return;
dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] { dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
if (auto sthis = w.lock()) { if (auto sthis = w.lock()) {
std::lock_guard lk(sthis->pimpl_->loadedHistory_.mutex);
cb(sthis->pimpl_->loadMessages2(options)); cb(sthis->pimpl_->loadMessages2(options));
} }
}); });
...@@ -1716,7 +1711,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options ...@@ -1716,7 +1711,7 @@ Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options
void void
Conversation::clearCache() Conversation::clearCache()
{ {
std::lock_guard lk(pimpl_->loadingMtx_); std::lock_guard lk(pimpl_->loadedHistory_.mutex);
pimpl_->loadedHistory_.messageList.clear(); pimpl_->loadedHistory_.messageList.clear();
pimpl_->loadedHistory_.quickAccess.clear(); pimpl_->loadedHistory_.quickAccess.clear();
pimpl_->loadedHistory_.pendingEditions.clear(); pimpl_->loadedHistory_.pendingEditions.clear();
...@@ -1730,17 +1725,16 @@ Conversation::clearCache() ...@@ -1730,17 +1725,16 @@ Conversation::clearCache()
std::string std::string
Conversation::lastCommitId() const Conversation::lastCommitId() const
{ {
LogOptions options;
options.nbOfCommits = 1;
options.skipMerge = true;
History optHistory;
{ {
std::lock_guard lk(pimpl_->historyMtx_); std::lock_guard lk(pimpl_->loadedHistory_.mutex);
if (!pimpl_->loadedHistory_.messageList.empty()) if (!pimpl_->loadedHistory_.messageList.empty())
return (*pimpl_->loadedHistory_.messageList.begin())->id; return (*pimpl_->loadedHistory_.messageList.begin())->id;
} }
LogOptions options;
std::lock_guard lk(pimpl_->writeMtx_); options.nbOfCommits = 1;
options.skipMerge = true;
History optHistory;
std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
auto res = pimpl_->loadMessages2(options, &optHistory); auto res = pimpl_->loadMessages2(options, &optHistory);
if (res.empty()) if (res.empty())
return {}; return {};
...@@ -2241,7 +2235,7 @@ Conversation::Impl::updateStatus(const std::string& uri, ...@@ -2241,7 +2235,7 @@ Conversation::Impl::updateStatus(const std::string& uri,
options.logIfNotFound = false; options.logIfNotFound = false;
options.fastLog = true; options.fastLog = true;
History optHistory; History optHistory;
std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status. std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status.
auto res = loadMessages2(options, &optHistory); auto res = loadMessages2(options, &optHistory);
if (res.size() == 0) { if (res.size() == 0) {
// In this case, commit is not received yet, so we cache it // In this case, commit is not received yet, so we cache it
...@@ -2562,6 +2556,7 @@ Conversation::countInteractions(const std::string& toId, ...@@ -2562,6 +2556,7 @@ Conversation::countInteractions(const std::string& toId,
options.logIfNotFound = false; options.logIfNotFound = false;
options.fastLog = true; options.fastLog = true;
History history; History history;
std::lock_guard lk(history.mutex);
auto res = pimpl_->loadMessages2(options, &history); auto res = pimpl_->loadMessages2(options, &history);
return res.size(); return res.size();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment