Skip to content
Snippets Groups Projects
Commit f175ed5f authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud
Browse files

connectionManager: make async

Callbacks MUST be used. This patch remove the usage of waitForInitialization
and waitForNegotiation in connectionManager

Change-Id: I2be7e3b931e3d6ab7bcd71c51eca63f48668264c
parent 6dabec9a
No related branches found
No related tags found
No related merge requests found
......@@ -122,6 +122,11 @@ public:
removeUnusedConnections();
}
void connectDeviceStartIce(const std::string& deviceId, const dht::Value::Id& vid);
void connectDeviceOnNegoDone(const std::string& deviceId,
const std::string& name,
const dht::Value::Id& vid,
const std::shared_ptr<dht::crypto::Certificate>& cert);
void connectDevice(const std::string& deviceId, const std::string& uri, ConnectCallback cb);
/**
* Send a ChannelRequest on the TLS socket. Triggers cb when ready
......@@ -137,8 +142,12 @@ public:
/**
* Triggered when a PeerConnectionRequest comes from the DHT
*/
void answerTo(IceTransport& ice, const dht::Value::Id& id, const dht::InfoHash& from);
void onRequestStartIce(const PeerConnectionRequest& req);
void onRequestOnNegoDone(const PeerConnectionRequest& req);
void onDhtPeerRequest(const PeerConnectionRequest& req,
const std::shared_ptr<dht::crypto::Certificate>& cert);
void addNewMultiplexedSocket(const std::string& deviceId,
const dht::Value::Id& vid,
std::unique_ptr<TlsSocketEndpoint>&& tlsSocket);
......@@ -203,6 +212,176 @@ public:
std::atomic_bool isDestroying_ {false};
};
void
ConnectionManager::Impl::connectDeviceStartIce(const std::string& deviceId,
const dht::Value::Id& vid)
{
auto tit = connectionsInfos_.find(deviceId);
if (tit == connectionsInfos_.end())
return;
auto it = tit->second.find(vid);
if (it == tit->second.end())
return;
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
std::unique_lock<std::mutex> lk {it->second.mutex_};
auto& ice = it->second.ice_;
auto onError = [&]() {
ice.reset();
std::lock_guard<std::mutex> lk(connectCbsMtx_);
auto cbIt = pendingCbs_.find(cbId);
if (cbIt != pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
pendingCbs_.erase(cbIt);
}
};
if (!ice) {
JAMI_ERR("No ICE detected");
onError();
return;
}
account.registerDhtAddress(*ice);
auto iceAttributes = ice->getLocalAttributes();
std::stringstream icemsg;
icemsg << iceAttributes.ufrag << "\n";
icemsg << iceAttributes.pwd << "\n";
for (const auto& addr : ice->getLocalCandidates(0)) {
icemsg << addr << "\n";
}
// Prepare connection request as a DHT message
PeerConnectionRequest val;
val.id = vid; /* Random id for the message unicity */
val.ice_msg = icemsg.str();
auto value = std::make_shared<dht::Value>(std::move(val));
value->user_type = "peer_request";
// Send connection request through DHT
JAMI_DBG() << account << "Request connection to " << deviceId;
account.dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + deviceId),
dht::InfoHash(deviceId),
value);
// Wait for call to onResponse() operated by DHT
if (isDestroying_)
return; // This avoid to wait new negotiation when destroying
it->second.responseCv_.wait_for(lk, DHT_MSG_TIMEOUT);
if (isDestroying_)
return; // The destructor can wake a pending wait here.
if (!it->second.responseReceived_) {
JAMI_ERR("no response from DHT to E2E request.");
onError();
return;
}
auto& response = it->second.response_;
if (!ice)
return;
auto sdp = IceTransport::parse_SDP(response.ice_msg, *ice);
auto hasPubIp = hasPublicIp(sdp);
if (!hasPubIp)
ice->setInitiatorSession();
if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) {
JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str());
onError();
}
}
void
ConnectionManager::Impl::connectDeviceOnNegoDone(
const std::string& deviceId,
const std::string& name,
const dht::Value::Id& vid,
const std::shared_ptr<dht::crypto::Certificate>& cert)
{
auto tit = connectionsInfos_.find(deviceId);
if (tit == connectionsInfos_.end())
return;
auto it = tit->second.find(vid);
if (it == tit->second.end())
return;
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
std::unique_lock<std::mutex> lk {it->second.mutex_};
auto& ice = it->second.ice_;
auto onError = [&]() {
std::lock_guard<std::mutex> lk(connectCbsMtx_);
auto cbIt = pendingCbs_.find(cbId);
if (cbIt != pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
pendingCbs_.erase(cbIt);
}
};
if (!ice || !ice->isRunning()) {
JAMI_ERR("No ICE detected or not running");
onError();
return;
}
// Build socket
std::lock_guard<std::mutex> lknrs(nonReadySocketsMutex_);
auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
std::move(ice)),
true);
// Negotiate a TLS session
JAMI_DBG() << account << "Start TLS session";
auto tlsSocket = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
account.identity(),
account.dhParams(),
*cert);
auto& nonReadyIt = nonReadySockets_[deviceId][vid];
nonReadyIt = std::move(tlsSocket);
nonReadyIt->setOnReady([w = weak(),
deviceId = std::move(deviceId),
vid = std::move(vid),
name = std::move(name)](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
auto mSockIt = sthis->multiplexedSockets_[deviceId];
if (mSockIt.find(vid) != mSockIt.end())
return;
if (!ok) {
JAMI_ERR() << "TLS connection failure for peer " << deviceId;
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
} else {
// The socket is ready, store it in multiplexedSockets_
std::lock_guard<std::mutex> lkmSockets(sthis->msocketsMutex_);
std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_);
auto nonReadyIt = sthis->nonReadySockets_.find(deviceId);
if (nonReadyIt != sthis->nonReadySockets_.end()) {
sthis->addNewMultiplexedSocket(deviceId, vid, std::move(nonReadyIt->second[vid]));
nonReadyIt->second.erase(vid);
if (nonReadyIt->second.empty()) {
sthis->nonReadySockets_.erase(nonReadyIt);
}
}
// Finally, open the channel
auto mxSockIt = sthis->multiplexedSockets_.at(deviceId);
if (!mxSockIt.empty())
sthis->sendChannelRequest(mxSockIt.rbegin()->second, name, deviceId, vid);
}
});
}
void
ConnectionManager::Impl::connectDevice(const std::string& deviceId,
const std::string& name,
......@@ -223,9 +402,11 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
}
// Avoid dht operation in a DHT callback to avoid deadlocks
// TODO use runOnMainThread instead but first, this needs to make the
// TLSSession and ICETransport async.
dht::ThreadPool::io().run([w, deviceId, name, cert, cb = std::move(cb)] {
runOnMainThread([w,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
cb = std::move(cb)] {
auto sthis = w.lock();
if (!sthis || sthis->isDestroying_) {
cb(nullptr);
......@@ -258,6 +439,68 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
auto& iceTransportFactory = Manager::instance().getIceTransportFactory();
auto ice_config = sthis->account.getIceOptions();
ice_config.tcpEnable = true;
ice_config.onInitDone = [w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
return;
}
dht::ThreadPool::io().run(
[w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] {
auto sthis = w.lock();
if (!sthis)
return;
sthis->connectDeviceStartIce(deviceId, vid);
});
};
ice_config.onNegoDone = [w,
cbId,
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed.");
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
return;
}
dht::ThreadPool::io().run([w = std::move(w),
deviceId = std::move(deviceId),
name = std::move(name),
cert = std::move(cert),
vid = std::move(vid)] {
auto sthis = w.lock();
if (!sthis)
return;
sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert);
});
};
auto& connectionInfo = sthis->connectionsInfos_[deviceId][vid];
std::unique_lock<std::mutex> lk {connectionInfo.mutex_};
connectionInfo.ice_ = iceTransportFactory
......@@ -265,55 +508,9 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
1,
false,
ice_config);
auto& ice = connectionInfo.ice_;
if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) {
if (!connectionInfo.ice_) {
JAMI_ERR("Cannot initialize ICE session.");
ice.reset();
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
return;
}
sthis->account.registerDhtAddress(*ice);
auto iceAttributes = ice->getLocalAttributes();
std::stringstream icemsg;
icemsg << iceAttributes.ufrag << "\n";
icemsg << iceAttributes.pwd << "\n";
for (const auto& addr : ice->getLocalCandidates(0)) {
icemsg << addr << "\n";
}
// Prepare connection request as a DHT message
PeerConnectionRequest val;
val.id = vid; /* Random id for the message unicity */
val.ice_msg = icemsg.str();
auto value = std::make_shared<dht::Value>(std::move(val));
value->user_type = "peer_request";
// Send connection request through DHT
JAMI_DBG() << sthis->account << "Request connection to " << deviceId;
sthis->account.dht()->putEncrypted(dht::InfoHash::get(
PeerConnectionRequest::key_prefix + deviceId),
dht::InfoHash(deviceId),
value);
// Wait for call to onResponse() operated by DHT
if (sthis->isDestroying_)
return; // This avoid to wait new negotiation when destroying
connectionInfo.responseCv_.wait_for(lk, DHT_MSG_TIMEOUT);
if (sthis->isDestroying_)
return; // The destructor can wake a pending wait here.
if (!connectionInfo.responseReceived_) {
JAMI_ERR("no response from DHT to E2E request.");
ice.reset();
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
......@@ -323,102 +520,6 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
}
return;
}
auto& response = connectionInfo.response_;
if (!ice)
return;
auto sdp = IceTransport::parse_SDP(response.ice_msg, *ice);
auto hasPubIp = sthis->hasPublicIp(sdp);
if (!hasPubIp)
ice->setInitiatorSession();
if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) {
JAMI_WARN("[Account:%s] start ICE failed",
sthis->account.getAccountID().c_str());
ice.reset();
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
return;
}
ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT);
if (!ice->isRunning()) {
JAMI_ERR("[Account:%s] ICE negotation failed",
sthis->account.getAccountID().c_str());
ice.reset();
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
return;
}
// Build socket
std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_);
auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
std::move(ice)),
true);
// Negotiate a TLS session
JAMI_DBG() << sthis->account << "Start TLS session";
auto tlsSocket = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
sthis->account.identity(),
sthis->account.dhParams(),
*cert);
auto& nonReadyIt = sthis->nonReadySockets_[deviceId][vid];
nonReadyIt = std::move(tlsSocket);
nonReadyIt->setOnReady([w,
deviceId = std::move(deviceId),
vid = std::move(vid),
name = std::move(name)](bool ok) {
auto sthis = w.lock();
if (!sthis)
return;
auto mSockIt = sthis->multiplexedSockets_[deviceId];
if (mSockIt.find(vid) != mSockIt.end())
return;
if (!ok) {
JAMI_ERR() << "TLS connection failure for peer " << deviceId;
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
} else {
// The socket is ready, store it in multiplexedSockets_
std::lock_guard<std::mutex> lkmSockets(sthis->msocketsMutex_);
std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_);
auto nonReadyIt = sthis->nonReadySockets_.find(deviceId);
if (nonReadyIt != sthis->nonReadySockets_.end()) {
sthis->addNewMultiplexedSocket(deviceId,
vid,
std::move(nonReadyIt->second[vid]));
nonReadyIt->second.erase(vid);
if (nonReadyIt->second.empty()) {
sthis->nonReadySockets_.erase(nonReadyIt);
}
}
// Finally, open the channel
auto mxSockIt = sthis->multiplexedSockets_.at(deviceId);
if (!mxSockIt.empty())
sthis->sendChannelRequest(mxSockIt.rbegin()->second,
name,
deviceId,
vid);
}
});
});
});
}
......@@ -506,12 +607,7 @@ ConnectionManager::Impl::onDhtConnected(const std::string& deviceId)
return;
dht::InfoHash peer_h;
if (AccountManager::foundPeerDevice(cert, peer_h)) {
dht::ThreadPool::io().run([w, req, cert] {
auto shared = w.lock();
if (!shared)
return;
shared->onDhtPeerRequest(req, cert);
});
shared->onDhtPeerRequest(req, cert);
} else {
JAMI_WARN()
<< shared->account << "Rejected untrusted connection request from "
......@@ -525,55 +621,50 @@ ConnectionManager::Impl::onDhtConnected(const std::string& deviceId)
}
void
ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
const std::shared_ptr<dht::crypto::Certificate>& cert)
ConnectionManager::Impl::answerTo(IceTransport& ice,
const dht::Value::Id& id,
const dht::InfoHash& from)
{
auto vid = req.id;
auto deviceId = req.from.toString();
JAMI_INFO() << account << "New connection requested by " << deviceId.c_str();
if (!iceReqCb_ || !iceReqCb_(deviceId)) {
JAMI_INFO("[Account:%s] refuse connection from %s",
account.getAccountID().c_str(),
deviceId.c_str());
return;
// NOTE: This is a shortest version of a real SDP message to save some bits
auto iceAttributes = ice.getLocalAttributes();
std::stringstream icemsg;
icemsg << iceAttributes.ufrag << "\n";
icemsg << iceAttributes.pwd << "\n";
for (const auto& addr : ice.getLocalCandidates(0)) {
icemsg << addr << "\n";
}
auto crt = cert; // This copy the shared_ptr for gcc 6
certMap_.emplace(cert->getId(), std::make_pair(crt, dht::InfoHash(deviceId)));
// Because the connection is accepted, create an ICE socket.
auto& iceTransportFactory = Manager::instance().getIceTransportFactory();
struct IceReady
{
std::mutex mtx {};
std::condition_variable cv {};
bool ready {false};
};
auto iceReady = std::make_shared<IceReady>();
auto ice_config = account.getIceOptions();
ice_config.tcpEnable = true;
ice_config.onRecvReady = [iceReady]() {
auto& ir = *iceReady;
std::lock_guard<std::mutex> lk {ir.mtx};
ir.ready = true;
ir.cv.notify_one();
};
// Send PeerConnection response
PeerConnectionRequest val;
val.id = id;
val.ice_msg = icemsg.str();
val.isAnswer = true;
auto value = std::make_shared<dht::Value>(std::move(val));
value->user_type = "peer_request";
// 1. Create a new Multiplexed Socket
JAMI_DBG() << account << "[CNX] connection accepted, DHT reply to " << from;
account.dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
+ from.toString()),
from,
value);
}
// Negotiate a new ICE socket
auto& connectionInfo = connectionsInfos_[deviceId][req.id];
connectionInfo.ice_ = iceTransportFactory.createUTransport(account.getAccountID().c_str(),
1,
true,
ice_config);
auto& ice = connectionInfo.ice_;
void
ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
{
auto tit = connectionsInfos_.find(req.from.toString());
if (tit == connectionsInfos_.end())
return;
auto it = tit->second.find(req.id);
if (it == tit->second.end())
return;
if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) {
JAMI_ERR("Cannot initialize ICE session.");
ice = nullptr;
std::unique_lock<std::mutex> lk {it->second.mutex_};
auto& ice = it->second.ice_;
if (!ice) {
JAMI_ERR("No ICE detected");
if (connReadyCb_)
connReadyCb_(deviceId, "", nullptr);
connReadyCb_(req.from.toString(), "", nullptr);
return;
}
......@@ -585,60 +676,39 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
JAMI_ERR("[Account:%s] start ICE failed - fallback to TURN", account.getAccountID().c_str());
ice = nullptr;
if (connReadyCb_)
connReadyCb_(deviceId, "", nullptr);
connReadyCb_(req.from.toString(), "", nullptr);
return;
}
if (!hasPubIp) {
ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT);
if (ice->isRunning()) {
JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP",
account.getAccountID().c_str());
} else {
JAMI_ERR("[Account:%s] ICE negotation failed", account.getAccountID().c_str());
ice = nullptr;
if (connReadyCb_)
connReadyCb_(deviceId, "", nullptr);
return;
}
}
// NOTE: This is a shortest version of a real SDP message to save some bits
auto iceAttributes = ice->getLocalAttributes();
std::stringstream icemsg;
icemsg << iceAttributes.ufrag << "\n";
icemsg << iceAttributes.pwd << "\n";
for (const auto& addr : ice->getLocalCandidates(0)) {
icemsg << addr << "\n";
}
// Send PeerConnection response
PeerConnectionRequest val;
val.id = req.id;
val.ice_msg = icemsg.str();
val.isAnswer = true;
auto value = std::make_shared<dht::Value>(std::move(val));
value->user_type = "peer_request";
if (hasPubIp)
answerTo(*ice, req.id, req.from);
}
JAMI_DBG() << account << "[CNX] connection accepted, DHT reply to " << req.from;
account.dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + deviceId),
req.from,
value);
void
ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
{
auto deviceId = req.from.toString();
auto tit = connectionsInfos_.find(deviceId);
if (tit == connectionsInfos_.end())
return;
auto it = tit->second.find(req.id);
if (it == tit->second.end())
return;
if (hasPubIp) {
ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT);
if (ice->isRunning()) {
JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP",
account.getAccountID().c_str());
} else {
JAMI_ERR("[Account:%s] ICE negotation failed", account.getAccountID().c_str());
ice = nullptr;
if (connReadyCb_)
connReadyCb_(deviceId, "", nullptr);
return;
}
std::unique_lock<std::mutex> lk {it->second.mutex_};
auto& ice = it->second.ice_;
if (!ice) {
JAMI_ERR("No ICE detected");
if (connReadyCb_)
connReadyCb_(deviceId, "", nullptr);
return;
}
auto sdp = IceTransport::parse_SDP(req.ice_msg, *ice);
auto hasPubIp = hasPublicIp(sdp);
if (!hasPubIp)
answerTo(*ice, req.id, req.from);
// Build socket
std::lock_guard<std::mutex> lknrs(nonReadySocketsMutex_);
auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
......@@ -659,9 +729,9 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
return shared->validatePeerCertificate(cert, peer_h) && peer_h == ph;
});
auto& nonReadyIt = nonReadySockets_[deviceId][vid];
auto& nonReadyIt = nonReadySockets_[deviceId][req.id];
nonReadyIt = std::move(tlsSocket);
nonReadyIt->setOnReady([w = weak(), deviceId, vid = std::move(vid)](bool ok) {
nonReadyIt->setOnReady([w = weak(), deviceId, vid = std::move(req.id)](bool ok) {
auto shared = w.lock();
if (!shared)
return;
......@@ -689,6 +759,94 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
});
}
void
ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
const std::shared_ptr<dht::crypto::Certificate>& cert)
{
auto deviceId = req.from.toString();
JAMI_INFO() << account << "New connection requested by " << deviceId.c_str();
if (!iceReqCb_ || !iceReqCb_(deviceId)) {
JAMI_INFO("[Account:%s] refuse connection from %s",
account.getAccountID().c_str(),
deviceId.c_str());
return;
}
auto crt = cert; // This copy the shared_ptr for gcc 6
certMap_.emplace(cert->getId(), std::make_pair(crt, dht::InfoHash(deviceId)));
// Because the connection is accepted, create an ICE socket.
auto& iceTransportFactory = Manager::instance().getIceTransportFactory();
struct IceReady
{
std::mutex mtx {};
std::condition_variable cv {};
bool ready {false};
};
auto iceReady = std::make_shared<IceReady>();
auto ice_config = account.getIceOptions();
ice_config.tcpEnable = true;
ice_config.onRecvReady = [iceReady]() {
auto& ir = *iceReady;
std::lock_guard<std::mutex> lk {ir.mtx};
ir.ready = true;
ir.cv.notify_one();
};
ice_config.onInitDone = [w = weak(), deviceId, req](bool ok) {
auto shared = w.lock();
if (!shared)
return;
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
if (shared->connReadyCb_)
shared->connReadyCb_(deviceId, "", nullptr);
return;
}
dht::ThreadPool::io().run([w = std::move(w), deviceId, req = std::move(req)] {
auto shared = w.lock();
if (!shared)
return;
shared->onRequestStartIce(req);
});
};
ice_config.onNegoDone = [w = weak(), deviceId, req](bool ok) {
auto shared = w.lock();
if (!shared)
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed");
if (shared->connReadyCb_)
shared->connReadyCb_(deviceId, "", nullptr);
return;
}
dht::ThreadPool::io().run([w = std::move(w), deviceId, req = std::move(req)] {
auto shared = w.lock();
if (!shared)
return;
shared->onRequestOnNegoDone(req);
});
};
// 1. Create a new Multiplexed Socket
// Negotiate a new ICE socket
auto& connectionInfo = connectionsInfos_[deviceId][req.id];
std::unique_lock<std::mutex> lk {connectionInfo.mutex_};
connectionInfo.ice_ = iceTransportFactory.createUTransport(account.getAccountID().c_str(),
1,
true,
ice_config);
if (not connectionInfo.ice_) {
JAMI_ERR("Cannot initialize ICE session.");
if (connReadyCb_)
connReadyCb_(deviceId, "", nullptr);
return;
}
}
void
ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId,
const dht::Value::Id& vid,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment