diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 319baf0acf886dc363a1f160fda53a00f95dffd3..d9a21096e90c08419b548deaa8b5f814aa2f55a4 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -183,7 +183,20 @@ public: onICERequestCallback iceReqCb_ {}; std::mutex connectCbsMtx_ {}; - std::map<std::pair<std::string, dht::Value::Id>, ConnectCallback> pendingCbs_ {}; + using CallbackId = std::pair<std::string, dht::Value::Id>; + std::map<CallbackId, ConnectCallback> pendingCbs_ {}; + + ConnectCallback getPendingCallback(const CallbackId& cbId) + { + ConnectCallback ret; + std::lock_guard<std::mutex> lk(connectCbsMtx_); + auto cbIt = pendingCbs_.find(cbId); + if (cbIt != pendingCbs_.end()) { + ret = std::move(cbIt->second); + pendingCbs_.erase(cbIt); + } + return ret; + } std::shared_ptr<ConnectionManager::Impl> shared() { @@ -214,19 +227,13 @@ ConnectionManager::Impl::connectDeviceStartIce(const std::string& deviceId, return; } - std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); std::unique_lock<std::mutex> lk(info->mutex_); auto& ice = info->ice_; auto onError = [&]() { ice.reset(); - std::lock_guard<std::mutex> lk(connectCbsMtx_); - auto cbIt = pendingCbs_.find(cbId); - if (cbIt != pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - pendingCbs_.erase(cbIt); - } + if (auto cb = getPendingCallback({deviceId, vid})) + cb(nullptr); }; if (!ice) { @@ -295,30 +302,17 @@ ConnectionManager::Impl::connectDeviceOnNegoDone( if (!info) return; - std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); std::unique_lock<std::mutex> lk {info->mutex_}; auto& ice = info->ice_; - - auto onError = [&]() { - std::lock_guard<std::mutex> lk(connectCbsMtx_); - auto cbIt = pendingCbs_.find(cbId); - if (cbIt != pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - pendingCbs_.erase(cbIt); - } - }; - if (!ice || !ice->isRunning()) { JAMI_ERR("No ICE detected or not running"); - onError(); + if (auto cb = getPendingCallback({deviceId, vid})) + cb(nullptr); return; } // Build socket - auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( - std::move(ice)), - true); + auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(std::move(ice)), true); // Negotiate a TLS session JAMI_DBG() << account << "Start TLS session"; @@ -338,14 +332,8 @@ ConnectionManager::Impl::connectDeviceOnNegoDone( return; if (!ok) { JAMI_ERR() << "TLS connection failure for peer " << deviceId; - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } + if (auto cb = sthis->getPendingCallback({deviceId, vid})) + cb(nullptr); } else { // The socket is ready, store it sthis->addNewMultiplexedSocket(deviceId, vid); @@ -433,22 +421,15 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, return; if (!ok) { JAMI_ERR("Cannot initialize ICE session."); - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } + if (auto cb = sthis->getPendingCallback(cbId)) + cb(nullptr); return; } dht::ThreadPool::io().run( [w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] { - auto sthis = w.lock(); - if (!sthis) - return; - sthis->connectDeviceStartIce(deviceId, vid); + if (auto sthis = w.lock()) + sthis->connectDeviceStartIce(deviceId, vid); }); }; ice_config.onNegoDone = [w, @@ -462,13 +443,8 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, return; if (!ok) { JAMI_ERR("ICE negotiation failed."); - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } + if (auto cb = sthis->getPendingCallback(cbId)) + cb(nullptr); return; } @@ -498,13 +474,8 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, if (!info->ice_) { JAMI_ERR("Cannot initialize ICE session."); - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } + if (auto cb = sthis->getPendingCallback(cbId)) + cb(nullptr); return; } }); @@ -526,16 +497,9 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& msgpack::pack(ss, val); auto toSend = ss.str(); sock->setOnChannelReady(channelSock->channel(), [channelSock, deviceId, vid, w = weak()]() { - auto shared = w.lock(); - if (!shared) - return; - std::lock_guard<std::mutex> lk(shared->connectCbsMtx_); - std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); - auto cbIt = shared->pendingCbs_.find(cbId); - if (cbIt != shared->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(channelSock); - shared->pendingCbs_.erase(cbIt); + if (auto shared = w.lock()) { + if (auto cb = shared->getPendingCallback({deviceId, vid})) + cb(channelSock); } }); std::error_code ec; @@ -842,25 +806,13 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId, if (!sthis) return; // Cancel current outgoing connections - { - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - if (!sthis->pendingCbs_.empty()) { - auto it = sthis->pendingCbs_.begin(); - while (it != sthis->pendingCbs_.end()) { - if (it->first.first == deviceId && it->first.second == vid) { - it->second(nullptr); - it = sthis->pendingCbs_.erase(it); - } else { - ++it; - } - } - } - } - dht::ThreadPool::io().run([w, deviceId, vid] { + if (auto cb = sthis->getPendingCallback({deviceId, vid})) + cb(nullptr); + dht::ThreadPool::io().run([w, deviceId = dht::InfoHash(deviceId), vid] { auto sthis = w.lock(); if (!sthis) return; - auto info = sthis->getInfo(dht::InfoHash(deviceId), vid); + auto info = sthis->getInfo(deviceId, vid); if (!info) return; @@ -871,7 +823,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId, info->ice_->cancelOperations(); std::lock_guard<std::mutex> lk(sthis->infosMtx_); - sthis->infos_.erase({dht::InfoHash(deviceId), vid}); + sthis->infos_.erase({deviceId, vid}); }); }); }