Skip to content
Snippets Groups Projects
Commit cd7a7bd4 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

MultiplexedSocket: handleControlPacket on processing loop

Avoids potential processing ordering issues

Change-Id: I480872ffecb80439620a8442610b845f790b9db6
parent 40371791
Branches
No related tags found
No related merge requests found
...@@ -445,42 +445,38 @@ MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel) ...@@ -445,42 +445,38 @@ MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel)
void void
MultiplexedSocket::Impl::handleControlPacket(std::vector<uint8_t>&& pkt) MultiplexedSocket::Impl::handleControlPacket(std::vector<uint8_t>&& pkt)
{ {
// Run this on dedicated thread because some callbacks can take time
dht::ThreadPool::io().run([w = parent_.weak(), pkt = std::move(pkt)]() {
auto shared = w.lock();
if (!shared)
return;
auto& pimpl = *shared->pimpl_;
try { try {
size_t off = 0; size_t off = 0;
while (off != pkt.size()) { while (off != pkt.size()) {
msgpack::unpacked result; msgpack::unpacked result;
msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off); msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off);
auto object = result.get(); auto object = result.get();
if (pimpl.handleProtocolMsg(object)) if (handleProtocolMsg(object))
continue; continue;
auto req = object.as<ChannelRequest>(); auto req = object.as<ChannelRequest>();
if (req.state == ChannelRequestState::REQUEST) { if (req.state == ChannelRequestState::REQUEST) {
pimpl.onRequest(req.name, req.channel); dht::ThreadPool::io().run([w = parent_.weak(), req = std::move(req)]() {
if (auto shared = w.lock())
shared->pimpl_->onRequest(req.name, req.channel);
});
} }
else if (req.state == ChannelRequestState::ACCEPT) { else if (req.state == ChannelRequestState::ACCEPT) {
pimpl.onAccept(req.name, req.channel); onAccept(req.name, req.channel);
} else { } else {
// DECLINE or unknown // DECLINE or unknown
std::lock_guard<std::mutex> lkSockets(pimpl.socketsMutex); std::lock_guard<std::mutex> lkSockets(socketsMutex);
auto channel = pimpl.sockets.find(req.channel); auto channel = sockets.find(req.channel);
if (channel != pimpl.sockets.end()) { if (channel != sockets.end()) {
channel->second->ready(false); channel->second->ready(false);
channel->second->stop(); channel->second->stop();
pimpl.sockets.erase(channel); sockets.erase(channel);
} }
} }
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
if (pimpl.logger_) if (logger_)
pimpl.logger_->error("Error on the control channel: {}", e.what()); logger_->error("Error on the control channel: {}", e.what());
} }
});
} }
void void
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment