Skip to content
Snippets Groups Projects
Commit 35e4cfce authored by Sébastien Blin's avatar Sébastien Blin
Browse files

multiplexed_socket: fix race condition on acceptation

Because some callbacks can take a long time (accepting a SIP channel
will starts to sync profiles, ask for new channel, etc), control packets
are handled in their own thread. However, if the peer accepts a request,
it can starts to use it and send data on it. When finished, it will close
the socket and both peer will remove the channel.
In some case, the channel can be removed before
MultiplexedSocket::Impl::onAccept, causing upper layers to handle a
bad channel. In this patch, we removes the channel only when upper layers
know that the channel is accepted and ready to use. Also, onAccept()
doesn't create wrong channels (which was a bug, where isInitiator() was
in the wrong state).

GitLab: #659
Change-Id: I4f45feacc2624ce0135ace09f7e0188b56fcf81f
parent 81d75c92
No related branches found
No related tags found
No related merge requests found
......@@ -258,10 +258,20 @@ MultiplexedSocket::Impl::onAccept(const std::string& name, uint16_t channel)
{
std::lock_guard<std::mutex> lkSockets(socketsMutex);
auto& socket = sockets[channel];
if (!socket)
socket = makeSocket(name, channel);
if (!socket) {
JAMI_ERR() << "Receiving an answer for a non existing channel. This is a bug.";
return;
}
onChannelReady_(deviceId, socket);
socket->ready();
// Due to the callbacks that can take some time, onAccept can arrive after
// receiving all the data. In this case, the socket should be removed here
// as handle by onChannelReady_
if (socket->isRemovable())
sockets.erase(channel);
else
socket->answered();
}
void
......@@ -433,7 +443,11 @@ MultiplexedSocket::Impl::handleChannelPacket(uint16_t channel, std::vector<uint8
if (channel > 0 && sockIt->second) {
if (pkt.size() == 0) {
sockIt->second->stop();
sockets.erase(sockIt);
if (sockIt->second->isAnswered())
sockets.erase(sockIt);
else
sockIt->second->removable(); // This means that onAccept didn't happen yet, will be
// removed later.
} else {
sockIt->second->onRecv(std::move(pkt));
}
......@@ -517,7 +531,8 @@ MultiplexedSocket::addChannel(const std::string& name)
if (c == CONTROL_CHANNEL || c == PROTOCOL_CHANNEL
|| pimpl_->sockets.find(c) != pimpl_->sockets.end())
continue;
return pimpl_->makeSocket(name, c, true);
auto channel = pimpl_->makeSocket(name, c, true);
return channel;
}
return {};
}
......@@ -732,6 +747,9 @@ public:
std::weak_ptr<MultiplexedSocket> endpoint {};
bool isInitiator_ {false};
bool isAnswered_ {false};
bool isRemovable_ {false};
std::vector<uint8_t> buf {};
std::mutex mutex {};
std::condition_variable cv {};
......@@ -839,6 +857,30 @@ ChannelSocket::underlyingSocket() const
}
#endif
void
ChannelSocket::answered()
{
pimpl_->isAnswered_ = true;
}
void
ChannelSocket::removable()
{
pimpl_->isRemovable_ = true;
}
bool
ChannelSocket::isRemovable() const
{
return pimpl_->isRemovable_;
}
bool
ChannelSocket::isAnswered() const
{
return pimpl_->isAnswered_;
}
void
ChannelSocket::ready()
{
......
......@@ -28,8 +28,10 @@ class ChannelSocket;
class TlsSocketEndpoint;
using DeviceId = dht::PkId;
using OnConnectionRequestCb = std::function<
bool(const std::shared_ptr<dht::crypto::Certificate>& /* peer */, const uint16_t& /* id */, const std::string& /* name */)>;
using OnConnectionRequestCb
= std::function<bool(const std::shared_ptr<dht::crypto::Certificate>& /* peer */,
const uint16_t& /* id */,
const std::string& /* name */)>;
using OnConnectionReadyCb
= std::function<void(const DeviceId& /* deviceId */, const std::shared_ptr<ChannelSocket>&)>;
using ChannelReadyCb = std::function<void(void)>;
......@@ -247,6 +249,14 @@ public:
std::shared_ptr<MultiplexedSocket> underlyingSocket() const;
#endif
// Note: When a channel is accepted, it can receives data ASAP and when finished will be removed
// however, onAccept is it's own thread due to the callbacks. In this case, the channel must be
// deleted in the onAccept.
void answered();
bool isAnswered() const;
void removable();
bool isRemovable() const;
private:
class Impl;
std::unique_ptr<Impl> pimpl_;
......
......@@ -101,6 +101,8 @@ SyncHistoryTest::setUp()
void
SyncHistoryTest::tearDown()
{
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
if (alice2Id.empty()) {
wait_for_removal_of({aliceId, bobId});
} else {
......@@ -117,7 +119,6 @@ SyncHistoryTest::testCreateConversationThenSync()
// Now create alice2
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
std::map<std::string, std::string> details = DRing::getAccountTemplate("RING");
details[ConfProperties::TYPE] = "RING";
......@@ -155,7 +156,6 @@ SyncHistoryTest::testCreateConversationThenSync()
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return alice2Ready && conversationReady;
}));
std::remove(aliceArchive.c_str());
DRing::unregisterSignalHandlers();
}
......@@ -166,7 +166,6 @@ SyncHistoryTest::testCreateConversationWithOnlineDevice()
// Now create alice2
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
std::map<std::string, std::string> details = DRing::getAccountTemplate("RING");
details[ConfProperties::TYPE] = "RING";
......@@ -208,7 +207,6 @@ SyncHistoryTest::testCreateConversationWithOnlineDevice()
return alice2Ready && conversationReady;
}));
DRing::unregisterSignalHandlers();
std::remove(aliceArchive.c_str());
}
void
......@@ -253,7 +251,6 @@ SyncHistoryTest::testCreateConversationWithMessagesThenAddDevice()
// Now create alice2
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
std::map<std::string, std::string> details = DRing::getAccountTemplate("RING");
details[ConfProperties::TYPE] = "RING";
......@@ -289,7 +286,6 @@ SyncHistoryTest::testCreateConversationWithMessagesThenAddDevice()
CPPUNIT_ASSERT(messages[0]["body"] == "Message 3");
CPPUNIT_ASSERT(messages[1]["body"] == "Message 2");
CPPUNIT_ASSERT(messages[2]["body"] == "Message 1");
std::remove(aliceArchive.c_str());
}
void
......@@ -319,7 +315,6 @@ SyncHistoryTest::testCreateMultipleConversationThenAddDevice()
// Now create alice2
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
std::map<std::string, std::string> details = DRing::getAccountTemplate("RING");
details[ConfProperties::TYPE] = "RING";
......@@ -350,7 +345,6 @@ SyncHistoryTest::testCreateMultipleConversationThenAddDevice()
CPPUNIT_ASSERT(
cv.wait_for(lk, std::chrono::seconds(60), [&]() { return conversationReady == 4; }));
DRing::unregisterSignalHandlers();
std::remove(aliceArchive.c_str());
}
void
......@@ -360,7 +354,6 @@ SyncHistoryTest::testReceivesInviteThenAddDevice()
// Export alice
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
......@@ -429,7 +422,6 @@ SyncHistoryTest::testReceivesInviteThenAddDevice()
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return requestReceived; }));
DRing::unregisterSignalHandlers();
std::remove(aliceArchive.c_str());
}
void
......@@ -439,7 +431,6 @@ SyncHistoryTest::testRemoveConversationOnAllDevices()
// Now create alice2
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
std::map<std::string, std::string> details = DRing::getAccountTemplate("RING");
details[ConfProperties::TYPE] = "RING";
......@@ -493,7 +484,6 @@ SyncHistoryTest::testRemoveConversationOnAllDevices()
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return conversationRemoved; }));
DRing::unregisterSignalHandlers();
std::remove(aliceArchive.c_str());
}
void
......@@ -506,7 +496,6 @@ SyncHistoryTest::testSyncCreateAccountExportDeleteReimportOldBackup()
// Backup alice before start conversation, worst scenario for invites
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
// Start conversation
......@@ -599,7 +588,6 @@ SyncHistoryTest::testSyncCreateAccountExportDeleteReimportOldBackup()
messageBobReceived = 0;
DRing::sendMessage(alice2Id, convId, std::string("hi"), "");
cv.wait_for(lk, std::chrono::seconds(30), [&]() { return messageBobReceived == 1; });
std::remove(aliceArchive.c_str());
DRing::unregisterSignalHandlers();
}
......@@ -686,7 +674,6 @@ SyncHistoryTest::testSyncCreateAccountExportDeleteReimportWithConvId()
// Backup alice after startConversation with member
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
// disable account (same as removed)
......@@ -711,7 +698,6 @@ SyncHistoryTest::testSyncCreateAccountExportDeleteReimportWithConvId()
messageBobReceived = 0;
DRing::sendMessage(alice2Id, convId, std::string("hi"), "");
cv.wait_for(lk, std::chrono::seconds(30), [&]() { return messageBobReceived == 1; });
std::remove(aliceArchive.c_str());
DRing::unregisterSignalHandlers();
}
......@@ -782,7 +768,6 @@ SyncHistoryTest::testSyncCreateAccountExportDeleteReimportWithConvReq()
// Backup alice after startConversation with member
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
// disable account (same as removed)
......@@ -806,7 +791,6 @@ SyncHistoryTest::testSyncCreateAccountExportDeleteReimportWithConvReq()
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() {
return conversationReady && messageBobReceived == 1;
}));
std::remove(aliceArchive.c_str());
DRing::unregisterSignalHandlers();
}
......@@ -848,7 +832,6 @@ SyncHistoryTest::testSyncOneToOne()
// Now create alice2
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
std::map<std::string, std::string> details = DRing::getAccountTemplate("RING");
details[ConfProperties::TYPE] = "RING";
......@@ -863,7 +846,6 @@ SyncHistoryTest::testSyncOneToOne()
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] {
return alice2Ready && conversationReady;
}));
std::remove(aliceArchive.c_str());
DRing::unregisterSignalHandlers();
}
......@@ -874,7 +856,6 @@ SyncHistoryTest::testConversationRequestRemoved()
// Export alice
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId);
......@@ -951,7 +932,6 @@ SyncHistoryTest::testConversationRequestRemoved()
}));
DRing::unregisterSignalHandlers();
std::remove(aliceArchive.c_str());
}
void
......@@ -964,7 +944,6 @@ SyncHistoryTest::testProfileReceivedMultiDevice()
// Export alice
auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz";
std::remove(aliceArchive.c_str());
aliceAccount->exportArchive(aliceArchive);
// Set VCards
......@@ -1058,7 +1037,6 @@ END:VCARD";
details[ConfProperties::ARCHIVE_PATH] = aliceArchive;
bobProfileReceived = false, aliceProfileReceived = false;
alice2Id = Manager::instance().addAccount(details);
std::remove(aliceArchive.c_str());
CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&] {
return aliceProfileReceived && bobProfileReceived;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment