diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp index 2b8d605be9ec68a0ab1f38155d5e1c3989c4150a..929243e0924f49db240b103359acb7f1d9063847 100644 --- a/src/jamidht/multiplexed_socket.cpp +++ b/src/jamidht/multiplexed_socket.cpp @@ -305,10 +305,13 @@ MultiplexedSocket::Impl::handleChannelPacket(uint16_t channel, const std::vector cb(&pkt[0], pkt.size()); return; } - dataIt->second->buf.insert(dataIt->second->buf.end(), - std::make_move_iterator(pkt.begin()), - std::make_move_iterator(pkt.end())); - dataIt->second->cv.notify_all(); + { + std::lock_guard<std::mutex> lkSockets(dataIt->second->mutex); + dataIt->second->buf.insert(dataIt->second->buf.end(), + std::make_move_iterator(pkt.begin()), + std::make_move_iterator(pkt.end())); + dataIt->second->cv.notify_all(); + } } } else { JAMI_WARN("Non existing channel: %u", channel); @@ -407,14 +410,17 @@ MultiplexedSocket::read(const uint16_t& channel, uint8_t* buf, std::size_t len, ec = std::make_error_code(std::errc::broken_pipe); return -1; } - auto& chanBuf = dataIt->second->buf; - auto size = std::min(len, chanBuf.size()); + std::size_t size; + { + std::lock_guard<std::mutex> lkSockets(dataIt->second->mutex); + auto& chanBuf = dataIt->second->buf; + size = std::min(len, chanBuf.size()); - for (std::size_t i = 0; i < size; ++i) { - buf[i] = chanBuf[i]; - } + for (std::size_t i = 0; i < size; ++i) + buf[i] = chanBuf[i]; - chanBuf.erase(chanBuf.begin(), chanBuf.begin() + size); + chanBuf.erase(chanBuf.begin(), chanBuf.begin() + size); + } return size; } @@ -637,10 +643,17 @@ std::size_t ChannelSocket::write(const ValueType* buf, std::size_t len, std::error_code& ec) { if (auto ep = pimpl_->endpoint.lock()) { - int res = ep->write(pimpl_->channel, buf, len, ec); - if (ec) - JAMI_ERR("Error when writing on channel: %s", ec.message().c_str()); - return res; + std::size_t sent = 0; + do { + std::size_t toSend = std::min(static_cast<std::size_t>(UINT16_MAX), len - sent); + auto res = ep->write(pimpl_->channel, buf + sent, toSend, ec); + if (ec) { + JAMI_ERR("Error when writing on channel: %s", ec.message().c_str()); + return res; + } + sent += toSend; + } while (sent < len); + return sent; } ec = std::make_error_code(std::errc::broken_pipe); return -1;