diff --git a/src/multiplexed_socket.cpp b/src/multiplexed_socket.cpp index db4b0cba4d369ed2ed9c501bb273dc09010a98e7..e064186c63b40a2fe1c2c347f866ea70ee861419 100644 --- a/src/multiplexed_socket.cpp +++ b/src/multiplexed_socket.cpp @@ -445,42 +445,38 @@ MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel) void MultiplexedSocket::Impl::handleControlPacket(std::vector<uint8_t>&& pkt) { - // Run this on dedicated thread because some callbacks can take time - dht::ThreadPool::io().run([w = parent_.weak(), pkt = std::move(pkt)]() { - auto shared = w.lock(); - if (!shared) - return; - auto& pimpl = *shared->pimpl_; - try { - size_t off = 0; - while (off != pkt.size()) { - msgpack::unpacked result; - msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off); - auto object = result.get(); - if (pimpl.handleProtocolMsg(object)) - continue; - auto req = object.as<ChannelRequest>(); - if (req.state == ChannelRequestState::REQUEST) { - pimpl.onRequest(req.name, req.channel); - } - else if (req.state == ChannelRequestState::ACCEPT) { - pimpl.onAccept(req.name, req.channel); - } else { - // DECLINE or unknown - std::lock_guard<std::mutex> lkSockets(pimpl.socketsMutex); - auto channel = pimpl.sockets.find(req.channel); - if (channel != pimpl.sockets.end()) { - channel->second->ready(false); - channel->second->stop(); - pimpl.sockets.erase(channel); - } + try { + size_t off = 0; + while (off != pkt.size()) { + msgpack::unpacked result; + msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off); + auto object = result.get(); + if (handleProtocolMsg(object)) + continue; + auto req = object.as<ChannelRequest>(); + if (req.state == ChannelRequestState::REQUEST) { + dht::ThreadPool::io().run([w = parent_.weak(), req = std::move(req)]() { + if (auto shared = w.lock()) + shared->pimpl_->onRequest(req.name, req.channel); + }); + } + else if (req.state == ChannelRequestState::ACCEPT) { + onAccept(req.name, req.channel); + } else { + // DECLINE or unknown + std::lock_guard<std::mutex> lkSockets(socketsMutex); + auto channel = sockets.find(req.channel); + if (channel != sockets.end()) { + channel->second->ready(false); + channel->second->stop(); + sockets.erase(channel); } } - } catch (const std::exception& e) { - if (pimpl.logger_) - pimpl.logger_->error("Error on the control channel: {}", e.what()); } - }); + } catch (const std::exception& e) { + if (logger_) + logger_->error("Error on the control channel: {}", e.what()); + } } void