Skip to content
Snippets Groups Projects
Commit a760dd17 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

cleanup: remove channel.h

Change-Id: Iffce994fb894b0bf8612004526ae9c6d57a49f23
parent 64d5a670
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,6 @@ list (APPEND Source_Files
"${CMAKE_CURRENT_SOURCE_DIR}/call.h"
"${CMAKE_CURRENT_SOURCE_DIR}/call_factory.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/call_factory.h"
"${CMAKE_CURRENT_SOURCE_DIR}/channel.h"
"${CMAKE_CURRENT_SOURCE_DIR}/compiler_intrinsics.h"
"${CMAKE_CURRENT_SOURCE_DIR}/conference.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/conference.h"
......
......@@ -164,7 +164,6 @@ libring_la_SOURCES = \
smartools.h \
base64.h \
base64.cpp \
channel.h \
peer_connection.cpp \
peer_connection.h \
data_transfer.cpp \
......
/*
* Copyright (C) 2004-2020 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include <mutex>
#include <queue>
#include <condition_variable>
#include <stdexcept>
#include "logger.h"
///
/// \file Channel is a synchronized queue to share data between threads.
///
/// This is a C++11-ish class that mimic Python "queue" module and/or Go "Channel" type.
///
namespace jami {
class ChannelEmpty : public std::exception
{
public:
const char* what() const noexcept { return "channel empty"; }
};
class ChannelFull : public std::exception
{
public:
const char* what() const noexcept { return "channel full"; }
};
namespace detail {
template<typename T, std::size_t N = 0>
class _ChannelBase
{
public:
using value_type = T;
const std::size_t max_size = N; ///< maximal size of the Channel, 0 means no size limits
// Pop operations
template<typename Duration>
T receive(Duration timeout)
{
std::unique_lock<std::mutex> lk {mutex_};
if (!cv_.wait_for(lk, timeout, [this] { return !queue_.empty(); }))
throw ChannelEmpty();
auto value = std::move(queue_.front());
queue_.pop();
return value;
}
template<typename Duration>
void receive(T& value, Duration timeout)
{
value = receive(timeout);
}
T receive()
{
std::unique_lock<std::mutex> lk {mutex_};
if (queue_.empty())
throw ChannelEmpty();
auto value = std::move(queue_.front());
queue_.pop();
return value;
}
T receive_wait()
{
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [this] { return !queue_.empty(); });
auto value = std::move(queue_.front());
queue_.pop();
return value;
}
void receive_wait(T& value) { value = receive_wait(); }
void wait()
{
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [this] { return !queue_.empty(); });
}
void operator>>(T& value) { receive_wait(value); }
std::queue<T> flush()
{
std::unique_lock<std::mutex> lk {mutex_};
std::queue<T> result;
std::swap(queue_, result);
return result;
}
// Capacity operation
std::size_t size() const
{
std::lock_guard<std::mutex> lk {mutex_};
return queue_.size();
}
bool empty() const
{
std::lock_guard<std::mutex> lk {mutex_};
return queue_.empty();
}
protected:
mutable std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
};
} // namespace detail
///
/// Generic implementation
///
template<typename T, std::size_t N = 0>
class Channel : public detail::_ChannelBase<T, N>
{
public:
using base = detail::_ChannelBase<T, N>;
using base::mutex_;
using base::cv_;
using base::queue_;
template<typename U>
void send(U&& value)
{
std::lock_guard<std::mutex> lk {mutex_};
if (queue_.size() < N) {
queue_.push(std::forward<U>(value));
cv_.notify_one();
return;
}
throw ChannelFull();
}
template<typename... Args>
void send_emplace(Args&&... args)
{
std::lock_guard<std::mutex> lk {mutex_};
if (queue_.size() < N) {
queue_.emplace(std::forward<Args>(args)...);
cv_.notify_one();
return;
}
throw ChannelFull();
}
template<typename U>
void operator<<(U&& value)
{
send(std::forward<U>(value));
}
};
///
/// Optimized implementations for unlimited channel (N=0)
///
template<typename T>
class Channel<T> : public detail::_ChannelBase<T>
{
public:
using base = detail::_ChannelBase<T>;
using base::mutex_;
using base::cv_;
using base::queue_;
template<typename U>
void send(U&& value)
{
std::lock_guard<std::mutex> lk {mutex_};
queue_.push(std::forward<U>(value));
cv_.notify_one();
}
template<typename... Args>
void send_emplace(Args&&... args)
{
std::lock_guard<std::mutex> lk {mutex_};
queue_.emplace(std::forward<Args>(args)...);
cv_.notify_one();
}
/// \note This method exists only for unlimited channel
void send(const T* data, std::size_t len)
{
std::lock_guard<std::mutex> lk {mutex_};
while (len > 0) {
queue_.push(*(data++));
--len;
}
cv_.notify_one();
}
template<typename U>
void operator<<(U&& value)
{
send(std::forward<U>(value));
}
};
} // namespace jami
......@@ -23,7 +23,6 @@
#include "account_schema.h"
#include "jamiaccount.h"
#include "channel.h"
#include "ice_transport.h"
#include "ftp_server.h"
#include "manager.h"
......
......@@ -25,7 +25,6 @@
#include "manager.h"
#include "jamidht/jamiaccount.h"
#include "string_utils.h"
#include "channel.h"
#include "security/tls_session.h"
#include <algorithm>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment