Skip to content
Snippets Groups Projects
Commit 52ce8c85 authored by Sébastien Blin's avatar Sébastien Blin
Browse files

multiplexed_socket: cleanup ChannelSocketTest

Change-Id: I7f975da75d845e02b981f17fe6819752db317885
parent ddb36d0f
No related branches found
No related tags found
No related merge requests found
/*
* Copyright (C) 2019 Savoir-faire Linux Inc.
* Copyright (C) 2019-2023 Savoir-faire Linux Inc.
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
......@@ -23,8 +23,9 @@
#include "ice_transport.h"
#include "connectivity/security/certstore.h"
#include <deque>
#include <opendht/thread_pool.h>
#include <asio/io_context.hpp>
#include <deque>
static constexpr std::size_t IO_BUFFER_SIZE {8192}; ///< Size of char buffer used by IO operations
static constexpr int MULTIPLEXED_SOCKET_VERSION {1};
......@@ -787,31 +788,18 @@ ChannelSocketTest::ChannelSocketTest(const DeviceId& deviceId,
: pimpl_deviceId(deviceId)
, pimpl_name(name)
, pimpl_channel(channel)
, eventLoopThread_ {[this] {
try {
eventLoop();
} catch (const std::exception& e) {
JAMI_ERR() << "[CNX] peer connection event loop failure: " << e.what();
shutdown();
}
}}
, ioCtx_(*Manager::instance().ioContext())
{}
ChannelSocketTest::~ChannelSocketTest()
{
eventLoopThread_.join();
}
{}
void
ChannelSocketTest::link(const std::weak_ptr<ChannelSocketTest>& socket1,
const std::weak_ptr<ChannelSocketTest>& socket2)
ChannelSocketTest::link(const std::shared_ptr<ChannelSocketTest>& socket1,
const std::shared_ptr<ChannelSocketTest>& socket2)
{
if (auto peer = socket1.lock()) {
peer->remote = socket2;
}
if (auto peer = socket2.lock()) {
peer->remote = socket1;
}
socket1->remote = socket2;
socket2->remote = socket1;
}
DeviceId
......@@ -835,14 +823,17 @@ ChannelSocketTest::channel() const
void
ChannelSocketTest::shutdown()
{
if (!isShutdown_) {
isShutdown_ = true;
{
std::unique_lock<std::mutex> lk {mutex};
if (!isShutdown_.exchange(true)) {
lk.unlock();
shutdownCb_();
}
cv.notify_all();
}
if (auto peer = remote.lock()) {
if (!peer->isShutdown_) {
peer->isShutdown_ = true;
if (!peer->isShutdown_.exchange(true)) {
peer->shutdownCb_();
}
peer->cv.notify_all();
......@@ -852,15 +843,17 @@ ChannelSocketTest::shutdown()
std::size_t
ChannelSocketTest::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
std::lock_guard<std::mutex> lkSockets(mutex);
std::size_t size = std::min(len, this->buf.size());
std::size_t size = std::min(len, this->rx_buf.size());
for (std::size_t i = 0; i < size; ++i)
buf[i] = this->buf[i];
buf[i] = this->rx_buf[i];
this->buf.erase(this->buf.begin(), this->buf.begin() + size);
if (size == this->rx_buf.size()) {
this->rx_buf.clear();
} else
this->rx_buf.erase(this->rx_buf.begin(), this->rx_buf.begin() + size);
return size;
};
}
std::size_t
ChannelSocketTest::write(const ValueType* buf, std::size_t len, std::error_code& ec)
......@@ -869,29 +862,21 @@ ChannelSocketTest::write(const ValueType* buf, std::size_t len, std::error_code&
ec = std::make_error_code(std::errc::broken_pipe);
return -1;
}
if (auto peer = remote.lock()) {
std::vector<uint8_t> bufToSend(buf, buf + len);
std::size_t sent = 0;
do {
std::size_t lenToSend = std::min(static_cast<std::size_t>(UINT16_MAX), len - sent);
peer->buf.insert(peer->buf.end(),
bufToSend.begin() + sent,
bufToSend.begin() + sent + lenToSend);
sent += lenToSend;
peer->cv.notify_all();
} while (sent < len);
return sent;
}
ec = std::make_error_code(std::errc::broken_pipe);
return -1;
ec = {};
dht::ThreadPool::computation().run(
[r = remote, data = std::vector<uint8_t>(buf, buf + len)]() mutable {
if (auto peer = r.lock())
peer->onRecv(std::move(data));
});
return len;
}
int
ChannelSocketTest::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
std::unique_lock<std::mutex> lk {mutex};
cv.wait_for(lk, timeout, [&] { return !buf.empty() or isShutdown_; });
return buf.size();
cv.wait_for(lk, timeout, [&] { return !rx_buf.empty() or isShutdown_; });
return rx_buf.size();
}
void
......@@ -899,9 +884,9 @@ ChannelSocketTest::setOnRecv(RecvCb&& cb)
{
std::lock_guard<std::mutex> lkSockets(mutex);
this->cb = std::move(cb);
if (!buf.empty() && this->cb) {
this->cb(buf.data(), buf.size());
buf.clear();
if (!rx_buf.empty() && this->cb) {
this->cb(rx_buf.data(), rx_buf.size());
rx_buf.clear();
}
}
......@@ -910,10 +895,13 @@ ChannelSocketTest::onRecv(std::vector<uint8_t>&& pkt)
{
std::lock_guard<std::mutex> lkSockets(mutex);
if (cb) {
cb(&pkt[0], pkt.size());
cb(pkt.data(), pkt.size());
return;
}
buf.insert(buf.end(), std::make_move_iterator(pkt.begin()), std::make_move_iterator(pkt.end()));
rx_buf.insert(rx_buf.end(),
std::make_move_iterator(pkt.begin()),
std::make_move_iterator(pkt.end()));
cv.notify_all();
}
void
......@@ -923,38 +911,12 @@ ChannelSocketTest::onReady(ChannelReadyCb&& cb)
void
ChannelSocketTest::onShutdown(OnShutdownCb&& cb)
{
std::unique_lock<std::mutex> lk {mutex};
shutdownCb_ = std::move(cb);
if (isShutdown_) {
shutdownCb_();
}
}
void
ChannelSocketTest::eventLoop()
{
std::error_code ec;
std::vector<uint8_t> buf(IO_BUFFER_SIZE);
while (!isShutdown_) {
// wait for new data before reading
std::unique_lock<std::mutex> lk {mutex};
cv.wait(lk, [&] { return !this->buf.empty() or isShutdown_; });
if (isShutdown_) {
lk.unlock();
int size = read(reinterpret_cast<uint8_t*>(buf.data()), IO_BUFFER_SIZE, ec);
if (size < 0) {
if (ec)
JAMI_ERR("Read error detected: %s", ec.message().c_str());
break;
}
if (size == 0) {
shutdown();
}
if (size != 0) {
onRecv(std::move(buf));
}
shutdownCb_();
}
}
......
......@@ -23,6 +23,10 @@
#include "connectivity/generic_io.h"
#include <condition_variable>
namespace asio {
class io_context;
}
namespace jami {
class IceTransport;
......@@ -206,8 +210,8 @@ public:
ChannelSocketTest(const DeviceId& deviceId, const std::string& name, const uint16_t& channel);
~ChannelSocketTest();
static void link(const std::weak_ptr<ChannelSocketTest>& socket1,
const std::weak_ptr<ChannelSocketTest>& socket2);
static void link(const std::shared_ptr<ChannelSocketTest>& socket1,
const std::shared_ptr<ChannelSocketTest>& socket2);
DeviceId deviceId() const override;
std::string name() const override;
......@@ -219,14 +223,9 @@ public:
void shutdown() override;
std::size_t read(ValueType* buf,
std::size_t len,
std::error_code& ec) override;
std::size_t write(const ValueType* buf,
std::size_t len,
std::error_code& ec) override;
int waitForData(std::chrono::milliseconds timeout,
std::error_code&) const override;
std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override;
std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override;
int waitForData(std::chrono::milliseconds timeout, std::error_code&) const override;
void setOnRecv(RecvCb&&) override;
void onRecv(std::vector<uint8_t>&& pkt) override;
......@@ -240,7 +239,7 @@ public:
*/
void onShutdown(OnShutdownCb&& cb) override;
std::vector<uint8_t> buf {};
std::vector<uint8_t> rx_buf {};
mutable std::mutex mutex {};
mutable std::condition_variable cv {};
GenericSocket<uint8_t>::RecvCb cb {};
......@@ -249,18 +248,17 @@ private:
const DeviceId pimpl_deviceId;
const std::string pimpl_name;
const uint16_t pimpl_channel;
asio::io_context& ioCtx_;
std::weak_ptr<ChannelSocketTest> remote;
OnShutdownCb shutdownCb_ { [&] {} };
OnShutdownCb shutdownCb_ {[&] {
}};
std::atomic_bool isShutdown_ {false};
void eventLoop();
std::thread eventLoopThread_ {};
};
/**
* Represents a channel of the multiplexed socket (channel, name)
*/
class ChannelSocket : ChannelSocketInterface
class ChannelSocket : public ChannelSocketInterface
{
public:
ChannelSocket(std::weak_ptr<MultiplexedSocket> endpoint,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment