diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp index 4af6ba90cab8eb4f6dd0aa45f2b99bf625e9be50..81daa399378877fcc9640aabcb8d502626694c30 100644 --- a/src/jamidht/multiplexed_socket.cpp +++ b/src/jamidht/multiplexed_socket.cpp @@ -67,17 +67,18 @@ public: stop.store(true); isShutdown_ = true; if (onShutdown_) onShutdown_(); - endpoint->setOnStateChange({}); - endpoint->shutdown(); - { - std::lock_guard<std::mutex> lkSockets(socketsMutex); - for (auto& socket : sockets) { - // Just trigger onShutdown() to make client know - // No need to write the EOF for the channel, the write will fail because endpoint is already shutdown - if (socket.second) socket.second->stop(); - } - sockets.clear(); + if (endpoint) { + endpoint->setOnStateChange({}); + std::unique_lock<std::mutex> lk(writeMtx); + endpoint->shutdown(); } + std::lock_guard<std::mutex> lkSockets(socketsMutex); + for (auto& socket : sockets) { + // Just trigger onShutdown() to make client know + // No need to write the EOF for the channel, the write will fail because endpoint is already shutdown + if (socket.second) socket.second->stop(); + } + sockets.clear(); } /** @@ -123,6 +124,8 @@ public: std::mutex channelCbsMtx_ {}; std::map<uint16_t, GenericSocket<uint8_t>::RecvCb> channelCbs_ {}; std::atomic_bool isShutdown_ {false}; + + std::mutex writeMtx; }; void @@ -160,11 +163,10 @@ MultiplexedSocket::Impl::eventLoop() try { auto msg = oh.get().as<ChanneledMessage>(); - if (msg.channel == 0) { + if (msg.channel == 0) handleControlPacket(std::move(msg.data)); - } else { + else handleChannelPacket(msg.channel, std::move(msg.data)); - } } catch (const msgpack::unpack_error &e) { JAMI_WARN("Error when decoding msgpack message: %s", e.what()); } @@ -273,7 +275,7 @@ MultiplexedSocket::Impl::handleChannelPacket(uint16_t channel, const std::vector auto cb = channelCbs_.find(channel); if (cb != channelCbs_.end()) { lk.unlock(); - cb->second(&pkt[0], pkt.size()); + if (cb->second) cb->second(&pkt[0], pkt.size()); return; } lk.unlock(); @@ -413,7 +415,9 @@ MultiplexedSocket::write(const uint16_t& channel, const uint8_t* buf, std::size_ pk.pack_bin(len); pk.pack_bin_body((const char*)buf, len); + std::unique_lock<std::mutex> lk(pimpl_->writeMtx); int res = pimpl_->endpoint->write((const unsigned char*)buffer.data(), buffer.size(), ec); + lk.unlock(); if (res < 0) { if (ec) JAMI_ERR("Error when writing on socket: %s", ec.message().c_str()); @@ -481,7 +485,7 @@ public: Impl(std::weak_ptr<MultiplexedSocket> endpoint, const std::string& name, const uint16_t& channel) : name(name), channel(channel), endpoint(std::move(endpoint)) {} - ~Impl() {} + ~Impl() { } OnShutdownCb shutdownCb_; std::atomic_bool isShutdown_ {false}; @@ -564,13 +568,16 @@ ChannelSocket::underlyingICE() const void ChannelSocket::stop() { + if (pimpl_->isShutdown_) return; pimpl_->isShutdown_ = true; - if (pimpl_->shutdownCb_) pimpl_->shutdownCb_(); + if (pimpl_->shutdownCb_) + pimpl_->shutdownCb_(); } void ChannelSocket::shutdown() { + if (pimpl_->isShutdown_) return; stop(); if (auto ep = pimpl_->endpoint.lock()) { std::error_code ec; diff --git a/src/jamidht/multiplexed_socket.h b/src/jamidht/multiplexed_socket.h index 4f4767ef6fb8d35cf9ea948d2c5aef2a50f3f60d..101f50bb51087b0b6796c0a13f6a4168e1c11107 100644 --- a/src/jamidht/multiplexed_socket.h +++ b/src/jamidht/multiplexed_socket.h @@ -158,6 +158,11 @@ public: std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override; int waitForData(std::chrono::milliseconds timeout, std::error_code&) const override; + /** + * set a callback when receiving data + * @note: this callback should take a little time and not block + * but you can move it in a thread + */ void setOnRecv(RecvCb&&) override; std::shared_ptr<IceTransport> underlyingICE() const;