diff --git a/src/call.cpp b/src/call.cpp index 919e6f830aec2098f1ef0c71cdf41a34ecc93b37..a5d123668e90e3b43a821b05ea4b85fd0ed0a95f 100644 --- a/src/call.cpp +++ b/src/call.cpp @@ -91,65 +91,40 @@ Call::Call(Account& account, { updateDetails(details); - addStateListener([this](Call::CallState call_state, - Call::ConnectionState cnx_state, - UNUSED int code) { - if (cnx_state == ConnectionState::PROGRESSING && startFallback_.exchange(false) - && not isSubcall()) { - // If the other peer lose the connectivity during the progressing - // this means that the other peer had a connectivity change we didn't - // detect for now. - // In this case, we let two secs before sending a request via the DHT - // just to bypass the CONNECTING status - + addStateListener( + [this](Call::CallState call_state, Call::ConnectionState cnx_state, UNUSED int code) { + checkPendingIM(); std::weak_ptr<Call> callWkPtr = shared_from_this(); - Manager::instance().scheduler().scheduleIn( - [callWkPtr] { - if (auto callShPtr = callWkPtr.lock()) { - if (callShPtr->getConnectionState() == Call::ConnectionState::PROGRESSING) { - JAMI_WARN("Call %s is still connecting after timeout, sending fallback " - "request", - callShPtr->getCallId().c_str()); - if (callShPtr->onNeedFallback_) - callShPtr->onNeedFallback_(); - } - } - }, - std::chrono::seconds(2)); - } - - checkPendingIM(); - std::weak_ptr<Call> callWkPtr = shared_from_this(); - runOnMainThread([callWkPtr] { - if (auto call = callWkPtr.lock()) - call->checkAudio(); - }); - - // if call just started ringing, schedule call timeout - if (type_ == CallType::INCOMING and cnx_state == ConnectionState::RINGING) { - auto timeout = Manager::instance().getRingingTimeout(); - JAMI_DBG("Scheduling call timeout in %d seconds", timeout); + runOnMainThread([callWkPtr] { + if (auto call = callWkPtr.lock()) + call->checkAudio(); + }); - std::weak_ptr<Call> callWkPtr = shared_from_this(); - Manager::instance().scheduler().scheduleIn( - [callWkPtr] { - if (auto callShPtr = callWkPtr.lock()) { - if (callShPtr->getConnectionState() == Call::ConnectionState::RINGING) { - JAMI_DBG( - "Call %s is still ringing after timeout, setting state to BUSY", - callShPtr->getCallId().c_str()); - callShPtr->hangup(PJSIP_SC_BUSY_HERE); - Manager::instance().callFailure(*callShPtr); + // if call just started ringing, schedule call timeout + if (type_ == CallType::INCOMING and cnx_state == ConnectionState::RINGING) { + auto timeout = Manager::instance().getRingingTimeout(); + JAMI_DBG("Scheduling call timeout in %d seconds", timeout); + + std::weak_ptr<Call> callWkPtr = shared_from_this(); + Manager::instance().scheduler().scheduleIn( + [callWkPtr] { + if (auto callShPtr = callWkPtr.lock()) { + if (callShPtr->getConnectionState() == Call::ConnectionState::RINGING) { + JAMI_DBG( + "Call %s is still ringing after timeout, setting state to BUSY", + callShPtr->getCallId().c_str()); + callShPtr->hangup(PJSIP_SC_BUSY_HERE); + Manager::instance().callFailure(*callShPtr); + } } - } - }, - std::chrono::seconds(timeout)); - } + }, + std::chrono::seconds(timeout)); + } - // kill pending subcalls at disconnect - if (call_state == CallState::OVER) - hangupCalls(safePopSubcalls(), 0); - }); + // kill pending subcalls at disconnect + if (call_state == CallState::OVER) + hangupCalls(safePopSubcalls(), 0); + }); time(×tamp_start_); account_.attachCall(id_); diff --git a/src/call.h b/src/call.h index a306144933fd462af808210c28daf9f4bb40be91..1488892028b9a269a6f0c9fbcc6de98a5d33d3b6 100644 --- a/src/call.h +++ b/src/call.h @@ -63,7 +63,6 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { public: using SubcallSet = std::set<std::shared_ptr<Call>, std::owner_less<std::shared_ptr<Call>>>; - using OnNeedFallbackCb = std::function<void()>; using OnReadyCb = std::function<void(bool)>; static const char* const DEFAULT_ID; @@ -272,8 +271,6 @@ public: return parent_ != nullptr; } - void setOnNeedFallback(OnNeedFallbackCb&& cb) { onNeedFallback_ = std::move(cb); } - public: // media management virtual bool toggleRecording(); @@ -412,10 +409,6 @@ private: ///< MultiDevice: message received by subcall to merged yet MsgList pendingInMessages_; - // If the call is blocked during the progressing state - OnNeedFallbackCb onNeedFallback_; - std::atomic_bool startFallback_ {true}; - mutable std::mutex confInfoMutex_ {}; mutable ConfInfo confInfo_ {}; }; diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 3821441f85bb79ea3381c521aa8e2deabd26273e..3ce567b2f2a7d3378dbb8deefdb4c9c2f73d8c68 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -508,160 +508,31 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: #endif dht::InfoHash peer_account(toUri); - auto sendDhtRequest = [this, wCall, toUri, peer_account](const std::string& deviceId) { + + auto sendRequest = [this, wCall, toUri](const std::string& deviceId) { auto call = wCall.lock(); if (not call) return; - JAMI_DBG("[call %s] calling device %s", call->getCallId().c_str(), deviceId.c_str()); - - auto& manager = Manager::instance(); - auto dev_call = manager.callFactory.newCall<SIPCall, JamiAccount>(*this, - manager.getNewCallID(), - Call::CallType::OUTGOING, - call->getDetails()); + auto state = call->getConnectionState(); + if (state > Call::ConnectionState::PROGRESSING) + return; - auto callId = dev_call->getCallId(); - auto onNegoDone = [callId, w = weak()](bool) { - runOnMainThread([callId, w]() { - if (auto shared = w.lock()) - shared->checkPendingCall(callId); - }); - }; + auto dev_call = Manager::instance().callFactory.newCall<SIPCall, JamiAccount>( + *this, Manager::instance().getNewCallID(), Call::CallType::OUTGOING, call->getDetails()); - std::weak_ptr<SIPCall> weak_dev_call = dev_call; - auto iceOptions = getIceOptions(); - iceOptions.onNegoDone = onNegoDone; dev_call->setIPToIP(true); dev_call->setSecure(isTlsEnabled()); - auto ice = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), - ICE_COMPONENTS, - true, - iceOptions); - if (not ice) { - JAMI_WARN("[call %s] Can't create ICE", call->getCallId().c_str()); - dev_call->removeCall(); - return; - } - - iceOptions.tcpEnable = true; - auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(), - ICE_COMPONENTS, - true, - iceOptions); - if (not ice_tcp) { - JAMI_WARN("Can't create ICE over TCP, will only use UDP"); - } + dev_call->setState(Call::ConnectionState::TRYING); call->addSubCall(*dev_call); + { + std::lock_guard<std::mutex> lk(pendingCallsMutex_); + pendingCalls_[deviceId].emplace_back(dev_call); + } - manager.addTask([w = weak(), weak_dev_call, ice, ice_tcp, deviceId, toUri, peer_account] { - auto sthis = w.lock(); - if (not sthis) { - dht::ThreadPool::io().run([ice = std::move(ice), ice_tcp = std::move(ice_tcp)]() {}); - return false; - } - auto call = weak_dev_call.lock(); - - // call aborted? - if (not call) { - dht::ThreadPool::io().run([ice = std::move(ice), ice_tcp = std::move(ice_tcp)]() {}); - return false; - } - - if (ice->isFailed()) { - JAMI_ERR("[call:%s] ice init failed", call->getCallId().c_str()); - call->onFailure(EIO); - dht::ThreadPool::io().run([ice = std::move(ice), ice_tcp = std::move(ice_tcp)]() {}); - return false; - } - - if (ice_tcp && ice_tcp->isFailed()) { - JAMI_WARN("[call:%s] ice tcp init failed, will only use UDP", - call->getCallId().c_str()); - } - - // Loop until ICE transport is initialized. - // Note: we suppose that ICE init routine has a an internal timeout (bounded in time) - // and we let upper layers decide when the call shall be aborded (our first check upper). - if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized())) - return true; - - sthis->registerDhtAddress(*ice); - if (ice_tcp) - sthis->registerDhtAddress(*ice_tcp); - // Next step: sent the ICE data to peer through DHT - const dht::Value::Id callvid = ValueIdDist()(sthis->rand); - const auto callkey = dht::InfoHash::get("callto:" + deviceId); - auto blob = ice->packIceMsg(); - if (ice_tcp) { - auto ice_tcp_msg = ice_tcp->packIceMsg(2); - blob.insert(blob.end(), ice_tcp_msg.begin(), ice_tcp_msg.end()); - } - dht::Value val {dht::IceCandidates(callvid, blob)}; - - dht::InfoHash dev(deviceId); - sthis->dht_->putEncrypted(callkey, - dev, - std::move(val), - [weak_dev_call](bool ok) { // Put complete callback - if (!ok) { - JAMI_WARN("Can't put ICE descriptor on DHT"); - if (auto call = weak_dev_call.lock()) - call->onFailure(); - } else - JAMI_DBG("Successfully put ICE descriptor on DHT"); - }); - - auto listenKey = sthis->dht_->listen<dht::IceCandidates>( - callkey, [weak_dev_call, ice, ice_tcp, callvid, deviceId](dht::IceCandidates&& msg) { - if (msg.id != callvid or msg.from.toString() != deviceId) - return true; - auto call = weak_dev_call.lock(); - if (!call) - return false; - // remove unprintable characters - auto iceData = std::string(msg.ice_data.cbegin(), msg.ice_data.cend()); - iceData.erase(std::remove_if(iceData.begin(), - iceData.end(), - [](unsigned char c) { - return !std::isprint(c) && !std::isspace(c); - }), - iceData.end()); - JAMI_WARN("ICE request for call %s replied from DHT peer %s\nData: %s", - call->getCallId().c_str(), - deviceId.c_str(), - iceData.c_str()); - call->setState(Call::ConnectionState::PROGRESSING); - - auto udp_failed = true, tcp_failed = true; - initICE(msg.ice_data, ice, ice_tcp, udp_failed, tcp_failed); - if (udp_failed && tcp_failed) { - call->onFailure(); - return true; - } - return false; - }); - - std::lock_guard<std::mutex> lock(sthis->callsMutex_); - sthis->pendingCalls_ - .emplace(call->getCallId(), - PendingCall {std::chrono::steady_clock::now(), - std::move(ice), - std::move(ice_tcp), - weak_dev_call, - std::move(listenKey), - callkey, - dev, - peer_account, - tls::CertificateStore::instance().getCertificate(toUri)}); - - Manager::instance().scheduleTask( - [w, callId = call->getCallId()]() { - if (auto shared = w.lock()) - shared->checkPendingCall(callId); - }, - std::chrono::steady_clock::now() + ICE_NEGOTIATION_TIMEOUT); - return false; - }); + JAMI_WARN("[call %s] No channeled socket with this peer. Send request", + call->getCallId().c_str()); + // Else, ask for a channel (for future calls/text messages) + requestSIPConnection(toUri, deviceId); }; // Call connected devices @@ -708,6 +579,23 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: // and avoid to get an active call in a TRYING state. dev_call->setState(Call::ConnectionState::PROGRESSING); + { + std::lock_guard<std::mutex> lk(onConnectionClosedMtx_); + onConnectionClosed_[deviceConnIt->first] = sendRequest; + } + + call->addStateListener( + [w = weak(), + deviceId = deviceConnIt->first](Call::CallState, Call::ConnectionState state, int) { + if (state >= Call::ConnectionState::PROGRESSING) { + if (auto shared = w.lock()) { + JAMI_ERR("@@@ ERASE"); + std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_); + shared->onConnectionClosed_.erase(deviceId); + } + } + }); + auto remoted_address = it.channel->underlyingICE()->getRemoteAddress(ICE_COMP_SIP_TRANSPORT); try { onConnectedOutgoingCall(*dev_call, toUri, remoted_address); @@ -720,26 +608,16 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: continue; } devices.emplace(deviceConnIt->first); - - call->setOnNeedFallback( - [sendDhtRequest, deviceId = deviceConnIt->first]() { sendDhtRequest(deviceId); }); } // Find listening devices for this account accountManager_->forEachDevice( peer_account, - [this, toUri, devices, sendDhtRequest, callId = call->getCallId()]( - const dht::InfoHash& dev) { + [this, toUri, devices, call, sendRequest](const dht::InfoHash& dev) { // Test if already sent via a SIP transport if (devices.find(dev.toString()) != devices.end()) return; - - JAMI_WARN("[call %s] No channeled socket with this peer. Send request + DHT request", - callId.c_str()); - // Else, ask for a channel (for future calls/text messages) and send a DHT message - requestSIPConnection(toUri, dev.toString()); - - sendDhtRequest(dev.toString()); + sendRequest(dev.toString()); }, [wCall, dummyCall](bool ok) { // Mark the temp call as failed to stop the main call if necessary @@ -1618,8 +1496,8 @@ JamiAccount::checkPendingCall(const std::string& callId) // Note only one check at a time. In fact, the UDP and TCP negotiation // can finish at the same time and we need to avoid potential race conditions. std::lock_guard<std::mutex> lk(callsMutex_); - auto it = pendingCalls_.find(callId); - if (it == pendingCalls_.end()) + auto it = pendingCallsDht_.find(callId); + if (it == pendingCallsDht_.end()) return; bool incoming = !it->second.call_key; @@ -1636,7 +1514,7 @@ JamiAccount::checkPendingCall(const std::string& callId) // Cancel pending listen (outgoing call) dht_->cancelListen(it->second.call_key, std::move(it->second.listen_key)); } - pendingCalls_.erase(it); + pendingCallsDht_.erase(it); } } @@ -2535,16 +2413,16 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call, // Let the call handled by the PendingCall handler loop std::lock_guard<std::mutex> lock(callsMutex_); - pendingCalls_.emplace(call->getCallId(), - PendingCall {/*.start = */ started_time, - /*.ice_sp = */ udp_failed ? nullptr : ice, - /*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp, - /*.call = */ wcall, - /*.listen_key = */ {}, - /*.call_key = */ {}, - /*.from = */ peer_ice_msg.from, - /*.from_account = */ from_id, - /*.from_cert = */ from_cert}); + pendingCallsDht_.emplace(call->getCallId(), + PendingCall {/*.start = */ started_time, + /*.ice_sp = */ udp_failed ? nullptr : ice, + /*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp, + /*.call = */ wcall, + /*.listen_key = */ {}, + /*.call_key = */ {}, + /*.from = */ peer_ice_msg.from, + /*.from_account = */ from_id, + /*.from_cert = */ from_cert}); Manager::instance().scheduleTask( [w = weak(), callId = call->getCallId()]() { @@ -2573,10 +2451,15 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb) { std::lock_guard<std::mutex> lock(callsMutex_); - pendingCalls_.clear(); + pendingCallsDht_.clear(); pendingSipCalls_.clear(); } + { + std::lock_guard<std::mutex> lk(pendingCallsMutex_); + pendingCalls_.clear(); + } + dht_->join(); if (upnp_) @@ -3793,19 +3676,36 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, auto shared = w.lock(); if (!shared) return; - std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_); - auto& connections = shared->sipConnections_[peerId][deviceId]; - auto conn = connections.begin(); - while (conn != connections.end()) { - if (conn->channel == socket) - conn = connections.erase(conn); - else - conn++; + { + std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_); + auto& connections = shared->sipConnections_[peerId][deviceId]; + auto conn = connections.begin(); + while (conn != connections.end()) { + if (conn->channel == socket) + conn = connections.erase(conn); + else + conn++; + } + } + // The connection can be closed during the SIP initialization, so + // if this happens, the request should be re-sent to ask for a new + // SIP channel to make the call pass through + std::function<void(const std::string&)> cb; + { + std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_); + if (shared->onConnectionClosed_[deviceId]) { + cb = std::move(shared->onConnectionClosed_[deviceId]); + shared->onConnectionClosed_.erase(deviceId); + } + } + if (cb) { + JAMI_WARN("An outgoing call was in progress while shutdown, relaunch the request"); + cb(deviceId); } }; auto sip_tr = link_.sipTransportBroker->getChanneledTransport(socket, std::move(onShutdown)); // Store the connection - sipConnections_[peerId][deviceId].emplace_back(SipConnection {std::move(sip_tr), socket}); + sipConnections_[peerId][deviceId].emplace_back(SipConnection {sip_tr, socket}); JAMI_WARN("New SIP channel opened with %s", deviceId.c_str()); lk.unlock(); @@ -3813,6 +3713,30 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, // Retry messages messageEngine_.onPeerOnline(peerId); + + // Connect pending calls + std::vector<std::shared_ptr<SIPCall>> pc; + { + std::lock_guard<std::mutex> lk(pendingCallsMutex_); + pc = std::move(pendingCalls_[deviceId]); + } + for (auto& pendingCall : pc) { + pendingCall->setTransport(sip_tr); + pendingCall->setState(Call::ConnectionState::PROGRESSING); + if (auto ice = socket->underlyingICE()) { + auto remoted_address = ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT); + try { + onConnectedOutgoingCall(*pendingCall, peerId, remoted_address); + } catch (const VoipLinkException&) { + // In this case, the main scenario is that SIPStartCall failed because + // the ICE is dead and the TLS session didn't send any packet on that dead + // link (connectivity change, killed by the os, etc) + // Here, we don't need to do anything, the TLS will fail and will delete + // the cached transport + continue; + } + } + } } } // namespace jami diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index caca25bd87476f7fe6fa0258f15b2a4aa7d8f198..072cbdc1e92a81cedefe3f5d08a0150ff10a110e 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -602,7 +602,7 @@ private: /** * DHT calls waiting for ICE negotiation */ - std::map<std::string, PendingCall> pendingCalls_; + std::map<std::string, PendingCall> pendingCallsDht_; /** * Incoming DHT calls that are not yet actual SIP calls. @@ -729,6 +729,12 @@ private: std::set<std::pair<std::string /* accountId */, std::string /* deviceId */>> pendingSipConnections_ {}; + std::mutex pendingCallsMutex_; + std::map<std::string, std::vector<std::shared_ptr<SIPCall>>> pendingCalls_; + + std::mutex onConnectionClosedMtx_ {}; + std::map<std::string, std::function<void(const std::string&)>> onConnectionClosed_ {}; + /** * Ask a device to open a channeled SIP socket * @param peerId The contact who owns the device