diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 0abc7872185125d8e3a8ef678458173477e94fd6..72932db684006a980fe58dd8656f0d9cc63d5ef9 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -945,6 +945,13 @@ ConnectionManager::connectDevice(const DeviceId& deviceId, pimpl_->connectDevice(deviceId, name, std::move(cb)); } +bool +ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const +{ + auto pending = pimpl_->getPendingCallbacks(deviceId); + return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; }) != pending.end(); +} + void ConnectionManager::closeConnectionsWith(const DeviceId& deviceId) { diff --git a/src/jamidht/connectionmanager.h b/src/jamidht/connectionmanager.h index fe0fa33c5dc93b9273d1967fbedd569d601791cc..8549e2016f688047f9eefad57105312f91faafcb 100644 --- a/src/jamidht/connectionmanager.h +++ b/src/jamidht/connectionmanager.h @@ -87,6 +87,15 @@ public: */ void connectDevice(const DeviceId& deviceId, const std::string& name, ConnectCallback cb); + /** + * Check if we are already connecting to a device with a specific name + * @param deviceId Remote device + * @param name Name of the channel + * @return if connecting + * @note isConnecting is not true just after connectDevice() as connectDevice is full async + */ + bool isConnecting(const DeviceId& deviceId, const std::string& name) const; + /** * Close all connections with a current device * @param deviceId Remote device diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index cbf1c3a3460812d667fceeb60853555761cb3591..e47989b5405b981f4b6477ba63140a0180d77e78 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -124,6 +124,7 @@ struct TextMessageCtx DeviceId deviceId; uint64_t id; bool retryOnTimeout; + std::shared_ptr<ChannelSocket> channel; std::shared_ptr<PendingConfirmation> confirmation; }; @@ -3482,6 +3483,7 @@ JamiAccount::sendTextMessage(const std::string& to, continue; } auto& conn = value.back(); + auto& channel = conn.channel; // Set input token into callback std::unique_ptr<TextMessageCtx> ctx {std::make_unique<TextMessageCtx>()}; @@ -3490,6 +3492,7 @@ JamiAccount::sendTextMessage(const std::string& to, ctx->deviceId = key.second; ctx->id = token; ctx->retryOnTimeout = retryOnTimeout; + ctx->channel = channel; ctx->confirmation = confirm; try { @@ -3508,15 +3511,7 @@ JamiAccount::sendTextMessage(const std::string& to, acc->messageEngine_.onMessageSent(c->to, c->id, true); } else { JAMI_WARN("Timeout when send a message, close current connection"); - { - std::unique_lock<std::mutex> lk(acc->sipConnsMtx_); - acc->sipConns_.erase(std::make_pair(c->to, c->deviceId)); - } - { - std::lock_guard<std::mutex> lk(acc->connManagerMtx_); - if (acc->connectionManager_) - acc->connectionManager_->closeConnectionsWith(c->deviceId); - } + acc->shutdownSIPConnection(c->channel, c->to, c->deviceId); // This MUST be done after closing the connection to avoid race condition // with messageEngine_ acc->messageEngine_.onMessageSent(c->to, c->id, false); @@ -3536,8 +3531,9 @@ JamiAccount::sendTextMessage(const std::string& to, } catch (const std::runtime_error& ex) { JAMI_WARN("%s", ex.what()); messageEngine_.onMessageSent(to, token, false); + ++it; // Remove connection in incorrect state - it = sipConns_.erase(it); + shutdownSIPConnection(channel, to, key.second); continue; } @@ -4612,41 +4608,38 @@ JamiAccount::requestSIPConnection(const std::string& peerId, const DeviceId& dev deviceId.to_c_str()); return; } - sipConns_[id] = {}; // If not present, create it - JAMI_INFO("[Account %s] Ask %s for a new SIP channel", - getAccountID().c_str(), - deviceId.to_c_str()); std::lock_guard<std::mutex> lkCM(connManagerMtx_); if (!connectionManager_) return; + // Note, Even if we send 50 "sip" request, the connectionManager_ will only use one socket. + // however, this will still ask for multiple channels, so only ask + // if there is no pending request + if (connectionManager_->isConnecting(deviceId, "sip")) { + JAMI_INFO("[Account %s] Already connecting to %s", + getAccountID().c_str(), + deviceId.to_c_str()); + return; + } + JAMI_INFO("[Account %s] Ask %s for a new SIP channel", + getAccountID().c_str(), + deviceId.to_c_str()); connectionManager_->connectDevice(deviceId, "sip", [w = weak(), id](std::shared_ptr<ChannelSocket> socket, const DeviceId&) { - auto shared = w.lock(); - if (!shared) - return; - // NOTE: No need to cache Connection there. - // OnConnectionReady is called before this callback, so - // the socket is already cached if succeed. We just need - // to remove the pending request. - if (!socket) { - // If this is triggered, this means that the - // connectDevice didn't get any response from the DHT. - // Stop searching pending call. - shared->callConnectionClosed(id.second, true); - shared->forEachPendingCall(id.second, - [](const auto& pc) { - pc->onFailure(); - }); - } - - std::lock_guard<std::mutex> lk(shared->sipConnsMtx_); - auto it = shared->sipConns_.find(id); - if (it != shared->sipConns_.end() && it->second.empty()) { - shared->sipConns_.erase(it); - } + if (socket) return; + auto shared = w.lock(); + if (!shared) + return; + // If this is triggered, this means that the + // connectDevice didn't get any response from the DHT. + // Stop searching pending call. + shared->callConnectionClosed(id.second, true); + shared->forEachPendingCall(id.second, + [](const auto& pc) { + pc->onFailure(); + }); }); } @@ -4816,22 +4809,7 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, auto shared = w.lock(); if (!shared) return; - { - std::lock_guard<std::mutex> lk(shared->sipConnsMtx_); - auto it = shared->sipConns_.find(key); - if (it == shared->sipConns_.end()) - return; - auto& connections = it->second; - auto conn = connections.begin(); - while (conn != connections.end()) { - if (conn->channel == socket) - conn = connections.erase(conn); - else - conn++; - } - if (connections.empty()) - shared->sipConns_.erase(it); - } + shared->shutdownSIPConnection(socket, key.first, key.second); // The connection can be closed during the SIP initialization, so // if this happens, the request should be re-sent to ask for a new // SIP channel to make the call pass through @@ -4873,6 +4851,26 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, }); } +void +JamiAccount::shutdownSIPConnection(const std::shared_ptr<ChannelSocket>& channel, const std::string& peerId, const DeviceId& deviceId) +{ + std::unique_lock<std::mutex> lk(sipConnsMtx_); + SipConnectionKey key(peerId, deviceId); + auto it = sipConns_.find(key); + if (it != sipConns_.end()) { + auto& conns = it->second; + conns.erase(std::remove_if(conns.begin(), conns.end(), + [&](auto v) { + return v.channel == channel; + }), conns.end()); + if (conns.empty()) + sipConns_.erase(it); + } + lk.unlock(); + // Shutdown after removal to let the callbacks do stuff if needed + if (channel) channel->shutdown(); +} + std::string_view JamiAccount::currentDeviceId() const { diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 434e43f804c89a41a3572d51e0215546e2d843ca..14dab4237068b61c085662eb293012d87546bbd5 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -885,13 +885,23 @@ private: void requestSIPConnection(const std::string& peerId, const DeviceId& deviceId); /** * Store a new SIP connection into sipConnections_ - * @param socket The new sip channel + * @param channel The new sip channel * @param peerId The contact who owns the device * @param deviceId Device linked to that transport */ - void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, + void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& channel, const std::string& peerId, const DeviceId& deviceId); + /** + * Shutdown a SIP connection + * @param channel The channel to close + * @param peerId The contact who owns the device + * @param deviceId Device linked to that transport + */ + void shutdownSIPConnection(const std::shared_ptr<ChannelSocket>& channel, + const std::string& peerId, + const DeviceId& deviceId); + /** * Store a new Sync connection * @param socket The new sync channel diff --git a/test/unitTest/connectionManager/connectionManager.cpp b/test/unitTest/connectionManager/connectionManager.cpp index b1fc2e36485ca3342ff29cad3d4451f3cc14fe7f..69a939c924ce131e152007286475c401e3c54389 100644 --- a/test/unitTest/connectionManager/connectionManager.cpp +++ b/test/unitTest/connectionManager/connectionManager.cpp @@ -68,6 +68,7 @@ private: void testShutdownCallbacks(); void testFloodSocket(); void testDestroyWhileSending(); + void testIsConnecting(); CPPUNIT_TEST_SUITE(ConnectionManagerTest); CPPUNIT_TEST(testConnectDevice); @@ -84,6 +85,7 @@ private: CPPUNIT_TEST(testShutdownCallbacks); CPPUNIT_TEST(testFloodSocket); CPPUNIT_TEST(testDestroyWhileSending); + CPPUNIT_TEST(testIsConnecting); CPPUNIT_TEST_SUITE_END(); }; @@ -1000,6 +1002,52 @@ ConnectionManagerTest::testDestroyWhileSending() // No need to wait, immediately destroy, no segfault must occurs } +void +ConnectionManagerTest::testIsConnecting() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); + auto bobDeviceId = DeviceId(std::string(bobAccount->currentDeviceId())); + + bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; }); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + bool successfullyConnected = false, successfullyReceive = false; + + bobAccount->connectionManager().onChannelRequest( + [&](const DeviceId&, const std::string&) { + successfullyReceive = true; + cv.notify_one(); + std::this_thread::sleep_for(std::chrono::seconds(2)); + return true; + }); + + CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); + aliceAccount->connectionManager() + .connectDevice(bobDeviceId, + "sip", + [&cv, &successfullyConnected](std::shared_ptr<ChannelSocket> socket, + const DeviceId&) { + if (socket) { + successfullyConnected = true; + } + cv.notify_one(); + }); + // connectDevice is full async, so isConnecting will be true after a few ms. + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&] { + return successfullyReceive; + })); + CPPUNIT_ASSERT(aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { + return successfullyConnected; + })); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Just to wait for the callback to finish + CPPUNIT_ASSERT(!aliceAccount->connectionManager().isConnecting(bobDeviceId, "sip")); +} + } // namespace test } // namespace jami