Skip to content
Snippets Groups Projects
Commit 8d787734 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

tests: add testManyChannels

Change-Id: I768da190ea9dc891aba059ab9b44664897a9fffe
parent 5fd233f0
No related branches found
No related tags found
No related merge requests found
...@@ -83,6 +83,7 @@ private: ...@@ -83,6 +83,7 @@ private:
void testConnectDevice(); void testConnectDevice();
void testAcceptConnection(); void testAcceptConnection();
void testManyChannels();
void testMultipleChannels(); void testMultipleChannels();
void testMultipleChannelsOneDeclined(); void testMultipleChannelsOneDeclined();
void testMultipleChannelsSameName(); void testMultipleChannelsSameName();
...@@ -109,6 +110,7 @@ private: ...@@ -109,6 +110,7 @@ private:
CPPUNIT_TEST(testIsConnecting); CPPUNIT_TEST(testIsConnecting);
CPPUNIT_TEST(testAcceptConnection); CPPUNIT_TEST(testAcceptConnection);
CPPUNIT_TEST(testDeclineConnection); CPPUNIT_TEST(testDeclineConnection);
CPPUNIT_TEST(testManyChannels);
CPPUNIT_TEST(testMultipleChannels); CPPUNIT_TEST(testMultipleChannels);
CPPUNIT_TEST(testMultipleChannelsOneDeclined); CPPUNIT_TEST(testMultipleChannelsOneDeclined);
CPPUNIT_TEST(testMultipleChannelsSameName); CPPUNIT_TEST(testMultipleChannelsSameName);
...@@ -332,6 +334,140 @@ ConnectionManagerTest::testDeclineConnection() ...@@ -332,6 +334,140 @@ ConnectionManagerTest::testDeclineConnection()
CPPUNIT_ASSERT(!receiverConnected); CPPUNIT_ASSERT(!receiverConnected);
} }
void
ConnectionManagerTest::testManyChannels()
{
bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
std::condition_variable cv;
size_t successfullyConnected = 0;
size_t accepted = 0;
size_t receiverConnected = 0;
size_t successfullyReceived = 0;
size_t shutdownCount = 0;
auto acceptAll = [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
if (name.empty()) return false;
std::lock_guard<std::mutex> lk {mtx};
accepted++;
cv.notify_one();
return true;
};
bob->connectionManager->onChannelRequest(acceptAll);
alice->connectionManager->onChannelRequest(acceptAll);
auto onReady = [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
if (not socket or name.empty()) return;
if (socket->isInitiator())
return;
socket->setOnRecv([rxbuf = std::make_shared<std::vector<uint8_t>>(), w = std::weak_ptr(socket)](const uint8_t* data, size_t size) {
rxbuf->insert(rxbuf->end(), data, data + size);
if (rxbuf->size() == 32) {
if (auto socket = w.lock()) {
std::error_code ec;
socket->write(rxbuf->data(), rxbuf->size(), ec);
CPPUNIT_ASSERT(!ec);
socket->shutdown();
}
}
return size;
});
std::lock_guard<std::mutex> lk {mtx};
receiverConnected++;
cv.notify_one();
};
bob->connectionManager->onConnectionReady(onReady);
alice->connectionManager->onConnectionReady(onReady);
// max supported number of channels per side (64k - 2 reserved channels)
static constexpr size_t N = 1024 * 32 - 1;
auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
CPPUNIT_ASSERT(socket);
if (socket) {
std::lock_guard<std::mutex> lk {mtx};
successfullyConnected++;
cv.notify_one();
}
auto data_sent = dht::PkId::get(socket->name());
socket->setOnRecv([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t>>()](const uint8_t* data, size_t size) {
rxbuf->insert(rxbuf->end(), data, data + size);
if (rxbuf->size() == 32) {
CPPUNIT_ASSERT(!std::memcmp(data_sent.data(), rxbuf->data(), data_sent.size()));
std::lock_guard<std::mutex> lk {mtx};
successfullyReceived++;
cv.notify_one();
}
return size;
});
socket->onShutdown([&]() {
std::lock_guard<std::mutex> lk {mtx};
shutdownCount++;
cv.notify_one();
});
std::error_code ec;
socket->write(data_sent.data(), data_sent.size(), ec);
CPPUNIT_ASSERT(!ec);
};
for (size_t i = 0; i < N; ++i) {
alice->connectionManager->connectDevice(bob->id.second,
fmt::format("git://{}", i+1),
onConnect);
bob->connectionManager->connectDevice(alice->id.second,
fmt::format("sip://{}", i+1),
onConnect);
if (i % 128 == 0)
std::this_thread::sleep_for(5ms);
}
std::unique_lock<std::mutex> lk {mtx};
cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 2; });
CPPUNIT_ASSERT_EQUAL(N * 2, successfullyConnected);
cv.wait_for(lk, 30s, [&] { return accepted == N * 2; });
CPPUNIT_ASSERT_EQUAL(N * 2, accepted);
cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 2; });
CPPUNIT_ASSERT_EQUAL(N * 2, receiverConnected);
cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 2; });
CPPUNIT_ASSERT_EQUAL(N * 2, successfullyReceived);
cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 2; });
CPPUNIT_ASSERT_EQUAL(N * 2, shutdownCount);
lk.unlock();
// Wait a bit to let at least some channels shutdown
std::this_thread::sleep_for(10ms);
// Second time to make sure we can re-use the channels after shutdown
for (size_t i = 0; i < N; ++i) {
alice->connectionManager->connectDevice(bob->id.second,
fmt::format("git://{}", N+i+1),
onConnect);
bob->connectionManager->connectDevice(alice->id.second,
fmt::format("sip://{}", N+i+1),
onConnect);
if (i % 128 == 0)
std::this_thread::sleep_for(5ms);
}
lk.lock();
cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 4; });
CPPUNIT_ASSERT_EQUAL(N * 4, successfullyConnected);
cv.wait_for(lk, 30s, [&] { return accepted == N * 4; });
CPPUNIT_ASSERT_EQUAL(N * 4, accepted);
cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 4; });
CPPUNIT_ASSERT_EQUAL(N * 4, receiverConnected);
cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 4; });
CPPUNIT_ASSERT_EQUAL(N * 4, successfullyReceived);
cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 4; });
CPPUNIT_ASSERT_EQUAL(N * 4, shutdownCount);
}
void void
ConnectionManagerTest::testMultipleChannels() ConnectionManagerTest::testMultipleChannels()
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment