diff --git a/src/connectivity/connectionmanager.cpp b/src/connectivity/connectionmanager.cpp index 1d15b21b4f66eaa83f61f742fe4862c8fc66035b..67aaef92b660bc7f5c162609ad5490e87823f8ba 100644 --- a/src/connectivity/connectionmanager.cpp +++ b/src/connectivity/connectionmanager.cpp @@ -23,6 +23,7 @@ #include "peer_connection.h" #include "logger.h" +#include <asio.hpp> #include <opendht/crypto.h> #include <opendht/thread_pool.h> #include <opendht/value.h> @@ -49,7 +50,6 @@ struct ConnectionInfo } std::mutex mutex_ {}; - std::condition_variable responseCv_ {}; bool responseReceived_ {false}; PeerConnectionRequest response_ {}; std::unique_ptr<IceTransport> ice_ {nullptr}; @@ -57,6 +57,10 @@ struct ConnectionInfo std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr}; std::shared_ptr<MultiplexedSocket> socket_ {}; std::set<CallbackId> cbIds_ {}; + + std::function<void(bool)> onConnected_; + std::unique_ptr<asio::steady_timer> waitForAnswer_ {}; + }; class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl> @@ -80,7 +84,8 @@ public: info->socket_->shutdown(); if (info->ice_) info->ice_->cancelOperations(); - info->responseCv_.notify_all(); + if (info->waitForAnswer_) + info->waitForAnswer_->cancel(); erased = true; it = infos_.erase(it); } @@ -114,9 +119,11 @@ public: dht::Value::Id vid; }; - bool connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk, + void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk, const dht::Value::Id& vid, - const std::string& connType); + const std::string& connType, + std::function<void(bool)> onConnected); + void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid); bool connectDeviceOnNegoDone(const DeviceId& deviceId, const std::string& name, const dht::Value::Id& vid, @@ -276,23 +283,27 @@ public: std::atomic_bool isDestroying_ {false}; }; -bool +void ConnectionManager::Impl::connectDeviceStartIce( const std::shared_ptr<dht::crypto::PublicKey>& devicePk, const dht::Value::Id& vid, - const std::string& connType) + const std::string& connType, + std::function<void(bool)> onConnected) { auto deviceId = devicePk->getLongId(); auto info = getInfo(deviceId, vid); - if (!info) - return false; + if (!info) { + onConnected(false); + return; + } std::unique_lock<std::mutex> lk(info->mutex_); auto& ice = info->ice_; if (!ice) { JAMI_ERR("No ICE detected"); - return false; + onConnected(false); + return; } auto iceAttributes = ice->getLocalAttributes(); @@ -327,28 +338,53 @@ ConnectionManager::Impl::connectDeviceStartIce( (ok ? "ok" : "failed")); }); // Wait for call to onResponse() operated by DHT - if (isDestroying_) - return true; // This avoid to wait new negotiation when destroying - info->responseCv_.wait_for(lk, DHT_MSG_TIMEOUT); - if (isDestroying_) - return true; // The destructor can wake a pending wait here. + if (isDestroying_) { + onConnected(true); // This avoid to wait new negotiation when destroying + return; + } + + info->onConnected_ = std::move(onConnected); + info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext(), std::chrono::steady_clock::now() + DHT_MSG_TIMEOUT); + info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid)); +} + +void +ConnectionManager::Impl::onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid) +{ + if (ec == asio::error::operation_aborted) + return; + auto info = getInfo(deviceId, vid); + if (!info) + return; + + std::unique_lock<std::mutex> lk(info->mutex_); + auto& ice = info->ice_; + if (isDestroying_) { + info->onConnected_(true); // The destructor can wake a pending wait here. + return; + } if (!info->responseReceived_) { JAMI_ERR("no response from DHT to E2E request."); - return false; + info->onConnected_(false); + return; } - if (!ice) - return false; + if (!info->ice_) { + info->onConnected_(false); + return; + } auto sdp = ice->parseIceCandidates(info->response_.ice_msg); if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str()); - return false; + info->onConnected_(false); + return; } - return true; + info->onConnected_(true); } + bool ConnectionManager::Impl::connectDeviceOnNegoDone( const DeviceId& deviceId, @@ -553,8 +589,15 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif eraseInfo, connType] { auto sthis = w.lock(); - if (!sthis || !sthis->connectDeviceStartIce(devicePk, vid, connType)) + if (!sthis) { runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + return; + } + sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) { + if (!ok) { + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + } + }); }); }; ice_config.onNegoDone = [w, @@ -657,7 +700,8 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req) std::lock_guard<std::mutex> lk {info->mutex_}; info->responseReceived_ = true; info->response_ = std::move(req); - info->responseCv_.notify_one(); + info->waitForAnswer_->expires_at(std::chrono::steady_clock::now()); + info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, device, req.id)); } else { JAMI_WARN() << account << " respond received, but cannot find request"; } @@ -666,8 +710,9 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req) void ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk) { - if (!account.dht()) + if (!account.dht()) { return; + } account.dht()->listen<PeerConnectionRequest>( dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()), [w = weak()](PeerConnectionRequest&& req) { @@ -1098,7 +1143,8 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri) info->ice_->cancelOperations(); if (info->socket_) info->socket_->shutdown(); - info->responseCv_.notify_all(); + if (info->waitForAnswer_) + info->waitForAnswer_->cancel(); if (info->ice_) { std::unique_lock<std::mutex> lk {info->mutex_}; dht::ThreadPool::io().run( diff --git a/test/unitTest/connectionManager/connectionManager.cpp b/test/unitTest/connectionManager/connectionManager.cpp index 2b4728e2c75a768f053cadea700374994cefc7db..a8d1983d0905cdc870adc1e30c0c8b6c086dd37b 100644 --- a/test/unitTest/connectionManager/connectionManager.cpp +++ b/test/unitTest/connectionManager/connectionManager.cpp @@ -750,7 +750,6 @@ ConnectionManagerTest::testShutdownCallbacks() auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); auto aliceUri = aliceAccount->getUsername(); - auto aliceDeviceId = DeviceId(std::string(aliceAccount->currentDeviceId())); bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });