diff --git a/src/Makefile.am b/src/Makefile.am index f39d30bfb99fe980ac1514707fa5b8aebc44e466..679e2bc1ee26220a56bcc07793fa887968e47215 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -142,7 +142,8 @@ libring_la_SOURCES = \ base64.h \ base64.cpp \ turn_transport.h \ - turn_transport.cpp + turn_transport.cpp \ + channel.h if HAVE_WIN32 libring_la_SOURCES += \ diff --git a/src/channel.h b/src/channel.h new file mode 100644 index 0000000000000000000000000000000000000000..aa2fcf6c504f26fb67f7b9c8702a84c74ac01edf --- /dev/null +++ b/src/channel.h @@ -0,0 +1,208 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include <mutex> +#include <queue> +#include <condition_variable> +#include <stdexcept> +#include "logger.h" + +/// +/// \file Channel is a synchronized queue to share data between threads. +/// +/// This is a C++11-ish class that mimic Python "queue" module and/or Go "Channel" type. +/// + +namespace ring +{ + +class ChannelEmpty : public std::exception{ +public: + const char* what() const noexcept { return "channel empty"; } +}; + +class ChannelFull : public std::exception { +public: + const char* what() const noexcept { return "channel full"; } +}; + +namespace detail { + +template <typename T, std::size_t N=0> +class _ChannelBase { +public: + using value_type = T; + const std::size_t max_size = N; ///< maximal size of the Channel, 0 means no size limits + + // Pop operations + + template <typename Duration> + T receive(Duration timeout) { + std::unique_lock<std::mutex> lk {mutex_}; + if (!cv_.wait_for(lk, timeout, [this]{ return !queue_.empty(); })) + throw ChannelEmpty(); + auto value = std::move(queue_.front()); + queue_.pop(); + return value; + } + + template <typename Duration> + void receive(T& value, Duration timeout) { + value = receive(timeout); + } + + T receive() { + std::unique_lock<std::mutex> lk {mutex_}; + if (queue_.empty()) + throw ChannelEmpty(); + auto value = std::move(queue_.front()); + queue_.pop(); + return value; + } + + T receive_wait() { + std::unique_lock<std::mutex> lk {mutex_}; + cv_.wait(lk, [this]{ return !queue_.empty(); }); + auto value = std::move(queue_.front()); + queue_.pop(); + return value; + } + + void receive_wait(T& value) { + value = receive_wait(); + } + + void wait() { + std::unique_lock<std::mutex> lk {mutex_}; + cv_.wait(lk, [this]{ return !queue_.empty(); }); + } + + void operator >>(T& value) { + receive_wait(value); + } + + std::queue<T> flush() { + std::unique_lock<std::mutex> lk {mutex_}; + std::queue<T> result; + std::swap(queue_, result); + return result; + } + + // Capacity operation + + std::size_t size() const { + std::lock_guard<std::mutex> lk {mutex_}; + return queue_.size(); + } + + bool empty() const { + std::lock_guard<std::mutex> lk {mutex_}; + return queue_.empty(); + } + +protected: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::queue<T> queue_; +}; + +} // namespace detail + +/// +/// Generic implementation +/// +template <typename T, std::size_t N=0> +class Channel : public detail::_ChannelBase<T, N> { +public: + using base = detail::_ChannelBase<T, N>; + using base::mutex_; + using base::cv_; + using base::queue_; + + template <typename U> + void send(U&& value) { + std::lock_guard<std::mutex> lk {mutex_}; + if (queue_.size() < N) { + queue_.push(std::forward<U>(value)); + cv_.notify_one(); + return; + } + throw ChannelFull(); + } + + template <typename... Args> + void send_emplace(Args&&... args) { + std::lock_guard<std::mutex> lk {mutex_}; + if (queue_.size() < N) { + queue_.emplace(std::forward<Args>(args)...); + cv_.notify_one(); + return; + } + throw ChannelFull(); + } + + void operator <<(const T& value) { + push(value); + } + + void operator <<(T&& value) { + push(std::move(value)); + } +}; + +/// +/// Optimized implementations for unlimited channel (N=0) +/// +template <typename T> +class Channel<T> : public detail::_ChannelBase<T> { +public: + using base = detail::_ChannelBase<T>; + using base::mutex_; + using base::cv_; + using base::queue_; + + template <typename U> + void send(U&& value) { + std::lock_guard<std::mutex> lk {mutex_}; + queue_.push(std::forward<U>(value)); + cv_.notify_one(); + } + + template <typename... Args> + void send_emplace(Args&&... args) { + std::lock_guard<std::mutex> lk {mutex_}; + queue_.emplace(std::forward<Args>(args)...); + cv_.notify_one(); + } + + /// /note This method exists only for unlimited channel + void send(const T* data, std::size_t len) { + std::lock_guard<std::mutex> lk {mutex_}; + while (len > 0) { + queue_.push(*(data++)); + --len; + } + cv_.notify_one(); + } +}; + +} // namespace ring diff --git a/test/unitTest/Makefile.am b/test/unitTest/Makefile.am index 8125878d057b20ea26d2f6bd9ed14b53c29c1144..51e8ed8c1cc911449b27ed5a42b6ae4ce42c852f 100644 --- a/test/unitTest/Makefile.am +++ b/test/unitTest/Makefile.am @@ -19,6 +19,12 @@ ut_account_factory_SOURCES = account_factory/testAccount_factory.cpp check_PROGRAMS += ut_base64 ut_base64_SOURCES = base64/base64.cpp +# +# channel +# +check_PROGRAMS += ut_channel +ut_channel_SOURCES = channel/testChannel.cpp + # # map_utils # diff --git a/test/unitTest/channel/testChannel.cpp b/test/unitTest/channel/testChannel.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fe19f515ef8027a5a821a2f90bf4bf60b7cec90b --- /dev/null +++ b/test/unitTest/channel/testChannel.cpp @@ -0,0 +1,226 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include <cppunit/TestAssert.h> +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include <future> +#include <algorithm> + +#include "test_runner.h" + +#include "channel.h" + +namespace ring { namespace test { + +// During a receive operation we should not wait more than this timeout, +// otherwise it would indicate a thread starvation. +static const std::chrono::seconds timeout {1}; + +class ChannelTest : public CppUnit::TestFixture { +public: + static std::string name() { return "channel"; } + +private: + void emptyStateTest(); + void sendTest(); + void receiveTest(); + void simpleUnlimitedTest(); + void flushTest(); + void dinningPhilosophersTest(); + + CPPUNIT_TEST_SUITE(ChannelTest); + CPPUNIT_TEST(emptyStateTest); + CPPUNIT_TEST(sendTest); + CPPUNIT_TEST(receiveTest); + CPPUNIT_TEST(simpleUnlimitedTest); + CPPUNIT_TEST(flushTest); + CPPUNIT_TEST(dinningPhilosophersTest); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(ChannelTest, ChannelTest::name()); + +//============================================================================== + +void +ChannelTest::emptyStateTest() +{ + Channel<int, 1> limited_channel; + Channel<int> unlimited_channel; + + CPPUNIT_ASSERT(limited_channel.empty()); + CPPUNIT_ASSERT(limited_channel.size() == 0); + CPPUNIT_ASSERT(unlimited_channel.empty()); + CPPUNIT_ASSERT(unlimited_channel.size() == 0); + + CPPUNIT_ASSERT_THROW(limited_channel.receive(timeout), ring::ChannelEmpty); + CPPUNIT_ASSERT_THROW(unlimited_channel.receive(timeout), ring::ChannelEmpty); +} + +void +ChannelTest::sendTest() +{ + Channel<int, 2> limited_channel; + Channel<int> unlimited_channel; + + // First insert + limited_channel.send(42); + CPPUNIT_ASSERT(!limited_channel.empty()); + CPPUNIT_ASSERT_EQUAL(1ul, limited_channel.size()); + + unlimited_channel.send(999); + CPPUNIT_ASSERT(!unlimited_channel.empty()); + CPPUNIT_ASSERT_EQUAL(1ul, unlimited_channel.size()); + + // Second insert + limited_channel.send(123456789); + CPPUNIT_ASSERT(!limited_channel.empty()); + CPPUNIT_ASSERT_EQUAL(2ul, limited_channel.size()); + + unlimited_channel.send(-1); + CPPUNIT_ASSERT(!unlimited_channel.empty()); + CPPUNIT_ASSERT_EQUAL(2ul, unlimited_channel.size()); +} + +void +ChannelTest::receiveTest() +{ + Channel<int> channel; + int first = 1, second = 2, third = 3; + + channel.send(first); + channel.send(second); + CPPUNIT_ASSERT(!channel.empty() && 2ul == channel.size()); + + int v = 0; + CPPUNIT_ASSERT_NO_THROW(channel.receive(v, timeout)); + CPPUNIT_ASSERT_EQUAL(v, first); + CPPUNIT_ASSERT_EQUAL(1ul, channel.size()); + CPPUNIT_ASSERT(!channel.empty()); + CPPUNIT_ASSERT_NO_THROW(channel.receive(v, timeout)); + CPPUNIT_ASSERT_EQUAL(v, second); + CPPUNIT_ASSERT_EQUAL(0ul, channel.size()); + CPPUNIT_ASSERT(channel.empty()); + + channel.send(third); + CPPUNIT_ASSERT(!channel.empty() && 1ul == channel.size()); + v = channel.receive(); + CPPUNIT_ASSERT_EQUAL(v, third); + CPPUNIT_ASSERT_EQUAL(0ul, channel.size()); + CPPUNIT_ASSERT(channel.empty()); +} + +void +ChannelTest::simpleUnlimitedTest() +{ + Channel<int> c; + std::array<int, 3> values = {1, 2, 3}; + c.send(&values[0], values.size()); // this API exists only for unlimited-version +} + +void +ChannelTest::flushTest() +{ + Channel<int> c; + c.send(1); c.send(1); c.send(1); + CPPUNIT_ASSERT(!c.empty() && 3ul == c.size()); + + auto queue = c.flush(); + bool correct = true; + while (!queue.empty()) { + correct &= queue.front() == 1; + queue.pop(); + } + CPPUNIT_ASSERT_MESSAGE("invalid items returned by Channel::flush()", correct); + CPPUNIT_ASSERT(c.empty()); + CPPUNIT_ASSERT_EQUAL(0ul, c.size()); +} + +//============================================================================== + +// Classical concurrential problem: dinning philosophers +// https://en.wikipedia.org/wiki/Dining_philosophers_problem + +template <int N> +struct DinningTable +{ + // Per-fork pick-up order channel (entry = philosopher #) + Channel<int> pickUpChannels[N]; + + // Per-fork put-down order channel (entry = philosopher #) + Channel<int> putDownChannels[N]; +}; + +template <int N> +void +philosopher_job(DinningTable<N>& table, int i) +{ + // Think -> pick-up Left+Right forks -> eat -> put-down forks + + if (i) { + table.pickUpChannels[i].send_emplace(i); + table.pickUpChannels[(i + 1) % N].send_emplace(i); + } else { + // For the fist one, we swap fork picking order + table.pickUpChannels[(i + 1) % N].send_emplace(i); + table.pickUpChannels[i].send_emplace(i); + } + + table.putDownChannels[i].send_emplace(i); + table.putDownChannels[(i + 1) % N].send_emplace(i); +} + +template <int N> +void +fork_job(DinningTable<N>& table, int i) +{ + // A fork wait to be pick-up first, then to be put-down + table.pickUpChannels[i].receive(timeout); + table.putDownChannels[i].receive(timeout); + + // In this configuration a fork could be used twice at least + table.pickUpChannels[i].receive(timeout); + table.putDownChannels[i].receive(timeout); +} + +void +ChannelTest::dinningPhilosophersTest() +{ + constexpr int N = 5; // For the classical description of the problem + DinningTable<N> table; + + // Dress-up the table first :-) + std::future<void> forks[N]; + for (int i = 0; i < N; i++) + forks[i] = std::async(std::launch::async, + [&table, i]{ fork_job<N>(table, i); }); + + // Then invite philosophers... + std::future<void> philosophers[N]; + for (int i = 0; i < N; i++) + philosophers[i] = std::async(std::launch::async, + [&table, i]{ philosopher_job<N>(table, i); }); +} + +}} // namespace ring::test + +RING_TEST_RUNNER(ring::test::ChannelTest::name());