Skip to content
Snippets Groups Projects
Commit 591cdc17 authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud
Browse files

connectionManager: add some tests

Change-Id: I30883d1d43dd1c0c36be424b0937ecccb2100928
parent 6f0a3f8a
Branches
No related tags found
No related merge requests found
......@@ -66,6 +66,8 @@ private:
void testChannelSenderShutdown();
void testCloseConnectionWithDevice();
void testShutdownCallbacks();
void testFloodSocket();
void testDestroyWhileSending();
CPPUNIT_TEST_SUITE(ConnectionManagerTest);
CPPUNIT_TEST(testConnectDevice);
......@@ -80,6 +82,8 @@ private:
CPPUNIT_TEST(testChannelSenderShutdown);
CPPUNIT_TEST(testCloseConnectionWithDevice);
CPPUNIT_TEST(testShutdownCallbacks);
CPPUNIT_TEST(testFloodSocket);
CPPUNIT_TEST(testDestroyWhileSending);
CPPUNIT_TEST_SUITE_END();
};
......@@ -142,8 +146,7 @@ ConnectionManagerTest::tearDown()
std::unique_lock<std::mutex> lk {mtx};
std::condition_variable cv;
auto currentAccSize = Manager::instance().getAccountList().size();
confHandlers.insert(
DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&]() {
confHandlers.insert(DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&] {
if (Manager::instance().getAccountList().size() <= currentAccSize - 2) {
cv.notify_one();
}
......@@ -190,10 +193,9 @@ ConnectionManagerTest::testConnectDevice()
}
cv.notify_one();
});
cvReceive.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyReceive);
cv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(
cvReceive.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyReceive; }));
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] { return successfullyConnected; }));
}
void
......@@ -236,10 +238,9 @@ ConnectionManagerTest::testAcceptConnection()
cv.notify_one();
});
cv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(receiverConnected);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
return successfullyReceive && successfullyConnected && receiverConnected;
}));
}
void
......@@ -290,11 +291,9 @@ ConnectionManagerTest::testMultipleChannels()
cv.notify_one();
});
cv.wait_for(lk, std::chrono::seconds(30));
cv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(successfullyConnected2);
CPPUNIT_ASSERT(receiverConnected == 2);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
}));
}
void
......@@ -346,11 +345,9 @@ ConnectionManagerTest::testMultipleChannelsSameName()
cv.notify_one();
});
cv.wait_for(lk, std::chrono::seconds(30));
cv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(successfullyConnected2);
CPPUNIT_ASSERT(receiverConnected == 2);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
}));
}
void
......@@ -422,14 +419,10 @@ ConnectionManagerTest::testSendReceiveData()
cv.notify_one();
});
auto expiration = std::chrono::system_clock::now() + std::chrono::seconds(10);
cv.wait_until(lk, expiration, [&events]() { return events == 4; });
CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(successfullyConnected2);
CPPUNIT_ASSERT(receiverConnected);
CPPUNIT_ASSERT(dataOk);
CPPUNIT_ASSERT(dataOk2);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
return events == 4 && successfullyReceive && successfullyConnected && successfullyConnected2
&& dataOk && dataOk2;
}));
}
void
......@@ -518,10 +511,9 @@ ConnectionManagerTest::testAcceptsICERequest()
cv.notify_one();
});
cv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(receiverConnected);
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyReceive && successfullyConnected && receiverConnected;
}));
}
void
......@@ -605,7 +597,7 @@ ConnectionManagerTest::testChannelRcvShutdown()
"git://*",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
socket->onShutdown([&]() {
socket->onShutdown([&] {
shutdownReceived = true;
scv.notify_one();
});
......@@ -648,7 +640,7 @@ ConnectionManagerTest::testChannelSenderShutdown()
bobAccount->connectionManager().onConnectionReady(
[&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
if (socket) {
socket->onShutdown([&]() {
socket->onShutdown([&] {
shutdownReceived = true;
scv.notify_one();
});
......@@ -701,7 +693,7 @@ ConnectionManagerTest::testCloseConnectionWithDevice()
bobAccount->connectionManager().onConnectionReady(
[&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
if (socket) {
socket->onShutdown([&]() {
socket->onShutdown([&] {
events += 1;
scv.notify_one();
});
......@@ -713,7 +705,7 @@ ConnectionManagerTest::testCloseConnectionWithDevice()
"git://*",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
socket->onShutdown([&]() {
socket->onShutdown([&] {
events += 1;
scv.notify_one();
});
......@@ -725,12 +717,9 @@ ConnectionManagerTest::testCloseConnectionWithDevice()
rcv.wait_for(lk, std::chrono::seconds(30));
// This should trigger onShutdown
aliceAccount->connectionManager().closeConnectionsWith(bobDeviceId);
auto expiration = std::chrono::system_clock::now() + std::chrono::seconds(10);
scv.wait_until(lk, expiration, [&events]() { return events == 2; });
CPPUNIT_ASSERT(events == 2);
CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(receiverConnected);
CPPUNIT_ASSERT(scv.wait_for(lk, std::chrono::seconds(60), [&] {
return events == 2 && successfullyReceive && successfullyConnected && receiverConnected;
}));
}
void
......@@ -777,10 +766,9 @@ ConnectionManagerTest::testShutdownCallbacks()
}
});
// Connect first channel. This will initiate a mx sock
rcv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(successfullyReceive);
CPPUNIT_ASSERT(successfullyConnected);
CPPUNIT_ASSERT(receiverConnected);
CPPUNIT_ASSERT(rcv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyReceive && successfullyConnected && receiverConnected;
}));
// Connect another channel, but close the connection
bool channel2NotConnected = false;
......@@ -794,8 +782,199 @@ ConnectionManagerTest::testShutdownCallbacks()
// This should trigger onShutdown for second callback
bobAccount->connectionManager().closeConnectionsWith(aliceDeviceId);
rcv.wait_for(lk, std::chrono::seconds(30));
CPPUNIT_ASSERT(channel2NotConnected);
CPPUNIT_ASSERT(rcv.wait_for(lk, std::chrono::seconds(30), [&] { return channel2NotConnected; }));
}
void
ConnectionManagerTest::testFloodSocket()
{
auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
auto bobDeviceId = DeviceId(bobAccount->getAccountDetails()[ConfProperties::RING_DEVICE_ID]);
bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
std::mutex mtx;
std::unique_lock<std::mutex> lk {mtx};
std::condition_variable cv;
bool successfullyConnected = false;
bool successfullyReceive = false;
bool receiverConnected = false;
std::shared_ptr<ChannelSocket> rcvSock1, rcvSock2, rcvSock3, sendSock, sendSock2, sendSock3;
bobAccount->connectionManager().onChannelRequest(
[&successfullyReceive](const DeviceId&, const std::string& name) {
successfullyReceive = name == "1";
return true;
});
bobAccount->connectionManager().onConnectionReady(
[&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
receiverConnected = socket != nullptr;
if (name == "1")
rcvSock1 = socket;
else if (name == "2")
rcvSock2 = socket;
else if (name == "3")
rcvSock3 = socket;
});
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"1",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
sendSock = socket;
successfullyConnected = true;
}
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyReceive && successfullyConnected && receiverConnected;
}));
CPPUNIT_ASSERT(receiverConnected);
successfullyConnected = false;
receiverConnected = false;
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"2",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
sendSock2 = socket;
successfullyConnected = true;
}
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyConnected && receiverConnected;
}));
successfullyConnected = false;
receiverConnected = false;
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"3",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
sendSock3 = socket;
successfullyConnected = true;
}
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyConnected && receiverConnected;
}));
std::mutex mtxRcv {};
std::string alphabet, shouldRcv, rcv1, rcv2, rcv3;
for (int i = 0; i < 100; ++i)
alphabet += "QWERTYUIOPASDFGHJKLZXCVBNM";
rcvSock1->setOnRecv([&](const uint8_t* buf, size_t len) {
rcv1 += std::string(buf, buf + len);
return len;
});
rcvSock2->setOnRecv([&](const uint8_t* buf, size_t len) {
rcv2 += std::string(buf, buf + len);
return len;
});
rcvSock3->setOnRecv([&](const uint8_t* buf, size_t len) {
rcv3 += std::string(buf, buf + len);
return len;
});
for (uint64_t i = 0; i < alphabet.size(); ++i) {
auto send = std::string(8000, alphabet[i]);
shouldRcv += send;
std::error_code ec;
sendSock->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
sendSock2->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
CPPUNIT_ASSERT(!ec);
}
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
return shouldRcv == rcv1 && shouldRcv == rcv2 && shouldRcv == rcv3;
}));
}
void
ConnectionManagerTest::testDestroyWhileSending()
{
// Same as test before, but destroy the accounts while sending.
// This test if a segfault occurs
auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
auto bobDeviceId = DeviceId(bobAccount->getAccountDetails()[ConfProperties::RING_DEVICE_ID]);
bobAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
aliceAccount->connectionManager().onICERequest([](const DeviceId&) { return true; });
std::mutex mtx;
std::unique_lock<std::mutex> lk {mtx};
std::condition_variable cv;
bool successfullyConnected = false;
bool successfullyReceive = false;
bool receiverConnected = false;
std::shared_ptr<ChannelSocket> rcvSock1, rcvSock2, rcvSock3, sendSock, sendSock2, sendSock3;
bobAccount->connectionManager().onChannelRequest(
[&successfullyReceive](const DeviceId&, const std::string& name) {
successfullyReceive = name == "1";
return true;
});
bobAccount->connectionManager().onConnectionReady(
[&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
receiverConnected = socket != nullptr;
if (name == "1")
rcvSock1 = socket;
else if (name == "2")
rcvSock2 = socket;
else if (name == "3")
rcvSock3 = socket;
});
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"1",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
sendSock = socket;
successfullyConnected = true;
}
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyReceive && successfullyConnected && receiverConnected;
}));
successfullyConnected = false;
receiverConnected = false;
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"2",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
sendSock2 = socket;
successfullyConnected = true;
}
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyConnected && receiverConnected;
}));
successfullyConnected = false;
receiverConnected = false;
aliceAccount->connectionManager().connectDevice(bobDeviceId,
"3",
[&](std::shared_ptr<ChannelSocket> socket) {
if (socket) {
sendSock3 = socket;
successfullyConnected = true;
}
cv.notify_one();
});
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return successfullyConnected && receiverConnected;
}));
std::mutex mtxRcv {};
std::string alphabet;
for (int i = 0; i < 100; ++i)
alphabet += "QWERTYUIOPASDFGHJKLZXCVBNM";
rcvSock1->setOnRecv([&](const uint8_t* buf, size_t len) { return len; });
rcvSock2->setOnRecv([&](const uint8_t* buf, size_t len) { return len; });
rcvSock3->setOnRecv([&](const uint8_t* buf, size_t len) { return len; });
for (uint64_t i = 0; i < alphabet.size(); ++i) {
auto send = std::string(8000, alphabet[i]);
std::error_code ec;
sendSock->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
sendSock2->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
CPPUNIT_ASSERT(!ec);
}
// No need to wait, immediately destroy, no segfault must occurs
}
} // namespace test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment