diff --git a/tests/connectionManager.cpp b/tests/connectionManager.cpp index 83914425824be5b2b31ddc0785b008761ffcfdfa..0bbe5bfd648ee6bff152e1c52bef32897d0a963f 100644 --- a/tests/connectionManager.cpp +++ b/tests/connectionManager.cpp @@ -83,6 +83,7 @@ private: void testConnectDevice(); void testAcceptConnection(); + void testManyChannels(); void testMultipleChannels(); void testMultipleChannelsOneDeclined(); void testMultipleChannelsSameName(); @@ -109,6 +110,7 @@ private: CPPUNIT_TEST(testIsConnecting); CPPUNIT_TEST(testAcceptConnection); CPPUNIT_TEST(testDeclineConnection); + CPPUNIT_TEST(testManyChannels); CPPUNIT_TEST(testMultipleChannels); CPPUNIT_TEST(testMultipleChannelsOneDeclined); CPPUNIT_TEST(testMultipleChannelsSameName); @@ -332,6 +334,140 @@ ConnectionManagerTest::testDeclineConnection() CPPUNIT_ASSERT(!receiverConnected); } + +void +ConnectionManagerTest::testManyChannels() +{ + bob->connectionManager->onICERequest([](const DeviceId&) { return true; }); + alice->connectionManager->onICERequest([](const DeviceId&) { return true; }); + + std::condition_variable cv; + size_t successfullyConnected = 0; + size_t accepted = 0; + size_t receiverConnected = 0; + size_t successfullyReceived = 0; + size_t shutdownCount = 0; + + auto acceptAll = [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) { + if (name.empty()) return false; + std::lock_guard<std::mutex> lk {mtx}; + accepted++; + cv.notify_one(); + return true; + }; + bob->connectionManager->onChannelRequest(acceptAll); + alice->connectionManager->onChannelRequest(acceptAll); + + auto onReady = [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) { + if (not socket or name.empty()) return; + if (socket->isInitiator()) + return; + socket->setOnRecv([rxbuf = std::make_shared<std::vector<uint8_t>>(), w = std::weak_ptr(socket)](const uint8_t* data, size_t size) { + rxbuf->insert(rxbuf->end(), data, data + size); + if (rxbuf->size() == 32) { + if (auto socket = w.lock()) { + std::error_code ec; + socket->write(rxbuf->data(), rxbuf->size(), ec); + CPPUNIT_ASSERT(!ec); + socket->shutdown(); + } + } + return size; + }); + std::lock_guard<std::mutex> lk {mtx}; + receiverConnected++; + cv.notify_one(); + }; + bob->connectionManager->onConnectionReady(onReady); + alice->connectionManager->onConnectionReady(onReady); + + // max supported number of channels per side (64k - 2 reserved channels) + static constexpr size_t N = 1024 * 32 - 1; + + auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) { + CPPUNIT_ASSERT(socket); + if (socket) { + std::lock_guard<std::mutex> lk {mtx}; + successfullyConnected++; + cv.notify_one(); + } + auto data_sent = dht::PkId::get(socket->name()); + socket->setOnRecv([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t>>()](const uint8_t* data, size_t size) { + rxbuf->insert(rxbuf->end(), data, data + size); + if (rxbuf->size() == 32) { + CPPUNIT_ASSERT(!std::memcmp(data_sent.data(), rxbuf->data(), data_sent.size())); + std::lock_guard<std::mutex> lk {mtx}; + successfullyReceived++; + cv.notify_one(); + } + return size; + }); + socket->onShutdown([&]() { + std::lock_guard<std::mutex> lk {mtx}; + shutdownCount++; + cv.notify_one(); + }); + std::error_code ec; + socket->write(data_sent.data(), data_sent.size(), ec); + CPPUNIT_ASSERT(!ec); + }; + + for (size_t i = 0; i < N; ++i) { + alice->connectionManager->connectDevice(bob->id.second, + fmt::format("git://{}", i+1), + onConnect); + + bob->connectionManager->connectDevice(alice->id.second, + fmt::format("sip://{}", i+1), + onConnect); + + if (i % 128 == 0) + std::this_thread::sleep_for(5ms); + } + + std::unique_lock<std::mutex> lk {mtx}; + cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 2; }); + CPPUNIT_ASSERT_EQUAL(N * 2, successfullyConnected); + cv.wait_for(lk, 30s, [&] { return accepted == N * 2; }); + CPPUNIT_ASSERT_EQUAL(N * 2, accepted); + cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 2; }); + CPPUNIT_ASSERT_EQUAL(N * 2, receiverConnected); + cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 2; }); + CPPUNIT_ASSERT_EQUAL(N * 2, successfullyReceived); + cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 2; }); + CPPUNIT_ASSERT_EQUAL(N * 2, shutdownCount); + lk.unlock(); + + // Wait a bit to let at least some channels shutdown + std::this_thread::sleep_for(10ms); + + // Second time to make sure we can re-use the channels after shutdown + for (size_t i = 0; i < N; ++i) { + alice->connectionManager->connectDevice(bob->id.second, + fmt::format("git://{}", N+i+1), + onConnect); + + bob->connectionManager->connectDevice(alice->id.second, + fmt::format("sip://{}", N+i+1), + onConnect); + + if (i % 128 == 0) + std::this_thread::sleep_for(5ms); + } + + lk.lock(); + cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 4; }); + CPPUNIT_ASSERT_EQUAL(N * 4, successfullyConnected); + cv.wait_for(lk, 30s, [&] { return accepted == N * 4; }); + CPPUNIT_ASSERT_EQUAL(N * 4, accepted); + cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 4; }); + CPPUNIT_ASSERT_EQUAL(N * 4, receiverConnected); + cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 4; }); + CPPUNIT_ASSERT_EQUAL(N * 4, successfullyReceived); + cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 4; }); + CPPUNIT_ASSERT_EQUAL(N * 4, shutdownCount); +} + void ConnectionManagerTest::testMultipleChannels() {