diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index c0785671d3c6ac15b9fb7b2a8b780c6e2c481b2d..c5984fa47f842af7257827cd50a2fab24bfda7a1 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -559,10 +559,6 @@ std::vector<std::map<std::string, std::string>> Conversation::getMembers(bool includeInvited, bool includeLeft) const { std::vector<std::map<std::string, std::string>> result; - - auto shared = pimpl_->account_.lock(); - if (!shared) - return result; auto members = pimpl_->repository_->members(); std::lock_guard<std::mutex> lk(pimpl_->lastDisplayedMtx_); for (const auto& member : members) { diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index b15e29234737ef4ab83ef4ad86dbd85bf2f982c2..a93d37fac476a2b5b9384ae97b7de360e6d62f50 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -34,6 +34,8 @@ namespace jami { +using ConvInfoMap = std::map<std::string, ConvInfo>; + struct PendingConversationFetch { bool ready {false}; @@ -77,6 +79,7 @@ public: */ void checkConversationsEvents(); bool handlePendingConversations(); + void handlePendingConversation(const std::string& conversationId, const std::string& deviceId); // Requests std::optional<ConversationRequest> getRequest(const std::string& id) const; @@ -404,8 +407,8 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, void ConversationModule::Impl::checkConversationsEvents() { - bool hasHandler = conversationsEventHandler and not conversationsEventHandler->isCancelled(); std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + bool hasHandler = conversationsEventHandler and not conversationsEventHandler->isCancelled(); if (not pendingConversationsFetch_.empty() and not hasHandler) { conversationsEventHandler = Manager::instance().scheduler().scheduleAtFixedRate( [w = weak()] { @@ -420,6 +423,94 @@ ConversationModule::Impl::checkConversationsEvents() } } +// Clone and store conversation +void +ConversationModule::Impl::handlePendingConversation(const std::string& conversationId, + const std::string& deviceId) +{ + auto erasePending = [&] { + std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + pendingConversationsFetch_.erase(conversationId); + }; + try { + auto conversation = std::make_shared<Conversation>(account_, deviceId, conversationId); + if (!conversation->isMember(username_, true)) { + JAMI_ERR("Conversation cloned but doesn't seems to be a valid member"); + conversation->erase(); + erasePending(); + return; + } + auto removeRepo = false; + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + // Note: a removeContact while cloning. In this case, the conversation + // must not be announced and removed. + auto itConv = convInfos_.find(conversationId); + if (itConv != convInfos_.end() && itConv->second.removed) + removeRepo = true; + conversations_.emplace(conversationId, conversation); + } + if (removeRepo) { + removeRepository(conversationId, false, true); + erasePending(); + return; + } + auto commitId = conversation->join(); + std::vector<std::map<std::string, std::string>> messages; + { + std::lock_guard<std::mutex> lk(replayMtx_); + auto replayIt = replay_.find(conversationId); + if (replayIt != replay_.end()) { + messages = std::move(replayIt->second); + replay_.erase(replayIt); + } + } + if (!commitId.empty()) + sendMessageNotification(conversationId, commitId, false); + // Inform user that the conversation is ready + emitSignal<DRing::ConversationSignal::ConversationReady>(accountId_, conversationId); + needsSyncingCb_(); + std::vector<Json::Value> values; + values.reserve(messages.size()); + for (const auto& message : messages) { + // For now, only replay text messages. + // File transfers will need more logic, and don't care about calls for now. + if (message.at("type") == "text/plain" && message.at("author") == username_) { + Json::Value json; + json["body"] = message.at("body"); + json["type"] = "text/plain"; + values.emplace_back(std::move(json)); + } + } + if (!values.empty()) + conversation->sendMessages(std::move(values), + "", + [w = weak(), conversationId](const auto& commits) { + auto shared = w.lock(); + if (shared and not commits.empty()) + shared->sendMessageNotification(conversationId, + *commits.rbegin(), + true); + }); + // Download members profile on first sync + if (auto cert = tls::CertificateStore::instance().getCertificate(deviceId)) { + if (cert->issuer && cert->issuer->getId().toString() == username_) { + if (auto acc = account_.lock()) { + for (const auto& member : conversation->memberUris(username_)) + acc->askForProfile(conversationId, deviceId, member); + } + } + } + } catch (const std::exception& e) { + emitSignal<DRing::ConversationSignal::OnConversationError>(accountId_, + conversationId, + EFETCH, + e.what()); + JAMI_WARN("Something went wrong when cloning conversation: %s", e.what()); + } + erasePending(); +} + bool ConversationModule::Impl::handlePendingConversations() { @@ -427,105 +518,11 @@ ConversationModule::Impl::handlePendingConversations() for (auto it = pendingConversationsFetch_.begin(); it != pendingConversationsFetch_.end();) { if (it->second.ready && !it->second.cloning) { it->second.cloning = true; - dht::ThreadPool::io().run([w = weak(), - conversationId = it->first, - deviceId = it->second.deviceId]() { - auto sthis = w.lock(); - if (!sthis) - return; - // Clone and store conversation - auto erasePending = [&] { - std::lock_guard<std::mutex> lk(sthis->pendingConversationsFetchMtx_); - sthis->pendingConversationsFetch_.erase(conversationId); - }; - try { - auto conversation = std::make_shared<Conversation>(sthis->account_, - deviceId, - conversationId); - if (!conversation->isMember(sthis->username_, true)) { - JAMI_ERR("Conversation cloned but doesn't seems to be a valid member"); - conversation->erase(); - erasePending(); - return; - } - if (conversation) { - auto removeRepo = false; - { - std::lock_guard<std::mutex> lk(sthis->conversationsMtx_); - // Note: a removeContact while cloning. In this case, the conversation - // must not be announced and removed. - auto itConv = sthis->convInfos_.find(conversationId); - if (itConv != sthis->convInfos_.end() && itConv->second.removed) - removeRepo = true; - sthis->conversations_.emplace(conversationId, conversation); - } - if (removeRepo) { - sthis->removeRepository(conversationId, false, true); - erasePending(); - return; - } - auto commitId = conversation->join(); - std::vector<std::map<std::string, std::string>> messages; - { - std::lock_guard<std::mutex> lk(sthis->replayMtx_); - auto replayIt = sthis->replay_.find(conversationId); - if (replayIt != sthis->replay_.end()) { - messages = std::move(replayIt->second); - sthis->replay_.erase(replayIt); - } - } - if (!commitId.empty()) - sthis->sendMessageNotification(conversationId, commitId, false); - // Inform user that the conversation is ready - emitSignal<DRing::ConversationSignal::ConversationReady>(sthis->accountId_, - conversationId); - sthis->needsSyncingCb_(); - std::vector<Json::Value> values; - values.reserve(messages.size()); - for (const auto& message : messages) { - // For now, only replay text messages. - // File transfers will need more logic, and don't care about calls for now. - if (message.at("type") == "text/plain" - && message.at("author") == sthis->username_) { - Json::Value json; - json["body"] = message.at("body"); - json["type"] = "text/plain"; - values.emplace_back(std::move(json)); - } - } - if (!values.empty()) - conversation->sendMessages( - std::move(values), "", [w, conversationId](const auto& commits) { - auto shared = w.lock(); - if (!shared || !commits.empty()) - shared->sendMessageNotification(conversationId, - *commits.rbegin(), - true); - }); - // Download members profile on first sync - if (auto cert = tls::CertificateStore::instance().getCertificate(deviceId)) { - if (cert->issuer - && cert->issuer->getId().toString() == sthis->username_) { - if (auto acc = sthis->account_.lock()) { - for (const auto& member : conversation->getMembers()) { - if (member.at("uri") != sthis->username_) - acc->askForProfile(conversationId, - deviceId, - member.at("uri")); - } - } - } - } - } - } catch (const std::exception& e) { - emitSignal<DRing::ConversationSignal::OnConversationError>(sthis->accountId_, - conversationId, - EFETCH, - e.what()); - JAMI_WARN("Something went wrong when cloning conversation: %s", e.what()); - } - erasePending(); - }); + dht::ThreadPool::io().run( + [w = weak(), conversationId = it->first, deviceId = it->second.deviceId]() { + if (auto sthis = w.lock()) + sthis->handlePendingConversation(conversationId, deviceId); + }); } ++it; } @@ -600,13 +597,10 @@ ConversationModule::Impl::sendMessageNotification(const Conversation& conversati message["deviceId"] = deviceId_; Json::StreamWriterBuilder builder; const auto text = Json::writeString(builder, message); - for (const auto& members : conversation.getMembers()) { - auto uri = members.at("uri"); - // Do not send to ourself, it's synced via convInfos - if (!sync && username_.find(uri) != std::string::npos) - continue; + for (const auto& member : conversation.memberUris(username_)) { // Announce to all members that a new message is sent - sendMsgCb_(uri, std::map<std::string, std::string> {{"application/im-gitmessage-id", text}}); + sendMsgCb_(member, + std::map<std::string, std::string> {{"application/im-gitmessage-id", text}}); } } @@ -672,16 +666,14 @@ ConversationModule::saveConvRequestsToPath( } void -ConversationModule::saveConvInfos(const std::string& accountId, - const std::map<std::string, ConvInfo>& conversations) +ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations) { auto path = fileutils::get_data_dir() + DIR_SEPARATOR_STR + accountId; saveConvInfosToPath(path, conversations); } void -ConversationModule::saveConvInfosToPath(const std::string& path, - const std::map<std::string, ConvInfo>& conversations) +ConversationModule::saveConvInfosToPath(const std::string& path, const ConvInfoMap& conversations) { std::ofstream file(path + DIR_SEPARATOR_STR + "convInfo", std::ios::trunc | std::ios::binary); msgpack::pack(file, conversations); diff --git a/src/jamidht/conversationrepository.cpp b/src/jamidht/conversationrepository.cpp index 37a0870e0b9a08b175f7b36b7dbeaa33562aed93..25ddb5de1b4275beb02048555b9f73dbda8817fe 100644 --- a/src/jamidht/conversationrepository.cpp +++ b/src/jamidht/conversationrepository.cpp @@ -104,6 +104,7 @@ public: const std::string& parentId) const; bool add(const std::string& path); + void addUserDevice(); std::string commit(const std::string& msg); ConversationMode mode() const; @@ -1224,10 +1225,10 @@ ConversationRepository::Impl::checkInitialCommit(const std::string& userDevice, } auto hasDevice = false, hasAdmin = false; - std::string adminsFile = std::string("admins") + "/" + userUri + ".crt"; - std::string deviceFile = std::string("devices") + "/" + userDevice + ".crt"; - std::string crlFile = std::string("CRLs") + "/" + userUri; - std::string invitedFile = std::string("invited") + "/" + invited; + std::string adminsFile = fmt::format("admins/{}.crt", userUri); + std::string deviceFile = fmt::format("devices/{}.crt", userDevice); + std::string crlFile = fmt::format("CRLs/{}", userUri); + std::string invitedFile = fmt::format("invited/{}", invited); // Check that admin cert is added // Check that device cert is added // Check CRLs added @@ -2378,40 +2379,54 @@ ConversationRepository::remoteHead(const std::string& remoteDeviceId, return commit_str; } -std::string -ConversationRepository::commitMessage(const std::string& msg) +void +ConversationRepository::Impl::addUserDevice() { - auto account = pimpl_->account_.lock(); + auto account = account_.lock(); if (!account) - return {}; - auto deviceId = std::string(account->currentDeviceId()); + return; // First, we need to add device file to the repository if not present - auto repo = pimpl_->repository(); + auto repo = repository(); if (!repo) - return {}; - std::string repoPath = git_repository_workdir(repo.get()); + return; // NOTE: libgit2 uses / for files - std::string path = std::string("devices") + "/" + deviceId + ".crt"; - std::string devicePath = repoPath + path; + std::string path = fmt::format("devices/{}.crt", account->currentDeviceId()); + std::string devicePath = git_repository_workdir(repo.get()) + path; if (!fileutils::isFile(devicePath)) { auto file = fileutils::ofstream(devicePath, std::ios::trunc | std::ios::binary); if (!file.is_open()) { JAMI_ERR("Could not write data to %s", devicePath.c_str()); - return {}; + return; } auto cert = account->identity().second; auto deviceCert = cert->toString(false); file << deviceCert; file.close(); - if (!pimpl_->add(path)) + if (!add(path)) JAMI_WARN("Couldn't add file %s", devicePath.c_str()); } +} +std::string +ConversationRepository::commitMessage(const std::string& msg) +{ + pimpl_->addUserDevice(); return pimpl_->commit(msg); } +std::vector<std::string> +ConversationRepository::commitMessages(const std::vector<std::string>& msgs) +{ + pimpl_->addUserDevice(); + std::vector<std::string> ret; + ret.reserve(msgs.size()); + for (const auto& msg : msgs) + ret.emplace_back(pimpl_->commit(msg)); + return ret; +} + std::vector<ConversationCommit> ConversationRepository::logN(const std::string& last, unsigned n, bool logIfNotFound) const { diff --git a/src/jamidht/conversationrepository.h b/src/jamidht/conversationrepository.h index 33eabeb9fbebdfc85c88efcbc922aedc503e30c5..65a1c18a5377f5529727aa75ad2253e3d8a71a7e 100644 --- a/src/jamidht/conversationrepository.h +++ b/src/jamidht/conversationrepository.h @@ -175,6 +175,8 @@ public: */ std::string commitMessage(const std::string& msg); + std::vector<std::string> commitMessages(const std::vector<std::string>& msgs); + /** * Amend a commit message * @param id The commit to amend