Skip to content
Snippets Groups Projects
Commit e921a0df authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud
Browse files

connectionmanager: avoid double lock

removeUnusedConnection can happen and will lock infosMtx_ then cbMtx_
and at the same moment onTlsNegotiationDone can happen and lock
cbMtx_ then infosMtx_ causing a potential double lock.
This patch avoid this case by avoiding the infosMtx_ to be locked
if destroying.

Change-Id: If21d35757e9dbd667ef3e20e8c13025479494333
parent e4a08dcd
No related branches found
No related tags found
No related merge requests found
...@@ -71,12 +71,22 @@ public: ...@@ -71,12 +71,22 @@ public:
~Impl() {} ~Impl() {}
void removeUnusedConnections(const DeviceId& deviceId = {}) void removeUnusedConnections(const DeviceId& deviceId = {})
{
std::vector<std::shared_ptr<ConnectionInfo>> unused {};
{ {
std::lock_guard<std::mutex> lk(infosMtx_); std::lock_guard<std::mutex> lk(infosMtx_);
for (auto it = infos_.begin(); it != infos_.end();) { for (auto it = infos_.begin(); it != infos_.end();) {
auto& [key, info] = *it; auto& [key, info] = *it;
bool erased = false;
if (info && (!deviceId || key.first == deviceId)) { if (info && (!deviceId || key.first == deviceId)) {
unused.emplace_back(std::move(info));
it = infos_.erase(it);
} else {
++it;
}
}
}
for (auto& info: unused) {
if (info->tls_) if (info->tls_)
info->tls_->shutdown(); info->tls_->shutdown();
if (info->socket_) if (info->socket_)
...@@ -85,21 +95,15 @@ public: ...@@ -85,21 +95,15 @@ public:
info->ice_->cancelOperations(); info->ice_->cancelOperations();
if (info->waitForAnswer_) if (info->waitForAnswer_)
info->waitForAnswer_->cancel(); info->waitForAnswer_->cancel();
erased = true;
it = infos_.erase(it);
}
if (!erased)
++it;
} }
if (!deviceId) if (!unused.empty())
dht::ThreadPool::io().run([infos = std::move(infos_)]() mutable { infos.clear(); }); dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
} }
void shutdown() void shutdown()
{ {
if (isDestroying_) if (isDestroying_.exchange(true))
return; return;
isDestroying_ = true;
{ {
std::lock_guard<std::mutex> lk(connectCbsMtx_); std::lock_guard<std::mutex> lk(connectCbsMtx_);
// Call all pending callbacks that channel is not ready // Call all pending callbacks that channel is not ready
...@@ -161,7 +165,7 @@ public: ...@@ -161,7 +165,7 @@ public:
void onDhtPeerRequest(const PeerConnectionRequest& req, void onDhtPeerRequest(const PeerConnectionRequest& req,
const std::shared_ptr<dht::crypto::Certificate>& cert); const std::shared_ptr<dht::crypto::Certificate>& cert);
void addNewMultiplexedSocket(const DeviceId& deviceId, const dht::Value::Id& vid); void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
void onPeerResponse(const PeerConnectionRequest& req); void onPeerResponse(const PeerConnectionRequest& req);
void onDhtConnected(const dht::crypto::PublicKey& devicePk); void onDhtConnected(const dht::crypto::PublicKey& devicePk);
...@@ -782,8 +786,7 @@ ConnectionManager::Impl::onTlsNegotiationDone(bool ok, ...@@ -782,8 +786,7 @@ ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
const dht::Value::Id& vid, const dht::Value::Id& vid,
const std::string& name) const std::string& name)
{ {
auto info = getInfo(deviceId, vid); if (isDestroying_)
if (!info)
return; return;
// Note: only handle pendingCallbacks here for TLS initied by connectDevice() // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
// Note: if not initied by connectDevice() the channel name will be empty (because no channel // Note: if not initied by connectDevice() the channel name will be empty (because no channel
...@@ -812,7 +815,9 @@ ConnectionManager::Impl::onTlsNegotiationDone(bool ok, ...@@ -812,7 +815,9 @@ ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
<< " - Initied by connectDevice(). Initied by channel: " << name << " - Initied by connectDevice(). Initied by channel: " << name
<< " - vid: " << vid; << " - vid: " << vid;
} }
addNewMultiplexedSocket(deviceId, vid);
auto info = getInfo(deviceId, vid);
addNewMultiplexedSocket({deviceId, vid}, info);
// Finally, open the channel and launch pending callbacks // Finally, open the channel and launch pending callbacks
if (info->socket_) { if (info->socket_) {
// Note: do not remove pending there it's done in sendChannelRequest // Note: do not remove pending there it's done in sendChannelRequest
...@@ -995,7 +1000,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, ...@@ -995,7 +1000,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
return; return;
if (!ok) { if (!ok) {
JAMI_ERR("ICE negotiation failed"); JAMI_ERR("ICE negotiation failed");
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return; return;
} }
...@@ -1003,7 +1008,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, ...@@ -1003,7 +1008,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
[w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] { [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
if (auto shared = w.lock()) if (auto shared = w.lock())
if (!shared->onRequestOnNegoDone(req)) if (!shared->onRequestOnNegoDone(req))
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); eraseInfo();
}); });
}; };
...@@ -1029,19 +1034,16 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, ...@@ -1029,19 +1034,16 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
} }
// We need to detect any shutdown if the ice session is destroyed before going to the TLS session; // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
info->ice_->setOnShutdown([eraseInfo]() { info->ice_->setOnShutdown([eraseInfo]() {
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
}); });
info->ice_->initIceInstance(ice_config); info->ice_->initIceInstance(ice_config);
}); });
} }
void void
ConnectionManager::Impl::addNewMultiplexedSocket(const DeviceId& deviceId, const dht::Value::Id& vid) ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
{ {
auto info = getInfo(deviceId, vid); info->socket_ = std::make_shared<MultiplexedSocket>(id.first, std::move(info->tls_));
if (!info)
return;
info->socket_ = std::make_shared<MultiplexedSocket>(deviceId, std::move(info->tls_));
info->socket_->setOnReady( info->socket_->setOnReady(
[w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) { [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
if (auto sthis = w.lock()) if (auto sthis = w.lock())
...@@ -1056,7 +1058,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const DeviceId& deviceId, const ...@@ -1056,7 +1058,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const DeviceId& deviceId, const
return sthis->channelReqCb_(peer, name); return sthis->channelReqCb_(peer, name);
return false; return false;
}); });
info->socket_->onShutdown([w = weak(), deviceId, vid]() { info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
// Cancel current outgoing connections // Cancel current outgoing connections
dht::ThreadPool::io().run([w, deviceId, vid] { dht::ThreadPool::io().run([w, deviceId, vid] {
auto sthis = w.lock(); auto sthis = w.lock();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment