From 0f032559b18100859640cbfa0bdd1e3e50d303db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Wed, 27 Sep 2023 00:48:40 -0400 Subject: [PATCH] conversation: reduce lock contention Change-Id: I566a66b7405afe680bacd0e343ea7ac0901b912c --- src/jamidht/conversation.cpp | 25 ++++++++++++---------- src/jamidht/conversation_module.cpp | 27 ++++++++++++++++++------ src/jamidht/jamiaccount.cpp | 31 +++++++++------------------- src/jamidht/sync_channel_handler.cpp | 7 ++++++- 4 files changed, 51 insertions(+), 39 deletions(-) diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index dfaaaed0ba..2b02d579c1 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -1356,18 +1356,21 @@ Conversation::sync(const std::string& member, { JAMI_INFO() << "Sync " << id() << " with " << deviceId; pull(deviceId, std::move(cb), commitId); - if (auto account = pimpl_->account_.lock()) { - // For waiting request, downloadFile - for (const auto& wr : dataTransfer()->waitingRequests()) { - auto path = fileutils::get_data_dir() / account->getAccountID() - / "conversation_data" / id() / wr.fileId; - auto start = fileutils::size(path.string()); - if (start < 0) - start = 0; - downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId, start); + dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w=weak_from_this()]{ + auto sthis = w.lock(); + if (auto account = a.lock()) { + // For waiting request, downloadFile + for (const auto& wr : sthis->dataTransfer()->waitingRequests()) { + auto path = fileutils::get_data_dir() / account->getAccountID() + / "conversation_data" / sthis->id() / wr.fileId; + auto start = fileutils::size(path.string()); + if (start < 0) + start = 0; + sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId, start); + } + account->sendProfile(sthis->id(), member, deviceId); } - account->sendProfile(id(), member, deviceId); - } + }); } std::map<std::string, std::string> diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 97cd9ec72c..8ee12157ff 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -2816,8 +2816,16 @@ ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view void ConversationModule::shutdownConnections() { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - for (auto& [k, c] : pimpl_->conversations_) { + std::vector<std::shared_ptr<SyncedConversation>> conversations; + { + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + conversations.reserve(pimpl_->conversations_.size()); + for (auto& [k, c]: pimpl_->conversations_) { + conversations.emplace_back(c); + } + } + for (const auto& c: conversations) { + std::lock_guard<std::mutex> lkc(c->mtx); if (c->conversation) c->conversation->shutdownConnections(); if (c->pending) @@ -2835,10 +2843,17 @@ ConversationModule::addSwarmChannel(const std::string& conversationId, void ConversationModule::connectivityChanged() { - std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); - for (auto& [k, c] : pimpl_->conversations_) { - if (c->conversation) - c->conversation->connectivityChanged(); + std::vector<std::shared_ptr<Conversation>> conversations; + { + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + conversations.reserve(pimpl_->conversations_.size()); + for (const auto& [k, c]: pimpl_->conversations_) { + std::lock_guard<std::mutex> lkc(c->mtx); + if (c->conversation) + conversations.emplace_back(c->conversation); + } } + for (const auto& conv: conversations) + conv->connectivityChanged(); } } // namespace jami diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 05eda8ce7f..f381f704d3 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1923,12 +1923,8 @@ JamiAccount::doRegister_() auto res = accountManager_->onPeerCertificate(cert, dhtPublicInCalls_, peer_account_id); - if (res) - JAMI_INFO("Accepting ICE request from account %s", - peer_account_id.toString().c_str()); - else - JAMI_INFO("Discarding ICE request from account %s", - peer_account_id.toString().c_str()); + JAMI_LOG("{} ICE request from {}", + res ? "Accepting" : "Discarding", peer_account_id); accept.set_value(res); }); fut.wait(); @@ -2139,8 +2135,9 @@ JamiAccount::convModule() cb({}); return; } - std::lock_guard<std::mutex> lkCM(shared->connManagerMtx_); + std::unique_lock<std::mutex> lkCM(shared->connManagerMtx_); if (!shared->connectionManager_) { + lkCM.unlock(); cb({}); return; } @@ -3625,14 +3622,10 @@ JamiAccount::requestSIPConnection(const std::string& peerId, // however, this will still ask for multiple channels, so only ask // if there is no pending request if (!forceNewConnection && connectionManager_->isConnecting(deviceId, "sip")) { - JAMI_INFO("[Account %s] Already connecting to %s", - getAccountID().c_str(), - deviceId.to_c_str()); + JAMI_LOG("[Account {}] Already connecting to {}", getAccountID(), deviceId); return; } - JAMI_INFO("[Account %s] Ask %s for a new SIP channel", - getAccountID().c_str(), - deviceId.to_c_str()); + JAMI_LOG("[Account {}] Ask {} for a new SIP channel", getAccountID(), deviceId); connectionManager_->connectDevice( deviceId, "sip", @@ -3832,8 +3825,7 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket, return v.channel == socket; }); if (conn != connections.end()) { - JAMI_WARN("[Account %s] Channel socket already cached with this peer", - getAccountID().c_str()); + JAMI_WARNING("[Account {}] Channel socket already cached with this peer", getAccountID()); return; } @@ -3860,8 +3852,7 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket, // Store the connection connections.emplace_back(SipConnection {sip_tr, socket}); JAMI_WARNING("[Account {:s}] New SIP channel opened with {:s}", - getAccountID(), - deviceId.to_c_str()); + getAccountID(), deviceId); lk.unlock(); dht::ThreadPool::io().run([w = weak(), peerId, deviceId] { @@ -4045,14 +4036,12 @@ JamiAccount::transferFile(const std::string& conversationId, uint64_t lastWriteTime, std::function<void()> onFinished) { - auto fid = fileId; std::string modified; if (lastWriteTime != 0) { modified = fmt::format("&modified={}", lastWriteTime); } - if (fid == "profile.vcf") { - fid = fmt::format("profile.vcf?sha3={}{}", sha3Sum, modified); - } + auto fid = fileId == "profile.vcf" ? + fmt::format("profile.vcf?sha3={}{}", sha3Sum, modified) : fileId; auto channelName = conversationId.empty() ? fmt::format("{}profile.vcf?sha3={}{}", DATA_TRANSFER_URI, sha3Sum, modified) diff --git a/src/jamidht/sync_channel_handler.cpp b/src/jamidht/sync_channel_handler.cpp index b07de0f994..9a00cb5ab6 100644 --- a/src/jamidht/sync_channel_handler.cpp +++ b/src/jamidht/sync_channel_handler.cpp @@ -20,6 +20,8 @@ #include "jamidht/sync_channel_handler.h" +#include <opendht/thread_pool.h> + static constexpr const char SYNC_URI[] {"sync://"}; namespace jami { @@ -64,11 +66,14 @@ SyncChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cer auto acc = account_.lock(); if (!cert || !cert->issuer || !acc) return; - acc->sendProfile("", acc->getUsername(), channel->deviceId().toString()); if (auto sm = acc->syncModule()) sm->cacheSyncConnection(std::move(channel), cert->issuer->getId().toString(), cert->getLongId()); + dht::ThreadPool::io().run([account=account_, channel]() { + if (auto acc = account.lock()) + acc->sendProfile("", acc->getUsername(), channel->deviceId().toString()); + }); } } // namespace jami \ No newline at end of file -- GitLab