diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 525aaedee9d5dd35d3bea2eb1437d9d4c2232783..944c5eed028992e4b6d18154580ee01e6b138177 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -550,7 +550,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: call->addSubCall(*dev_call); { std::lock_guard<std::mutex> lk(pendingCallsMutex_); - pendingCalls_[deviceId.toString()].emplace_back(dev_call); + pendingCalls_[deviceId].emplace_back(dev_call); } JAMI_WARN("[call %s] No channeled socket with this peer. Send request", @@ -601,15 +601,8 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: [w = weak(), deviceId = key.second](Call::CallState, Call::ConnectionState state, int) { if (state != Call::ConnectionState::PROGRESSING and state != Call::ConnectionState::TRYING) { - if (auto shared = w.lock()) { - std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_); - auto it = shared->onConnectionClosed_.find(deviceId); - if (it != shared->onConnectionClosed_.end()) { - if (it->second) - it->second(dht::InfoHash(deviceId), true); - shared->onConnectionClosed_.erase(it); - } - } + if (auto shared = w.lock()) + shared->callConnectionClosed(deviceId, true); } }); @@ -631,10 +624,14 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: // Find listening devices for this account accountManager_->forEachDevice( peer_account, - [this, toUri, devices = std::move(devices), call, sendRequest](const dht::InfoHash& dev) { + [this, devices = std::move(devices), sendRequest](const DeviceId& dev) { // Test if already sent via a SIP transport if (devices.find(dev) != devices.end()) return; + { + std::lock_guard<std::mutex> lk(onConnectionClosedMtx_); + onConnectionClosed_[dev] = sendRequest; + } sendRequest(dev, false); }, [wCall](bool ok) { @@ -1765,6 +1762,20 @@ JamiAccount::handlePendingCall(PendingCall& pc, bool incoming) return true; } +void +JamiAccount::forEachPendingCall(const DeviceId& deviceId, + const std::function<void(const std::shared_ptr<SIPCall>&)>& cb) +{ + std::vector<std::shared_ptr<SIPCall>> pc; + { + std::lock_guard<std::mutex> lk(pendingCallsMutex_); + pc = std::move(pendingCalls_[deviceId]); + } + for (const auto& pendingCall : pc) { + cb(pendingCall); + } +} + void JamiAccount::registerAsyncOps() { @@ -3501,7 +3512,23 @@ JamiAccount::cacheTurnServers() } void -JamiAccount::requestSIPConnection(const std::string& peerId, const dht::InfoHash& deviceId) +JamiAccount::callConnectionClosed(const DeviceId& deviceId, bool eraseDummy) +{ + std::function<void(const dht::InfoHash&, bool)> cb; + { + std::lock_guard<std::mutex> lk(onConnectionClosedMtx_); + auto it = onConnectionClosed_.find(deviceId); + if (it != onConnectionClosed_.end()) { + cb = std::move(it->second); + onConnectionClosed_.erase(it); + } + } + if (cb) + cb(deviceId, eraseDummy); +} + +void +JamiAccount::requestSIPConnection(const std::string& peerId, const DeviceId& deviceId) { // If a connection already exists or is in progress, no need to do this std::lock_guard<std::mutex> lk(sipConnsMtx_); @@ -3515,22 +3542,29 @@ JamiAccount::requestSIPConnection(const std::string& peerId, const dht::InfoHash JAMI_INFO("Ask %s for a new SIP channel", deviceId.to_c_str()); if (!connectionManager_) return; - connectionManager_->connectDevice(deviceId, - "sip", - [w = weak(), id](std::shared_ptr<ChannelSocket>) { - auto shared = w.lock(); - if (!shared) - return; - // NOTE: No need to cache Connection there. - // OnConnectionReady is called before this callback, so - // the socket is already cached if succeed. We just need - // to remove the pending request. - std::lock_guard<std::mutex> lk(shared->sipConnsMtx_); - auto it = shared->sipConns_.find(id); - if (it != shared->sipConns_.end() && it->second.empty()) { - shared->sipConns_.erase(it); - } - }); + connectionManager_ + ->connectDevice(deviceId, "sip", [w = weak(), id](std::shared_ptr<ChannelSocket> socket) { + auto shared = w.lock(); + if (!shared) + return; + // NOTE: No need to cache Connection there. + // OnConnectionReady is called before this callback, so + // the socket is already cached if succeed. We just need + // to remove the pending request. + if (!socket) { + // If this is triggered, this means that the connectDevice + // didn't get any response from the DHT. + // Stop searching pending call. + shared->callConnectionClosed(id.second, true); + shared->forEachPendingCall(id.second, [](const auto& pc) { pc->onFailure(); }); + } + + std::lock_guard<std::mutex> lk(shared->sipConnsMtx_); + auto it = shared->sipConns_.find(id); + if (it != shared->sipConns_.end() && it->second.empty()) { + shared->sipConns_.erase(it); + } + }); } bool @@ -3676,9 +3710,8 @@ JamiAccount::sendProfile(const std::string& deviceId) void JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const std::string& peerId, - const dht::InfoHash& deviceId) + const DeviceId& deviceId) { - auto deviceIdStr = deviceId.toString(); std::unique_lock<std::mutex> lk(sipConnsMtx_); // Verify that the connection is not already cached SipConnectionKey key(peerId, deviceId); @@ -3719,18 +3752,7 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, // The connection can be closed during the SIP initialization, so // if this happens, the request should be re-sent to ask for a new // SIP channel to make the call pass through - std::function<void(const dht::InfoHash&, bool)> cb; - { - std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_); - if (shared->onConnectionClosed_[key.second]) { - cb = std::move(shared->onConnectionClosed_[key.second]); - shared->onConnectionClosed_.erase(key.second); - } - } - if (cb) { - JAMI_WARN("An outgoing call was in progress while shutdown, relaunch the request"); - cb(key.second, false); - } + shared->callConnectionClosed(key.second, false); }; auto sip_tr = link_.sipTransportBroker->getChanneledTransport(socket, std::move(onShutdown)); // Store the connection @@ -3738,37 +3760,31 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, JAMI_WARN("New SIP channel opened with %s", deviceId.to_c_str()); lk.unlock(); - sendProfile(deviceIdStr); + sendProfile(deviceId.toString()); // Retry messages messageEngine_.onPeerOnline(peerId); // Connect pending calls - std::vector<std::shared_ptr<SIPCall>> pc; - { - std::lock_guard<std::mutex> lk(pendingCallsMutex_); - pc = std::move(pendingCalls_[deviceIdStr]); - } - for (auto& pendingCall : pc) { - if (pendingCall->getConnectionState() != Call::ConnectionState::TRYING - and pendingCall->getConnectionState() != Call::ConnectionState::PROGRESSING) - continue; - pendingCall->setTransport(sip_tr); - pendingCall->setState(Call::ConnectionState::PROGRESSING); + forEachPendingCall(deviceId, [&](const auto& pc) { + if (pc->getConnectionState() != Call::ConnectionState::TRYING + and pc->getConnectionState() != Call::ConnectionState::PROGRESSING) + return; + pc->setTransport(sip_tr); + pc->setState(Call::ConnectionState::PROGRESSING); if (auto ice = socket->underlyingICE()) { auto remoted_address = ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT); try { - onConnectedOutgoingCall(pendingCall, peerId, remoted_address); + onConnectedOutgoingCall(pc, peerId, remoted_address); } catch (const VoipLinkException&) { // In this case, the main scenario is that SIPStartCall failed because // the ICE is dead and the TLS session didn't send any packet on that dead // link (connectivity change, killed by the os, etc) // Here, we don't need to do anything, the TLS will fail and will delete // the cached transport - continue; } } - } + }); } } // namespace jami diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 91c3af9356d673b29070242dd835a1cb45b9b4d0..2c7a97427bacf76ce7397ca2303c897ae5c779e0 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -253,13 +253,11 @@ public: #ifndef _MSC_VER template<class T = SIPCall> std::shared_ptr<enable_if_base_of<T, SIPCall>> newOutgoingCall( - std::string_view toUrl, - const std::map<std::string, std::string>& volatileCallDetails = {}); + std::string_view toUrl, const std::map<std::string, std::string>& volatileCallDetails = {}); #else template<class T> std::shared_ptr<T> newOutgoingCall( - std::string_view toUrl, - const std::map<std::string, std::string>& volatileCallDetails = {}); + std::string_view toUrl, const std::map<std::string, std::string>& volatileCallDetails = {}); #endif /** @@ -532,6 +530,8 @@ private: void checkPendingCall(const std::string& callId); bool handlePendingCall(PendingCall& pc, bool incoming); + void forEachPendingCall(const DeviceId& deviceId, + const std::function<void(const std::shared_ptr<SIPCall>&)>& cb); void loadAccount(const std::string& archive_password = {}, const std::string& archive_pin = {}, @@ -728,10 +728,17 @@ private: std::map<SipConnectionKey, std::vector<SipConnection>> sipConns_; std::mutex pendingCallsMutex_; - std::map<std::string, std::vector<std::shared_ptr<SIPCall>>> pendingCalls_; + std::map<DeviceId, std::vector<std::shared_ptr<SIPCall>>> pendingCalls_; std::mutex onConnectionClosedMtx_ {}; std::map<DeviceId, std::function<void(const DeviceId&, bool)>> onConnectionClosed_ {}; + /** + * onConnectionClosed contains callbacks that need to be called if a sub call is failing + * @param deviceId The device we are calling + * @param eraseDummy Erase the dummy call (a temporary subcall that must be stop when we will + * not create new subcalls) + */ + void callConnectionClosed(const DeviceId& deviceId, bool eraseDummy); /** * Ask a device to open a channeled SIP socket diff --git a/test/unitTest/call/call.cpp b/test/unitTest/call/call.cpp index 5f1446cc544aa75c2e49d0d3b8cb7a76aa55a8b7..857e075adba1ad6960346af11b3c39fa2558b237 100644 --- a/test/unitTest/call/call.cpp +++ b/test/unitTest/call/call.cpp @@ -56,10 +56,12 @@ public: private: void testCall(); void testCachedCall(); + void testStopSearching(); CPPUNIT_TEST_SUITE(CallTest); CPPUNIT_TEST(testCall); CPPUNIT_TEST(testCachedCall); + CPPUNIT_TEST(testStopSearching); CPPUNIT_TEST_SUITE_END(); }; @@ -247,6 +249,44 @@ CallTest::testCachedCall() CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return callStopped == 2; })); } +void +CallTest::testStopSearching() +{ + JAMI_INFO("Waiting...."); + // TODO remove. This sleeps is because it take some time for the DHT to be connected + // and account announced + std::this_thread::sleep_for(std::chrono::seconds(5)); + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); + auto bobUri = bobAccount->getAccountDetails()[ConfProperties::USERNAME]; + auto aliceUri = aliceAccount->getAccountDetails()[ConfProperties::USERNAME]; + + Manager::instance().sendRegister(bobId, false); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + std::atomic_bool callStopped {false}; + // Watch signals + confHandlers.insert(DRing::exportable_callback<DRing::CallSignal::StateChange>( + [&](const std::string&, const std::string& state, signed) { + if (state == "OVER") { + callStopped = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + JAMI_INFO("Start call between alice and Bob"); + auto call = aliceAccount->newOutgoingCall(bobUri, {}); + + // Bob not there, so we should get a SEARCHING STATUS + JAMI_INFO("Wait OVER state"); + // Then wait for the DHT no answer. this can take some times + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return callStopped.load(); })); +} + } // namespace test } // namespace jami