diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 3c1a4389887f75c006bc5731541e0f6b47a00c5c..437bbf667c47f8714b110a0d0645d9d84539f508 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -578,6 +578,14 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& const dht::Value::Id& vid) { auto channelSock = sock->addChannel(name); + channelSock->onReady([wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()]() { + auto shared = w.lock(); + auto channelSock = wSock.lock(); + if (shared and channelSock) + for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid)) + pending.cb(channelSock, deviceId); + }); + ChannelRequest val; val.name = channelSock->name(); val.state = ChannelRequestState::REQUEST; @@ -585,11 +593,6 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& msgpack::sbuffer buffer(256); msgpack::pack(buffer, val); - sock->setOnChannelReady(channelSock->channel(), [channelSock, name, deviceId, vid, w = weak()]() { - if (auto shared = w.lock()) - for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid)) - pending.cb(channelSock, deviceId); - }); std::error_code ec; int res = sock->write(CONTROL_CHANNEL, reinterpret_cast<const uint8_t*>(buffer.data()), diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp index 80dada485f2f0df8b17759ad4d7e31789e90130d..b72adee32489cc58aeb1a68c35c6f0e4b6892e91 100644 --- a/src/jamidht/multiplexed_socket.cpp +++ b/src/jamidht/multiplexed_socket.cpp @@ -122,6 +122,10 @@ public: auto& channelSocket = sockets[channel]; if (not channelSocket) channelSocket = std::make_shared<ChannelSocket>(parent_.weak(), name, channel); + else { + JAMI_WARN("A channel is already present on that socket, accepting " + "the request will close the previous one"); + } return channelSocket; } @@ -167,9 +171,6 @@ public: std::mutex socketsMutex {}; std::map<uint16_t, std::shared_ptr<ChannelSocket>> sockets {}; - // Contains callback triggered when a channel is ready - std::mutex channelCbsMutex {}; - std::map<uint16_t, onChannelReadyCb> channelCbs {}; // Main loop to parse incoming packets std::atomic_bool stop {false}; @@ -247,12 +248,9 @@ void MultiplexedSocket::Impl::onAccept(const std::string& name, uint16_t channel) { std::lock_guard<std::mutex> lkSockets(socketsMutex); - onChannelReady_(deviceId, makeSocket(name, channel)); - std::lock_guard<std::mutex> lk(channelCbsMutex); - auto channelCbsIt = channelCbs.find(channel); - if (channelCbsIt != channelCbs.end()) { - (channelCbsIt->second)(); - } + auto socket = makeSocket(name, channel); + onChannelReady_(deviceId, socket); + socket->ready(); } void @@ -375,11 +373,7 @@ MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel) if (accept) { onChannelReady_(deviceId, channelSocket); - std::lock_guard<std::mutex> lk(channelCbsMutex); - auto channelCbsIt = channelCbs.find(channel); - if (channelCbsIt != channelCbs.end()) { - channelCbsIt->second(); - } + channelSocket->ready(); } } @@ -534,12 +528,6 @@ MultiplexedSocket::setOnRequest(OnConnectionRequestCb&& cb) pimpl_->onRequest_ = std::move(cb); } -void -MultiplexedSocket::setOnChannelReady(uint16_t channel, onChannelReadyCb&& cb) -{ - pimpl_->channelCbs[channel] = std::move(cb); -} - bool MultiplexedSocket::isReliable() const { @@ -708,6 +696,7 @@ public: ~Impl() {} + ChannelReadyCb readyCb_ {}; OnShutdownCb shutdownCb_ {}; std::atomic_bool isShutdown_ {false}; std::string name {}; @@ -819,6 +808,13 @@ ChannelSocket::underlyingSocket() const } #endif +void +ChannelSocket::ready() +{ + if (pimpl_->readyCb_) + pimpl_->readyCb_(); +} + void ChannelSocket::stop() { @@ -899,6 +895,11 @@ ChannelSocket::onShutdown(OnShutdownCb&& cb) } } +void +ChannelSocket::onReady(ChannelReadyCb&& cb){ + pimpl_->readyCb_ = std::move(cb); +} + void ChannelSocket::sendBeacon(const std::chrono::milliseconds& timeout) { diff --git a/src/jamidht/multiplexed_socket.h b/src/jamidht/multiplexed_socket.h index fd793e477392bec8dc09e4b9e08928d27d288cdf..bbee47f4d67d41103fddc7f65a9c9f66c9932194 100644 --- a/src/jamidht/multiplexed_socket.h +++ b/src/jamidht/multiplexed_socket.h @@ -32,7 +32,7 @@ using OnConnectionRequestCb = std::function< bool(const DeviceId& /* device id */, const uint16_t& /* id */, const std::string& /* name */)>; using OnConnectionReadyCb = std::function<void(const DeviceId& /* device id */, const std::shared_ptr<ChannelSocket>&)>; -using onChannelReadyCb = std::function<void(void)>; +using ChannelReadyCb = std::function<void(void)>; using OnShutdownCb = std::function<void(void)>; static constexpr auto SEND_BEACON_TIMEOUT = std::chrono::milliseconds(3000); @@ -97,11 +97,6 @@ public: * Will be triggered when the peer asks for a new channel */ void setOnRequest(OnConnectionRequestCb&& cb); - /** - * Triggered when a specific channel is ready - * Used by ConnectionManager::connectDevice() - */ - void setOnChannelReady(uint16_t channel, onChannelReadyCb&& cb); std::size_t write(const uint16_t& channel, const uint8_t* buf, @@ -197,6 +192,13 @@ public: * Will trigger onShutdown's callback */ void shutdown() override; + + void ready(); + /** + * Triggered when a specific channel is ready + * Used by ConnectionManager::connectDevice() + */ + void onReady(ChannelReadyCb&& cb); /** * Will trigger that callback when shutdown() is called */