From c879752f01bea829fbd85593b8d287e887f37ac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Tue, 6 Oct 2020 13:36:27 -0400 Subject: [PATCH] jamiaccount: use only a map for sipConnections Change-Id: I5beb8f45ad50baf2d2dd5bf1be4dbd6cbb1e3cc5 --- src/jamidht/jamiaccount.cpp | 184 ++++++++++++++++++------------------ src/jamidht/jamiaccount.h | 13 +-- 2 files changed, 98 insertions(+), 99 deletions(-) diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 022503d2ae..adc4557acf 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -115,7 +115,7 @@ struct TextMessageCtx { std::weak_ptr<JamiAccount> acc; std::string to; - std::string deviceId; + DeviceId deviceId; uint64_t id; bool retryOnTimeout; std::shared_ptr<PendingConfirmation> confirmation; @@ -329,9 +329,8 @@ JamiAccount::shutdownConnections() { connectionManager_.reset(); dhtPeerConnector_.reset(); - std::lock_guard<std::mutex> lk(sipConnectionsMtx_); - sipConnections_.clear(); - pendingSipConnections_.clear(); + std::lock_guard<std::mutex> lk(sipConnsMtx_); + sipConns_.clear(); } void @@ -353,13 +352,13 @@ JamiAccount::newIncomingCall(const std::string& from, std::lock_guard<std::mutex> lock(callsMutex_); if (sipTr) { - std::unique_lock<std::mutex> lk(sipConnectionsMtx_); - auto sipConnIt = sipConnections_.find(from); - if (sipConnIt != sipConnections_.end() && !sipConnIt->second.empty()) { - for (auto dit = sipConnIt->second.rbegin(); dit != sipConnIt->second.rend(); ++dit) { - for (auto it = dit->second.rbegin(); it != dit->second.rend(); ++it) { + std::unique_lock<std::mutex> lk(sipConnsMtx_); + for (auto& [key, value] : sipConns_) { + if (key.first == from) { + // For each SipConnection of the device + for (auto cit = value.rbegin(); cit != value.rend(); ++cit) { // Search linked Sip Transport - if (it->transport != sipTr) + if (cit->transport != sipTr) continue; auto call = Manager::instance().callFactory.newCall<SIPCall, JamiAccount>( @@ -509,9 +508,8 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: dht::InfoHash peer_account(toUri); // Call connected devices - std::set<std::string> devices; - std::unique_lock<std::mutex> lk(sipConnectionsMtx_); - auto& sipConns = sipConnections_[toUri]; + std::set<DeviceId> devices; + std::unique_lock<std::mutex> lk(sipConnsMtx_); // NOTE: dummyCall is a call used to avoid to mark the call as failed if the // cached connection is failing with ICE (close event still not detected). auto& manager = Manager::instance(); @@ -558,13 +556,16 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: requestSIPConnection(toUri, deviceId); }; - for (auto deviceConnIt = sipConns.begin(); deviceConnIt != sipConns.end(); ++deviceConnIt) { - if (deviceConnIt->second.empty()) + for (auto it = sipConns_.begin(); it != sipConns_.end(); ++it) { + auto& [key, value] = *it; + if (key.first != toUri) continue; - auto& it = deviceConnIt->second.back(); + if (value.empty()) + continue; + auto& sipConn = value.back(); - auto transport = it.transport; - if (!it.channel->underlyingICE()) { + auto transport = sipConn.transport; + if (!sipConn.channel->underlyingICE()) { JAMI_WARN("A SIP transport exists without Channel, this is a bug. Please report"); continue; } @@ -590,12 +591,11 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: { std::lock_guard<std::mutex> lk(onConnectionClosedMtx_); - onConnectionClosed_[deviceConnIt->first] = sendRequest; + onConnectionClosed_[key.second] = sendRequest; } call->addStateListener( - [w = weak(), - deviceId = deviceConnIt->first](Call::CallState, Call::ConnectionState state, int) { + [w = weak(), deviceId = key.second](Call::CallState, Call::ConnectionState state, int) { if (state != Call::ConnectionState::PROGRESSING and state != Call::ConnectionState::TRYING) { if (auto shared = w.lock()) { @@ -610,7 +610,8 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: } }); - auto remoted_address = it.channel->underlyingICE()->getRemoteAddress(ICE_COMP_SIP_TRANSPORT); + auto remoted_address = sipConn.channel->underlyingICE()->getRemoteAddress( + ICE_COMP_SIP_TRANSPORT); try { onConnectedOutgoingCall(dev_call, toUri, remoted_address); } catch (const VoipLinkException&) { @@ -621,15 +622,15 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: // the cached transport continue; } - devices.emplace(deviceConnIt->first); + devices.emplace(key.second); } // Find listening devices for this account accountManager_->forEachDevice( peer_account, - [this, toUri, devices, call, sendRequest](const dht::InfoHash& dev) { + [this, toUri, devices = std::move(devices), call, sendRequest](const dht::InfoHash& dev) { // Test if already sent via a SIP transport - if (devices.find(dev.toString()) != devices.end()) + if (devices.find(dev) != devices.end()) return; sendRequest(dev, false); }, @@ -1875,27 +1876,24 @@ JamiAccount::trackBuddyPresence(const std::string& buddy_id, bool track) auto h = dht::InfoHash(buddyUri); if (!track && dht_ && dht_->isRunning()) { - std::unique_lock<std::mutex> lk(sipConnectionsMtx_); - std::set<std::string> devices; - for (const auto& deviceConn : sipConnections_[buddy_id]) { - devices.emplace(deviceConn.first); - } - sipConnections_.erase(buddy_id); - - for (auto pendingIt = pendingSipConnections_.begin(); - pendingIt != pendingSipConnections_.end();) { - if (buddy_id == pendingIt->first) { - devices.emplace(pendingIt->second); - pendingIt = pendingSipConnections_.erase(pendingIt); - } else { - ++pendingIt; + // Remove current connections with contact + std::set<DeviceId> devices; + { + std::unique_lock<std::mutex> lk(sipConnsMtx_); + for (auto it = sipConns_.begin(); it != sipConns_.end();) { + const auto& [key, value] = *it; + if (key.first == buddyUri) { + devices.emplace(key.second); + it = sipConns_.erase(it); + } else { + ++it; + } } } - lk.unlock(); for (const auto& device : devices) { if (connectionManager_) - connectionManager_->closeConnectionsWith(DeviceId(device)); + connectionManager_->closeConnectionsWith(device); } } @@ -2902,28 +2900,23 @@ JamiAccount::removeContact(const std::string& uri, bool ban) } // Remove current connections with contact - std::set<std::string> devices; + std::set<DeviceId> devices; { - std::unique_lock<std::mutex> lk(sipConnectionsMtx_); - for (const auto& deviceConn : sipConnections_[uri]) { - devices.emplace(deviceConn.first); - } - sipConnections_.erase(uri); - - for (auto pendingIt = pendingSipConnections_.begin(); - pendingIt != pendingSipConnections_.end();) { - if (uri == pendingIt->first) { - devices.emplace(pendingIt->second); - pendingIt = pendingSipConnections_.erase(pendingIt); + std::unique_lock<std::mutex> lk(sipConnsMtx_); + for (auto it = sipConns_.begin(); it != sipConns_.end();) { + const auto& [key, value] = *it; + if (key.first == uri) { + devices.emplace(key.second); + it = sipConns_.erase(it); } else { - ++pendingIt; + ++it; } } } for (const auto& device : devices) { if (connectionManager_) - connectionManager_->closeConnectionsWith(DeviceId(device)); + connectionManager_->closeConnectionsWith(device); } } @@ -3049,30 +3042,29 @@ JamiAccount::sendTextMessage(const std::string& to, auto confirm = std::make_shared<PendingConfirmation>(); - std::set<std::string> devices; - std::unique_lock<std::mutex> lk(sipConnectionsMtx_); + std::set<DeviceId> devices; + std::unique_lock<std::mutex> lk(sipConnsMtx_); sip_utils::register_thread(); - auto& sipConns = sipConnections_[to]; - auto deviceConnIt = sipConns.begin(); - while (deviceConnIt != sipConns.end()) { - if (deviceConnIt->second.empty()) { - ++deviceConnIt; + for (auto it = sipConns_.begin(); it != sipConns_.end();) { + auto& [key, value] = *it; + if (key.first != to or value.empty()) { + ++it; continue; } - auto& it = deviceConnIt->second.back(); + auto& conn = value.back(); // Set input token into callback std::unique_ptr<TextMessageCtx> ctx {std::make_unique<TextMessageCtx>()}; ctx->acc = weak(); ctx->to = to; - ctx->deviceId = deviceConnIt->first; + ctx->deviceId = key.second; ctx->id = token; ctx->retryOnTimeout = retryOnTimeout; ctx->confirmation = confirm; try { auto res = sendSIPMessage( - it, to, ctx.release(), token, payloads, [](void* token, pjsip_event* event) { + conn, to, ctx.release(), token, payloads, [](void* token, pjsip_event* event) { std::unique_ptr<TextMessageCtx> c {(TextMessageCtx*) token}; auto code = event->body.tsx_state.tsx->status_code; auto acc = c->acc.lock(); @@ -3087,10 +3079,10 @@ JamiAccount::sendTextMessage(const std::string& to, } else { JAMI_WARN("Timeout when send a message, close current connection"); { - std::unique_lock<std::mutex> lk(acc->sipConnectionsMtx_); - acc->sipConnections_[c->to].erase(c->deviceId); + std::unique_lock<std::mutex> lk(acc->sipConnsMtx_); + acc->sipConns_.erase(std::make_pair(c->to, c->deviceId)); } - acc->connectionManager_->closeConnectionsWith(DeviceId(c->deviceId)); + acc->connectionManager_->closeConnectionsWith(c->deviceId); // This MUST be done after closing the connection to avoid race condition // with messageEngine_ acc->messageEngine_.onMessageSent(c->to, c->id, false); @@ -3104,19 +3096,19 @@ JamiAccount::sendTextMessage(const std::string& to, }); if (!res) { messageEngine_.onMessageSent(to, token, false); - ++deviceConnIt; + ++it; continue; } } catch (const std::runtime_error& ex) { JAMI_WARN("%s", ex.what()); messageEngine_.onMessageSent(to, token, false); // Remove connection in incorrect state - deviceConnIt = sipConns.erase(deviceConnIt); + it = sipConns_.erase(it); continue; } - devices.emplace(deviceConnIt->first); - ++deviceConnIt; + devices.emplace(key.second); + ++it; } lk.unlock(); @@ -3126,7 +3118,7 @@ JamiAccount::sendTextMessage(const std::string& to, [this, confirm, to, token, payloads, now, devices {std::move(devices)}]( const dht::InfoHash& dev) { // Test if already sent - if (devices.find(dev.toString()) != devices.end()) { + if (devices.find(dev) != devices.end()) { return; } @@ -3505,14 +3497,13 @@ void JamiAccount::requestSIPConnection(const std::string& peerId, const dht::InfoHash& deviceId) { // If a connection already exists or is in progress, no need to do this - std::lock_guard<std::mutex> lk(sipConnectionsMtx_); - auto id = std::make_pair<std::string, std::string>(std::string(peerId), deviceId.toString()); - if (!sipConnections_[peerId][deviceId.toString()].empty() - || pendingSipConnections_.find(id) != pendingSipConnections_.end()) { + std::lock_guard<std::mutex> lk(sipConnsMtx_); + auto id = std::make_pair(peerId, deviceId); + if (sipConns_.find(id) != sipConns_.end()) { JAMI_DBG("A SIP connection with %s already exists", deviceId.to_c_str()); return; } - pendingSipConnections_.emplace(id); + sipConns_[id] = {}; // If not present, create it JAMI_INFO("Ask %s for a new SIP channel", deviceId.to_c_str()); if (!connectionManager_) @@ -3527,8 +3518,11 @@ JamiAccount::requestSIPConnection(const std::string& peerId, const dht::InfoHash // OnConnectionReady is called before this callback, so // the socket is already cached if succeed. We just need // to remove the pending request. - std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_); - shared->pendingSipConnections_.erase(id); + std::lock_guard<std::mutex> lk(shared->sipConnsMtx_); + auto it = shared->sipConns_.find(id); + if (it != shared->sipConns_.end() && it->second.empty()) { + shared->sipConns_.erase(it); + } }); } @@ -3678,9 +3672,13 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const dht::InfoHash& deviceId) { auto deviceIdStr = deviceId.toString(); - std::unique_lock<std::mutex> lk(sipConnectionsMtx_); + std::unique_lock<std::mutex> lk(sipConnsMtx_); // Verify that the connection is not already cached - auto& connections = sipConnections_[peerId][deviceIdStr]; + SipConnectionKey key(peerId, deviceId); + auto it = sipConns_.find(key); + if (it == sipConns_.end()) + it = sipConns_.insert(it, std::make_pair(key, std::vector<SipConnection> {})); + auto& connections = it->second; auto conn = std::find_if(connections.begin(), connections.end(), [socket](auto v) { return v.channel == socket; }); @@ -3691,14 +3689,16 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, // Convert to SIP transport sip_utils::register_thread(); - auto onShutdown = [w = weak(), peerId, deviceId, socket]() { - auto deviceIdStr = deviceId.toString(); + auto onShutdown = [w = weak(), peerId, key, socket]() { auto shared = w.lock(); if (!shared) return; { - std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_); - auto& connections = shared->sipConnections_[peerId][deviceIdStr]; + std::lock_guard<std::mutex> lk(shared->sipConnsMtx_); + auto it = shared->sipConns_.find(key); + if (it == shared->sipConns_.end()) + return; + auto& connections = it->second; auto conn = connections.begin(); while (conn != connections.end()) { if (conn->channel == socket) @@ -3706,6 +3706,8 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, else conn++; } + if (connections.empty()) + shared->sipConns_.erase(it); } // The connection can be closed during the SIP initialization, so // if this happens, the request should be re-sent to ask for a new @@ -3713,19 +3715,19 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, std::function<void(const dht::InfoHash&, bool)> cb; { std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_); - if (shared->onConnectionClosed_[deviceIdStr]) { - cb = std::move(shared->onConnectionClosed_[deviceIdStr]); - shared->onConnectionClosed_.erase(deviceIdStr); + if (shared->onConnectionClosed_[key.second]) { + cb = std::move(shared->onConnectionClosed_[key.second]); + shared->onConnectionClosed_.erase(key.second); } } if (cb) { JAMI_WARN("An outgoing call was in progress while shutdown, relaunch the request"); - cb(deviceId, false); + cb(key.second, false); } }; auto sip_tr = link_.sipTransportBroker->getChanneledTransport(socket, std::move(onShutdown)); // Store the connection - sipConnections_[peerId][deviceIdStr].emplace_back(SipConnection {sip_tr, socket}); + connections.emplace_back(SipConnection {sip_tr, socket}); JAMI_WARN("New SIP channel opened with %s", deviceId.to_c_str()); lk.unlock(); diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 5c98a65460..eb6ce70d33 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -78,6 +78,8 @@ struct AccountInfo; class SipTransport; class ChanneledOutgoingTransfer; +using SipConnectionKey = std::pair<std::string /* accountId */, DeviceId>; + /** * @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity. */ @@ -712,7 +714,7 @@ private: std::set<std::shared_ptr<dht::http::Request>> requests_; - std::mutex sipConnectionsMtx_ {}; + std::mutex sipConnsMtx_ {}; struct SipConnection { std::shared_ptr<SipTransport> transport; @@ -723,18 +725,13 @@ private: // NOTE: here we use a vector to avoid race conditions. In fact the contact // can ask for a SIP channel when we are creating a new SIP Channel with this // peer too. - std::map<std::string /* accountId */, - std::map<std::string /* deviceId */, std::vector<SipConnection>>> - sipConnections_ {}; - // However, we only negotiate one socket from our side - std::set<std::pair<std::string /* accountId */, std::string /* deviceId */>> - pendingSipConnections_ {}; + std::map<SipConnectionKey, std::vector<SipConnection>> sipConns_; std::mutex pendingCallsMutex_; std::map<std::string, std::vector<std::shared_ptr<SIPCall>>> pendingCalls_; std::mutex onConnectionClosedMtx_ {}; - std::map<std::string, std::function<void(const DeviceId&, bool)>> onConnectionClosed_ {}; + std::map<DeviceId, std::function<void(const DeviceId&, bool)>> onConnectionClosed_ {}; /** * Ask a device to open a channeled SIP socket -- GitLab