Unverified Commit d1fac179 authored by Sébastien Blin's avatar Sébastien Blin
Browse files

connectionmanager: replace wait_for by async task

Change-Id: Ic5597f0713b9abbc6cfa903f13f9811b9a6f03b4
parent b2937e7a
......@@ -23,6 +23,7 @@
#include "peer_connection.h"
#include "logger.h"
#include <asio.hpp>
#include <opendht/crypto.h>
#include <opendht/thread_pool.h>
#include <opendht/value.h>
......@@ -49,7 +50,6 @@ struct ConnectionInfo
}
std::mutex mutex_ {};
std::condition_variable responseCv_ {};
bool responseReceived_ {false};
PeerConnectionRequest response_ {};
std::unique_ptr<IceTransport> ice_ {nullptr};
......@@ -57,6 +57,10 @@ struct ConnectionInfo
std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
std::shared_ptr<MultiplexedSocket> socket_ {};
std::set<CallbackId> cbIds_ {};
std::function<void(bool)> onConnected_;
std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
};
class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
......@@ -80,7 +84,8 @@ public:
info->socket_->shutdown();
if (info->ice_)
info->ice_->cancelOperations();
info->responseCv_.notify_all();
if (info->waitForAnswer_)
info->waitForAnswer_->cancel();
erased = true;
it = infos_.erase(it);
}
......@@ -114,9 +119,11 @@ public:
dht::Value::Id vid;
};
bool connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
const dht::Value::Id& vid,
const std::string& connType);
const std::string& connType,
std::function<void(bool)> onConnected);
void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
bool connectDeviceOnNegoDone(const DeviceId& deviceId,
const std::string& name,
const dht::Value::Id& vid,
......@@ -276,23 +283,27 @@ public:
std::atomic_bool isDestroying_ {false};
};
bool
void
ConnectionManager::Impl::connectDeviceStartIce(
const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
const dht::Value::Id& vid,
const std::string& connType)
const std::string& connType,
std::function<void(bool)> onConnected)
{
auto deviceId = devicePk->getLongId();
auto info = getInfo(deviceId, vid);
if (!info)
return false;
if (!info) {
onConnected(false);
return;
}
std::unique_lock<std::mutex> lk(info->mutex_);
auto& ice = info->ice_;
if (!ice) {
JAMI_ERR("No ICE detected");
return false;
onConnected(false);
return;
}
auto iceAttributes = ice->getLocalAttributes();
......@@ -327,28 +338,53 @@ ConnectionManager::Impl::connectDeviceStartIce(
(ok ? "ok" : "failed"));
});
// Wait for call to onResponse() operated by DHT
if (isDestroying_)
return true; // This avoid to wait new negotiation when destroying
info->responseCv_.wait_for(lk, DHT_MSG_TIMEOUT);
if (isDestroying_)
return true; // The destructor can wake a pending wait here.
if (isDestroying_) {
onConnected(true); // This avoid to wait new negotiation when destroying
return;
}
info->onConnected_ = std::move(onConnected);
info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext(), std::chrono::steady_clock::now() + DHT_MSG_TIMEOUT);
info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
}
void
ConnectionManager::Impl::onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid)
{
if (ec == asio::error::operation_aborted)
return;
auto info = getInfo(deviceId, vid);
if (!info)
return;
std::unique_lock<std::mutex> lk(info->mutex_);
auto& ice = info->ice_;
if (isDestroying_) {
info->onConnected_(true); // The destructor can wake a pending wait here.
return;
}
if (!info->responseReceived_) {
JAMI_ERR("no response from DHT to E2E request.");
return false;
info->onConnected_(false);
return;
}
if (!ice)
return false;
if (!info->ice_) {
info->onConnected_(false);
return;
}
auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str());
return false;
info->onConnected_(false);
return;
}
return true;
info->onConnected_(true);
}
bool
ConnectionManager::Impl::connectDeviceOnNegoDone(
const DeviceId& deviceId,
......@@ -553,8 +589,15 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif
eraseInfo,
connType] {
auto sthis = w.lock();
if (!sthis || !sthis->connectDeviceStartIce(devicePk, vid, connType))
if (!sthis) {
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
return;
}
sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
if (!ok) {
runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
}
});
});
};
ice_config.onNegoDone = [w,
......@@ -657,7 +700,8 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
std::lock_guard<std::mutex> lk {info->mutex_};
info->responseReceived_ = true;
info->response_ = std::move(req);
info->responseCv_.notify_one();
info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, device, req.id));
} else {
JAMI_WARN() << account << " respond received, but cannot find request";
}
......@@ -666,8 +710,9 @@ ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
void
ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
{
if (!account.dht())
if (!account.dht()) {
return;
}
account.dht()->listen<PeerConnectionRequest>(
dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
[w = weak()](PeerConnectionRequest&& req) {
......@@ -1098,7 +1143,8 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri)
info->ice_->cancelOperations();
if (info->socket_)
info->socket_->shutdown();
info->responseCv_.notify_all();
if (info->waitForAnswer_)
info->waitForAnswer_->cancel();
if (info->ice_) {
std::unique_lock<std::mutex> lk {info->mutex_};
dht::ThreadPool::io().run(
......
......@@ -750,7 +750,6 @@ ConnectionManagerTest::testShutdownCallbacks()
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId()));
auto aliceUri = aliceAccount->getUsername();
auto aliceDeviceId = DeviceId(std::string(aliceAccount->currentDeviceId()));
bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment