diff --git a/CMakeLists.txt b/CMakeLists.txt index 0939b6924dd9015980dfae3688073a0214f87578..22d1be10db03318d3309ea2af0ae3673d1ea2715 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,7 +21,6 @@ set (VERSION ${CMAKE_PROJECT_VERSION}) option(DHTNET_PUPNP "Enable UPnP support" ON) option(DHTNET_NATPMP "Enable NAT-PMP support" ON) -option(DHTNET_TESTABLE "Enable API for tests" ON) option(BUILD_TOOLS "Build tools" ON) option(BUILD_BENCHMARKS "Build benchmarks" ON) option(BUILD_DEPENDENCIES "Build dependencies" ON) diff --git a/src/connectionmanager.cpp b/src/connectionmanager.cpp index 544435e3e52a4d61f35b4c1a04bbc22ccafdd373..3fc8e9f46929ba0d1d50cfdaee9518beef7b8f1c 100644 --- a/src/connectionmanager.cpp +++ b/src/connectionmanager.cpp @@ -147,13 +147,18 @@ struct ConnectionInfo struct PendingCb { std::string name; + std::string connType; ConnectCallback cb; bool requested {false}; + /** Carry user preference and prevents retry mechanism to + * open a new connection if the channel request failed */ + bool noNewSocket {false}; }; struct DeviceInfo { const DeviceId deviceId; mutable std::mutex mtx_ {}; + std::shared_ptr<dht::crypto::Certificate> cert; std::map<dht::Value::Id, std::shared_ptr<ConnectionInfo>> info; std::map<dht::Value::Id, PendingCb> connecting; std::map<dht::Value::Id, PendingCb> waiting; @@ -162,6 +167,15 @@ struct DeviceInfo { inline bool isConnecting() const { return !connecting.empty() || !waiting.empty(); } + bool isConnecting(const std::string& name) const { + for (const auto& [id, pc]: connecting) + if (pc.name == name) + return true; + for (const auto& [id, pc]: waiting) + if (pc.name == name) + return true; + return false; + } inline bool empty() const { return info.empty() && connecting.empty() && waiting.empty(); @@ -225,6 +239,48 @@ struct DeviceInfo { return ret; } + /** + * A socket failed. Return failure callbacks and reset operations that can be retried. + * Sets noNewSocket to true for retryable operations, because we should never open more than one socket + * for a specific channel. + */ + std::pair<std::vector<PendingCb>, bool> resetPendingOperations(const std::set<dht::Value::Id>& ops) { + std::vector<PendingCb> ret; + bool retry = false; + if (ops.empty()) { + return {ret, retry}; + } + for (auto it = connecting.begin(); it != connecting.end();) { + auto& [vid, cb] = *it; + if (ops.find(vid) != ops.end()) { + if (cb.requested && !cb.noNewSocket) { + cb.requested = false; + cb.noNewSocket = true; + retry = true; + ++it; + } else { + ret.emplace_back(std::move(cb)); + it = connecting.erase(it); + } + } + } + for (auto it = waiting.begin(); it != waiting.end();) { + auto& [vid, cb] = *it; + if (ops.find(vid) != ops.end()) { + if (cb.requested && !cb.noNewSocket) { + cb.requested = false; + cb.noNewSocket = true; + retry = true; + ++it; + } else { + ret.emplace_back(std::move(cb)); + it = waiting.erase(it); + } + } + } + return {ret, retry}; + } + std::vector<std::shared_ptr<ConnectionInfo>> extractUnusedConnections() { std::vector<std::shared_ptr<ConnectionInfo>> unused {}; for (auto& [id, info] : info) @@ -244,15 +300,7 @@ struct DeviceInfo { executePendingOperations(lock, vid, sock, accepted); } - bool isConnecting(const std::string& name) const { - for (const auto& [id, pc]: connecting) - if (pc.name == name) - return true; - for (const auto& [id, pc]: waiting) - if (pc.name == name) - return true; - return false; - } + std::map<dht::Value::Id, std::string> requestPendingOps() { std::map<dht::Value::Id, std::string> ret; for (auto& [id, pc]: connecting) { @@ -459,6 +507,13 @@ public: bool noNewSocket = false, bool forceNewSocket = false, const std::string& connType = ""); + + void startConnection(const std::shared_ptr<DeviceInfo>& di, + const std::string& name, + dht::Value::Id vid, + const std::shared_ptr<dht::crypto::Certificate>& cert, + const std::string& connType); + /** * Send a ChannelRequest on the TLS socket. Triggers cb when ready * @param sock socket used to send the request @@ -498,7 +553,7 @@ public: void addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info); void onPeerResponse(PeerConnectionRequest&& req); void onDhtConnected(const dht::crypto::PublicKey& devicePk); - + void retryOnError(const std::shared_ptr<DeviceInfo>& deviceInfo); const std::shared_future<tls::DhParams> dhParams() const; tls::CertificateStore& certStore() const { return *config_->certStore; } @@ -862,6 +917,14 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif } auto di = sthis->infos_.createDeviceInfo(deviceId); std::unique_lock lk(di->mtx_); + if (!di->cert) { + di->cert = cert; + } else if (di->cert->getLongId() != deviceId) { + if (sthis->config_->logger) + sthis->config_->logger->error("[device {}] Certificate mismatch", deviceId); + cb(nullptr, deviceId); + return; + } dht::Value::Id vid; { @@ -871,32 +934,31 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif // Check if already connecting auto isConnectingToDevice = di->isConnecting(); + auto useExistingConnection = isConnectingToDevice && !forceNewSocket; // Note: we can be in a state where first // socket is negotiated and first channel is pending // so return only after we checked the info - auto& diw = (isConnectingToDevice && !forceNewSocket) + auto& diw = (useExistingConnection) ? di->waiting[vid] : di->connecting[vid]; - diw = PendingCb {name, std::move(cb)}; + diw = PendingCb {name, connType, std::move(cb), noNewSocket}; // Check if already negotiated if (auto info = di->getConnectedInfo()) { std::unique_lock lkc(info->mutex_); if (auto sock = info->socket_) { - if (sock->isRunning()) { - info->cbIds_.emplace(vid); - diw.requested = true; - lkc.unlock(); - lk.unlock(); - if (sthis->config_->logger) - sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId); - sthis->sendChannelRequest(di, info, sock, name, vid); - return; - } + info->cbIds_.emplace(vid); + diw.requested = true; + lkc.unlock(); + lk.unlock(); + if (sthis->config_->logger) + sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId); + sthis->sendChannelRequest(di, info, sock, name, vid); + return; } } - if (isConnectingToDevice && !forceNewSocket) { + if (useExistingConnection) { if (sthis->config_->logger) sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId); return; @@ -906,124 +968,130 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif di->executePendingOperations(lk, vid, nullptr); return; } + sthis->startConnection(di, name, vid, cert, connType); + }); +} - // Note: used when the ice negotiation fails to erase - // all stored structures. - auto eraseInfo = [w, diw=std::weak_ptr(di), vid] { - if (auto di = diw.lock()) { - std::unique_lock lk(di->mtx_); - di->info.erase(vid); - auto ops = di->extractPendingOperations(vid, nullptr); - if (di->empty()) { - if (auto shared = w.lock()) - shared->infos_.removeDeviceInfo(di->deviceId); - } - lk.unlock(); - for (const auto& op: ops) - op.cb(nullptr, di->deviceId); - } - }; - - // If no socket exists, we need to initiate an ICE connection. - sthis->getIceOptions([w, - deviceId = std::move(deviceId), - devicePk = std::move(devicePk), - diw=std::weak_ptr(di), - name = std::move(name), - cert = std::move(cert), - vid, - connType, - eraseInfo](auto&& ice_config) { - auto sthis = w.lock(); - if (!sthis) { - dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); - return; +void +ConnectionManager::Impl::startConnection(const std::shared_ptr<DeviceInfo>& di, + const std::string& name, + dht::Value::Id vid, + const std::shared_ptr<dht::crypto::Certificate>& cert, + const std::string& connType) +{ + // Note: used when the ice negotiation fails to erase + // all stored structures. + auto eraseInfo = [w = weak_from_this(), diw=std::weak_ptr(di), vid] { + if (auto di = diw.lock()) { + std::unique_lock lk(di->mtx_); + di->info.erase(vid); + auto ops = di->extractPendingOperations(vid, nullptr); + if (di->empty()) { + if (auto shared = w.lock()) + shared->infos_.removeDeviceInfo(di->deviceId); } - auto info = std::make_shared<ConnectionInfo>(); - auto winfo = std::weak_ptr(info); - ice_config.tcpEnable = true; - ice_config.onInitDone = [w, - devicePk = std::move(devicePk), - name = std::move(name), - cert = std::move(cert), - diw, - winfo = std::weak_ptr(info), - vid, - connType, - eraseInfo](bool ok) { - dht::ThreadPool::io().run([w = std::move(w), - devicePk = std::move(devicePk), - vid, - winfo, - eraseInfo, - connType, ok] { - auto sthis = w.lock(); - if (!ok && sthis && sthis->config_->logger) - sthis->config_->logger->error("[device {}] Unable to initialize ICE session.", devicePk->getLongId()); - if (!sthis || !ok) { - eraseInfo(); - return; + lk.unlock(); + for (const auto& op: ops) + op.cb(nullptr, di->deviceId); + } + }; + + auto info = std::make_shared<ConnectionInfo>(); + di->info[vid] = info; + auto winfo = std::weak_ptr(info); + + getIceOptions([w = weak_from_this(), + deviceId = di->deviceId, + devicePk = cert->getSharedPublicKey(), + diw=std::weak_ptr(di), + winfo, + name = std::move(name), + cert, + vid, + connType, + eraseInfo](auto&& ice_config) { + auto sthis = w.lock(); + auto info = winfo.lock(); + if (!sthis || !info) { + dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + return; + } + + ice_config.tcpEnable = true; + ice_config.onInitDone = [w, + devicePk = std::move(devicePk), + winfo, + vid, + connType, + eraseInfo](bool ok) { + dht::ThreadPool::io().run([w = std::move(w), + devicePk = std::move(devicePk), + vid, + winfo, + eraseInfo, + connType, ok] { + auto sthis = w.lock(); + if (!ok && sthis && sthis->config_->logger) + sthis->config_->logger->error("[device {}] Unable to initialize ICE session.", devicePk->getLongId()); + if (!sthis || !ok) { + eraseInfo(); + return; + } + sthis->connectDeviceStartIce(winfo.lock(), devicePk, vid, connType, [=](bool ok) { + if (!ok) { + dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); } - sthis->connectDeviceStartIce(winfo.lock(), devicePk, vid, connType, [=](bool ok) { - if (!ok) { - dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); - } - }); - }); - }; - ice_config.onNegoDone = [w, - deviceId, - name, - cert = std::move(cert), - diw, - winfo = std::weak_ptr(info), - vid, - eraseInfo](bool ok) { - dht::ThreadPool::io().run([w = std::move(w), - deviceId = std::move(deviceId), - name = std::move(name), - cert = std::move(cert), - diw = std::move(diw), - winfo = std::move(winfo), - vid = std::move(vid), - eraseInfo = std::move(eraseInfo), - ok] { - auto sthis = w.lock(); - if (!ok && sthis && sthis->config_->logger) - sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId); - if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(diw, winfo.lock(), deviceId, name, vid, cert)) - eraseInfo(); }); - }; - - if (auto di = diw.lock()) { - std::lock_guard lk(di->mtx_); - di->info[vid] = info; - } - std::unique_lock lk {info->mutex_}; - ice_config.master = false; - ice_config.streamsCount = 1; - ice_config.compCountPerStream = 1; - info->ice_ = sthis->config_->factory->createUTransport(""); - if (!info->ice_) { - if (sthis->config_->logger) - sthis->config_->logger->error("[device {}] Unable to initialize ICE session.", deviceId); - eraseInfo(); - return; - } - // We need to detect any shutdown if the ice session is destroyed before going to the - // TLS session; - info->ice_->setOnShutdown([eraseInfo]() { - dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); }); - try { - info->ice_->initIceInstance(ice_config); - } catch (const std::exception& e) { - if (sthis->config_->logger) - sthis->config_->logger->error("{}", e.what()); - dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); - } + }; + ice_config.onNegoDone = [w, + deviceId, + name, + cert = std::move(cert), + diw, + winfo, + vid, + eraseInfo](bool ok) { + dht::ThreadPool::io().run([w = std::move(w), + deviceId = std::move(deviceId), + name = std::move(name), + cert = std::move(cert), + diw = std::move(diw), + winfo = std::move(winfo), + vid = std::move(vid), + eraseInfo = std::move(eraseInfo), + ok] { + auto sthis = w.lock(); + if (!ok && sthis && sthis->config_->logger) + sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId); + if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(diw, winfo.lock(), deviceId, name, vid, cert)) { + eraseInfo(); + } + }); + }; + std::unique_lock lk {info->mutex_}; + ice_config.master = false; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; + info->ice_ = sthis->config_->factory->createUTransport(""); + if (!info->ice_) { + if (sthis->config_->logger) + sthis->config_->logger->error("[device {}] Unable to initialize ICE session.", deviceId); + eraseInfo(); + return; + } + // We need to detect any shutdown if the ice session is destroyed before going to the + // TLS session; + info->ice_->setOnShutdown([eraseInfo]() { + dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); }); + try { + info->ice_->initIceInstance(ice_config); + } catch (const std::exception& e) { + if (sthis->config_->logger) + sthis->config_->logger->error("{}", e.what()); + dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + } }); } @@ -1042,10 +1110,6 @@ ConnectionManager::Impl::sendChannelRequest(const std::weak_ptr<DeviceInfo>& din info->executePendingOperations(vid, nullptr); return; } - channelSock->onShutdown([dinfow, name, vid] { - if (auto info = dinfow.lock()) - info->executePendingOperations(vid, nullptr); - }); channelSock->onReady( [dinfow, cinfow, wSock = std::weak_ptr(channelSock), name, vid](bool accepted) { if (auto dinfo = dinfow.lock()) { @@ -1500,38 +1564,58 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo> return sthis->channelReqCb_(peer, name); return false; }); - info->socket_->onShutdown([dinfo, wi=std::weak_ptr(info), vid]() { + info->socket_->onShutdown([w = weak_from_this(), dinfo, wi=std::weak_ptr(info), vid] { // Cancel current outgoing connections - dht::ThreadPool::io().run([dinfo, wi, vid] { - std::set<dht::Value::Id> ids; + dht::ThreadPool::io().run([w, dinfo, wi, vid] { if (auto info = wi.lock()) { - std::lock_guard lk(info->mutex_); - if (info->socket_) { - ids = std::move(info->cbIds_); - info->socket_->shutdown(); - } - } - if (auto deviceInfo = dinfo.lock()) { - std::shared_ptr<ConnectionInfo> info; - std::vector<PendingCb> ops; - std::unique_lock lk(deviceInfo->mtx_); - auto it = deviceInfo->info.find(vid); - if (it != deviceInfo->info.end()) { - info = std::move(it->second); - deviceInfo->info.erase(it); - } - for (const auto& cbId : ids) { - auto po = deviceInfo->extractPendingOperations(cbId, nullptr); - ops.insert(ops.end(), po.begin(), po.end()); + if (auto deviceInfo = dinfo.lock()) { + std::unique_lock lkd(deviceInfo->mtx_); + std::unique_lock lkc(info->mutex_); + auto ids = std::move(info->cbIds_); + auto [ops, retry] = deviceInfo->resetPendingOperations(ids); + auto erased = deviceInfo->info.erase(vid); + if (!retry && deviceInfo->empty()) { + if (auto sthis = w.lock()) + sthis->infos_.removeDeviceInfo(deviceInfo->deviceId); + } + lkc.unlock(); + lkd.unlock(); + + for (auto& op : ops) + op.cb(nullptr, deviceInfo->deviceId); + if (retry) { + if (auto sthis = w.lock()) + sthis->retryOnError(deviceInfo); + } } - lk.unlock(); - for (auto& op : ops) - op.cb(nullptr, deviceInfo->deviceId); } }); }); } +void +ConnectionManager::Impl::retryOnError(const std::shared_ptr<DeviceInfo>& deviceInfo) +{ + std::unique_lock<std::mutex> lk(deviceInfo->mtx_); + if (not deviceInfo->isConnecting()) + return; + if (auto i = deviceInfo->getConnectedInfo()) { + auto ops = deviceInfo->requestPendingOps(); + lk.unlock(); + for (const auto& [id, name]: ops) + sendChannelRequest(deviceInfo, i, i->socket_, name, id); + } else { + if (deviceInfo->connecting.empty()) { + // move first waiting to connecting + auto it = deviceInfo->waiting.begin(); + deviceInfo->connecting[it->first] = std::move(it->second); + deviceInfo->waiting.erase(it); + } + auto it = deviceInfo->connecting.begin(); + startConnection(deviceInfo, it->second.name, it->first, deviceInfo->cert, it->second.connType); + } +} + const std::shared_future<tls::DhParams> ConnectionManager::Impl::dhParams() const { diff --git a/tests/connectionManager.cpp b/tests/connectionManager.cpp index b4d3af32fde61e574735ca232e301cdd7294382e..070ed83fa418ee397a204a21a321825f8cfde2a5 100644 --- a/tests/connectionManager.cpp +++ b/tests/connectionManager.cpp @@ -98,6 +98,7 @@ private: void testCloseConnectionWith(); void testShutdownCallbacks(); void testFloodSocket(); + void testDestroyWhileConnecting(); void testDestroyWhileSending(); void testIsConnecting(); void testIsConnected(); @@ -126,6 +127,7 @@ private: CPPUNIT_TEST(testCloseConnectionWith); CPPUNIT_TEST(testShutdownCallbacks); CPPUNIT_TEST(testFloodSocket); + CPPUNIT_TEST(testDestroyWhileConnecting); CPPUNIT_TEST(testDestroyWhileSending); CPPUNIT_TEST(testCanSendBeacon); CPPUNIT_TEST(testCannotSendBeacon); @@ -965,15 +967,20 @@ ConnectionManagerTest::testShutdownCallbacks() bool successfullyConnected = false; bool successfullyReceive = false; bool receiverConnected = false; + int requestCount = 0; bob->connectionManager->onChannelRequest( [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) { if (name == "1") { - std::unique_lock lk {mtx}; + std::lock_guard lk {mtx}; successfullyReceive = true; rcv.notify_one(); } else { - chan2cv.notify_one(); + { + std::lock_guard lk {mtx}; + requestCount++; + chan2cv.notify_one(); + } // Do not return directly. Let the connection be closed std::this_thread::sleep_for(10s); } @@ -984,7 +991,7 @@ ConnectionManagerTest::testShutdownCallbacks() const std::string& name, std::shared_ptr<dhtnet::ChannelSocket> socket) { if (name == "1") { - std::unique_lock lk {mtx}; + std::lock_guard lk {mtx}; receiverConnected = (bool)socket; rcv.notify_one(); } @@ -995,7 +1002,7 @@ ConnectionManagerTest::testShutdownCallbacks() [&](std::shared_ptr<dhtnet::ChannelSocket> socket, const DeviceId&) { if (socket) { - std::unique_lock lk {mtx}; + std::lock_guard lk {mtx}; successfullyConnected = true; rcv.notify_one(); } @@ -1008,19 +1015,30 @@ ConnectionManagerTest::testShutdownCallbacks() })); // Connect another channel, but close the connection + int connectFinished = 0; bool channel2NotConnected = false; alice->connectionManager->connectDevice(bob->id.second, "2", [&](std::shared_ptr<dhtnet::ChannelSocket> socket, const DeviceId&) { + std::lock_guard lk {mtx}; + connectFinished++; channel2NotConnected = !socket; rcv.notify_one(); }); - chan2cv.wait_for(lk, 30s); - + // wait for requestCount == 1 + chan2cv.wait_for(lk, 30s, [&] { return requestCount == 1; }); + bob->connectionManager->closeConnectionsWith(aliceUri); + + chan2cv.wait_for(lk, 30s, [&] { return requestCount == 2; }); // This should trigger onShutdown for second callback bob->connectionManager->closeConnectionsWith(aliceUri); - CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return channel2NotConnected; })); + + CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return connectFinished; })); + + //CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return channel2NotConnected; })); + CPPUNIT_ASSERT_EQUAL(1, connectFinished); + CPPUNIT_ASSERT(channel2NotConnected); } void @@ -1147,6 +1165,103 @@ ConnectionManagerTest::testFloodSocket() } } +void +ConnectionManagerTest::testDestroyWhileConnecting() +{ + bob->connectionManager->onICERequest([](const DeviceId&) { return true; }); + alice->connectionManager->onICERequest([](const DeviceId&) { return true; }); + + auto bobUri = bob->id.second->issuer->getId().toString(); + std::condition_variable cv; + unsigned open_events(0); + unsigned close_events(0); + bool successfullyConnected = false; + bool successfullyReceive = false; + bool receiverConnected = false; + + bob->connectionManager->onChannelRequest( + [&](const std::shared_ptr<dht::crypto::Certificate>&, + const std::string& name) { + std::lock_guard lk {mtx}; + successfullyReceive = name == "test://test"; + return true; + }); + + std::shared_ptr<dhtnet::MultiplexedSocket> multiplexedSocket; + bob->connectionManager->onConnectionReady([&](const DeviceId&, + const std::string& name, + std::shared_ptr<dhtnet::ChannelSocket> socket) { + if (socket) { + socket->onShutdown([&] { + std::lock_guard lk {mtx}; + close_events++; + cv.notify_one(); + }); + multiplexedSocket = socket->underlyingSocket(); + } + if (not name.empty()) { + std::lock_guard lk {mtx}; + open_events++; + receiverConnected = socket && (name == "test://test"); + cv.notify_one(); + } + }); + + alice->connectionManager->connectDevice(bob->id.second, + "test://test", + [&](std::shared_ptr<dhtnet::ChannelSocket> socket, + const DeviceId&) { + if (socket) { + socket->onShutdown([&] { + std::lock_guard lk {mtx}; + close_events++; + cv.notify_one(); + }); + { + std::lock_guard lk {mtx}; + successfullyConnected = true; + open_events++; + cv.notify_one(); + } + } + }); + // connecting + { + std::unique_lock lk {mtx}; + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { + return successfullyReceive && successfullyConnected && receiverConnected; + })); + } + + multiplexedSocket->shutdown(); + alice->connectionManager->connectDevice(bob->id.second, + "test://test", + [&](std::shared_ptr<dhtnet::ChannelSocket> socket, + const DeviceId&) { + if (socket) { + socket->onShutdown([&] { + std::lock_guard lk {mtx}; + close_events++; + cv.notify_one(); + }); + { + std::lock_guard lk {mtx}; + open_events++; + cv.notify_one(); + } + socket->shutdown(); + } + }); + + std::unique_lock lk {mtx}; + // === Shutdown + CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return close_events == 2; })); + // === Open + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&] { return open_events == 4; })); + // === Shutdown + CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return close_events == 4; })); +} + void ConnectionManagerTest::testDestroyWhileSending() {