diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 72932db684006a980fe58dd8656f0d9cc63d5ef9..9d6b542ca752cff8752510b246c5ad86ea3a70d6 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -117,6 +117,7 @@ public: const dht::Value::Id& vid, const std::shared_ptr<dht::crypto::Certificate>& cert); void connectDevice(const DeviceId& deviceId, const std::string& uri, ConnectCallback cb); + void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name, ConnectCallback cb); /** * Send a ChannelRequest on the TLS socket. Triggers cb when ready * @param sock socket used to send the request @@ -394,149 +395,156 @@ ConnectionManager::Impl::connectDevice(const DeviceId& deviceId, cb(nullptr, deviceId); return; } + if (auto shared = w.lock()) { + shared->connectDevice(cert, name, std::move(cb)); + } + }); +} - // Avoid dht operation in a DHT callback to avoid deadlocks - runOnMainThread([w, - deviceId = std::move(deviceId), - name = std::move(name), - cert = std::move(cert), - cb = std::move(cb)] { - auto sthis = w.lock(); - if (!sthis || sthis->isDestroying_) { - cb(nullptr, deviceId); - return; - } - dht::Value::Id vid; - auto tentatives = 0; - do { - vid = ValueIdDist(1, DRING_ID_MAX_VAL)(sthis->account.rand); - --tentatives; - } while (sthis->getPendingCallbacks(deviceId, vid).size() != 0 - && tentatives != MAX_TENTATIVES); - if (tentatives == MAX_TENTATIVES) { - JAMI_ERR("Couldn't get a current random channel number"); - cb(nullptr, deviceId); - return; - } - auto isConnectingToDevice = false; - { - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - // Check if already connecting - auto pendingsIt = sthis->pendingCbs_.find(deviceId); - isConnectingToDevice = pendingsIt != sthis->pendingCbs_.end(); - // Save current request for sendChannelRequest. - // Note: do not return here, cause we can be in a state where first - // socket is negotiated and first channel is pending - // so return only after we checked the info - if (isConnectingToDevice) - pendingsIt->second.emplace_back(PendingCb {name, std::move(cb), vid}); - else - sthis->pendingCbs_[deviceId] = {{name, std::move(cb), vid}}; - } +void +ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name, ConnectCallback cb) +{ + // Avoid dht operation in a DHT callback to avoid deadlocks + runOnMainThread([w=weak(), + name = std::move(name), + cert = std::move(cert), + cb = std::move(cb)] { + auto deviceId = cert->getId(); + auto sthis = w.lock(); + if (!sthis || sthis->isDestroying_) { + cb(nullptr, deviceId); + return; + } + dht::Value::Id vid; + auto tentatives = 0; + do { + vid = ValueIdDist(1, DRING_ID_MAX_VAL)(sthis->account.rand); + --tentatives; + } while (sthis->getPendingCallbacks(deviceId, vid).size() != 0 + && tentatives != MAX_TENTATIVES); + if (tentatives == MAX_TENTATIVES) { + JAMI_ERR("Couldn't get a current random channel number"); + cb(nullptr, deviceId); + return; + } + auto isConnectingToDevice = false; + { + std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); + // Check if already connecting + auto pendingsIt = sthis->pendingCbs_.find(deviceId); + isConnectingToDevice = pendingsIt != sthis->pendingCbs_.end(); + // Save current request for sendChannelRequest. + // Note: do not return here, cause we can be in a state where first + // socket is negotiated and first channel is pending + // so return only after we checked the info + if (isConnectingToDevice) + pendingsIt->second.emplace_back(PendingCb {name, std::move(cb), vid}); + else + sthis->pendingCbs_[deviceId] = {{name, std::move(cb), vid}}; + } - // Check if already negotiated - CallbackId cbId(deviceId, vid); - if (auto info = sthis->getInfo(deviceId)) { - std::lock_guard<std::mutex> lk(info->mutex_); - if (info->socket_) { - JAMI_DBG("Peer already connected to %s. Add a new channel", - deviceId.to_c_str()); - info->cbIds_.emplace(cbId); - sthis->sendChannelRequest(info->socket_, name, deviceId, vid); - return; - } - } + // Check if already negotiated + CallbackId cbId(deviceId, vid); + if (auto info = sthis->getInfo(deviceId)) { + std::lock_guard<std::mutex> lk(info->mutex_); + if (info->socket_) { + JAMI_DBG("Peer already connected to %s. Add a new channel", + deviceId.to_c_str()); + info->cbIds_.emplace(cbId); + sthis->sendChannelRequest(info->socket_, name, deviceId, vid); + return; + } + } - if (isConnectingToDevice) { - JAMI_DBG("Already connecting to %s, wait for the ICE negotiation", - deviceId.to_c_str()); - return; - } + if (isConnectingToDevice) { + JAMI_DBG("Already connecting to %s, wait for the ICE negotiation", + deviceId.to_c_str()); + return; + } - // Note: used when the ice negotiation fails to erase - // all stored structures. - auto eraseInfo = [w, cbId] { - if (auto shared = w.lock()) { - std::lock_guard<std::mutex> lk(shared->infosMtx_); - shared->infos_.erase(cbId); - } - }; - - // If no socket exists, we need to initiate an ICE connection. - auto ice_config = sthis->account.getIceOptions(); - ice_config.tcpEnable = true; - ice_config.onInitDone = [w, - cbId, - deviceId = std::move(deviceId), - name = std::move(name), - cert = std::move(cert), - vid, - eraseInfo](bool ok) { - auto sthis = w.lock(); - if (!sthis) - return; - if (!ok) { - JAMI_ERR("Cannot initialize ICE session."); - for (const auto& pending : sthis->extractPendingCallbacks(deviceId)) - pending.cb(nullptr, deviceId); - runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); - return; - } - - dht::ThreadPool::io().run( - [w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] { - if (auto sthis = w.lock()) - sthis->connectDeviceStartIce(deviceId, vid); - }); - }; - ice_config.onNegoDone = [w, - cbId, - deviceId = std::move(deviceId), - name = std::move(name), - cert = std::move(cert), - vid, - eraseInfo](bool ok) { - auto sthis = w.lock(); - if (!sthis) - return; - if (!ok) { - JAMI_ERR("ICE negotiation failed."); - for (const auto& pending : sthis->extractPendingCallbacks(deviceId)) - pending.cb(nullptr, deviceId); - runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); - return; - } - - dht::ThreadPool::io().run([w = std::move(w), - deviceId = std::move(deviceId), - name = std::move(name), - cert = std::move(cert), - vid = std::move(vid)] { - auto sthis = w.lock(); - if (!sthis) - return; - sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert); - }); - }; - - auto info = std::make_shared<ConnectionInfo>(); - { - std::lock_guard<std::mutex> lk(sthis->infosMtx_); - sthis->infos_[{deviceId, vid}] = info; - } - std::unique_lock<std::mutex> lk {info->mutex_}; - info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( - sthis->account.getAccountID().c_str(), 1, false, ice_config); - - if (!info->ice_) { - JAMI_ERR("Cannot initialize ICE session."); - for (const auto& pending : sthis->extractPendingCallbacks(deviceId)) - pending.cb(nullptr, deviceId); - eraseInfo(); + // Note: used when the ice negotiation fails to erase + // all stored structures. + auto eraseInfo = [w, cbId] { + if (auto shared = w.lock()) { + std::lock_guard<std::mutex> lk(shared->infosMtx_); + shared->infos_.erase(cbId); + } + }; + + // If no socket exists, we need to initiate an ICE connection. + auto ice_config = sthis->account.getIceOptions(); + ice_config.tcpEnable = true; + ice_config.onInitDone = [w, + cbId, + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid, + eraseInfo](bool ok) { + auto sthis = w.lock(); + if (!sthis) + return; + if (!ok) { + JAMI_ERR("Cannot initialize ICE session."); + for (const auto& pending : sthis->extractPendingCallbacks(deviceId)) + pending.cb(nullptr, deviceId); + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + return; + } + + dht::ThreadPool::io().run( + [w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] { + if (auto sthis = w.lock()) + sthis->connectDeviceStartIce(deviceId, vid); + }); + }; + ice_config.onNegoDone = [w, + cbId, + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid, + eraseInfo](bool ok) { + auto sthis = w.lock(); + if (!sthis) + return; + if (!ok) { + JAMI_ERR("ICE negotiation failed."); + for (const auto& pending : sthis->extractPendingCallbacks(deviceId)) + pending.cb(nullptr, deviceId); + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + return; + } + + dht::ThreadPool::io().run([w = std::move(w), + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid = std::move(vid)] { + auto sthis = w.lock(); + if (!sthis) return; - } + sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert); }); - }); + }; + + auto info = std::make_shared<ConnectionInfo>(); + { + std::lock_guard<std::mutex> lk(sthis->infosMtx_); + sthis->infos_[{deviceId, vid}] = info; + } + std::unique_lock<std::mutex> lk {info->mutex_}; + info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( + sthis->account.getAccountID().c_str(), 1, false, ice_config); + + if (!info->ice_) { + JAMI_ERR("Cannot initialize ICE session."); + for (const auto& pending : sthis->extractPendingCallbacks(deviceId)) + pending.cb(nullptr, deviceId); + eraseInfo(); + return; + } + }); } void @@ -945,6 +953,14 @@ ConnectionManager::connectDevice(const DeviceId& deviceId, pimpl_->connectDevice(deviceId, name, std::move(cb)); } +void +ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, + const std::string& name, + ConnectCallback cb) +{ + pimpl_->connectDevice(cert, name, std::move(cb)); +} + bool ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const { diff --git a/src/jamidht/connectionmanager.h b/src/jamidht/connectionmanager.h index 8549e2016f688047f9eefad57105312f91faafcb..81732d5f8ac9ccc11106efcc5b78bbe08fb40e65 100644 --- a/src/jamidht/connectionmanager.h +++ b/src/jamidht/connectionmanager.h @@ -86,6 +86,7 @@ public: * @param cb Callback called when socket is ready ready */ void connectDevice(const DeviceId& deviceId, const std::string& name, ConnectCallback cb); + void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name, ConnectCallback cb); /** * Check if we are already connecting to a device with a specific name diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index e47989b5405b981f4b6477ba63140a0180d77e78..a74361ed601fa5dcc8dda77c517a152c03ee29fc 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -2408,30 +2408,23 @@ JamiAccount::doRegister_() if (accountManager_->getInfo()->deviceId == deviceId) return; - { - // Avoid to create multiple sync channels with a device - std::lock_guard<std::mutex> lk(syncConnectionsMtx_); - auto syncConn = syncConnections_.find(deviceId); - if ((syncConn != syncConnections_.end() and not syncConn->second.empty()) - or pendingSync_.find(deviceId) != pendingSync_.end()) - return; // Already syncing - pendingSync_.emplace(deviceId); - } - std::lock_guard<std::mutex> lk(connManagerMtx_); if (!connectionManager_) return; - connectionManager_->connectDevice(crt->getId(), - "sync://" + deviceId, + + auto channelName = "sync://" + deviceId; + if (connectionManager_->isConnecting(crt->getId(), channelName)) { + JAMI_INFO("[Account %s] Already connecting to %s", + getAccountID().c_str(), + deviceId.c_str()); + return; + } + connectionManager_->connectDevice(crt, + channelName, [this](std::shared_ptr<ChannelSocket> socket, const DeviceId& deviceId) { if (socket) syncWith(deviceId.toString(), socket); - { - std::lock_guard<std::mutex> lk( - syncConnectionsMtx_); - pendingSync_.erase(deviceId.toString()); - } }); }); diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 14dab4237068b61c085662eb293012d87546bbd5..b136cc5ca52de2a19e40c277c1106d615bebc0b8 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -872,7 +872,6 @@ private: // Sync connections std::mutex syncConnectionsMtx_; - std::set<std::string> pendingSync_ {}; std::map<std::string /* deviceId */, std::vector<std::shared_ptr<ChannelSocket>>> syncConnections_;