Skip to content
Snippets Groups Projects
Commit 36c25224 authored by Guillaume Roguez's avatar Guillaume Roguez Committed by Olivier SOLDANO
Browse files

add Channel class


New class serving as inter-thread object queue.
Mimic Python "queue" module and/or Go "Channel" type.

Basic unit-test provided.

Change-Id: Ibd9bcb265ff6fe45aec1917ad152c2cd01935ddf
Reviewed-by: default avatarOlivier Soldano <olivier.soldano@savoirfairelinux.com>
parent c6b859da
No related branches found
No related tags found
No related merge requests found
...@@ -142,7 +142,8 @@ libring_la_SOURCES = \ ...@@ -142,7 +142,8 @@ libring_la_SOURCES = \
base64.h \ base64.h \
base64.cpp \ base64.cpp \
turn_transport.h \ turn_transport.h \
turn_transport.cpp turn_transport.cpp \
channel.h
if HAVE_WIN32 if HAVE_WIN32
libring_la_SOURCES += \ libring_la_SOURCES += \
......
/*
* Copyright (C) 2017 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include <mutex>
#include <queue>
#include <condition_variable>
#include <stdexcept>
#include "logger.h"
///
/// \file Channel is a synchronized queue to share data between threads.
///
/// This is a C++11-ish class that mimic Python "queue" module and/or Go "Channel" type.
///
namespace ring
{
class ChannelEmpty : public std::exception{
public:
const char* what() const noexcept { return "channel empty"; }
};
class ChannelFull : public std::exception {
public:
const char* what() const noexcept { return "channel full"; }
};
namespace detail {
template <typename T, std::size_t N=0>
class _ChannelBase {
public:
using value_type = T;
const std::size_t max_size = N; ///< maximal size of the Channel, 0 means no size limits
// Pop operations
template <typename Duration>
T receive(Duration timeout) {
std::unique_lock<std::mutex> lk {mutex_};
if (!cv_.wait_for(lk, timeout, [this]{ return !queue_.empty(); }))
throw ChannelEmpty();
auto value = std::move(queue_.front());
queue_.pop();
return value;
}
template <typename Duration>
void receive(T& value, Duration timeout) {
value = receive(timeout);
}
T receive() {
std::unique_lock<std::mutex> lk {mutex_};
if (queue_.empty())
throw ChannelEmpty();
auto value = std::move(queue_.front());
queue_.pop();
return value;
}
T receive_wait() {
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [this]{ return !queue_.empty(); });
auto value = std::move(queue_.front());
queue_.pop();
return value;
}
void receive_wait(T& value) {
value = receive_wait();
}
void wait() {
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [this]{ return !queue_.empty(); });
}
void operator >>(T& value) {
receive_wait(value);
}
std::queue<T> flush() {
std::unique_lock<std::mutex> lk {mutex_};
std::queue<T> result;
std::swap(queue_, result);
return result;
}
// Capacity operation
std::size_t size() const {
std::lock_guard<std::mutex> lk {mutex_};
return queue_.size();
}
bool empty() const {
std::lock_guard<std::mutex> lk {mutex_};
return queue_.empty();
}
protected:
mutable std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
};
} // namespace detail
///
/// Generic implementation
///
template <typename T, std::size_t N=0>
class Channel : public detail::_ChannelBase<T, N> {
public:
using base = detail::_ChannelBase<T, N>;
using base::mutex_;
using base::cv_;
using base::queue_;
template <typename U>
void send(U&& value) {
std::lock_guard<std::mutex> lk {mutex_};
if (queue_.size() < N) {
queue_.push(std::forward<U>(value));
cv_.notify_one();
return;
}
throw ChannelFull();
}
template <typename... Args>
void send_emplace(Args&&... args) {
std::lock_guard<std::mutex> lk {mutex_};
if (queue_.size() < N) {
queue_.emplace(std::forward<Args>(args)...);
cv_.notify_one();
return;
}
throw ChannelFull();
}
void operator <<(const T& value) {
push(value);
}
void operator <<(T&& value) {
push(std::move(value));
}
};
///
/// Optimized implementations for unlimited channel (N=0)
///
template <typename T>
class Channel<T> : public detail::_ChannelBase<T> {
public:
using base = detail::_ChannelBase<T>;
using base::mutex_;
using base::cv_;
using base::queue_;
template <typename U>
void send(U&& value) {
std::lock_guard<std::mutex> lk {mutex_};
queue_.push(std::forward<U>(value));
cv_.notify_one();
}
template <typename... Args>
void send_emplace(Args&&... args) {
std::lock_guard<std::mutex> lk {mutex_};
queue_.emplace(std::forward<Args>(args)...);
cv_.notify_one();
}
/// /note This method exists only for unlimited channel
void send(const T* data, std::size_t len) {
std::lock_guard<std::mutex> lk {mutex_};
while (len > 0) {
queue_.push(*(data++));
--len;
}
cv_.notify_one();
}
};
} // namespace ring
...@@ -19,6 +19,12 @@ ut_account_factory_SOURCES = account_factory/testAccount_factory.cpp ...@@ -19,6 +19,12 @@ ut_account_factory_SOURCES = account_factory/testAccount_factory.cpp
check_PROGRAMS += ut_base64 check_PROGRAMS += ut_base64
ut_base64_SOURCES = base64/base64.cpp ut_base64_SOURCES = base64/base64.cpp
#
# channel
#
check_PROGRAMS += ut_channel
ut_channel_SOURCES = channel/testChannel.cpp
# #
# map_utils # map_utils
# #
......
/*
* Copyright (C) 2017 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <cppunit/TestAssert.h>
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
#include <future>
#include <algorithm>
#include "test_runner.h"
#include "channel.h"
namespace ring { namespace test {
// During a receive operation we should not wait more than this timeout,
// otherwise it would indicate a thread starvation.
static const std::chrono::seconds timeout {1};
class ChannelTest : public CppUnit::TestFixture {
public:
static std::string name() { return "channel"; }
private:
void emptyStateTest();
void sendTest();
void receiveTest();
void simpleUnlimitedTest();
void flushTest();
void dinningPhilosophersTest();
CPPUNIT_TEST_SUITE(ChannelTest);
CPPUNIT_TEST(emptyStateTest);
CPPUNIT_TEST(sendTest);
CPPUNIT_TEST(receiveTest);
CPPUNIT_TEST(simpleUnlimitedTest);
CPPUNIT_TEST(flushTest);
CPPUNIT_TEST(dinningPhilosophersTest);
CPPUNIT_TEST_SUITE_END();
};
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(ChannelTest, ChannelTest::name());
//==============================================================================
void
ChannelTest::emptyStateTest()
{
Channel<int, 1> limited_channel;
Channel<int> unlimited_channel;
CPPUNIT_ASSERT(limited_channel.empty());
CPPUNIT_ASSERT(limited_channel.size() == 0);
CPPUNIT_ASSERT(unlimited_channel.empty());
CPPUNIT_ASSERT(unlimited_channel.size() == 0);
CPPUNIT_ASSERT_THROW(limited_channel.receive(timeout), ring::ChannelEmpty);
CPPUNIT_ASSERT_THROW(unlimited_channel.receive(timeout), ring::ChannelEmpty);
}
void
ChannelTest::sendTest()
{
Channel<int, 2> limited_channel;
Channel<int> unlimited_channel;
// First insert
limited_channel.send(42);
CPPUNIT_ASSERT(!limited_channel.empty());
CPPUNIT_ASSERT_EQUAL(1ul, limited_channel.size());
unlimited_channel.send(999);
CPPUNIT_ASSERT(!unlimited_channel.empty());
CPPUNIT_ASSERT_EQUAL(1ul, unlimited_channel.size());
// Second insert
limited_channel.send(123456789);
CPPUNIT_ASSERT(!limited_channel.empty());
CPPUNIT_ASSERT_EQUAL(2ul, limited_channel.size());
unlimited_channel.send(-1);
CPPUNIT_ASSERT(!unlimited_channel.empty());
CPPUNIT_ASSERT_EQUAL(2ul, unlimited_channel.size());
}
void
ChannelTest::receiveTest()
{
Channel<int> channel;
int first = 1, second = 2, third = 3;
channel.send(first);
channel.send(second);
CPPUNIT_ASSERT(!channel.empty() && 2ul == channel.size());
int v = 0;
CPPUNIT_ASSERT_NO_THROW(channel.receive(v, timeout));
CPPUNIT_ASSERT_EQUAL(v, first);
CPPUNIT_ASSERT_EQUAL(1ul, channel.size());
CPPUNIT_ASSERT(!channel.empty());
CPPUNIT_ASSERT_NO_THROW(channel.receive(v, timeout));
CPPUNIT_ASSERT_EQUAL(v, second);
CPPUNIT_ASSERT_EQUAL(0ul, channel.size());
CPPUNIT_ASSERT(channel.empty());
channel.send(third);
CPPUNIT_ASSERT(!channel.empty() && 1ul == channel.size());
v = channel.receive();
CPPUNIT_ASSERT_EQUAL(v, third);
CPPUNIT_ASSERT_EQUAL(0ul, channel.size());
CPPUNIT_ASSERT(channel.empty());
}
void
ChannelTest::simpleUnlimitedTest()
{
Channel<int> c;
std::array<int, 3> values = {1, 2, 3};
c.send(&values[0], values.size()); // this API exists only for unlimited-version
}
void
ChannelTest::flushTest()
{
Channel<int> c;
c.send(1); c.send(1); c.send(1);
CPPUNIT_ASSERT(!c.empty() && 3ul == c.size());
auto queue = c.flush();
bool correct = true;
while (!queue.empty()) {
correct &= queue.front() == 1;
queue.pop();
}
CPPUNIT_ASSERT_MESSAGE("invalid items returned by Channel::flush()", correct);
CPPUNIT_ASSERT(c.empty());
CPPUNIT_ASSERT_EQUAL(0ul, c.size());
}
//==============================================================================
// Classical concurrential problem: dinning philosophers
// https://en.wikipedia.org/wiki/Dining_philosophers_problem
template <int N>
struct DinningTable
{
// Per-fork pick-up order channel (entry = philosopher #)
Channel<int> pickUpChannels[N];
// Per-fork put-down order channel (entry = philosopher #)
Channel<int> putDownChannels[N];
};
template <int N>
void
philosopher_job(DinningTable<N>& table, int i)
{
// Think -> pick-up Left+Right forks -> eat -> put-down forks
if (i) {
table.pickUpChannels[i].send_emplace(i);
table.pickUpChannels[(i + 1) % N].send_emplace(i);
} else {
// For the fist one, we swap fork picking order
table.pickUpChannels[(i + 1) % N].send_emplace(i);
table.pickUpChannels[i].send_emplace(i);
}
table.putDownChannels[i].send_emplace(i);
table.putDownChannels[(i + 1) % N].send_emplace(i);
}
template <int N>
void
fork_job(DinningTable<N>& table, int i)
{
// A fork wait to be pick-up first, then to be put-down
table.pickUpChannels[i].receive(timeout);
table.putDownChannels[i].receive(timeout);
// In this configuration a fork could be used twice at least
table.pickUpChannels[i].receive(timeout);
table.putDownChannels[i].receive(timeout);
}
void
ChannelTest::dinningPhilosophersTest()
{
constexpr int N = 5; // For the classical description of the problem
DinningTable<N> table;
// Dress-up the table first :-)
std::future<void> forks[N];
for (int i = 0; i < N; i++)
forks[i] = std::async(std::launch::async,
[&table, i]{ fork_job<N>(table, i); });
// Then invite philosophers...
std::future<void> philosophers[N];
for (int i = 0; i < N; i++)
philosophers[i] = std::async(std::launch::async,
[&table, i]{ philosopher_job<N>(table, i); });
}
}} // namespace ring::test
RING_TEST_RUNNER(ring::test::ChannelTest::name());
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment