diff --git a/src/connectivity/multiplexed_socket.cpp b/src/connectivity/multiplexed_socket.cpp index 3ab3eb35db352464de8c8cc689dd6eadaf462a6c..2670f716264fb4571f18468c9f31e8dfb62b927f 100644 --- a/src/connectivity/multiplexed_socket.cpp +++ b/src/connectivity/multiplexed_socket.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 Savoir-faire Linux Inc. + * Copyright (C) 2019-2023 Savoir-faire Linux Inc. * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify @@ -23,8 +23,9 @@ #include "ice_transport.h" #include "connectivity/security/certstore.h" -#include <deque> #include <opendht/thread_pool.h> +#include <asio/io_context.hpp> +#include <deque> static constexpr std::size_t IO_BUFFER_SIZE {8192}; ///< Size of char buffer used by IO operations static constexpr int MULTIPLEXED_SOCKET_VERSION {1}; @@ -663,7 +664,7 @@ MultiplexedSocket::monitor() const JAMI_DEBUG("- Socket with device: {:s} - account: {:s}", deviceId().to_c_str(), userUri); auto now = clock::now(); JAMI_DEBUG("- Duration: {}", - std::chrono::duration_cast<std::chrono::milliseconds>(now - pimpl_->start_)); + std::chrono::duration_cast<std::chrono::milliseconds>(now - pimpl_->start_)); pimpl_->endpoint->monitor(); std::lock_guard<std::mutex> lk(pimpl_->socketsMutex); for (const auto& [_, channel] : pimpl_->sockets) { @@ -787,31 +788,18 @@ ChannelSocketTest::ChannelSocketTest(const DeviceId& deviceId, : pimpl_deviceId(deviceId) , pimpl_name(name) , pimpl_channel(channel) - , eventLoopThread_ {[this] { - try { - eventLoop(); - } catch (const std::exception& e) { - JAMI_ERR() << "[CNX] peer connection event loop failure: " << e.what(); - shutdown(); - } - }} + , ioCtx_(*Manager::instance().ioContext()) {} ChannelSocketTest::~ChannelSocketTest() -{ - eventLoopThread_.join(); -} +{} void -ChannelSocketTest::link(const std::weak_ptr<ChannelSocketTest>& socket1, - const std::weak_ptr<ChannelSocketTest>& socket2) +ChannelSocketTest::link(const std::shared_ptr<ChannelSocketTest>& socket1, + const std::shared_ptr<ChannelSocketTest>& socket2) { - if (auto peer = socket1.lock()) { - peer->remote = socket2; - } - if (auto peer = socket2.lock()) { - peer->remote = socket1; - } + socket1->remote = socket2; + socket2->remote = socket1; } DeviceId @@ -835,14 +823,17 @@ ChannelSocketTest::channel() const void ChannelSocketTest::shutdown() { - if (!isShutdown_) { - isShutdown_ = true; - shutdownCb_(); + { + std::unique_lock<std::mutex> lk {mutex}; + if (!isShutdown_.exchange(true)) { + lk.unlock(); + shutdownCb_(); + } + cv.notify_all(); } - cv.notify_all(); + if (auto peer = remote.lock()) { - if (!peer->isShutdown_) { - peer->isShutdown_ = true; + if (!peer->isShutdown_.exchange(true)) { peer->shutdownCb_(); } peer->cv.notify_all(); @@ -852,15 +843,17 @@ ChannelSocketTest::shutdown() std::size_t ChannelSocketTest::read(ValueType* buf, std::size_t len, std::error_code& ec) { - std::lock_guard<std::mutex> lkSockets(mutex); - std::size_t size = std::min(len, this->buf.size()); + std::size_t size = std::min(len, this->rx_buf.size()); for (std::size_t i = 0; i < size; ++i) - buf[i] = this->buf[i]; + buf[i] = this->rx_buf[i]; - this->buf.erase(this->buf.begin(), this->buf.begin() + size); + if (size == this->rx_buf.size()) { + this->rx_buf.clear(); + } else + this->rx_buf.erase(this->rx_buf.begin(), this->rx_buf.begin() + size); return size; -}; +} std::size_t ChannelSocketTest::write(const ValueType* buf, std::size_t len, std::error_code& ec) @@ -869,29 +862,21 @@ ChannelSocketTest::write(const ValueType* buf, std::size_t len, std::error_code& ec = std::make_error_code(std::errc::broken_pipe); return -1; } - if (auto peer = remote.lock()) { - std::vector<uint8_t> bufToSend(buf, buf + len); - std::size_t sent = 0; - do { - std::size_t lenToSend = std::min(static_cast<std::size_t>(UINT16_MAX), len - sent); - peer->buf.insert(peer->buf.end(), - bufToSend.begin() + sent, - bufToSend.begin() + sent + lenToSend); - sent += lenToSend; - peer->cv.notify_all(); - } while (sent < len); - return sent; - } - ec = std::make_error_code(std::errc::broken_pipe); - return -1; + ec = {}; + dht::ThreadPool::computation().run( + [r = remote, data = std::vector<uint8_t>(buf, buf + len)]() mutable { + if (auto peer = r.lock()) + peer->onRecv(std::move(data)); + }); + return len; } int ChannelSocketTest::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const { std::unique_lock<std::mutex> lk {mutex}; - cv.wait_for(lk, timeout, [&] { return !buf.empty() or isShutdown_; }); - return buf.size(); + cv.wait_for(lk, timeout, [&] { return !rx_buf.empty() or isShutdown_; }); + return rx_buf.size(); } void @@ -899,9 +884,9 @@ ChannelSocketTest::setOnRecv(RecvCb&& cb) { std::lock_guard<std::mutex> lkSockets(mutex); this->cb = std::move(cb); - if (!buf.empty() && this->cb) { - this->cb(buf.data(), buf.size()); - buf.clear(); + if (!rx_buf.empty() && this->cb) { + this->cb(rx_buf.data(), rx_buf.size()); + rx_buf.clear(); } } @@ -910,10 +895,13 @@ ChannelSocketTest::onRecv(std::vector<uint8_t>&& pkt) { std::lock_guard<std::mutex> lkSockets(mutex); if (cb) { - cb(&pkt[0], pkt.size()); + cb(pkt.data(), pkt.size()); return; } - buf.insert(buf.end(), std::make_move_iterator(pkt.begin()), std::make_move_iterator(pkt.end())); + rx_buf.insert(rx_buf.end(), + std::make_move_iterator(pkt.begin()), + std::make_move_iterator(pkt.end())); + cv.notify_all(); } void @@ -923,38 +911,12 @@ ChannelSocketTest::onReady(ChannelReadyCb&& cb) void ChannelSocketTest::onShutdown(OnShutdownCb&& cb) { + std::unique_lock<std::mutex> lk {mutex}; shutdownCb_ = std::move(cb); - if (isShutdown_) { - shutdownCb_(); - } -} -void -ChannelSocketTest::eventLoop() -{ - std::error_code ec; - std::vector<uint8_t> buf(IO_BUFFER_SIZE); - - while (!isShutdown_) { - // wait for new data before reading - std::unique_lock<std::mutex> lk {mutex}; - cv.wait(lk, [&] { return !this->buf.empty() or isShutdown_; }); + if (isShutdown_) { lk.unlock(); - - int size = read(reinterpret_cast<uint8_t*>(buf.data()), IO_BUFFER_SIZE, ec); - if (size < 0) { - if (ec) - JAMI_ERR("Read error detected: %s", ec.message().c_str()); - break; - } - - if (size == 0) { - shutdown(); - } - - if (size != 0) { - onRecv(std::move(buf)); - } + shutdownCb_(); } } diff --git a/src/connectivity/multiplexed_socket.h b/src/connectivity/multiplexed_socket.h index 6f1bad9c89125d81fd034d34eab0513012dd4c3a..b110547c4d80cdfdaaf089963fc616c7898c561f 100644 --- a/src/connectivity/multiplexed_socket.h +++ b/src/connectivity/multiplexed_socket.h @@ -23,6 +23,10 @@ #include "connectivity/generic_io.h" #include <condition_variable> +namespace asio { +class io_context; +} + namespace jami { class IceTransport; @@ -206,8 +210,8 @@ public: ChannelSocketTest(const DeviceId& deviceId, const std::string& name, const uint16_t& channel); ~ChannelSocketTest(); - static void link(const std::weak_ptr<ChannelSocketTest>& socket1, - const std::weak_ptr<ChannelSocketTest>& socket2); + static void link(const std::shared_ptr<ChannelSocketTest>& socket1, + const std::shared_ptr<ChannelSocketTest>& socket2); DeviceId deviceId() const override; std::string name() const override; @@ -219,14 +223,9 @@ public: void shutdown() override; - std::size_t read(ValueType* buf, - std::size_t len, - std::error_code& ec) override; - std::size_t write(const ValueType* buf, - std::size_t len, - std::error_code& ec) override; - int waitForData(std::chrono::milliseconds timeout, - std::error_code&) const override; + std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override; + std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override; + int waitForData(std::chrono::milliseconds timeout, std::error_code&) const override; void setOnRecv(RecvCb&&) override; void onRecv(std::vector<uint8_t>&& pkt) override; @@ -240,7 +239,7 @@ public: */ void onShutdown(OnShutdownCb&& cb) override; - std::vector<uint8_t> buf {}; + std::vector<uint8_t> rx_buf {}; mutable std::mutex mutex {}; mutable std::condition_variable cv {}; GenericSocket<uint8_t>::RecvCb cb {}; @@ -249,18 +248,17 @@ private: const DeviceId pimpl_deviceId; const std::string pimpl_name; const uint16_t pimpl_channel; + asio::io_context& ioCtx_; std::weak_ptr<ChannelSocketTest> remote; - OnShutdownCb shutdownCb_ { [&] {} }; + OnShutdownCb shutdownCb_ {[&] { + }}; std::atomic_bool isShutdown_ {false}; - - void eventLoop(); - std::thread eventLoopThread_ {}; }; /** * Represents a channel of the multiplexed socket (channel, name) */ -class ChannelSocket : ChannelSocketInterface +class ChannelSocket : public ChannelSocketInterface { public: ChannelSocket(std::weak_ptr<MultiplexedSocket> endpoint,