diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 47469580c32333bcf4ec21d8678377836282d94e..8111151419481cc466861d8a5d981c0e2bc5fc17 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -146,7 +146,7 @@ public: if (!repository_) { throw std::logic_error("Couldn't create repository"); } - init(*account); + init(account); } Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId) @@ -159,7 +159,7 @@ public: if (!repository_) { throw std::logic_error("Couldn't create repository"); } - init(*account); + init(account); } Impl(const std::shared_ptr<JamiAccount>& account, @@ -188,10 +188,10 @@ public: activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS; for (const auto& c : repository_->convCommitsToMap(commits)) updateActiveCalls(c); - init(*account); + init(account); } - void init(JamiAccount& account) { + void init(const std::shared_ptr<JamiAccount>& account) { ioContext_ = Manager::instance().ioContext(); fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_); swarmManager_ @@ -203,7 +203,7 @@ public: } return false; }); - swarmManager_->setMobility(account.isMobile()); + swarmManager_->setMobility(account->isMobile()); transferManager_ = std::make_shared<TransferManager>(accountId_, "", @@ -219,7 +219,7 @@ public: hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS; loadActiveCalls(); loadStatus(); - typers_ = std::make_shared<Typers>(shared, repository_->id()); + typers_ = std::make_shared<Typers>(account, repository_->id()); } const std::string& toString() const diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 83e74033ca727c723534eb424708990b4d3c0ae9..1a9076ea1c66f08ff67a62116c96c8e046d7795e 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -122,7 +122,8 @@ struct SyncedConversation class ConversationModule::Impl : public std::enable_shared_from_this<Impl> { public: - Impl(std::weak_ptr<JamiAccount>&& account, + Impl(std::shared_ptr<JamiAccount>&& account, + std::shared_ptr<AccountManager>&& accountManager, NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, @@ -367,6 +368,7 @@ public: } std::weak_ptr<JamiAccount> account_; + std::shared_ptr<AccountManager> accountManager_; NeedsSyncingCb needsSyncingCb_; SengMsgCb sendMsgCb_; NeedSocketCb onNeedSocket_; @@ -449,7 +451,8 @@ public: } }; -ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account, +ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account, + std::shared_ptr<AccountManager>&& accountManager, NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, @@ -457,20 +460,20 @@ ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account, UpdateConvReq&& updateConvReqCb, OneToOneRecvCb&& oneToOneRecvCb) : account_(account) + , accountManager_(accountManager) , needsSyncingCb_(needsSyncingCb) , sendMsgCb_(sendMsgCb) , onNeedSocket_(onNeedSocket) , onNeedSwarmSocket_(onNeedSwarmSocket) , updateConvReqCb_(updateConvReqCb) , oneToOneRecvCb_(oneToOneRecvCb) + , accountId_(account->getAccountID()) { - if (auto shared = account.lock()) { - accountId_ = shared->getAccountID(); - deviceId_ = shared->currentDeviceId(); - if (auto accm = shared->accountManager()) - if (const auto* info = accm->getInfo()) - username_ = info->accountId; - } + if (auto accm = account->accountManager()) + if (const auto* info = accm->getInfo()) { + deviceId_ = info->deviceId; + username_ = info->accountId; + } conversationsRequests_ = convRequests(accountId_); loadMetadatas(); } @@ -507,7 +510,7 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId, onNeedSocket_( conv->info.id, deviceId, - [=](const auto& channel) { + [w = weak(), conv, deviceId](const auto& channel) { std::lock_guard lk(conv->mtx); if (conv->pending && !conv->pending->ready) { if (channel) { @@ -516,7 +519,7 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId, conv->pending->socket = channel; if (!conv->pending->cloning) { conv->pending->cloning = true; - dht::ThreadPool::io().run([w = weak(), + dht::ThreadPool::io().run([w, convId = conv->info.id, deviceId = conv->pending->deviceId]() { if (auto sthis = w.lock()) @@ -686,9 +689,7 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, } } if (shared->syncCnt.fetch_sub(1) == 1) { - if (auto account = shared->account_.lock()) - emitSignal<libjami::ConversationSignal::ConversationSyncFinished>( - account->getAccountID().c_str()); + emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(shared->accountId_); } }, commitId); @@ -725,9 +726,13 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat if (!acc) return; std::vector<DeviceId> kd; - for (const auto& [id, _] : acc->getKnownDevices()) - kd.emplace_back(id); - + { + std::unique_lock lk(conversationsMtx_); + const auto& devices = accountManager_->getKnownDevices(); + kd.reserve(devices.size()); + for (const auto& [id, _] : devices) + kd.emplace_back(id); + } auto conv = getConversation(conversationId); if (!conv) return; @@ -844,17 +849,13 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat auto askForProfile = isOneOne; if (!isOneOne) { // If not 1:1 only download profiles from self (to avoid non checked files) - if (auto acc = account_.lock()) { - auto cert = acc->certStore().getCertificate(deviceId); - askForProfile = cert && cert->issuer - && cert->issuer->getId().toString() == username_; - } + auto cert = acc->certStore().getCertificate(deviceId); + askForProfile = cert && cert->issuer + && cert->issuer->getId().toString() == username_; } if (askForProfile) { - if (auto acc = account_.lock()) { - for (const auto& member : conversation->memberUris(username_)) { - acc->askForProfile(conversationId, deviceId, member); - } + for (const auto& member : conversation->memberUris(username_)) { + acc->askForProfile(conversationId, deviceId, member); } } } catch (const std::exception& e) { @@ -887,10 +888,7 @@ ConversationModule::Impl::getRequest(const std::string& id) const std::string ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept { - auto acc = account_.lock(); - if (!acc) - return {}; - auto details = acc->getContactDetails(uri); + auto details = accountManager_->getContactDetails(uri); auto itRemoved = details.find("removed"); // If contact is removed there is no conversation if (itRemoved != details.end() && itRemoved->second != "0") { @@ -955,13 +953,11 @@ ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sy try { if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) { for (const auto& member : conv.conversation->getInitialMembers()) { - auto account = account_.lock(); - if (member != account->getUsername()) { + if (member != username_) { // Note: this can happen while re-adding a contact. // In this case, check that we are removing the linked conversation. if (conv.info.id == getOneToOneConversation(member)) { - if (auto am = account->accountManager()) - am->removeContactConversation(member); + accountManager_->removeContactConversation(member); } } } @@ -1092,12 +1088,10 @@ ConversationModule::Impl::sendMessageNotification(Conversation& conversation, auto members = conversation.memberUris(username_, {MemberRole::BANNED}); std::vector<std::string> connectedMembers; // print all members - if (auto acc = account_.lock()) { - for (const auto& device : devices) { - auto cert = acc->certStore().getCertificate(device.toString()); - if (cert && cert->issuer) - connectedMembers.emplace_back(cert->issuer->getId().toString()); - } + for (const auto& device : devices) { + auto cert = acc->certStore().getCertificate(device.toString()); + if (cert && cert->issuer) + connectedMembers.emplace_back(cert->issuer->getId().toString()); } std::sort(std::begin(connectedMembers), std::end(connectedMembers)); std::set_difference(members.begin(), @@ -1320,9 +1314,7 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv onNeedSocket_( conversationId, deviceId, - [sthis = shared_from_this(), conv, conversationId, oldConvId, deviceId]( - const auto& channel) { - auto acc = sthis->account_.lock(); + [wthis=weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) { std::lock_guard lk(conv->mtx); if (conv->pending && !conv->pending->ready) { conv->pending->removeId = oldConvId; @@ -1332,15 +1324,15 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv conv->pending->socket = channel; if (!conv->pending->cloning) { conv->pending->cloning = true; - dht::ThreadPool::io().run([w = sthis->weak(), - conversationId, - deviceId = conv->pending->deviceId]() { - if (auto sthis = w.lock()) + dht::ThreadPool::io().run([wthis, + conversationId, + deviceId = conv->pending->deviceId]() { + if (auto sthis = wthis.lock()) sthis->handlePendingConversation(conversationId, deviceId); }); } return true; - } else { + } else if (auto sthis = wthis.lock()) { conv->stopFetch(deviceId); JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count()); conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer); @@ -1379,10 +1371,13 @@ void ConversationModule::Impl::bootstrap(const std::string& convId) { std::vector<DeviceId> kd; - if (auto acc = account_.lock()) - for (const auto& [id, _] : acc->getKnownDevices()) + { + std::unique_lock lk(conversationsMtx_); + const auto& devices = accountManager_->getKnownDevices(); + kd.reserve(devices.size()); + for (const auto& [id, _] : devices) kd.emplace_back(id); - + } auto bootstrap = [&](auto& conv) { if (conv) { #ifdef LIBJAMI_TESTABLE @@ -1427,9 +1422,8 @@ ConversationModule::Impl::cloneConversationFrom(const std::string& conversationI const std::string& uri, const std::string& oldConvId) { - auto acc = account_.lock(); auto memberHash = dht::InfoHash(uri); - if (!acc || !memberHash) { + if (!memberHash) { JAMI_WARNING("Invalid member detected: {}", uri); return; } @@ -1439,15 +1433,16 @@ ConversationModule::Impl::cloneConversationFrom(const std::string& conversationI conv->info.created = std::time(nullptr); conv->info.members.emplace(username_); conv->info.members.emplace(uri); - acc->forEachDevice(memberHash, - [w = weak(), conv, conversationId, oldConvId]( - const std::shared_ptr<dht::crypto::PublicKey>& pk) { - auto sthis = w.lock(); - auto deviceId = pk->getLongId().toString(); - if (!sthis or deviceId == sthis->deviceId_) - return; - sthis->cloneConversationFrom(conv, deviceId, oldConvId); - }); + accountManager_->forEachDevice( + memberHash, + [w = weak(), conv, conversationId, oldConvId]( + const std::shared_ptr<dht::crypto::PublicKey>& pk) { + auto sthis = w.lock(); + auto deviceId = pk->getLongId().toString(); + if (!sthis or deviceId == sthis->deviceId_) + return; + sthis->cloneConversationFrom(conv, deviceId, oldConvId); + }); addConvInfo(conv->info); } @@ -1490,7 +1485,8 @@ ConversationModule::saveConvInfosToPath(const std::filesystem::path& path, //////////////////////////////////////////////////////////////// -ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account, +ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account, + std::shared_ptr<AccountManager> accountManager, NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, @@ -1499,6 +1495,7 @@ ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account, OneToOneRecvCb&& oneToOneRecvCb, bool autoLoadConversations) : pimpl_ {std::make_unique<Impl>(std::move(account), + std::move(accountManager), std::move(needsSyncingCb), std::move(sendMsgCb), std::move(onNeedSocket), @@ -1511,6 +1508,13 @@ ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account, } } +void +ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager) +{ + std::unique_lock lk(pimpl_->conversationsMtx_); + pimpl_->accountManager_ = accountManager; +} + #ifdef LIBJAMI_TESTABLE void ConversationModule::onBootstrapStatus( @@ -1532,9 +1536,8 @@ ConversationModule::loadConversations() auto conversationsRepositories = dhtnet::fileutils::readDirectory( fileutils::get_data_dir() / pimpl_->accountId_ / "conversations"); - auto contacts = acc->getContacts( - true); // Avoid to lock configurationMtx while conv Mtx is locked std::unique_lock lk(pimpl_->conversationsMtx_); + auto contacts = pimpl_->accountManager_->getContacts(true); // Avoid to lock configurationMtx while conv Mtx is locked std::unique_lock ilk(pimpl_->convInfosMtx_); pimpl_->convInfos_ = convInfos(pimpl_->accountId_); pimpl_->conversations_.clear(); @@ -1546,19 +1549,18 @@ ConversationModule::loadConversations() std::mutex toRmMtx; std::set<std::string> toRm; std::mutex convMtx; - std::atomic_int convNb; + size_t convNb; std::vector<std::map<std::string, std::string>> contacts; std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv; }; auto ctx = std::make_shared<Ctx>(); - ctx->convNb = conversationsRepositories.empty() ? 0 : conversationsRepositories.size(); + ctx->convNb = conversationsRepositories.size(); ctx->contacts = std::move(contacts); - for (auto repository : conversationsRepositories) { - dht::ThreadPool::io().run([this, ctx, repository, acc] { + for (auto&& r : conversationsRepositories) { + dht::ThreadPool::io().run([this, ctx, repository=std::move(r), acc] { try { auto sconv = std::make_shared<SyncedConversation>(repository); - auto conv = std::make_shared<Conversation>(acc, repository); conv->onMessageStatusChanged([this, repository](const auto& status) { auto msg = std::make_shared<SyncMsg>(); @@ -1593,7 +1595,7 @@ ConversationModule::loadConversations() ctx->cv.notify_all(); return; } - std::string convFromDetails = itContact->at("conversationId"); + const std::string& convFromDetails = itContact->at("conversationId"); auto removed = std::stoul(itContact->at("removed")); auto added = std::stoul(itContact->at("added")); auto isRemoved = removed > added; @@ -1668,8 +1670,8 @@ ConversationModule::loadConversations() }); } - std::unique_lock lkCv {ctx->cvMtx}; - ctx->cv.wait(lkCv, [&] { return ctx->convNb.load() == 0; }); + std::unique_lock lkCv(ctx->cvMtx); + ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; }); // Prune any invalid conversations without members and // set the removed flag if needed @@ -1903,11 +1905,10 @@ void ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value) { ConversationRequest req(value); - auto acc = pimpl_->account_.lock(); auto isOneToOne = req.isOneToOne(); std::string oldConv; - if (acc && isOneToOne) { - oldConv = getOneToOneConversation(from); + if (isOneToOne) { + oldConv = pimpl_->getOneToOneConversation(from); } std::unique_lock lk(pimpl_->conversationsRequestsMtx_); JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}", @@ -2058,9 +2059,8 @@ ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash conversationId), kd); } catch (const std::exception& e) { - JAMI_ERR("[Account %s] Error while generating a conversation %s", - pimpl_->accountId_.c_str(), - e.what()); + JAMI_ERROR("[Account {}] Error while generating a conversation {}", + pimpl_->accountId_, e.what()); return {}; } auto convId = conversation->id(); @@ -2370,11 +2370,8 @@ ConversationModule::syncConversations(const std::string& peer, const std::string pimpl_->fetchNewCommits(peer, deviceId, cid); for (const auto& cid : toClone) pimpl_->cloneConversation(deviceId, peer, cid); - if (pimpl_->syncCnt.load() == 0) { - if (auto acc = pimpl_->account_.lock()) - emitSignal<libjami::ConversationSignal::ConversationSyncFinished>( - acc->getAccountID().c_str()); - } + if (pimpl_->syncCnt.load() == 0) + emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_); } void @@ -2781,12 +2778,8 @@ ConversationModule::isBanned(const std::string& convId, const std::string& uri) return conv->conversation->isBanned(uri); } // If 1:1 we check the certificate status - if (auto acc = pimpl_->account_.lock()) { - if (auto am = acc->accountManager()) - return am->getCertificateStatus(uri) - == dhtnet::tls::TrustStore::PermissionStatus::BANNED; - } - return true; + std::lock_guard lk(pimpl_->conversationsMtx_); + return pimpl_->accountManager_->getCertificateStatus(uri) == dhtnet::tls::TrustStore::PermissionStatus::BANNED; } void diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h index acd2ea75ef488ab9e9db28521ca6985a181ac36c..3f89f1b30d1088104914a39cf813b85edc345e59 100644 --- a/src/jamidht/conversation_module.h +++ b/src/jamidht/conversation_module.h @@ -72,7 +72,8 @@ using OneToOneRecvCb = std::function<void(const std::string&, const std::string& class ConversationModule { public: - ConversationModule(std::weak_ptr<JamiAccount>&& account, + ConversationModule(std::shared_ptr<JamiAccount> account, + std::shared_ptr<AccountManager> accountManager, NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, @@ -82,6 +83,8 @@ public: bool autoLoadConversations = true); ~ConversationModule() = default; + void setAccountManager(std::shared_ptr<AccountManager> accountManager); + /** * Refresh informations about conversations */ diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 9eabdee2a6421c4d6b7810c4055a2d587b4db9c9..5638828da09647310b67a71e661dbdc861b06983 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1219,6 +1219,7 @@ JamiAccount::loadAccount(const std::string& archive_password_scheme, const auto& conf = config(); try { + auto oldIdentity = id_.first ? id_.first->getPublicKey().getLongId() : DeviceId(); if (conf.managerUri.empty()) { accountManager_ = std::make_shared<ArchiveAccountManager>( getPath(), @@ -1246,8 +1247,15 @@ JamiAccount::loadAccount(const std::string& archive_password_scheme, id_ = std::move(id); config_->username = info->accountId; JAMI_WARNING("[Account {:s}] loaded account identity", getAccountID()); + if (info->identity.first->getPublicKey().getLongId() != oldIdentity) { + JAMI_WARNING("[Account {:s}] identity changed", getAccountID()); + std::lock_guard lk(moduleMtx_); + convModule_.reset(); + } else { + convModule_->setAccountManager(accountManager_); + } + convModule(); // Init conv module if (not isEnabled()) { - convModule(); // Init conv module setRegistrationState(RegistrationState::UNREGISTERED); } } else if (isEnabled()) { @@ -1349,6 +1357,10 @@ JamiAccount::loadAccount(const std::string& archive_password_scheme, conf.fromMap(config); }); id_ = std::move(id); + { + std::lock_guard lk(moduleMtx_); + convModule_.reset(); + } if (migrating) { Migration::setState(getAccountID(), Migration::State::SUCCESS); } @@ -2150,7 +2162,8 @@ JamiAccount::convModule(bool noCreation) std::lock_guard lk(moduleMtx_); if (!convModule_) { convModule_ = std::make_unique<ConversationModule>( - weak(), + shared(), + accountManager_, [this](auto&& syncMsg) { dht::ThreadPool::io().run([w = weak(), syncMsg] { if (auto shared = w.lock()) {