diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index ebe89d2f299c3df3fd702022e001cfb895599a2e..86bd08b3b5873ad4be43b4d7947a25ad00a9f071 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -122,6 +122,11 @@ public: removeUnusedConnections(); } + void connectDeviceStartIce(const std::string& deviceId, const dht::Value::Id& vid); + void connectDeviceOnNegoDone(const std::string& deviceId, + const std::string& name, + const dht::Value::Id& vid, + const std::shared_ptr<dht::crypto::Certificate>& cert); void connectDevice(const std::string& deviceId, const std::string& uri, ConnectCallback cb); /** * Send a ChannelRequest on the TLS socket. Triggers cb when ready @@ -137,8 +142,12 @@ public: /** * Triggered when a PeerConnectionRequest comes from the DHT */ + void answerTo(IceTransport& ice, const dht::Value::Id& id, const dht::InfoHash& from); + void onRequestStartIce(const PeerConnectionRequest& req); + void onRequestOnNegoDone(const PeerConnectionRequest& req); void onDhtPeerRequest(const PeerConnectionRequest& req, const std::shared_ptr<dht::crypto::Certificate>& cert); + void addNewMultiplexedSocket(const std::string& deviceId, const dht::Value::Id& vid, std::unique_ptr<TlsSocketEndpoint>&& tlsSocket); @@ -203,6 +212,176 @@ public: std::atomic_bool isDestroying_ {false}; }; +void +ConnectionManager::Impl::connectDeviceStartIce(const std::string& deviceId, + const dht::Value::Id& vid) +{ + auto tit = connectionsInfos_.find(deviceId); + if (tit == connectionsInfos_.end()) + return; + auto it = tit->second.find(vid); + if (it == tit->second.end()) + return; + + std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); + std::unique_lock<std::mutex> lk {it->second.mutex_}; + auto& ice = it->second.ice_; + + auto onError = [&]() { + ice.reset(); + std::lock_guard<std::mutex> lk(connectCbsMtx_); + auto cbIt = pendingCbs_.find(cbId); + if (cbIt != pendingCbs_.end()) { + if (cbIt->second) + cbIt->second(nullptr); + pendingCbs_.erase(cbIt); + } + }; + + if (!ice) { + JAMI_ERR("No ICE detected"); + onError(); + return; + } + + account.registerDhtAddress(*ice); + + auto iceAttributes = ice->getLocalAttributes(); + std::stringstream icemsg; + icemsg << iceAttributes.ufrag << "\n"; + icemsg << iceAttributes.pwd << "\n"; + for (const auto& addr : ice->getLocalCandidates(0)) { + icemsg << addr << "\n"; + } + + // Prepare connection request as a DHT message + PeerConnectionRequest val; + + val.id = vid; /* Random id for the message unicity */ + val.ice_msg = icemsg.str(); + auto value = std::make_shared<dht::Value>(std::move(val)); + value->user_type = "peer_request"; + + // Send connection request through DHT + JAMI_DBG() << account << "Request connection to " << deviceId; + account.dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + deviceId), + dht::InfoHash(deviceId), + value); + + // Wait for call to onResponse() operated by DHT + if (isDestroying_) + return; // This avoid to wait new negotiation when destroying + it->second.responseCv_.wait_for(lk, DHT_MSG_TIMEOUT); + if (isDestroying_) + return; // The destructor can wake a pending wait here. + if (!it->second.responseReceived_) { + JAMI_ERR("no response from DHT to E2E request."); + onError(); + return; + } + + auto& response = it->second.response_; + if (!ice) + return; + auto sdp = IceTransport::parse_SDP(response.ice_msg, *ice); + auto hasPubIp = hasPublicIp(sdp); + if (!hasPubIp) + ice->setInitiatorSession(); + if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) { + JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str()); + onError(); + } +} + +void +ConnectionManager::Impl::connectDeviceOnNegoDone( + const std::string& deviceId, + const std::string& name, + const dht::Value::Id& vid, + const std::shared_ptr<dht::crypto::Certificate>& cert) +{ + auto tit = connectionsInfos_.find(deviceId); + if (tit == connectionsInfos_.end()) + return; + auto it = tit->second.find(vid); + if (it == tit->second.end()) + return; + + std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); + std::unique_lock<std::mutex> lk {it->second.mutex_}; + auto& ice = it->second.ice_; + + auto onError = [&]() { + std::lock_guard<std::mutex> lk(connectCbsMtx_); + auto cbIt = pendingCbs_.find(cbId); + if (cbIt != pendingCbs_.end()) { + if (cbIt->second) + cbIt->second(nullptr); + pendingCbs_.erase(cbIt); + } + }; + + if (!ice || !ice->isRunning()) { + JAMI_ERR("No ICE detected or not running"); + onError(); + return; + } + + // Build socket + std::lock_guard<std::mutex> lknrs(nonReadySocketsMutex_); + auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( + std::move(ice)), + true); + + // Negotiate a TLS session + JAMI_DBG() << account << "Start TLS session"; + auto tlsSocket = std::make_unique<TlsSocketEndpoint>(std::move(endpoint), + account.identity(), + account.dhParams(), + *cert); + + auto& nonReadyIt = nonReadySockets_[deviceId][vid]; + nonReadyIt = std::move(tlsSocket); + nonReadyIt->setOnReady([w = weak(), + deviceId = std::move(deviceId), + vid = std::move(vid), + name = std::move(name)](bool ok) { + auto sthis = w.lock(); + if (!sthis) + return; + auto mSockIt = sthis->multiplexedSockets_[deviceId]; + if (mSockIt.find(vid) != mSockIt.end()) + return; + if (!ok) { + JAMI_ERR() << "TLS connection failure for peer " << deviceId; + std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); + std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); + auto cbIt = sthis->pendingCbs_.find(cbId); + if (cbIt != sthis->pendingCbs_.end()) { + if (cbIt->second) + cbIt->second(nullptr); + sthis->pendingCbs_.erase(cbIt); + } + } else { + // The socket is ready, store it in multiplexedSockets_ + std::lock_guard<std::mutex> lkmSockets(sthis->msocketsMutex_); + std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_); + auto nonReadyIt = sthis->nonReadySockets_.find(deviceId); + if (nonReadyIt != sthis->nonReadySockets_.end()) { + sthis->addNewMultiplexedSocket(deviceId, vid, std::move(nonReadyIt->second[vid])); + nonReadyIt->second.erase(vid); + if (nonReadyIt->second.empty()) { + sthis->nonReadySockets_.erase(nonReadyIt); + } + } + // Finally, open the channel + auto mxSockIt = sthis->multiplexedSockets_.at(deviceId); + if (!mxSockIt.empty()) + sthis->sendChannelRequest(mxSockIt.rbegin()->second, name, deviceId, vid); + } + }); +} + void ConnectionManager::Impl::connectDevice(const std::string& deviceId, const std::string& name, @@ -223,9 +402,11 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, } // Avoid dht operation in a DHT callback to avoid deadlocks - // TODO use runOnMainThread instead but first, this needs to make the - // TLSSession and ICETransport async. - dht::ThreadPool::io().run([w, deviceId, name, cert, cb = std::move(cb)] { + runOnMainThread([w, + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + cb = std::move(cb)] { auto sthis = w.lock(); if (!sthis || sthis->isDestroying_) { cb(nullptr); @@ -258,6 +439,68 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); auto ice_config = sthis->account.getIceOptions(); ice_config.tcpEnable = true; + ice_config.onInitDone = [w, + cbId, + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid](bool ok) { + auto sthis = w.lock(); + if (!sthis) + return; + if (!ok) { + JAMI_ERR("Cannot initialize ICE session."); + std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); + auto cbIt = sthis->pendingCbs_.find(cbId); + if (cbIt != sthis->pendingCbs_.end()) { + if (cbIt->second) + cbIt->second(nullptr); + sthis->pendingCbs_.erase(cbIt); + } + return; + } + + dht::ThreadPool::io().run( + [w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] { + auto sthis = w.lock(); + if (!sthis) + return; + sthis->connectDeviceStartIce(deviceId, vid); + }); + }; + ice_config.onNegoDone = [w, + cbId, + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid](bool ok) { + auto sthis = w.lock(); + if (!sthis) + return; + if (!ok) { + JAMI_ERR("ICE negotiation failed."); + std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); + auto cbIt = sthis->pendingCbs_.find(cbId); + if (cbIt != sthis->pendingCbs_.end()) { + if (cbIt->second) + cbIt->second(nullptr); + sthis->pendingCbs_.erase(cbIt); + } + return; + } + + dht::ThreadPool::io().run([w = std::move(w), + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + vid = std::move(vid)] { + auto sthis = w.lock(); + if (!sthis) + return; + sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert); + }); + }; + auto& connectionInfo = sthis->connectionsInfos_[deviceId][vid]; std::unique_lock<std::mutex> lk {connectionInfo.mutex_}; connectionInfo.ice_ = iceTransportFactory @@ -265,55 +508,9 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, 1, false, ice_config); - auto& ice = connectionInfo.ice_; - if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) { + if (!connectionInfo.ice_) { JAMI_ERR("Cannot initialize ICE session."); - ice.reset(); - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } - return; - } - - sthis->account.registerDhtAddress(*ice); - - auto iceAttributes = ice->getLocalAttributes(); - std::stringstream icemsg; - icemsg << iceAttributes.ufrag << "\n"; - icemsg << iceAttributes.pwd << "\n"; - for (const auto& addr : ice->getLocalCandidates(0)) { - icemsg << addr << "\n"; - } - - // Prepare connection request as a DHT message - PeerConnectionRequest val; - - val.id = vid; /* Random id for the message unicity */ - val.ice_msg = icemsg.str(); - auto value = std::make_shared<dht::Value>(std::move(val)); - value->user_type = "peer_request"; - - // Send connection request through DHT - JAMI_DBG() << sthis->account << "Request connection to " << deviceId; - sthis->account.dht()->putEncrypted(dht::InfoHash::get( - PeerConnectionRequest::key_prefix + deviceId), - dht::InfoHash(deviceId), - value); - - // Wait for call to onResponse() operated by DHT - if (sthis->isDestroying_) - return; // This avoid to wait new negotiation when destroying - connectionInfo.responseCv_.wait_for(lk, DHT_MSG_TIMEOUT); - if (sthis->isDestroying_) - return; // The destructor can wake a pending wait here. - if (!connectionInfo.responseReceived_) { - JAMI_ERR("no response from DHT to E2E request."); - ice.reset(); std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); auto cbIt = sthis->pendingCbs_.find(cbId); if (cbIt != sthis->pendingCbs_.end()) { @@ -323,102 +520,6 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId, } return; } - - auto& response = connectionInfo.response_; - if (!ice) - return; - auto sdp = IceTransport::parse_SDP(response.ice_msg, *ice); - auto hasPubIp = sthis->hasPublicIp(sdp); - if (!hasPubIp) - ice->setInitiatorSession(); - if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) { - JAMI_WARN("[Account:%s] start ICE failed", - sthis->account.getAccountID().c_str()); - ice.reset(); - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } - return; - } - - ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT); - - if (!ice->isRunning()) { - JAMI_ERR("[Account:%s] ICE negotation failed", - sthis->account.getAccountID().c_str()); - ice.reset(); - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } - return; - } - - // Build socket - std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_); - auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( - std::move(ice)), - true); - - // Negotiate a TLS session - JAMI_DBG() << sthis->account << "Start TLS session"; - auto tlsSocket = std::make_unique<TlsSocketEndpoint>(std::move(endpoint), - sthis->account.identity(), - sthis->account.dhParams(), - *cert); - - auto& nonReadyIt = sthis->nonReadySockets_[deviceId][vid]; - nonReadyIt = std::move(tlsSocket); - nonReadyIt->setOnReady([w, - deviceId = std::move(deviceId), - vid = std::move(vid), - name = std::move(name)](bool ok) { - auto sthis = w.lock(); - if (!sthis) - return; - auto mSockIt = sthis->multiplexedSockets_[deviceId]; - if (mSockIt.find(vid) != mSockIt.end()) - return; - if (!ok) { - JAMI_ERR() << "TLS connection failure for peer " << deviceId; - std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); - std::pair<std::string, dht::Value::Id> cbId(deviceId, vid); - auto cbIt = sthis->pendingCbs_.find(cbId); - if (cbIt != sthis->pendingCbs_.end()) { - if (cbIt->second) - cbIt->second(nullptr); - sthis->pendingCbs_.erase(cbIt); - } - } else { - // The socket is ready, store it in multiplexedSockets_ - std::lock_guard<std::mutex> lkmSockets(sthis->msocketsMutex_); - std::lock_guard<std::mutex> lknrs(sthis->nonReadySocketsMutex_); - auto nonReadyIt = sthis->nonReadySockets_.find(deviceId); - if (nonReadyIt != sthis->nonReadySockets_.end()) { - sthis->addNewMultiplexedSocket(deviceId, - vid, - std::move(nonReadyIt->second[vid])); - nonReadyIt->second.erase(vid); - if (nonReadyIt->second.empty()) { - sthis->nonReadySockets_.erase(nonReadyIt); - } - } - // Finally, open the channel - auto mxSockIt = sthis->multiplexedSockets_.at(deviceId); - if (!mxSockIt.empty()) - sthis->sendChannelRequest(mxSockIt.rbegin()->second, - name, - deviceId, - vid); - } - }); }); }); } @@ -506,12 +607,7 @@ ConnectionManager::Impl::onDhtConnected(const std::string& deviceId) return; dht::InfoHash peer_h; if (AccountManager::foundPeerDevice(cert, peer_h)) { - dht::ThreadPool::io().run([w, req, cert] { - auto shared = w.lock(); - if (!shared) - return; - shared->onDhtPeerRequest(req, cert); - }); + shared->onDhtPeerRequest(req, cert); } else { JAMI_WARN() << shared->account << "Rejected untrusted connection request from " @@ -525,55 +621,50 @@ ConnectionManager::Impl::onDhtConnected(const std::string& deviceId) } void -ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, - const std::shared_ptr<dht::crypto::Certificate>& cert) +ConnectionManager::Impl::answerTo(IceTransport& ice, + const dht::Value::Id& id, + const dht::InfoHash& from) { - auto vid = req.id; - auto deviceId = req.from.toString(); - JAMI_INFO() << account << "New connection requested by " << deviceId.c_str(); - if (!iceReqCb_ || !iceReqCb_(deviceId)) { - JAMI_INFO("[Account:%s] refuse connection from %s", - account.getAccountID().c_str(), - deviceId.c_str()); - return; + // NOTE: This is a shortest version of a real SDP message to save some bits + auto iceAttributes = ice.getLocalAttributes(); + std::stringstream icemsg; + icemsg << iceAttributes.ufrag << "\n"; + icemsg << iceAttributes.pwd << "\n"; + for (const auto& addr : ice.getLocalCandidates(0)) { + icemsg << addr << "\n"; } - auto crt = cert; // This copy the shared_ptr for gcc 6 - certMap_.emplace(cert->getId(), std::make_pair(crt, dht::InfoHash(deviceId))); - - // Because the connection is accepted, create an ICE socket. - auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); - struct IceReady - { - std::mutex mtx {}; - std::condition_variable cv {}; - bool ready {false}; - }; - auto iceReady = std::make_shared<IceReady>(); - auto ice_config = account.getIceOptions(); - ice_config.tcpEnable = true; - ice_config.onRecvReady = [iceReady]() { - auto& ir = *iceReady; - std::lock_guard<std::mutex> lk {ir.mtx}; - ir.ready = true; - ir.cv.notify_one(); - }; + // Send PeerConnection response + PeerConnectionRequest val; + val.id = id; + val.ice_msg = icemsg.str(); + val.isAnswer = true; + auto value = std::make_shared<dht::Value>(std::move(val)); + value->user_type = "peer_request"; - // 1. Create a new Multiplexed Socket + JAMI_DBG() << account << "[CNX] connection accepted, DHT reply to " << from; + account.dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + + from.toString()), + from, + value); +} - // Negotiate a new ICE socket - auto& connectionInfo = connectionsInfos_[deviceId][req.id]; - connectionInfo.ice_ = iceTransportFactory.createUTransport(account.getAccountID().c_str(), - 1, - true, - ice_config); - auto& ice = connectionInfo.ice_; +void +ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req) +{ + auto tit = connectionsInfos_.find(req.from.toString()); + if (tit == connectionsInfos_.end()) + return; + auto it = tit->second.find(req.id); + if (it == tit->second.end()) + return; - if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) { - JAMI_ERR("Cannot initialize ICE session."); - ice = nullptr; + std::unique_lock<std::mutex> lk {it->second.mutex_}; + auto& ice = it->second.ice_; + if (!ice) { + JAMI_ERR("No ICE detected"); if (connReadyCb_) - connReadyCb_(deviceId, "", nullptr); + connReadyCb_(req.from.toString(), "", nullptr); return; } @@ -585,60 +676,39 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, JAMI_ERR("[Account:%s] start ICE failed - fallback to TURN", account.getAccountID().c_str()); ice = nullptr; if (connReadyCb_) - connReadyCb_(deviceId, "", nullptr); + connReadyCb_(req.from.toString(), "", nullptr); return; } - if (!hasPubIp) { - ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT); - if (ice->isRunning()) { - JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", - account.getAccountID().c_str()); - } else { - JAMI_ERR("[Account:%s] ICE negotation failed", account.getAccountID().c_str()); - ice = nullptr; - if (connReadyCb_) - connReadyCb_(deviceId, "", nullptr); - return; - } - } - - // NOTE: This is a shortest version of a real SDP message to save some bits - auto iceAttributes = ice->getLocalAttributes(); - std::stringstream icemsg; - icemsg << iceAttributes.ufrag << "\n"; - icemsg << iceAttributes.pwd << "\n"; - for (const auto& addr : ice->getLocalCandidates(0)) { - icemsg << addr << "\n"; - } - - // Send PeerConnection response - PeerConnectionRequest val; - val.id = req.id; - val.ice_msg = icemsg.str(); - val.isAnswer = true; - auto value = std::make_shared<dht::Value>(std::move(val)); - value->user_type = "peer_request"; + if (hasPubIp) + answerTo(*ice, req.id, req.from); +} - JAMI_DBG() << account << "[CNX] connection accepted, DHT reply to " << req.from; - account.dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + deviceId), - req.from, - value); +void +ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req) +{ + auto deviceId = req.from.toString(); + auto tit = connectionsInfos_.find(deviceId); + if (tit == connectionsInfos_.end()) + return; + auto it = tit->second.find(req.id); + if (it == tit->second.end()) + return; - if (hasPubIp) { - ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT); - if (ice->isRunning()) { - JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", - account.getAccountID().c_str()); - } else { - JAMI_ERR("[Account:%s] ICE negotation failed", account.getAccountID().c_str()); - ice = nullptr; - if (connReadyCb_) - connReadyCb_(deviceId, "", nullptr); - return; - } + std::unique_lock<std::mutex> lk {it->second.mutex_}; + auto& ice = it->second.ice_; + if (!ice) { + JAMI_ERR("No ICE detected"); + if (connReadyCb_) + connReadyCb_(deviceId, "", nullptr); + return; } + auto sdp = IceTransport::parse_SDP(req.ice_msg, *ice); + auto hasPubIp = hasPublicIp(sdp); + if (!hasPubIp) + answerTo(*ice, req.id, req.from); + // Build socket std::lock_guard<std::mutex> lknrs(nonReadySocketsMutex_); auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( @@ -659,9 +729,9 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, return shared->validatePeerCertificate(cert, peer_h) && peer_h == ph; }); - auto& nonReadyIt = nonReadySockets_[deviceId][vid]; + auto& nonReadyIt = nonReadySockets_[deviceId][req.id]; nonReadyIt = std::move(tlsSocket); - nonReadyIt->setOnReady([w = weak(), deviceId, vid = std::move(vid)](bool ok) { + nonReadyIt->setOnReady([w = weak(), deviceId, vid = std::move(req.id)](bool ok) { auto shared = w.lock(); if (!shared) return; @@ -689,6 +759,94 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, }); } +void +ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, + const std::shared_ptr<dht::crypto::Certificate>& cert) +{ + auto deviceId = req.from.toString(); + JAMI_INFO() << account << "New connection requested by " << deviceId.c_str(); + if (!iceReqCb_ || !iceReqCb_(deviceId)) { + JAMI_INFO("[Account:%s] refuse connection from %s", + account.getAccountID().c_str(), + deviceId.c_str()); + return; + } + + auto crt = cert; // This copy the shared_ptr for gcc 6 + certMap_.emplace(cert->getId(), std::make_pair(crt, dht::InfoHash(deviceId))); + + // Because the connection is accepted, create an ICE socket. + auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); + struct IceReady + { + std::mutex mtx {}; + std::condition_variable cv {}; + bool ready {false}; + }; + auto iceReady = std::make_shared<IceReady>(); + auto ice_config = account.getIceOptions(); + ice_config.tcpEnable = true; + ice_config.onRecvReady = [iceReady]() { + auto& ir = *iceReady; + std::lock_guard<std::mutex> lk {ir.mtx}; + ir.ready = true; + ir.cv.notify_one(); + }; + ice_config.onInitDone = [w = weak(), deviceId, req](bool ok) { + auto shared = w.lock(); + if (!shared) + return; + if (!ok) { + JAMI_ERR("Cannot initialize ICE session."); + if (shared->connReadyCb_) + shared->connReadyCb_(deviceId, "", nullptr); + return; + } + + dht::ThreadPool::io().run([w = std::move(w), deviceId, req = std::move(req)] { + auto shared = w.lock(); + if (!shared) + return; + shared->onRequestStartIce(req); + }); + }; + + ice_config.onNegoDone = [w = weak(), deviceId, req](bool ok) { + auto shared = w.lock(); + if (!shared) + return; + if (!ok) { + JAMI_ERR("ICE negotiation failed"); + if (shared->connReadyCb_) + shared->connReadyCb_(deviceId, "", nullptr); + return; + } + + dht::ThreadPool::io().run([w = std::move(w), deviceId, req = std::move(req)] { + auto shared = w.lock(); + if (!shared) + return; + shared->onRequestOnNegoDone(req); + }); + }; + + // 1. Create a new Multiplexed Socket + + // Negotiate a new ICE socket + auto& connectionInfo = connectionsInfos_[deviceId][req.id]; + std::unique_lock<std::mutex> lk {connectionInfo.mutex_}; + connectionInfo.ice_ = iceTransportFactory.createUTransport(account.getAccountID().c_str(), + 1, + true, + ice_config); + if (not connectionInfo.ice_) { + JAMI_ERR("Cannot initialize ICE session."); + if (connReadyCb_) + connReadyCb_(deviceId, "", nullptr); + return; + } +} + void ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId, const dht::Value::Id& vid,