diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp index 0ef8c7859148c00239ca8022570bd9fe42c34b45..fdf217b53c7afbf92017db3999c0aa49b9dcadcc 100644 --- a/src/jamidht/multiplexed_socket.cpp +++ b/src/jamidht/multiplexed_socket.cpp @@ -724,21 +724,27 @@ MultiplexedSocket::waitForData(const uint16_t& channel, void MultiplexedSocket::setOnRecv(const uint16_t& channel, GenericSocket<uint8_t>::RecvCb&& cb) { - std::deque<uint8_t> recv; - { - // NOTE: here socketsMtx is already locked via onAccept - std::lock_guard<std::mutex> lk(pimpl_->channelCbsMtx_); - pimpl_->channelCbs_[channel] = cb; - - auto dataIt = pimpl_->channelDatas_.find(channel); - if (dataIt != pimpl_->channelDatas_.end() && dataIt->second) { - std::lock_guard<std::mutex> lk(dataIt->second->mutex); - recv = std::move(dataIt->second->buf); + // Re run on ioPool, socketsMtx can be locked here (via onAccept), so retrigger + // to avoid double lock + dht::ThreadPool::io().run([w = weak(), channel, cb = std::move(cb)]() { + if (auto shared = w.lock()) { + std::lock_guard<std::mutex> lkSockets(shared->pimpl_->socketsMutex); + std::deque<uint8_t> recv; + { + std::lock_guard<std::mutex> lk(shared->pimpl_->channelCbsMtx_); + shared->pimpl_->channelCbs_[channel] = cb; + + auto dataIt = shared->pimpl_->channelDatas_.find(channel); + if (dataIt != shared->pimpl_->channelDatas_.end() && dataIt->second) { + std::lock_guard<std::mutex> lk(dataIt->second->mutex); + recv = std::move(dataIt->second->buf); + } + } + if (!recv.empty() && cb) { + cb(&recv[0], recv.size()); + } } - } - if (!recv.empty() && cb) { - cb(&recv[0], recv.size()); - } + }); } void