Commit 09b6db3a authored by Adrien Béraud's avatar Adrien Béraud

peer channel: cleanup, refactor

Change-Id: If3609715ffe43f4a4bac391301a48cba7cdb33c3
parent 4325a016
......@@ -145,7 +145,8 @@ libring_la_SOURCES = \
ftp_server.h \
generic_io.h \
scheduled_executor.h \
scheduled_executor.cpp
scheduled_executor.cpp \
transport/peer_channel.h
if HAVE_WIN32
libring_la_SOURCES += \
......
......@@ -24,6 +24,7 @@
#include "sip/sip_utils.h"
#include "manager.h"
#include "upnp/upnp_control.h"
#include "transport/peer_channel.h"
#include <pjlib.h>
......@@ -69,75 +70,6 @@ struct IceSTransDeleter
}
};
class PeerChannel {
public:
PeerChannel() {}
~PeerChannel() { stop(); }
PeerChannel(PeerChannel &&o) {
MutexGuard lk{o.mutex_};
stream_ = std::move(o.stream_);
}
PeerChannel &operator=(PeerChannel &&o) {
std::lock(mutex_, o.mutex_);
MutexGuard lk1{mutex_, std::adopt_lock};
MutexGuard lk2{o.mutex_, std::adopt_lock};
stream_ = std::move(o.stream_);
return *this;
}
void operator<<(const std::string &data) {
MutexGuard lk{mutex_};
stream_.clear();
stream_ << data;
cv_.notify_one();
}
ssize_t isDataAvailable() {
MutexGuard lk{mutex_};
auto pos = stream_.tellg();
stream_.seekg(0, std::ios_base::end);
auto available = (stream_.tellg() - pos);
stream_.seekg(pos);
return available;
}
template <typename Duration> bool wait(Duration timeout) {
MutexLock lk{mutex_};
auto a = cv_.wait_for(lk, timeout,
[this] { return stop_ or !stream_.eof(); });
return a;
}
std::size_t read(char *output, std::size_t size) {
MutexLock lk{mutex_};
if (stream_.eof()) return 0;
cv_.wait(lk, [&, this] {
if (stop_)
return true;
stream_.read(&output[0], size);
return stream_.gcount() > 0;
});
return stop_ ? 0 : stream_.gcount();
}
void stop() noexcept {
stop_ = true;
cv_.notify_all();
}
private:
PeerChannel(const PeerChannel &o) = delete;
PeerChannel &operator=(const PeerChannel &o) = delete;
std::mutex mutex_{};
std::condition_variable cv_{};
std::stringstream stream_{};
bool stop_{false};
friend void operator<<(std::vector<char> &, PeerChannel &);
};
} // namespace <anonymous>
//==============================================================================
......@@ -850,13 +782,13 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size)
auto& io = compIO_[comp_id-1];
std::unique_lock<std::mutex> lk(io.mutex);
if (on_recv_cb_) {
on_recv_cb_();
on_recv_cb_();
}
if (io.cb) {
io.cb((uint8_t*)pkt, size);
} else {
peerChannels_.at(comp_id-1) << std::string(reinterpret_cast<const char *>(pkt), size);
peerChannels_.at(comp_id-1).write((char*)pkt, size);
}
}
......
......@@ -697,10 +697,13 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
certMap_.emplace(cert->getId(), std::make_pair(cert, peer_h));
auto sendRelayV4 = false, sendRelayV6 = false, sendIce = false, hasPubIp = false;
std::shared_ptr<bool> iceReady = std::make_shared<bool>(false);
std::shared_ptr<std::condition_variable> cv =
std::make_shared < std::condition_variable>();
struct IceReady {
std::mutex mtx {};
std::condition_variable cv {};
bool ready {false};
};
auto iceReady = std::make_shared<IceReady>();
for (auto& ip: request.addresses) {
try {
if (IpAddr(ip).isIpv4()) {
......@@ -720,9 +723,11 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
auto ice_config = account.getIceOptions();
ice_config.tcpEnable = true;
ice_config.aggressive = true;
ice_config.onRecvReady = [this, cv, iceReady]() {
if (iceReady) *iceReady = true;
if (cv) cv->notify_one();
ice_config.onRecvReady = [iceReady]() {
auto& ir = *iceReady;
std::lock_guard<std::mutex> lk{ir.mtx};
ir.ready = true;
ir.cv.notify_one();
};
ice_ = iceTransportFactory.createTransport(account.getAccountID().c_str(), 1, true, ice_config);
......@@ -804,12 +809,10 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
}
}
std::mutex mtx;
std::unique_lock<std::mutex> lk{mtx};
if (!*iceReady) {
if (not iceReady->ready) {
if (!hasPubIp) ice_->setSlaveSession();
cv->wait_for(lk, ICE_READY_TIMEOUT);
if (!*iceReady) {
std::unique_lock<std::mutex> lk {iceReady->mtx};
if (not iceReady->cv.wait_for(lk, ICE_READY_TIMEOUT, [&]{ return iceReady->ready; })) {
// This will fallback on TURN if ICE is not ready
return;
}
......
......@@ -41,7 +41,7 @@ template <typename T>
class Observable
{
public:
Observable() : observers_(), mutex_() {}
Observable() : mutex_(), observers_() {}
virtual ~Observable() {
std::lock_guard<std::mutex> lk(mutex_);
for (auto& o : observers_)
......@@ -66,22 +66,23 @@ public:
return false;
}
void notify(T data) {
int getObserversCount() {
std::lock_guard<std::mutex> lk(mutex_);
for (auto observer : observers_)
observer->update(this, data);
return observers_.size();
}
int getObserversCount() {
protected:
void notify(T data) {
std::lock_guard<std::mutex> lk(mutex_);
return observers_.size();
for (auto observer : observers_)
observer->update(this, data);
}
private:
NON_COPYABLE(Observable<T>);
std::set<Observer<T>*> observers_;
std::mutex mutex_; // lock observers_
std::set<Observer<T>*> observers_;
};
/*=== Observer =============================================================*/
......
......@@ -86,7 +86,7 @@ init_crt(gnutls_session_t session, dht::crypto::Certificate& crt)
using lock = std::lock_guard<std::mutex>;
static constexpr std::size_t IO_BUFFER_SIZE {3000}; ///< Size of char buffer used by IO operations
static constexpr std::size_t IO_BUFFER_SIZE {8192}; ///< Size of char buffer used by IO operations
//==============================================================================
......
/*
* Copyright (C) 2019 Savoir-faire Linux Inc.
* Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <mutex>
#include <condition_variable>
#include <sstream>
namespace jami {
class PeerChannel
{
public:
PeerChannel() {}
~PeerChannel() {
stop();
}
PeerChannel(PeerChannel&& o) {
std::lock_guard<std::mutex> lk(o.mutex_);
stream_ = std::move(o.stream_);
stop_ = o.stop_;
}
ssize_t isDataAvailable() {
std::lock_guard<std::mutex> lk{mutex_};
auto pos = stream_.tellg();
stream_.seekg(0, std::ios_base::end);
auto available = (stream_.tellg() - pos);
stream_.seekg(pos);
return available;
}
template <typename Duration>
bool wait(Duration timeout) {
std::unique_lock<std::mutex> lk {mutex_};
return cv_.wait_for(lk, timeout, [this]{ return stop_ or !stream_.eof(); });
}
std::size_t read(char* output, std::size_t size) {
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [&, this]{
if (stop_)
return true;
stream_.read(&output[0], size);
return stream_.gcount() > 0;
});
return stop_ ? 0 : stream_.gcount();
}
void write(const char* data, std::size_t size) {
std::lock_guard<std::mutex> lk {mutex_};
stream_.clear();
stream_.write(data, size);
cv_.notify_one();
}
void stop() noexcept {
std::lock_guard<std::mutex> lk {mutex_};
if (stop_)
return;
stop_ = true;
cv_.notify_all();
}
private:
PeerChannel(const PeerChannel& o) = delete;
PeerChannel& operator=(const PeerChannel& o) = delete;
PeerChannel& operator=(PeerChannel&& o) = delete;
std::mutex mutex_ {};
std::condition_variable cv_ {};
std::stringstream stream_ {};
bool stop_ {false};
friend void operator <<(std::vector<char>&, PeerChannel&);
};
}
......@@ -19,6 +19,7 @@
*/
#include "turn_transport.h"
#include "transport/peer_channel.h"
#include "logger.h"
#include "ip_utils.h"
......@@ -55,80 +56,6 @@ enum class RelayState
DOWN,
};
class PeerChannel
{
public:
PeerChannel() {}
~PeerChannel() {
stop();
}
PeerChannel(PeerChannel&&o) {
MutexGuard lk {o.mutex_};
stream_ = std::move(o.stream_);
}
PeerChannel& operator =(PeerChannel&& o) {
std::lock(mutex_, o.mutex_);
MutexGuard lk1 {mutex_, std::adopt_lock};
MutexGuard lk2 {o.mutex_, std::adopt_lock};
stream_ = std::move(o.stream_);
return *this;
}
void operator <<(const std::string& data) {
MutexGuard lk {mutex_};
stream_.clear();
stream_ << data;
cv_.notify_one();
}
template <typename Duration>
bool wait(Duration timeout) {
std::lock(apiMutex_, mutex_);
MutexGuard lk_api {apiMutex_, std::adopt_lock};
MutexLock lk {mutex_, std::adopt_lock};
return cv_.wait_for(lk, timeout, [this]{ return stop_ or !stream_.eof(); });
}
std::size_t read(char* output, std::size_t size) {
std::lock(apiMutex_, mutex_);
MutexGuard lk_api {apiMutex_, std::adopt_lock};
MutexLock lk {mutex_, std::adopt_lock};
cv_.wait(lk, [&, this]{
if (stop_)
return true;
stream_.read(&output[0], size);
return stream_.gcount() > 0;
});
return stop_ ? 0 : stream_.gcount();
}
void stop() noexcept {
{
MutexGuard lk {mutex_};
if (stop_)
return;
stop_ = true;
}
cv_.notify_all();
// Make sure that no thread is blocked into read() or wait() methods
MutexGuard lk_api {apiMutex_};
}
private:
PeerChannel(const PeerChannel&o) = delete;
PeerChannel& operator =(const PeerChannel& o) = delete;
std::mutex apiMutex_ {};
std::mutex mutex_ {};
std::condition_variable cv_ {};
std::stringstream stream_ {};
bool stop_ {false};
friend void operator <<(std::vector<char>&, PeerChannel&);
};
}
//==============================================================================
......@@ -169,7 +96,6 @@ public:
std::map<IpAddr, PeerChannel> peerChannels_;
GenericSocket<uint8_t>::RecvCb onRxDataCb;
TurnTransportParams settings;
pj_caching_pool poolCache {};
pj_pool_t* pool {nullptr};
......@@ -223,19 +149,13 @@ TurnTransportPimpl::onRxData(const uint8_t* pkt, unsigned pkt_len,
const pj_sockaddr_t* addr, unsigned addr_len)
{
IpAddr peer_addr (*static_cast<const pj_sockaddr*>(addr), addr_len);
decltype(peerChannels_)::iterator channel_it;
{
MutexGuard lk {apiMutex_};
channel_it = peerChannels_.find(peer_addr);
auto channel_it = peerChannels_.find(peer_addr);
if (channel_it == std::end(peerChannels_))
return;
channel_it->second.write((const char*)pkt, pkt_len);
}
if (onRxDataCb)
onRxDataCb(pkt, pkt_len);
else
(channel_it->second) << std::string(reinterpret_cast<const char*>(pkt), pkt_len);
}
void
......@@ -250,7 +170,7 @@ TurnTransportPimpl::onPeerConnection(pj_uint32_t conn_id,
<< conn_id;
{
MutexGuard lk {apiMutex_};
peerChannels_.emplace(peer_addr, PeerChannel {});
peerChannels_[peer_addr];
}
}
......@@ -368,7 +288,6 @@ TurnTransport::shutdown(const IpAddr& addr)
{
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(addr);
lk.unlock();
channel.stop();
}
......
......@@ -168,7 +168,6 @@ public:
private:
TurnTransport& turn_;
const IpAddr peer_;
RecvCb onRxDataCb_;
};
} // namespace jami
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment