diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index dab156d9f88159ac39974f17260567cfea282705..2baf06bd686e0043f11b551a66a4b66ced1793a4 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -443,8 +443,8 @@ public: } } - if (announceMember) { - shared->saveMembers(convId, repository_->memberUris("", {})); + if (announceMember && onMembersChanged_) { + onMembersChanged_(repository_->memberUris("", {})); } } } @@ -624,6 +624,7 @@ public: mutable std::mutex lastDisplayedMtx_ {}; // for lastDisplayed_ mutable std::map<std::string, std::string> lastDisplayed_ {}; std::function<void(const std::string&, const std::string&)> lastDisplayedUpdatedCb_ {}; + OnMembersChanged onMembersChanged_ {}; // Manage hosted calls on this device std::string hostedCallsPath_ {}; @@ -1921,6 +1922,12 @@ Conversation::onLastDisplayedUpdated( pimpl_->lastDisplayedUpdatedCb_ = std::move(lastDisplayedUpdatedCb); } +void +Conversation::onMembersChanged(OnMembersChanged&& cb) +{ + pimpl_->onMembersChanged_ = std::move(cb); +} + void Conversation::onNeedSocket(NeedSocketCb needSocket) { diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h index 25aeae18d611ed4d1c8f24fd7511d45df2c56549..953856294d35f87b3191683a17694d6267ec046c 100644 --- a/src/jamidht/conversation.h +++ b/src/jamidht/conversation.h @@ -111,7 +111,13 @@ struct ConvInfo std::string lastDisplayed {}; ConvInfo() = default; - ConvInfo(const Json::Value& json); + ConvInfo(const ConvInfo&) = default; + ConvInfo(ConvInfo&&) = default; + ConvInfo(const std::string& id) : id(id) {}; + explicit ConvInfo(const Json::Value& json); + + ConvInfo& operator=(const ConvInfo&) = default; + ConvInfo& operator=(ConvInfo&&) = default; Json::Value toJson() const; @@ -129,6 +135,7 @@ using OnLoadMessages using OnCommitCb = std::function<void(const std::string&)>; using OnDoneCb = std::function<void(bool, const std::string&)>; using OnMultiDoneCb = std::function<void(const std::vector<std::string>&)>; +using OnMembersChanged = std::function<void(const std::vector<std::string>&)>; using DeviceId = dht::PkId; using GitSocketList = std::map<DeviceId, std::shared_ptr<dhtnet::ChannelSocket>>; using ChannelCb = std::function<bool(const std::shared_ptr<dhtnet::ChannelSocket>&)>; @@ -184,6 +191,8 @@ public: void onLastDisplayedUpdated( std::function<void(const std::string&, const std::string&)>&& lastDisplayedUpdatedCb); + void onMembersChanged(OnMembersChanged&& cb); + /** * Set the callback that will be called whenever a new socket will be needed * @param cb diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 0502f7641d7977d5e61f673270ad26ea311e16bd..7a7b3311873d6ac54c2cd1facd870f94610a75ec 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -51,6 +51,66 @@ struct PendingConversationFetch std::shared_ptr<dhtnet::ChannelSocket> socket {}; }; +struct SyncedConversation { + std::mutex mtx; + ConvInfo info; + std::unique_ptr<PendingConversationFetch> pending; + std::shared_ptr<Conversation> conversation; + + SyncedConversation(const std::string& convId) + : info {convId} + {} + SyncedConversation(const ConvInfo& info) + : info {info} + {} + + bool startFetch(const std::string& deviceId, bool checkIfConv = false) + { + // conversation mtx must be locked + if (checkIfConv && conversation) + return false; // Already a conversation + if (pending) { + if (pending->ready) + return false; // Already doing stuff + // if (pending->deviceId == deviceId) + // return false; // Already fetching + if (pending->connectingTo.find(deviceId) != pending->connectingTo.end()) + return false; // Already connecting to this device + } else { + pending = std::make_unique<PendingConversationFetch>(); + pending->connectingTo.insert(deviceId); + return true; + } + return true; + } + + void stopFetch(const std::string& deviceId) + { + // conversation mtx must be locked + if (!pending) + return; + pending->connectingTo.erase(deviceId); + if (pending->connectingTo.empty()) + pending.reset(); + } + + std::vector<std::map<std::string, std::string>> + getMembers(bool includeBanned = false) const + { + // conversation mtx must be locked + if (conversation) + return conversation->getMembers(true, true, includeBanned); + // If we're cloning, we can return the initial members + std::vector<std::map<std::string, std::string>> result; + result.reserve(info.members.size()); + for (const auto& uri : info.members) { + result.emplace_back(std::map<std::string, std::string> {{"uri", uri}}); + } + return result; + } + +}; + class ConversationModule::Impl : public std::enable_shared_from_this<Impl> { public: @@ -62,6 +122,30 @@ public: UpdateConvReq&& updateConvReqCb, OneToOneRecvCb&& oneToOneRecvCb); + template <typename S, typename T> + inline auto withConv(const S& convId, T&& cb) const + { + if (auto conv = getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + return cb(*conv); + } else { + JAMI_WARNING("Conversation {} not found", convId); + } + return decltype(cb(std::declval<SyncedConversation&>()))(); + } + template <typename S, typename T> + inline auto withConversation(const S& convId, T&& cb) + { + if (auto conv = getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return cb(*conv->conversation); + } else { + JAMI_WARNING("Conversation {} not found", convId); + } + return decltype(cb(std::declval<Conversation&>()))(); + } + // Retrieving recent commits /** * Clone a conversation (initial) from device @@ -73,6 +157,10 @@ public: const std::string& peer, const std::string& convId, const std::string& lastDisplayed = ""); + void cloneConversation(const std::string& deviceId, + const std::string& peer, + const std::shared_ptr<SyncedConversation>& conv, + const std::string& lastDisplayed = ""); /** * Pull remote device @@ -102,6 +190,7 @@ public: */ std::vector<std::map<std::string, std::string>> getConversationMembers( const std::string& conversationId, bool includeBanned = false) const; + void setConversationMembers(const std::string& convId, const std::vector<std::string>& members); /** * Remove a repository and all files @@ -110,11 +199,13 @@ public: * @param force True if ignore the removing flag */ void removeRepository(const std::string& convId, bool sync, bool force = false); + void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false); /** * Remove a conversation * @param conversationId */ bool removeConversation(const std::string& conversationId); + bool removeConversationImpl(SyncedConversation& conv); /** * Send a message notification to all members @@ -138,16 +229,17 @@ public: bool isConversation(const std::string& convId) const { std::lock_guard<std::mutex> lk(conversationsMtx_); - return conversations_.find(convId) != conversations_.end(); + auto c = conversations_.find(convId); + return c != conversations_.end() && c->second; } + /** * @return if a convId is an accepted conversation */ bool isAcceptedConversation(const std::string& convId) const { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto itConv = convInfos_.find(convId); - return itConv != convInfos_.end() && !itConv->second.removed; + auto conv = getConversation(convId); + return conv && !conv->info.removed; } void addConvInfo(const ConvInfo& info) @@ -162,13 +254,10 @@ public: */ void onLastDisplayedUpdated(const std::string& convId, const std::string& lastId) { - // must not lock as used in callback from a conversation, - // so convInfos_ cannot change for convId - auto itConv = convInfos_.find(convId); - if (itConv != convInfos_.end()) - itConv->second.lastDisplayed = lastId; - saveConvInfos(); - + withConv(convId, [&](auto& conv) { + conv.info.lastDisplayed = lastId; + addConvInfo(conv.info); + }); // Updates info for client emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>( accountId_, @@ -180,58 +269,29 @@ public: std::string getOneToOneConversation(const std::string& uri) const noexcept; - std::weak_ptr<JamiAccount> account_; - NeedsSyncingCb needsSyncingCb_; - SengMsgCb sendMsgCb_; - NeedSocketCb onNeedSocket_; - NeedSocketCb onNeedSwarmSocket_; - UpdateConvReq updateConvReqCb_; - OneToOneRecvCb oneToOneRecvCb_; - - std::string accountId_ {}; - std::string deviceId_ {}; - std::string username_ {}; - - // Requests - mutable std::mutex conversationsRequestsMtx_; - std::map<std::string, ConversationRequest> conversationsRequests_; - - // Conversations - mutable std::mutex conversationsMtx_ {}; - std::map<std::string, std::shared_ptr<Conversation>, std::less<>> conversations_; - std::map<std::string, PendingConversationFetch, std::less<>> pendingConversationsFetch_; - - bool startFetch(const std::string& convId, const std::string& deviceId, bool checkIfConv = false) - { - // conversationsMtx_ must be locked - if (checkIfConv && conversations_.find(convId) != conversations_.end()) - return false; // Already a conversation - auto it = pendingConversationsFetch_.find(convId); - if (it == pendingConversationsFetch_.end()) { - PendingConversationFetch pf; - pf.connectingTo.insert(deviceId); - pendingConversationsFetch_[convId] = std::move(pf); - return true; - } - auto& pf = it->second; - if (pf.ready) - return false; // Already doing stuff - if (pf.connectingTo.find(deviceId) != pf.connectingTo.end()) - return false; // Already connecting to this device - pf.connectingTo.insert(deviceId); - return true; + std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto c = conversations_.find(convId); + return c != conversations_.end() ? c->second : nullptr; } - - void stopFetch(const std::string& convId, const std::string& deviceId) - { - // conversationsMtx_ must be locked - auto it = pendingConversationsFetch_.find(convId); - if (it == pendingConversationsFetch_.end()) - return; - auto& pf = it->second; - pf.connectingTo.erase(deviceId); - if (pf.connectingTo.empty()) - pendingConversationsFetch_.erase(it); + std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto c = conversations_.find(convId); + return c != conversations_.end() ? c->second : nullptr; + } + std::shared_ptr<SyncedConversation> startConversation(const std::string& convId) { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto& c = conversations_[convId]; + if (!c) + c = std::make_shared<SyncedConversation>(convId); + return c; + } + std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info) { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto& c = conversations_[info.id]; + if (!c) + c = std::make_shared<SyncedConversation>(info); + return c; } // Message send/load @@ -256,9 +316,6 @@ public: void bootstrapCb(const std::string& convId); - // The following informations are stored on the disk - mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed - std::map<std::string, ConvInfo> convInfos_; // The following methods modify what is stored on the disk /** * @note convInfosMtx_ should be locked @@ -297,6 +354,30 @@ public: saveConvRequests(); } + std::weak_ptr<JamiAccount> account_; + NeedsSyncingCb needsSyncingCb_; + SengMsgCb sendMsgCb_; + NeedSocketCb onNeedSocket_; + NeedSocketCb onNeedSwarmSocket_; + UpdateConvReq updateConvReqCb_; + OneToOneRecvCb oneToOneRecvCb_; + + std::string accountId_ {}; + std::string deviceId_ {}; + std::string username_ {}; + + // Requests + mutable std::mutex conversationsRequestsMtx_; + std::map<std::string, ConversationRequest> conversationsRequests_; + + // Conversations + mutable std::mutex conversationsMtx_ {}; + std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_; + + // The following informations are stored on the disk + mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed + std::map<std::string, ConvInfo> convInfos_; + // When sending a new message, we need to send the notification to some peers of the // conversation However, the conversation may be not bootstraped, so the list will be empty. // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync @@ -348,126 +429,133 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId, const std::string& convId, const std::string& lastDisplayed) { - JAMI_DBG("[Account %s] Clone conversation on device %s", accountId_.c_str(), deviceId.c_str()); + JAMI_DEBUG("[Account {}] Clone conversation on device {}", accountId_, deviceId); + + auto conv = startConversation(convId); + std::unique_lock<std::mutex> lk(conv->mtx); + cloneConversation(deviceId, peerUri, conv, lastDisplayed); +} - if (!isConversation(convId)) { +void +ConversationModule::Impl::cloneConversation(const std::string& deviceId, + const std::string& peerUri, + const std::shared_ptr<SyncedConversation>& conv, + const std::string& lastDisplayed) +{ + // conv->mtx must be locked + if (!conv->conversation) { // Note: here we don't return and connect to all members // the first that will successfully connect will be used for // cloning. // This avoid the case when we try to clone from convInfos + sync message // at the same time. - { - std::lock_guard<std::mutex> lk(conversationsMtx_); - if (!startFetch(convId, deviceId, true)) { - JAMI_WARN("[Account %s] Already fetching %s", accountId_.c_str(), convId.c_str()); - std::lock_guard<std::mutex> lk(convInfosMtx_); - auto ci = convInfos_.find(convId); - if (ci != convInfos_.end() && ci->second.lastDisplayed.empty()) { - // If fetchNewCommits called before sync - ci->second.lastDisplayed = lastDisplayed; - saveConvInfos(); - } - return; + if (!conv->startFetch(deviceId, true)) { + JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conv->info.id); + if (conv->info.lastDisplayed.empty()) { + // If fetchNewCommits called before sync + conv->info.lastDisplayed = lastDisplayed; + addConvInfo(conv->info); } + return; } onNeedSocket_( - convId, + conv->info.id, deviceId, [=](const auto& channel) { - auto acc = account_.lock(); - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto it = pendingConversationsFetch_.find(convId); - if (it != pendingConversationsFetch_.end() && !it->second.ready) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->pending && !conv->pending->ready) { if (channel) { - it->second.ready = true; - it->second.deviceId = channel->deviceId().toString(); - it->second.socket = channel; - if (!it->second.cloning) { - it->second.cloning = true; + conv->pending->ready = true; + conv->pending->deviceId = channel->deviceId().toString(); + conv->pending->socket = channel; + if (!conv->pending->cloning) { + conv->pending->cloning = true; dht::ThreadPool::io().run( - [w = weak(), convId, deviceId = it->second.deviceId]() { + [w = weak(), convId=conv->info.id, deviceId = conv->pending->deviceId]() { if (auto sthis = w.lock()) sthis->handlePendingConversation(convId, deviceId); }); } return true; } else { - stopFetch(convId, deviceId); + conv->stopFetch(deviceId); } } return false; }, - "application/im-gitmessage-id"); - - JAMI_INFO("[Account %s] New conversation detected: %s. Ask device %s to clone it", - accountId_.c_str(), - convId.c_str(), - deviceId.c_str()); - ConvInfo info; - info.id = convId; - info.created = std::time(nullptr); - info.members.emplace_back(username_); - info.lastDisplayed = lastDisplayed; + MIME_TYPE_GIT); + + JAMI_LOG("[Account {}] New conversation detected: {}. Ask device {} to clone it", + accountId_, + conv->info.id, + deviceId); + conv->info.created = std::time(nullptr); + conv->info.members.emplace_back(username_); + conv->info.lastDisplayed = lastDisplayed; if (peerUri != username_) - info.members.emplace_back(peerUri); - - std::lock_guard<std::mutex> lk(convInfosMtx_); - convInfos_[info.id] = std::move(info); - saveConvInfos(); + conv->info.members.emplace_back(peerUri); + addConvInfo(conv->info); } else { - std::unique_lock<std::mutex> lk(conversationsMtx_); - auto conversation = conversations_.find(convId); - if (conversation != conversations_.end() && conversation->second) - conversation->second->updateLastDisplayed(lastDisplayed); - JAMI_INFO("[Account %s] Already have conversation %s", accountId_.c_str(), convId.c_str()); + conv->conversation->updateLastDisplayed(lastDisplayed); + JAMI_DEBUG("[Account {}] Already have conversation {}", accountId_, conv->info.id); } } + void ConversationModule::Impl::fetchNewCommits(const std::string& peer, const std::string& deviceId, const std::string& conversationId, const std::string& commitId) { - JAMI_DBG("[Account %s] fetch commits for peer %s on device %s", - accountId_.c_str(), - peer.c_str(), - deviceId.c_str()); + JAMI_LOG("[Account {}] fetch commits for peer {} on device {}", + accountId_, + peer, + deviceId); - std::unique_lock<std::mutex> lk(conversationsMtx_); + auto conv = getConversation(conversationId); + if (!conv) { - auto conversation = conversations_.find(conversationId); - if (conversation != conversations_.end() && conversation->second) { + JAMI_WARNING("[Account {}] Could not find conversation {}, ask for an invite", + accountId_, + conversationId); + sendMsgCb_(peer, + {}, + std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}}, + 0); + return; + } + std::unique_lock<std::mutex> lk(conv->mtx); + + if (conv->conversation) { // Check if we already have the commit - if (not commitId.empty() && conversation->second->getCommit(commitId) != std::nullopt) { + if (not commitId.empty() && conv->conversation->getCommit(commitId) != std::nullopt) { return; } - if (!conversation->second->isMember(peer, true)) { - JAMI_WARN("[Account %s] %s is not a member of %s", - accountId_.c_str(), - peer.c_str(), - conversationId.c_str()); + if (!conv->conversation->isMember(peer, true)) { + JAMI_WARNING("[Account {}] {} is not a member of {}", + accountId_, + peer, + conversationId); return; } - if (conversation->second->isBanned(deviceId)) { - JAMI_WARN("[Account %s] %s is a banned device in conversation %s", - accountId_.c_str(), - deviceId.c_str(), - conversationId.c_str()); + if (conv->conversation->isBanned(deviceId)) { + JAMI_WARNING("[Account {}] {} is a banned device in conversation {}", + accountId_, + deviceId, + conversationId); return; } // Retrieve current last message - auto lastMessageId = conversation->second->lastCommitId(); + auto lastMessageId = conv->conversation->lastCommitId(); if (lastMessageId.empty()) { - JAMI_ERR("[Account %s] No message detected. This is a bug", accountId_.c_str()); + JAMI_ERROR("[Account {}] No message detected. This is a bug", accountId_); return; } - if (!startFetch(conversationId, deviceId)) { - JAMI_WARN("[Account %s] Already fetching %s", - accountId_.c_str(), - conversationId.c_str()); + if (!conv->startFetch(deviceId)) { + JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId); return; } @@ -476,25 +564,25 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, conversationId, deviceId, [this, + conv, conversationId = std::move(conversationId), peer = std::move(peer), deviceId = std::move(deviceId), commitId = std::move(commitId)](const auto& channel) { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto conversation = conversations_.find(conversationId); + std::lock_guard<std::mutex> lk(conv->mtx); + //auto conversation = conversations_.find(conversationId); auto acc = account_.lock(); - if (!channel || !acc || conversation == conversations_.end() - || !conversation->second) { - stopFetch(conversationId, deviceId); + if (!channel || !acc || !conv->conversation) { + conv->stopFetch(deviceId); syncCnt.fetch_sub(1); return false; } - conversation->second->addGitSocket(channel->deviceId(), channel); - - conversation->second->sync( + conv->conversation->addGitSocket(channel->deviceId(), channel); + conv->conversation->sync( peer, deviceId, [w = weak(), + conv, conversationId = std::move(conversationId), peer = std::move(peer), deviceId = std::move(deviceId), @@ -514,13 +602,14 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, deviceId, conversationId); } + { - std::lock_guard<std::mutex> lk(shared->conversationsMtx_); - shared->pendingConversationsFetch_.erase(conversationId); - } - // Notify peers that a new commit is there (DRT) - if (not commitId.empty()) { - shared->sendMessageNotification(conversationId, false, commitId, deviceId); + std::lock_guard<std::mutex> lk(conv->mtx); + conv->pending.reset(); + // Notify peers that a new commit is there (DRT) + if (not commitId.empty()) { + shared->sendMessageNotification(*conv->conversation, false, commitId, deviceId); + } } if (shared->syncCnt.fetch_sub(1) == 1) { if (auto account = shared->account_.lock()) @@ -535,25 +624,20 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, } else { if (getRequest(conversationId) != std::nullopt) return; - if (pendingConversationsFetch_.find(conversationId) != pendingConversationsFetch_.end()) + if (conv->pending) return; - bool clone = false; - { - std::lock_guard<std::mutex> lkCi(convInfosMtx_); - auto convIt = convInfos_.find(conversationId); - clone = convIt != convInfos_.end() && !convIt->second.removed; - } - lk.unlock(); + bool clone = !conv->info.removed; if (clone) { - cloneConversation(deviceId, peer, conversationId); + cloneConversation(deviceId, peer, conv); return; } - JAMI_WARN("[Account %s] Could not find conversation %s, ask for an invite", - accountId_.c_str(), - conversationId.c_str()); + lk.unlock(); + JAMI_WARNING("[Account {}] Could not find conversation {}, ask for an invite", + accountId_, + conversationId); sendMsgCb_(peer, {}, - std::map<std::string, std::string> {{"application/invite", conversationId}}, + std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}}, 0); } } @@ -569,53 +653,61 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat std::vector<DeviceId> kd; for (const auto& [id, _] : acc->getKnownDevices()) kd.emplace_back(id); + + auto conv = getConversation(conversationId); + if (!conv) + return; + std::unique_lock<std::mutex> lk(conv->mtx, std::defer_lock); auto erasePending = [&] { std::string toRm; - { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto oldFetch = pendingConversationsFetch_.find(conversationId); - if (oldFetch != pendingConversationsFetch_.end() && !oldFetch->second.removeId.empty()) - toRm = oldFetch->second.removeId; - pendingConversationsFetch_.erase(conversationId); - } + if (conv->pending && !conv->pending->removeId.empty()) + toRm = std::move(conv->pending->removeId); + conv->pending.reset(); + lk.unlock(); if (!toRm.empty()) removeConversation(toRm); }; try { auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId); - { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto oldFetch = pendingConversationsFetch_.find(conversationId); - if (oldFetch != pendingConversationsFetch_.end() && oldFetch->second.socket) - conversation->addGitSocket(DeviceId(deviceId), std::move(oldFetch->second.socket)); - } conversation->onLastDisplayedUpdated( - [&](auto convId, auto lastId) { onLastDisplayedUpdated(convId, lastId); }); + [&](const auto& convId, const auto& lastId) { onLastDisplayedUpdated(convId, lastId); }); + conversation->onMembersChanged( + [this, conversationId](const auto& members) { setConversationMembers(conversationId, members); }); + conversation->onNeedSocket(onNeedSwarmSocket_); if (!conversation->isMember(username_, true)) { JAMI_ERR("Conversation cloned but doesn't seems to be a valid member"); conversation->erase(); + lk.lock(); erasePending(); return; } - conversation->onNeedSocket(onNeedSwarmSocket_); + + lk.lock(); + + if (conv->pending && conv->pending->socket) + conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket)); auto removeRepo = false; - { - std::lock_guard<std::mutex> lk(conversationsMtx_); - // Note: a removeContact while cloning. In this case, the conversation - // must not be announced and removed. - auto itConv = convInfos_.find(conversationId); - if (itConv != convInfos_.end() && itConv->second.removed) - removeRepo = true; - if (itConv != convInfos_.end() && !itConv->second.lastDisplayed.empty()) { - conversation->updateLastDisplayed(itConv->second.lastDisplayed); - } - conversations_.emplace(conversationId, conversation); + // Note: a removeContact while cloning. In this case, the conversation + // must not be announced and removed. + if (conv->info.removed) + removeRepo = true; + std::string lastDisplayedInfo; + std::map<std::string, std::string> lastDisplayed; + std::map<std::string, std::string> preferences; + if (!conv->info.lastDisplayed.empty()) { + lastDisplayedInfo = conv->info.lastDisplayed; + } + if (conv->pending) { + preferences = std::move(conv->pending->preferences); + lastDisplayed = std::move(conv->pending->lastDisplayed); } + conv->conversation = conversation; if (removeRepo) { - removeRepository(conversationId, false, true); + removeRepositoryImpl(*conv, false, true); erasePending(); return; } + auto commitId = conversation->join(); std::vector<std::map<std::string, std::string>> messages; { @@ -627,7 +719,8 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat } } if (!commitId.empty()) - sendMessageNotification(conversationId, false, commitId); + sendMessageNotification(*conversation, false, commitId); + lk.unlock(); #ifdef LIBJAMI_TESTABLE conversation->onBootstrapStatus(bootstrapCbTest_); @@ -635,20 +728,13 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat conversation->bootstrap( std::bind(&ConversationModule::Impl::bootstrapCb, this, conversation->id()), kd); - std::map<std::string, std::string> preferences; - std::map<std::string, std::string> lastDisplayed; - { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto itFetch = pendingConversationsFetch_.find(conversationId); - if (itFetch != pendingConversationsFetch_.end()) { - preferences = std::move(itFetch->second.preferences); - lastDisplayed = std::move(itFetch->second.lastDisplayed); - } - } + if (!lastDisplayedInfo.empty()) + conversation->updateLastDisplayed(lastDisplayedInfo); if (!preferences.empty()) conversation->updatePreferences(preferences); if (!lastDisplayed.empty()) conversation->updateLastDisplayed(lastDisplayed); + // Inform user that the conversation is ready emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId); needsSyncingCb_({}); @@ -740,40 +826,32 @@ std::vector<std::map<std::string, std::string>> ConversationModule::Impl::getConversationMembers(const std::string& conversationId, bool includeBanned) const { - std::unique_lock<std::mutex> lk(conversationsMtx_); - auto conversation = conversations_.find(conversationId); - if (conversation != conversations_.end() && conversation->second) - return conversation->second->getMembers(true, true, includeBanned); - - lk.unlock(); - std::lock_guard<std::mutex> lkCI(convInfosMtx_); - auto convIt = convInfos_.find(conversationId); - if (convIt != convInfos_.end()) { - std::vector<std::map<std::string, std::string>> result; - result.reserve(convIt->second.members.size()); - for (const auto& uri : convIt->second.members) { - result.emplace_back(std::map<std::string, std::string> {{"uri", uri}}); - } - return result; - } - return {}; + return withConv(conversationId, [&](const auto& conv) { return conv.getMembers(includeBanned); }); } void ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force) { - std::unique_lock<std::mutex> lk(conversationsMtx_); - auto it = conversations_.find(conversationId); - if (it != conversations_.end() && it->second && (force || it->second->isRemoving())) { - JAMI_DBG() << "Remove conversation: " << conversationId; + auto conv = getConversation(conversationId); + if (!conv) + return; + std::unique_lock<std::mutex> lk(conv->mtx); + removeRepositoryImpl(*conv, sync, force); +} + +void +ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force) +{ + if (conv.conversation && (force || conv.conversation->isRemoving())) { + JAMI_LOG("Remove conversation: {}", conv.info.id); try { - if (it->second->mode() == ConversationMode::ONE_TO_ONE) { + if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) { auto account = account_.lock(); - for (const auto& member : it->second->getInitialMembers()) { + for (const auto& member : conv.conversation->getInitialMembers()) { if (member != account->getUsername()) { // Note: this can happen while re-adding a contact. // In this case, check that we are removing the linked conversation. - if (conversationId == getOneToOneConversation(member)) { + if (conv.info.id == getOneToOneConversation(member)) { account->accountManager()->removeContactConversation(member); } } @@ -782,61 +860,55 @@ ConversationModule::Impl::removeRepository(const std::string& conversationId, bo } catch (const std::exception& e) { JAMI_ERR() << e.what(); } - it->second->erase(); - conversations_.erase(it); - lk.unlock(); + conv.conversation->erase(); + conv.conversation.reset(); if (!sync) return; - std::lock_guard<std::mutex> lkCi(convInfosMtx_); - auto convIt = convInfos_.find(conversationId); - if (convIt != convInfos_.end()) { - convIt->second.erased = std::time(nullptr); - needsSyncingCb_({}); - } - saveConvInfos(); + + conv.info.erased = std::time(nullptr); + needsSyncingCb_({}); + addConvInfo(conv.info); } } bool ConversationModule::Impl::removeConversation(const std::string& conversationId) { - auto members = getConversationMembers(conversationId); - std::unique_lock<std::mutex> lk(conversationsMtx_); - // Update convInfos - std::unique_lock<std::mutex> lockCi(convInfosMtx_); - auto itConv = convInfos_.find(conversationId); - if (itConv == convInfos_.end()) { - JAMI_ERR("Conversation %s doesn't exist", conversationId.c_str()); - return false; - } - auto it = conversations_.find(conversationId); - auto isSyncing = it == conversations_.end(); + return withConv(conversationId, [this](auto& conv) { + return removeConversationImpl(conv); + }); +} + +bool +ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv) +{ + auto members = conv.getMembers(); + auto isSyncing = !conv.conversation; auto hasMembers = !isSyncing && !(members.size() == 1 && username_ == members[0]["uri"]); - itConv->second.removed = std::time(nullptr); + conv.info.removed = std::time(nullptr); if (isSyncing) - itConv->second.erased = std::time(nullptr); + conv.info.erased = std::time(nullptr); // Sync now, because it can take some time to really removes the datas if (hasMembers) needsSyncingCb_({}); - saveConvInfos(); - lockCi.unlock(); - emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conversationId); + addConvInfo(conv.info); + emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id); if (isSyncing) return true; - if (it->second->mode() != ConversationMode::ONE_TO_ONE) { + if (conv.conversation->mode() != ConversationMode::ONE_TO_ONE) { // For one to one, we do not notify the leave. The other can still generate request // and this is managed by the banned part. If we re-accept, the old conversation will be // retrieved - auto commitId = it->second->leave(); + auto commitId = conv.conversation->leave(); if (hasMembers) { - JAMI_DBG() << "Wait that someone sync that user left conversation " << conversationId; + JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id); // Commit that we left if (!commitId.empty()) { // Do not sync as it's synched by convInfos - sendMessageNotification(*it->second, false, commitId); + sendMessageNotification(*conv.conversation, false, commitId); } else { - JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); + JAMI_ERROR("Failed to send message to conversation {}", conv.info.id); } // In this case, we wait that another peer sync the conversation // to definitely remove it from the device. This is to inform the @@ -847,11 +919,10 @@ ConversationModule::Impl::removeConversation(const std::string& conversationId) } else { for (const auto& m : members) if (username_ != m.at("uri") && updateConvReqCb_) - updateConvReqCb_(conversationId, m.at("uri"), false); + updateConvReqCb_(conv.info.id, m.at("uri"), false); } - lk.unlock(); // Else we are the last member, so we can remove - removeRepository(conversationId, true); + removeRepositoryImpl(conv, true); return true; } @@ -861,10 +932,10 @@ ConversationModule::Impl::sendMessageNotification(const std::string& conversatio const std::string& commitId, const std::string& deviceId) { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto it = conversations_.find(conversationId); - if (it != conversations_.end() && it->second) { - sendMessageNotification(*it->second, sync, commitId, deviceId); + if (auto conv = getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + sendMessageNotification(*conv->conversation, sync, commitId, deviceId); } } @@ -893,7 +964,7 @@ ConversationModule::Impl::sendMessageNotification(Conversation& conversation, refreshMessage[username_] = sendMsgCb_(username_, {}, std::map<std::string, std::string> { - {"application/im-gitmessage-id", text}}, + {MIME_TYPE_GIT, text}}, refreshMessage[username_]); } @@ -938,7 +1009,7 @@ ConversationModule::Impl::sendMessageNotification(Conversation& conversation, refreshMessage[member] = sendMsgCb_(member, {}, std::map<std::string, std::string> { - {"application/im-gitmessage-id", text}}, + {MIME_TYPE_GIT, text}}, refreshMessage[member]); } @@ -951,7 +1022,7 @@ ConversationModule::Impl::sendMessageNotification(Conversation& conversation, refreshMessage[deviceIdStr] = sendMsgCb_(memberUri, device, std::map<std::string, std::string> { - {"application/im-gitmessage-id", text}}, + {MIME_TYPE_GIT, text}}, refreshMessage[deviceIdStr]); } } @@ -984,24 +1055,24 @@ ConversationModule::Impl::sendMessage(const std::string& conversationId, OnCommitCb&& onCommit, OnDoneCb&& cb) { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto conversation = conversations_.find(conversationId); - if (conversation != conversations_.end() && conversation->second) { - conversation->second->sendMessage( - std::move(value), - replyTo, - std::move(onCommit), - [this, conversationId, announce, cb = std::move(cb)](bool ok, - const std::string& commitId) { - if (cb) - cb(ok, commitId); - if (!announce) - return; - if (ok) - sendMessageNotification(conversationId, true, commitId); - else - JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); - }); + if (auto conv = getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + conv->conversation->sendMessage( + std::move(value), + replyTo, + std::move(onCommit), + [this, conversationId, announce, cb = std::move(cb)](bool ok, + const std::string& commitId) { + if (cb) + cb(ok, commitId); + if (!announce) + return; + if (ok) + sendMessageNotification(conversationId, true, commitId); + else + JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); + }); } } @@ -1012,11 +1083,10 @@ ConversationModule::Impl::editMessage(const std::string& conversationId, { // Check that editedId is a valid commit, from ourself and plain/text auto validCommit = false; - { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto conversation = conversations_.find(conversationId); - if (conversation != conversations_.end() && conversation->second) { - auto commit = conversation->second->getCommit(editedId); + if (auto conv = getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { + auto commit = conv->conversation->getCommit(editedId); if (commit != std::nullopt) { validCommit = commit->at("author") == username_ && commit->at("type") == "text/plain"; @@ -1048,7 +1118,7 @@ ConversationModule::Impl::bootstrapCb(const std::string& convId) } } JAMI_DEBUG("[Conversation {}] Resend last message notification", convId); - dht::ThreadPool::io().run([w = weak(), convId, commitId] { + dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] { if (auto sthis = w.lock()) sthis->sendMessageNotification(convId, true, commitId); }); @@ -1117,9 +1187,9 @@ ConversationModule::onBootstrapStatus( { pimpl_->bootstrapCbTest_ = cb; std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - for (auto& [_, conv] : pimpl_->conversations_) - if (conv) - conv->onBootstrapStatus(pimpl_->bootstrapCbTest_); + for (auto& [_, c] : pimpl_->conversations_) + if (c && c->conversation) + c->conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_); } #endif @@ -1130,19 +1200,28 @@ ConversationModule::loadConversations() if (!acc) return; auto uri = acc->getUsername(); - JAMI_INFO("[Account %s] Start loading conversations…", pimpl_->accountId_.c_str()); + JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_); auto conversationsRepositories = fileutils::readDirectory( fileutils::get_data_dir() + DIR_SEPARATOR_STR + pimpl_->accountId_ + DIR_SEPARATOR_STR + "conversations"); std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + std::unique_lock<std::mutex> ilk(pimpl_->convInfosMtx_); pimpl_->convInfos_ = convInfos(pimpl_->accountId_); pimpl_->conversations_.clear(); std::set<std::string> toRm; for (const auto& repository : conversationsRepositories) { try { + auto sconv = std::make_shared<SyncedConversation>(repository); + auto conv = std::make_shared<Conversation>(acc, repository); conv->onLastDisplayedUpdated( - [&](auto convId, auto lastId) { pimpl_->onLastDisplayedUpdated(convId, lastId); }); + [w=pimpl_->weak_from_this()](auto convId, auto lastId) { + if (auto p=w.lock()) p->onLastDisplayedUpdated(convId, lastId); + }); + conv->onMembersChanged( + [w=pimpl_->weak_from_this(), repository](const auto& members) { + if (auto p=w.lock()) p->setConversationMembers(repository, members); + }); conv->onNeedSocket(pimpl_->onNeedSwarmSocket_); auto members = conv->memberUris(uri, {}); // NOTE: The following if is here to protect against any incorrect state @@ -1168,21 +1247,22 @@ ConversationModule::loadConversations() } } } - conv->onLastDisplayedUpdated( - [&](auto convId, auto lastId) { pimpl_->onLastDisplayedUpdated(convId, lastId); }); auto convInfo = pimpl_->convInfos_.find(repository); if (convInfo == pimpl_->convInfos_.end()) { - JAMI_ERR() << "Missing conv info for " << repository << ". This is a bug!"; - ConvInfo info; - info.id = repository; - info.created = std::time(nullptr); - info.members = std::move(members); - info.lastDisplayed = conv->infos()[ConversationMapKeys::LAST_DISPLAYED]; - addConvInfo(info); - } else if (convInfo->second.removed) { - // A conversation was removed, but repository still exists - conv->setRemovingFlag(); - toRm.insert(repository); + JAMI_ERROR("Missing conv info for {}. This is a bug!", repository); + sconv->info.created = std::time(nullptr); + sconv->info.members = std::move(members); + sconv->info.lastDisplayed = conv->infos()[ConversationMapKeys::LAST_DISPLAYED]; + // convInfosMtx_ is already locked + pimpl_->convInfos_[repository] = sconv->info; + pimpl_->saveConvInfos(); + } else { + sconv->info = convInfo->second; + if (convInfo->second.removed) { + // A conversation was removed, but repository still exists + conv->setRemovingFlag(); + toRm.insert(repository); + } } auto commits = conv->commitsEndedCalls(); if (!commits.empty()) { @@ -1191,7 +1271,8 @@ ConversationModule::loadConversations() // Notify other in the conversation that the call is finished pimpl_->sendMessageNotification(*conv, true, *commits.rbegin()); } - pimpl_->conversations_.emplace(repository, std::move(conv)); + sconv->conversation = conv; + pimpl_->conversations_.emplace(repository, std::move(sconv)); } catch (const std::logic_error& e) { JAMI_WARN("[Account %s] Conversations not loaded : %s", pimpl_->accountId_.c_str(), @@ -1212,8 +1293,8 @@ ConversationModule::loadConversations() if (info.removed) removed.insert(info.id); auto itConv = pimpl_->conversations_.find(info.id); - if (itConv != pimpl_->conversations_.end() && info.removed) - itConv->second->setRemovingFlag(); + if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation && info.removed) + itConv->second->conversation->setRemovingFlag(); if (!info.removed && itConv == pimpl_->conversations_.end()) { // In this case, the conversation is not synced and we only know ourself if (info.members.size() == 1 && info.members.at(0) == uri) { @@ -1236,6 +1317,7 @@ ConversationModule::loadConversations() if (oldConvInfosSize != pimpl_->convInfos_.size()) pimpl_->saveConvInfos(); + ilk.unlock(); lk.unlock(); //////////////////////////////////////////////////////////////// @@ -1296,27 +1378,30 @@ ConversationModule::bootstrap(const std::string& convId) } }; std::vector<std::string> toClone; - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - std::unique_lock<std::mutex> lkCi(pimpl_->convInfosMtx_); if (convId.empty()) { - for (const auto& [key, ci] : pimpl_->convInfos_) { - auto it = pimpl_->conversations_.find(key); - if (it != pimpl_->conversations_.end() && it->second) { - bootstrap(it->second); - } else if (!ci.removed) { + std::lock_guard<std::mutex> lk(pimpl_->convInfosMtx_); + for (const auto& [conversationId, convInfo] : pimpl_->convInfos_) { + auto conv = pimpl_->getConversation(conversationId); + if (!conv) { + // convInfos_ can contain a conversation that is not yet cloned + // so we need to add it there. + conv = pimpl_->startConversation(convInfo); + } + if ((!conv->conversation && !conv->info.removed)) { // Because we're not tracking contact presence in order to sync now, // we need to ask to clone requests when bootstraping all conversations // else it can stay syncing - toClone.emplace_back(key); + toClone.emplace_back(conversationId); + } else if (conv->conversation) { + bootstrap(conv->conversation); } } - } else { - auto it = pimpl_->conversations_.find(convId); - if (it != pimpl_->conversations_.end()) - bootstrap(it->second); + } else if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + bootstrap(conv->conversation); } - lkCi.unlock(); - lk.unlock(); + for (const auto& cid: toClone) { auto members = getConversationMembers(cid); for (const auto& member : members) { @@ -1330,8 +1415,8 @@ ConversationModule::monitor() { std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); for (auto& [_, conv] : pimpl_->conversations_) { - if (conv) { - conv->monitor(); + if (conv && conv->conversation) { + conv->conversation->monitor(); } } } @@ -1339,16 +1424,19 @@ ConversationModule::monitor() void ConversationModule::clearPendingFetch() { - if (!pimpl_->pendingConversationsFetch_.empty()) { - // Note: This is a fallback. convModule() is kept if account is disabled/re-enabled. - // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch - // is not erased (callback not called), it will block the new messages as it will not - // sync. The best way to debug this is to get logs from the last ICE connection for - // syncing the conversation. It may have been killed in some un-expected way avoiding to - // call the callbacks. This should never happen, but if it's the case, this will allow - // new messages to be synced correctly. - JAMI_ERR("This is a bug, seems to still fetch to some device on initializing"); - pimpl_->pendingConversationsFetch_.clear(); + // Note: This is a workaround. convModule() is kept if account is disabled/re-enabled. + // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch + // is not erased (callback not called), it will block the new messages as it will not + // sync. The best way to debug this is to get logs from the last ICE connection for + // syncing the conversation. It may have been killed in some un-expected way avoiding to + // call the callbacks. This should never happen, but if it's the case, this will allow + // new messages to be synced correctly. + std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [_, conv] : pimpl_->conversations_) { + if (conv && conv->pending) { + JAMI_ERR("This is a bug, seems to still fetch to some device on initializing"); + conv->pending.reset(); + } } } @@ -1448,8 +1536,9 @@ ConversationModule::onConversationRequest(const std::string& from, const Json::V req.from = from; // Already accepted request, do nothing - if (pimpl_->isAcceptedConversation(convId)) + if (pimpl_->isAcceptedConversation(convId)) { return; + } if (pimpl_->getRequest(convId) != std::nullopt) { JAMI_INFO("[Account %s] Received a request for a conversation already existing. " "Ignore", @@ -1485,23 +1574,14 @@ void ConversationModule::onNeedConversationRequest(const std::string& from, const std::string& conversationId) { - // Check if the conversation exists - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto itConv = pimpl_->conversations_.find(conversationId); - if (itConv != pimpl_->conversations_.end() && !itConv->second->isRemoving()) { - if (!itConv->second->isMember(from, true)) { - JAMI_WARN("%s is asking a new invite for %s, but not a member", - from.c_str(), - conversationId.c_str()); + pimpl_->withConversation(conversationId, [&](auto& conversation) { + if (!conversation.isMember(from, true)) { + JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId); return; } - - // Send new invite - auto invite = itConv->second->generateInvitation(); - lk.unlock(); - JAMI_DBG("%s is asking a new invite for %s", from.c_str(), conversationId.c_str()); - pimpl_->sendMsgCb_(from, {}, std::move(invite), 0); - } + JAMI_LOG("{} is asking a new invite for {}", from, conversationId); + pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0); + }); } void @@ -1550,6 +1630,8 @@ ConversationModule::startConversation(ConversationMode mode, const std::string& conversation = std::make_shared<Conversation>(acc, mode, otherMember); conversation->onLastDisplayedUpdated( [&](auto convId, auto lastId) { pimpl_->onLastDisplayedUpdated(convId, lastId); }); + conversation->onMembersChanged( + [this, conversationId=conversation->id()](const auto& members) { pimpl_->setConversationMembers(conversationId, members); }); conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_); #ifdef LIBJAMI_TESTABLE conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_); @@ -1563,19 +1645,13 @@ ConversationModule::startConversation(ConversationMode mode, const std::string& return {}; } auto convId = conversation->id(); - { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - pimpl_->conversations_[convId] = std::move(conversation); - } - - // Update convInfo - ConvInfo info; - info.id = convId; - info.created = std::time(nullptr); - info.members.emplace_back(pimpl_->username_); + auto conv = pimpl_->startConversation(convId); + conv->info.created = std::time(nullptr); + conv->info.members.emplace_back(pimpl_->username_); if (!otherMember.empty()) - info.members.emplace_back(otherMember); - addConvInfo(info); + conv->info.members.emplace_back(otherMember); + conv->conversation = conversation; + addConvInfo(conv->info); pimpl_->needsSyncingCb_({}); @@ -1591,64 +1667,64 @@ ConversationModule::cloneConversationFrom(const std::string& conversationId, auto acc = pimpl_->account_.lock(); auto memberHash = dht::InfoHash(uri); if (!acc || !memberHash) { - JAMI_WARN("Invalid member detected: %s", uri.c_str()); + JAMI_WARNING("Invalid member detected: {}", uri); return; } + auto conv = pimpl_->startConversation(conversationId); + conv->info = {}; + conv->info.id = conversationId; + conv->info.created = std::time(nullptr); + conv->info.members.emplace_back(pimpl_->username_); + conv->info.members.emplace_back(uri); + + std::lock_guard<std::mutex> lk(conv->mtx); acc->forEachDevice(memberHash, - [w = pimpl_->weak(), conversationId, oldConvId]( - const std::shared_ptr<dht::crypto::PublicKey>& pk) { - auto sthis = w.lock(); - auto deviceId = pk->getLongId().toString(); - if (!sthis or deviceId == sthis->deviceId_) - return; - { - std::lock_guard<std::mutex> lk(sthis->conversationsMtx_); - if (!sthis->startFetch(conversationId, deviceId, true)) { - JAMI_WARN("[Account %s] Already fetching %s", - sthis->accountId_.c_str(), - conversationId.c_str()); - return; + [w = pimpl_->weak(), conv, conversationId, oldConvId]( + const std::shared_ptr<dht::crypto::PublicKey>& pk) { + auto sthis = w.lock(); + auto deviceId = pk->getLongId().toString(); + if (!sthis or deviceId == sthis->deviceId_) + return; + + + std::lock_guard<std::mutex> lk(conv->mtx); + if (!conv->startFetch(deviceId, true)) { + JAMI_WARNING("[Account {}] Already fetching {}", sthis->accountId_, conversationId); + return; + } + + // We need a onNeedSocket_ with old logic. + sthis->onNeedSocket_( + conversationId, + pk->getLongId().toString(), + [sthis, conv, conversationId, oldConvId, deviceId](const auto& channel) { + auto acc = sthis->account_.lock(); + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->pending && !conv->pending->ready) { + conv->pending->removeId = oldConvId; + if (channel) { + conv->pending->ready = true; + conv->pending->deviceId = channel->deviceId().toString(); + conv->pending->socket = channel; + if (!conv->pending->cloning) { + conv->pending->cloning = true; + dht::ThreadPool::io().run( + [w = sthis->weak(), conversationId, deviceId = conv->pending->deviceId]() { + if (auto sthis = w.lock()) + sthis->handlePendingConversation(conversationId, deviceId); + }); } + return true; + } else { + conv->stopFetch(deviceId); } - - // We need a onNeedSocket_ with old logic. - sthis->onNeedSocket_( - conversationId, - pk->getLongId().toString(), - [=](const auto& channel) { - auto acc = sthis->account_.lock(); - std::lock_guard<std::mutex> lk(sthis->conversationsMtx_); - auto it = sthis->pendingConversationsFetch_.find(conversationId); - if (it != sthis->pendingConversationsFetch_.end() && !it->second.ready) { - it->second.removeId = oldConvId; - if (channel) { - it->second.ready = true; - it->second.deviceId = channel->deviceId().toString(); - it->second.socket = channel; - if (!it->second.cloning) { - it->second.cloning = true; - dht::ThreadPool::io().run( - [w = sthis->weak(), conversationId, deviceId = it->second.deviceId]() { - if (auto sthis = w.lock()) - sthis->handlePendingConversation(conversationId, deviceId); - }); - } - return true; - } else { - sthis->stopFetch(conversationId, deviceId); - } - } - return false; - }, - "application/im-gitmessage-id"); - }); - ConvInfo info; - info.id = conversationId; - info.created = std::time(nullptr); - info.members.emplace_back(pimpl_->username_); - info.members.emplace_back(uri); - addConvInfo(info); + } + return false; + }, + MIME_TYPE_GIT); + }); + addConvInfo(conv->info); } // Message send/load @@ -1727,17 +1803,18 @@ ConversationModule::onMessageDisplayed(const std::string& peer, const std::string& conversationId, const std::string& interactionId) { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation != pimpl_->conversations_.end() && conversation->second) { - if (conversation->second->setMessageDisplayed(peer, interactionId)) { - auto msg = std::make_shared<SyncMsg>(); - std::map<std::string, std::map<std::string, std::string>> ld; - ld[conversationId] = conversation->second->displayed(); - msg->ld = std::move(ld); + if (auto conv = pimpl_->getConversation(conversationId)) { + std::unique_lock<std::mutex> lk(conv->mtx); + if (auto conversation = conv->conversation) { lk.unlock(); - pimpl_->needsSyncingCb_(std::move(msg)); - return true; + if (conversation->setMessageDisplayed(peer, interactionId)) { + auto msg = std::make_shared<SyncMsg>(); + msg->ld = { + {conversationId, conversation->displayed()} + }; + pimpl_->needsSyncingCb_(std::move(msg)); + return true; + } } } return false; @@ -1749,8 +1826,8 @@ ConversationModule::convDisplayed() const std::map<std::string, std::map<std::string, std::string>> displayed; std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); for (const auto& [id, conv] : pimpl_->conversations_) { - if (conv) { - auto d = conv->displayed(); + if (conv && conv->conversation) { + auto d = conv->conversation->displayed(); if (!d.empty()) displayed[id] = std::move(d); } @@ -1763,23 +1840,24 @@ ConversationModule::loadConversationMessages(const std::string& conversationId, const std::string& fromMessage, size_t n) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); auto acc = pimpl_->account_.lock(); - auto conversation = pimpl_->conversations_.find(conversationId); - if (acc && conversation != pimpl_->conversations_.end() && conversation->second) { - const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand); - LogOptions options; - options.from = fromMessage; - options.nbOfCommits = n; - conversation->second->loadMessages( - [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) { - emitSignal<libjami::ConversationSignal::ConversationLoaded>(id, - accountId, - conversationId, - messages); - }, - options); - return id; + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { + const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand); + LogOptions options; + options.from = fromMessage; + options.nbOfCommits = n; + conv->conversation->loadMessages( + [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) { + emitSignal<libjami::ConversationSignal::ConversationLoaded>(id, + accountId, + conversationId, + messages); + }, + options); + return id; + } } return 0; } @@ -1789,36 +1867,35 @@ ConversationModule::loadConversationUntil(const std::string& conversationId, const std::string& fromMessage, const std::string& toMessage) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); auto acc = pimpl_->account_.lock(); - auto conversation = pimpl_->conversations_.find(conversationId); - if (acc && conversation != pimpl_->conversations_.end() && conversation->second) { - const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand); - LogOptions options; - options.from = fromMessage; - options.to = toMessage; - options.includeTo = true; - conversation->second->loadMessages( - [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) { - emitSignal<libjami::ConversationSignal::ConversationLoaded>(id, - accountId, - conversationId, - messages); - }, - options); - return id; + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { + const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand); + LogOptions options; + options.from = fromMessage; + options.to = toMessage; + options.includeTo = true; + conv->conversation->loadMessages( + [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) { + emitSignal<libjami::ConversationSignal::ConversationLoaded>(id, + accountId, + conversationId, + messages); + }, + options); + return id; + } } return 0; } std::shared_ptr<TransferManager> -ConversationModule::dataTransfer(const std::string& id) const +ConversationModule::dataTransfer(const std::string& conversationId) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(id); - if (conversation != pimpl_->conversations_.end() && conversation->second) - return conversation->second->dataTransfer(); - return {}; + return pimpl_->withConversation(conversationId, [](auto& conversation) { + return conversation.dataTransfer(); + }); } bool @@ -1827,10 +1904,11 @@ ConversationModule::onFileChannelRequest(const std::string& conversationId, const std::string& fileId, bool verifyShaSum) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation != pimpl_->conversations_.end() && conversation->second) - return conversation->second->onFileChannelRequest(member, fileId, verifyShaSum); + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->onFileChannelRequest(member, fileId, verifyShaSum); + } return false; } @@ -1842,12 +1920,12 @@ ConversationModule::downloadFile(const std::string& conversationId, size_t start, size_t end) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation == pimpl_->conversations_.end() || !conversation->second) - return false; - - return conversation->second->downloadFile(interactionId, fileId, path, "", "", start, end); + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->downloadFile(interactionId, fileId, path, "", "", start, end); + } + return false; } void @@ -1857,22 +1935,21 @@ ConversationModule::syncConversations(const std::string& peer, const std::string std::set<std::string> toFetch; std::set<std::string> toClone; { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - std::lock_guard<std::mutex> lkCI(pimpl_->convInfosMtx_); - for (const auto& [key, ci] : pimpl_->convInfos_) { - auto it = pimpl_->conversations_.find(key); - if (it != pimpl_->conversations_.end() && it->second) { - if (!it->second->isRemoving() && it->second->isMember(peer, false)) { + std::lock_guard<std::mutex> lkCI(pimpl_->conversationsMtx_); + for (const auto& [key, conv] : pimpl_->conversations_) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { + if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) { toFetch.emplace(key); - if (!it->second->hasSwarmChannel(deviceId)) { - if (auto acc = pimpl_->account_.lock()) { - // TODO connect to Swarm - } - } + // TODO connect to Swarm + // if (!conv->conversation->hasSwarmChannel(deviceId)) { + // if (auto acc = pimpl_->account_.lock()) { + // } + // } } - } else if (!ci.removed - && std::find(ci.members.begin(), ci.members.end(), peer) - != ci.members.end()) { + } else if (!conv->info.removed + && std::find(conv->info.members.begin(), conv->info.members.end(), peer) + != conv->info.members.end()) { // In this case the conversation was never cloned (can be after an import) toClone.emplace(key); } @@ -1895,57 +1972,41 @@ ConversationModule::onSyncData(const SyncMsg& msg, const std::string& deviceId) { for (const auto& [key, convInfo] : msg.c) { - auto convId = convInfo.id; - auto removed = convInfo.removed; + const auto& convId = convInfo.id; pimpl_->rmConversationRequest(convId); - if (not removed) { + + auto conv = pimpl_->startConversation(convInfo); + std::unique_lock<std::mutex> lk(conv->mtx); + if (not convInfo.removed) { // If multi devices, it can detect a conversation that was already // removed, so just check if the convinfo contains a removed conv - auto itConv = pimpl_->convInfos_.find(convId); - if (itConv != pimpl_->convInfos_.end() && itConv->second.removed) { - if (itConv->second.removed > convInfo.created) { + if (conv->info.removed) { + if (conv->info.removed > convInfo.created) { // Only reclone if re-added, else the peer is not synced yet (could be // offline before) continue; } JAMI_DEBUG("Re-add previously removed conversation {:s}", convId); } - auto clone = false; - { - - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(convId); - clone = conversation == pimpl_->conversations_.end() || !conversation->second; - } - if (clone) - pimpl_->cloneConversation(deviceId, peerId, convId, convInfo.lastDisplayed); + conv->info = convInfo; + if (!conv->conversation) + pimpl_->cloneConversation(deviceId, peerId, conv, convInfo.lastDisplayed); } else { - { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto itConv = pimpl_->conversations_.find(convId); - if (itConv != pimpl_->conversations_.end() && !itConv->second->isRemoving()) { - emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, - convId); - itConv->second->setRemovingFlag(); - } + if (conv->conversation && !conv->conversation->isRemoving()) { + emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, convId); + conv->conversation->setRemovingFlag(); } - std::unique_lock<std::mutex> lk(pimpl_->convInfosMtx_); - auto& ci = pimpl_->convInfos_; - auto itConv = ci.find(convId); - if (itConv != ci.end()) { - auto update = false; - if (!itConv->second.removed) { - update = true; - itConv->second.removed = std::time(nullptr); - } - if (convInfo.erased && !itConv->second.erased) { - itConv->second.erased = std::time(nullptr); - pimpl_->saveConvInfos(); - lk.unlock(); - pimpl_->removeRepository(convId, false); - } else if (update) { - pimpl_->saveConvInfos(); - } + auto update = false; + if (!conv->info.removed) { + update = true; + conv->info.removed = std::time(nullptr); + } + if (convInfo.erased && !conv->info.erased) { + conv->info.erased = std::time(nullptr); + pimpl_->addConvInfo(conv->info); + pimpl_->removeRepositoryImpl(*conv, false); + } else if (update) { + pimpl_->addConvInfo(conv->info); } } } @@ -1983,28 +2044,31 @@ ConversationModule::onSyncData(const SyncMsg& msg, } // Updates preferences for conversations - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); for (const auto& [convId, p] : msg.p) { - auto itConv = pimpl_->conversations_.find(convId); - if (itConv != pimpl_->conversations_.end() && itConv->second) { - itConv->second->updatePreferences(p); - continue; + if (auto conv = pimpl_->getConversation(convId)) { + std::unique_lock<std::mutex> lk(conv->mtx); + if (conv->conversation) { + auto conversation = conv->conversation; + lk.unlock(); + conversation->updatePreferences(p); + } else if (conv->pending) { + conv->pending->preferences = p; + } } - auto itFetch = pimpl_->pendingConversationsFetch_.find(convId); - if (itFetch != pimpl_->pendingConversationsFetch_.end()) - itFetch->second.preferences = p; } // Updates displayed for conversations for (const auto& [convId, ld] : msg.ld) { - auto itConv = pimpl_->conversations_.find(convId); - if (itConv != pimpl_->conversations_.end() && itConv->second) { - itConv->second->updateLastDisplayed(ld); - continue; + if (auto conv = pimpl_->getConversation(convId)) { + std::unique_lock<std::mutex> lk(conv->mtx); + if (conv->conversation) { + auto conversation = conv->conversation; + lk.unlock(); + conversation->updateLastDisplayed(ld); + } else if (conv->pending) { + conv->pending->lastDisplayed = ld; + } } - auto itFetch = pimpl_->pendingConversationsFetch_.find(convId); - if (itFetch != pimpl_->pendingConversationsFetch_.end()) - itFetch->second.lastDisplayed = ld; } } @@ -2013,15 +2077,14 @@ ConversationModule::needsSyncingWith(const std::string& memberUri, const std::st { // Check if a conversation needs to fetch remote or to be cloned std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - std::lock_guard<std::mutex> lkCI(pimpl_->convInfosMtx_); - for (const auto& [key, ci] : pimpl_->convInfos_) { - auto it = pimpl_->conversations_.find(key); - if (it != pimpl_->conversations_.end() && it->second) { - if (!it->second->isRemoving() && it->second->isMember(memberUri, false)) + for (const auto& [key, ci] : pimpl_->conversations_) { + std::lock_guard<std::mutex> lk(ci->mtx); + if (ci->conversation) { + if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false)) return true; - } else if (!ci.removed - && std::find(ci.members.begin(), ci.members.end(), memberUri) - != ci.members.end()) { + } else if (!ci->info.removed + && std::find(ci->info.members.begin(), ci->info.members.end(), memberUri) + != ci->info.members.end()) { // In this case the conversation was never cloned (can be after an import) return true; } @@ -2034,17 +2097,15 @@ ConversationModule::setFetched(const std::string& conversationId, const std::string& deviceId, const std::string& commitId) { - auto remove = false; - { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it != pimpl_->conversations_.end() && it->second) { - remove = it->second->isRemoving(); - it->second->hasFetched(deviceId, commitId); + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { + bool remove = conv->conversation->isRemoving(); + conv->conversation->hasFetched(deviceId, commitId); + if (remove) + pimpl_->removeRepositoryImpl(*conv, true); } } - if (remove) - pimpl_->removeRepository(conversationId, true); } void @@ -2053,7 +2114,7 @@ ConversationModule::onNewCommit(const std::string& peer, const std::string& conversationId, const std::string& commitId) { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + std::unique_lock<std::mutex> lk(pimpl_->convInfosMtx_); auto itConv = pimpl_->convInfos_.find(conversationId); if (itConv != pimpl_->convInfos_.end() && itConv->second.removed) { // If the conversation is removed and we receives a new commit, @@ -2065,7 +2126,7 @@ ConversationModule::onNewCommit(const std::string& peer, pimpl_->sendMsgCb_(peer, {}, std::map<std::string, std::string> { - {"application/invite", conversationId}}, + {MIME_TYPE_INVITE, conversationId}}, 0); return; } @@ -2083,40 +2144,36 @@ ConversationModule::addConversationMember(const std::string& conversationId, const std::string& contactUri, bool sendRequest) { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - // Add a new member in the conversation - auto it = pimpl_->conversations_.find(conversationId); - if (it == pimpl_->conversations_.end()) { + auto conv = pimpl_->getConversation(conversationId); + if (not conv || not conv->conversation) { JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); return; } + std::unique_lock<std::mutex> lk(conv->mtx); - if (it->second->isMember(contactUri, true)) { + if (conv->conversation->isMember(contactUri, true)) { JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUri, conversationId); // Note: This should not be necessary, but if for whatever reason the other side didn't // join we should not forbid new invites - auto invite = it->second->generateInvitation(); + auto invite = conv->conversation->generateInvitation(); lk.unlock(); pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0); return; } - it->second + conv->conversation ->addMember(contactUri, - [this, conversationId, sendRequest, contactUri](bool ok, + [this, conv, conversationId, sendRequest, contactUri](bool ok, const std::string& commitId) { if (ok) { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it != pimpl_->conversations_.end() && it->second) { - pimpl_->sendMessageNotification(*it->second, - true, - commitId); // For the other members - if (sendRequest) { - auto invite = it->second->generateInvitation(); - lk.unlock(); - pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0); - } + std::unique_lock<std::mutex> lk(conv->mtx); + pimpl_->sendMessageNotification(*conv->conversation, + true, + commitId); // For the other members + if (sendRequest) { + auto invite = conv->conversation->generateInvitation(); + lk.unlock(); + pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0); } } }); @@ -2127,18 +2184,17 @@ ConversationModule::removeConversationMember(const std::string& conversationId, const std::string& contactUri, bool isDevice) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it != pimpl_->conversations_.end() && it->second) { - it->second->removeMember(contactUri, - isDevice, - [this, conversationId](bool ok, const std::string& commitId) { - if (ok) { - pimpl_->sendMessageNotification(conversationId, - true, - commitId); - } - }); + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->removeMember(contactUri, isDevice, + [this, conversationId](bool ok, const std::string& commitId) { + if (ok) { + pimpl_->sendMessageNotification(conversationId, + true, + commitId); + } + }); } } @@ -2155,10 +2211,10 @@ ConversationModule::countInteractions(const std::string& convId, const std::string& fromId, const std::string& authorUri) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(convId); - if (conversation != pimpl_->conversations_.end() && conversation->second) { - return conversation->second->countInteractions(toId, fromId, authorUri); + if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->countInteractions(toId, fromId, authorUri); } return 0; } @@ -2166,20 +2222,27 @@ ConversationModule::countInteractions(const std::string& convId, void ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto finishedFlag = std::make_shared<std::atomic_int>(pimpl_->conversations_.size()); - for (const auto& [cid, conversation] : pimpl_->conversations_) { - if (!conversation || (!convId.empty() && convId != cid)) { - if ((*finishedFlag)-- == 1) { - emitSignal<libjami::ConversationSignal::MessagesFound>( - req, - pimpl_->accountId_, - std::string {}, - std::vector<std::map<std::string, std::string>> {}); + if (convId.empty()) { + auto finishedFlag = std::make_shared<std::atomic_int>(pimpl_->conversations_.size()); + std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + for (const auto& [cid, conv] : pimpl_->conversations_) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (!conv->conversation) { + if ((*finishedFlag)-- == 1) { + emitSignal<libjami::ConversationSignal::MessagesFound>( + req, + pimpl_->accountId_, + std::string {}, + std::vector<std::map<std::string, std::string>> {}); + } + } else { + conv->conversation->search(req, filter, finishedFlag); } - continue; } - conversation->search(req, filter, finishedFlag); + } else if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1)); } } @@ -2188,20 +2251,19 @@ ConversationModule::updateConversationInfos(const std::string& conversationId, const std::map<std::string, std::string>& infos, bool sync) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it == pimpl_->conversations_.end()) { + auto conv = pimpl_->getConversation(conversationId); + if (not conv or not conv->conversation) { JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); return; } - - it->second->updateInfos(infos, - [this, conversationId, sync](bool ok, const std::string& commitId) { - if (ok && sync) { - pimpl_->sendMessageNotification(conversationId, true, commitId); - } else if (sync) - JAMI_WARNING("Couldn't update infos on {:s}", conversationId); - }); + std::lock_guard<std::mutex> lk(conv->mtx); + conv->conversation->updateInfos(infos, + [this, conversationId, sync](bool ok, const std::string& commitId) { + if (ok && sync) { + pimpl_->sendMessageNotification(conversationId, true, commitId); + } else if (sync) + JAMI_WARNING("Couldn't update infos on {:s}", conversationId); + }); } std::map<std::string, std::string> @@ -2213,51 +2275,48 @@ ConversationModule::conversationInfos(const std::string& conversationId) const if (itReq != pimpl_->conversationsRequests_.end()) return itReq->second.metadatas; } - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it == pimpl_->conversations_.end() or not it->second) { - std::lock_guard<std::mutex> lkCi(pimpl_->convInfosMtx_); - auto itConv = pimpl_->convInfos_.find(conversationId); - if (itConv == pimpl_->convInfos_.end()) { - JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); - return {}; - } - return {{"syncing", "true"}, {"created", std::to_string(itConv->second.created)}}; + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->infos(); + else + return {{"syncing", "true"}, {"created", std::to_string(conv->info.created)}}; } - - return it->second->infos(); + JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); + return {}; } void ConversationModule::setConversationPreferences(const std::string& conversationId, const std::map<std::string, std::string>& prefs) { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it == pimpl_->conversations_.end()) { - JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); - return; + if (auto conv = pimpl_->getConversation(conversationId)) { + std::unique_lock<std::mutex> lk(conv->mtx); + if (not conv->conversation) { + JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); + return; + } + auto conversation = conv->conversation; + lk.unlock(); + conversation->updatePreferences(prefs); + auto msg = std::make_shared<SyncMsg>(); + msg->p = { + {conversationId, conversation->preferences(true)} + }; + pimpl_->needsSyncingCb_(std::move(msg)); } - - it->second->updatePreferences(prefs); - auto msg = std::make_shared<SyncMsg>(); - std::map<std::string, std::map<std::string, std::string>> p; - p[conversationId] = it->second->preferences(true); - msg->p = std::move(p); - lk.unlock(); - pimpl_->needsSyncingCb_(std::move(msg)); } std::map<std::string, std::string> ConversationModule::getConversationPreferences(const std::string& conversationId, bool includeCreated) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto it = pimpl_->conversations_.find(conversationId); - if (it == pimpl_->conversations_.end() or not it->second) - return {}; - - return it->second->preferences(includeCreated); + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->preferences(includeCreated); + } + return {}; } std::map<std::string, std::map<std::string, std::string>> @@ -2266,8 +2325,8 @@ ConversationModule::convPreferences() const std::map<std::string, std::map<std::string, std::string>> p; std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); for (const auto& [id, conv] : pimpl_->conversations_) { - if (conv) { - auto prefs = conv->preferences(true); + if (conv && conv->conversation) { + auto prefs = conv->conversation->preferences(true); if (!prefs.empty()) p[id] = std::move(prefs); } @@ -2278,28 +2337,24 @@ ConversationModule::convPreferences() const std::vector<uint8_t> ConversationModule::conversationVCard(const std::string& conversationId) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - // Add a new member in the conversation - auto it = pimpl_->conversations_.find(conversationId); - if (it == pimpl_->conversations_.end() || !it->second) { - JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); - return {}; + if (auto conv = pimpl_->getConversation(conversationId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->vCard(); } - - return it->second->vCard(); + JAMI_ERROR("Conversation {:s} doesn't exist", conversationId); + return {}; } bool ConversationModule::isBanned(const std::string& convId, const std::string& uri) const { - { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(convId); - if (conversation == pimpl_->conversations_.end() || !conversation->second) + if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (!conv->conversation) return true; - if (conversation->second->mode() != ConversationMode::ONE_TO_ONE) { - return conversation->second->isBanned(uri); - } + if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE) + return conv->conversation->isBanned(uri); } // If 1:1 we check the certificate status if (auto acc = pimpl_->account_.lock()) { @@ -2334,7 +2389,6 @@ ConversationModule::removeContact(const std::string& uri, bool banned) // Remove related conversation auto isSelf = uri == pimpl_->username_; std::vector<std::string> toRm; - std::vector<std::string> updated; auto updateClient = [&](const auto& convId) { if (pimpl_->updateConvReqCb_) pimpl_->updateConvReqCb_(convId, uri, false); @@ -2342,38 +2396,36 @@ ConversationModule::removeContact(const std::string& uri, bool banned) }; { std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - std::lock_guard<std::mutex> lkCi(pimpl_->convInfosMtx_); - for (auto& [convId, conv] : pimpl_->convInfos_) { - auto itConv = pimpl_->conversations_.find(convId); - if (itConv != pimpl_->conversations_.end() && itConv->second) { + for (auto& [convId, conv] : pimpl_->conversations_) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { try { // Note it's important to check getUsername(), else // removing self can remove all conversations - if (itConv->second->mode() == ConversationMode::ONE_TO_ONE) { - auto initMembers = itConv->second->getInitialMembers(); + if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) { + auto initMembers = conv->conversation->getInitialMembers(); if ((isSelf && initMembers.size() == 1) || (!isSelf && std::find(initMembers.begin(), initMembers.end(), uri) != initMembers.end())) { // Mark as removed - conv.removed = std::time(nullptr); + conv->info.removed = std::time(nullptr); toRm.emplace_back(convId); updateClient(convId); + pimpl_->addConvInfo(conv->info); } } } catch (const std::exception& e) { JAMI_WARN("%s", e.what()); } - } else if (std::find(conv.members.begin(), conv.members.end(), uri) - != conv.members.end()) { + } else if (std::find(conv->info.members.begin(), conv->info.members.end(), uri) + != conv->info.members.end()) { // It's syncing with uri, mark as removed! - conv.removed = std::time(nullptr); - updated.emplace_back(convId); + conv->info.removed = std::time(nullptr); updateClient(convId); + pimpl_->addConvInfo(conv->info); } } - if (!updated.empty() || !toRm.empty()) - pimpl_->saveConvInfos(); } // Note, if we ban the device, we don't send the leave cause the other peer will just // never got the notifications, so just erase the datas @@ -2390,39 +2442,38 @@ ConversationModule::removeConversation(const std::string& conversationId) void ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto acc = pimpl_->account_.lock(); - auto conversation = pimpl_->conversations_.find(oldConvId); - if (acc && conversation != pimpl_->conversations_.end() && conversation->second) { - std::promise<bool> waitLoad; - std::future<bool> fut = waitLoad.get_future(); - // we should wait for loadMessage, because it will be deleted after this. - conversation->second->loadMessages( - [&](auto&& messages) { - std::reverse(messages.begin(), - messages.end()); // Log is inverted as we want to replay - std::lock_guard<std::mutex> lk(pimpl_->replayMtx_); - pimpl_->replay_[newConvId] = std::move(messages); - waitLoad.set_value(true); - }, - {}); - fut.wait(); + if (auto conv = pimpl_->getConversation(oldConvId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) { + std::promise<bool> waitLoad; + std::future<bool> fut = waitLoad.get_future(); + // we should wait for loadMessage, because it will be deleted after this. + conv->conversation->loadMessages( + [&](auto&& messages) { + std::reverse(messages.begin(), + messages.end()); // Log is inverted as we want to replay + std::lock_guard<std::mutex> lk(pimpl_->replayMtx_); + pimpl_->replay_[newConvId] = std::move(messages); + waitLoad.set_value(true); + }, + {}); + fut.wait(); + } } } bool ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); if (conversationId.empty()) { + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); return std::find_if(pimpl_->conversations_.cbegin(), pimpl_->conversations_.cend(), - [&](const auto& conv) { return conv.second->isHosting(confId); }) + [&](const auto& conv) { return conv.second->conversation->isHosting(confId); }) != pimpl_->conversations_.cend(); - } else { - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation != pimpl_->conversations_.end() && conversation->second) { - return conversation->second->isHosting(confId); + } else if (auto conv = pimpl_->getConversation(conversationId)) { + if (conv->conversation) { + return conv->conversation->isHosting(confId); } } return false; @@ -2431,13 +2482,9 @@ ConversationModule::isHosting(const std::string& conversationId, const std::stri std::vector<std::map<std::string, std::string>> ConversationModule::getActiveCalls(const std::string& conversationId) const { - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation == pimpl_->conversations_.end() || !conversation->second) { - JAMI_ERR("Conversation %s not found", conversationId.c_str()); - return {}; - } - return conversation->second->currentCalls(); + return pimpl_->withConversation(conversationId, [](const auto& conversation) { + return conversation.currentCalls(); + }); } void @@ -2483,9 +2530,10 @@ ConversationModule::call(const std::string& url, cb(callUri, DeviceId(deviceId)); }; - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation == pimpl_->conversations_.end() || !conversation->second) { + auto conv = pimpl_->getConversation(conversationId); + if (!conv) return; + std::unique_lock<std::mutex> lk(conv->mtx); + if (!conv->conversation) { JAMI_ERROR("Conversation {:s} not found", conversationId); return; } @@ -2493,9 +2541,8 @@ ConversationModule::call(const std::string& url, // Check if we want to join a specific conference // So, if confId is specified or if there is some activeCalls // or if we are the default host. - auto& conv = conversation->second; - auto activeCalls = conv->currentCalls(); - auto infos = conv->infos(); + auto activeCalls = conv->conversation->currentCalls(); + auto infos = conv->conversation->infos(); auto itRdvAccount = infos.find("rdvAccount"); auto itRdvDevice = infos.find("rdvDevice"); auto sendCallRequest = false; @@ -2582,20 +2629,20 @@ ConversationModule::hostConference(const std::string& conversationId, return; } - std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); - auto conversation = pimpl_->conversations_.find(conversationId); - if (conversation == pimpl_->conversations_.end() || !conversation->second) { - JAMI_ERR("Conversation %s not found", conversationId.c_str()); + auto conv = pimpl_->getConversation(conversationId); + if (!conv) return; + std::unique_lock<std::mutex> lk(conv->mtx); + if (!conv->conversation) { + JAMI_ERROR("Conversation {} not found", conversationId); return; } - auto& conv = conversation->second; // Add commit to conversation Json::Value value; value["uri"] = pimpl_->username_; value["device"] = pimpl_->deviceId_; value["confId"] = confId; value["type"] = "application/call-history+json"; - conv->hostConference(std::move(value), + conv->conversation->hostConference(std::move(value), [w = pimpl_->weak(), conversationId](bool ok, const std::string& commitId) { if (ok) { if (auto shared = w.lock()) @@ -2610,7 +2657,7 @@ ConversationModule::hostConference(const std::string& conversationId, // Master call, so when it's stopped, the conference will be stopped (as we use the hold // state for detaching the call) conf->onShutdown( - [w = pimpl_->weak(), accountUri = pimpl_->username_, confId, conversationId, call]( + [w = pimpl_->weak(), accountUri = pimpl_->username_, confId, conversationId, call, conv]( int duration) { auto shared = w.lock(); if (shared) { @@ -2621,26 +2668,22 @@ ConversationModule::hostConference(const std::string& conversationId, value["type"] = "application/call-history+json"; value["duration"] = std::to_string(duration); - std::unique_lock<std::mutex> lk(shared->conversationsMtx_); - auto conversation = shared->conversations_.find(conversationId); - if (conversation == shared->conversations_.end() || !conversation->second) { - JAMI_ERR("Conversation %s not found", conversationId.c_str()); - return true; + std::lock_guard<std::mutex> lk(conv->mtx); + if (!conv->conversation) { + JAMI_ERROR("Conversation {} not found", conversationId); + return; } - auto& conv = conversation->second; - conv->removeActiveConference( + conv->conversation->removeActiveConference( std::move(value), [w, conversationId](bool ok, const std::string& commitId) { if (ok) { if (auto shared = w.lock()) { shared->sendMessageNotification(conversationId, true, commitId); } } else { - JAMI_ERR("Failed to send message to conversation %s", - conversationId.c_str()); + JAMI_ERROR("Failed to send message to conversation {}", conversationId); } }); } - return true; }); } @@ -2701,37 +2744,36 @@ ConversationModule::addConvInfo(const ConvInfo& info) } void -ConversationModule::setConversationMembers(const std::string& convId, +ConversationModule::Impl::setConversationMembers(const std::string& convId, const std::vector<std::string>& members) { - std::lock_guard<std::mutex> lk(pimpl_->convInfosMtx_); - auto convIt = pimpl_->convInfos_.find(convId); - if (convIt != pimpl_->convInfos_.end()) { - convIt->second.members = members; - pimpl_->saveConvInfos(); + if (auto conv = getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + conv->info.members = members; + addConvInfo(conv->info); } } std::shared_ptr<Conversation> ConversationModule::getConversation(const std::string& convId) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto convIt = pimpl_->conversations_.find(convId); - if (convIt != pimpl_->conversations_.end()) - return convIt->second; + if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + return conv->conversation; + } return nullptr; } std::shared_ptr<dhtnet::ChannelSocket> ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto convIt = pimpl_->conversations_.find(convId); - if (convIt != pimpl_->conversations_.end()) - return convIt->second->gitSocket(DeviceId(deviceId)); - auto it = pimpl_->pendingConversationsFetch_.find(convId); - if (it != pimpl_->pendingConversationsFetch_.end()) - return it->second.socket; + if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + if (conv->conversation) + return conv->conversation->gitSocket(DeviceId(deviceId)); + else if (conv->pending) + return conv->pending->socket; + } return nullptr; } @@ -2740,47 +2782,48 @@ ConversationModule::addGitSocket(std::string_view deviceId, std::string_view convId, const std::shared_ptr<dhtnet::ChannelSocket>& channel) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto convIt = pimpl_->conversations_.find(convId); - if (convIt != pimpl_->conversations_.end()) - convIt->second->addGitSocket(DeviceId(deviceId), channel); - else + if (auto conv = pimpl_->getConversation(convId)) { + std::lock_guard<std::mutex> lk(conv->mtx); + conv->conversation->addGitSocket(DeviceId(deviceId), channel); + } else JAMI_WARNING("addGitSocket: can't find conversation {:s}", convId); } void ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto convIt = pimpl_->conversations_.find(convId); - if (convIt != pimpl_->conversations_.end()) - return convIt->second->removeGitSocket(DeviceId(deviceId)); + pimpl_->withConversation(convId, [&](auto& conv) { + conv.removeGitSocket(DeviceId(deviceId)); + }); } + void ConversationModule::shutdownConnections() { std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - for (auto& [k, conversation] : pimpl_->conversations_) - conversation->shutdownConnections(); - for (auto& [k, pending] : pimpl_->pendingConversationsFetch_) - pending.socket = {}; + for (auto& [k, c] : pimpl_->conversations_) { + if (c->conversation) + c->conversation->shutdownConnections(); + if (c->pending) + c->pending->socket = {}; + } } void ConversationModule::addSwarmChannel(const std::string& conversationId, std::shared_ptr<dhtnet::ChannelSocket> channel) { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - auto convIt = pimpl_->conversations_.find(conversationId); - if (convIt != pimpl_->conversations_.end()) - convIt->second->addSwarmChannel(std::move(channel)); + pimpl_->withConversation(conversationId, [&](auto& conv) { + conv.addSwarmChannel(std::move(channel)); + }); } void ConversationModule::connectivityChanged() { std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - for (auto& [k, conversation] : pimpl_->conversations_) { - conversation->connectivityChanged(); + for (auto& [k, c] : pimpl_->conversations_) { + if (c->conversation) + c->conversation->connectivityChanged(); } } } // namespace jami diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h index fa0693499f3455c438da6fa74753af548120e11d..701b6a6f0614aea95fa65a1b3517b326efbd5b54 100644 --- a/src/jamidht/conversation_module.h +++ b/src/jamidht/conversation_module.h @@ -31,6 +31,8 @@ #include <msgpack.hpp> namespace jami { +static constexpr const char MIME_TYPE_INVITE[] {"application/invite"}; +static constexpr const char MIME_TYPE_GIT[] {"application/im-gitmessage-id"}; class SIPCall; @@ -447,7 +449,6 @@ public: static std::map<std::string, ConversationRequest> convRequests(const std::string& accountId); static std::map<std::string, ConversationRequest> convRequestsFromPath(const std::string& path); void addConvInfo(const ConvInfo& info); - void setConversationMembers(const std::string& convId, const std::vector<std::string>& members); /** * Get a conversation diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index c371148a8d4dacce8c579e8a37ae5b8b086b9144..ea0fc7483a984ad629f3ce0001d827a9849e7286 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -106,9 +106,7 @@ namespace jami { constexpr pj_str_t STR_MESSAGE_ID = jami::sip_utils::CONST_PJ_STR("Message-ID"); static constexpr const char MIME_TYPE_IMDN[] {"message/imdn+xml"}; static constexpr const char MIME_TYPE_IM_COMPOSING[] {"application/im-iscomposing+xml"}; -static constexpr const char MIME_TYPE_INVITE[] {"application/invite"}; static constexpr const char MIME_TYPE_INVITE_JSON[] {"application/invite+json"}; -static constexpr const char MIME_TYPE_GIT[] {"application/im-gitmessage-id"}; static constexpr const char FILE_URI[] {"file://"}; static constexpr const char VCARD_URI[] {"vcard://"}; static constexpr const char DATA_TRANSFER_URI[] {"data-transfer://"}; @@ -2833,7 +2831,7 @@ JamiAccount::updateConvForContact(const std::string& uri, auto details = getContactDetails(uri); auto itDetails = details.find(libjami::Account::TrustRequest::CONVERSATIONID); if (itDetails != details.end() && itDetails->second != oldConv) { - JAMI_DBG("Old conversation is not found in details %s", oldConv.c_str()); + JAMI_DEBUG("Old conversation is not found in details {} - found: {}", oldConv, itDetails->second); return false; } info->contacts->updateConversation(urih, newConv); @@ -3435,13 +3433,6 @@ JamiAccount::setActiveCodecs(const std::vector<unsigned>& list) config_->activeCodecs = getActiveCodecs(MEDIA_ALL); } -// Member management -void -JamiAccount::saveMembers(const std::string& convId, const std::vector<std::string>& members) -{ - convModule()->setConversationMembers(convId, members); -} - void JamiAccount::sendInstantMessage(const std::string& convId, const std::map<std::string, std::string>& msg) diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 4f147a81906f056b4ed0f533cef2eda52a578b16..093315e25636b12c8bb1839e9c6560964e12f6ee 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -446,10 +446,6 @@ public: std::string_view currentDeviceId() const; - // Member management - void saveMembers(const std::string& convId, - const std::vector<std::string>& members); // Save confInfos - // Received a new commit notification bool handleMessage(const std::string& from,