Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
peer_connection.cpp 25.65 KiB
/*
 *  Copyright (C) 2017-2019 Savoir-faire Linux Inc.
 *
 *  Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
 *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
 */

#include "peer_connection.h"

#include "data_transfer.h"
#include "manager.h"
#include "jamidht/jamiaccount.h"
#include "string_utils.h"
#include "channel.h"
#include "turn_transport.h"
#include "security/tls_session.h"

#include <algorithm>
#include <future>
#include <vector>
#include <atomic>
#include <stdexcept>
#include <istream>
#include <ostream>
#include <unistd.h>
#include <cstdio>

#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/select.h>
#endif

#ifndef _MSC_VER
#include <sys/time.h>
#endif

namespace jami {

int
init_crt(gnutls_session_t session, dht::crypto::Certificate& crt)
{
    // Support only x509 format
    if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) {
        return GNUTLS_E_CERTIFICATE_ERROR;
    }

    // Store verification status
    unsigned int status = 0;
    auto ret = gnutls_certificate_verify_peers2(session, &status);
    if (ret < 0 or (status & GNUTLS_CERT_SIGNATURE_FAILURE) != 0) {
        return GNUTLS_E_CERTIFICATE_ERROR;
    }

    unsigned int cert_list_size = 0;
    auto cert_list = gnutls_certificate_get_peers(session, &cert_list_size);
    if (cert_list == nullptr) {
        return GNUTLS_E_CERTIFICATE_ERROR;
    }

    // Check if received peer certificate is awaited
    std::vector<std::pair<uint8_t *, uint8_t *>> crt_data;
    crt_data.reserve(cert_list_size);
    for (unsigned i = 0; i < cert_list_size; i++)
        crt_data.emplace_back(cert_list[i].data,
                            cert_list[i].data + cert_list[i].size);
    crt = dht::crypto::Certificate{crt_data};

    return GNUTLS_E_SUCCESS;
}

using lock = std::lock_guard<std::mutex>;

static constexpr std::size_t IO_BUFFER_SIZE {8192}; ///< Size of char buffer used by IO operations

//==============================================================================

class TlsTurnEndpoint::Impl
{
public:
    static constexpr auto TLS_TIMEOUT = std::chrono::seconds(20);

    Impl(ConnectedTurnTransport& tr,
         std::function<bool(const dht::crypto::Certificate&)>&& cert_check)
        : turn {tr}, peerCertificateCheckFunc {std::move(cert_check)} {}

    ~Impl();

    // TLS callbacks
    int verifyCertificate(gnutls_session_t);
    void onTlsStateChange(tls::TlsSessionState);
    void onTlsRxData(std::vector<uint8_t>&&);
    void onTlsCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int);

    std::unique_ptr<tls::TlsSession> tls;
    ConnectedTurnTransport& turn;
    std::function<bool(const dht::crypto::Certificate&)> peerCertificateCheckFunc;
    dht::crypto::Certificate peerCertificate;
};

// Declaration at namespace scope is necessary (until C++17)
constexpr std::chrono::seconds TlsTurnEndpoint::Impl::TLS_TIMEOUT;

TlsTurnEndpoint::Impl::~Impl()
{}

int
TlsTurnEndpoint::Impl::verifyCertificate(gnutls_session_t session)
{
    dht::crypto::Certificate crt;
    auto verified = init_crt(session, crt);
    if (verified != GNUTLS_E_SUCCESS) return verified;

    if (!peerCertificateCheckFunc(crt))
        return GNUTLS_E_CERTIFICATE_ERROR;

    peerCertificate = std::move(crt);

    return GNUTLS_E_SUCCESS;
}

void
TlsTurnEndpoint::Impl::onTlsStateChange(tls::TlsSessionState)
{}
void
TlsTurnEndpoint::Impl::onTlsRxData(UNUSED std::vector<uint8_t>&& buf)
{
    JAMI_ERR() << "[TLS-TURN] rx " << buf.size() << " (but not implemented)";
}

void
TlsTurnEndpoint::Impl::onTlsCertificatesUpdate(UNUSED const gnutls_datum_t* local_raw,
                                               UNUSED const gnutls_datum_t* remote_raw,
                                               UNUSED unsigned int remote_count)
{}

TlsTurnEndpoint::TlsTurnEndpoint(ConnectedTurnTransport& turn_ep,
                                 const Identity& local_identity,
                                 const std::shared_future<tls::DhParams>& dh_params,
                                 std::function<bool(const dht::crypto::Certificate&)>&& cert_check)
    : pimpl_ { std::make_unique<Impl>(turn_ep, std::move(cert_check)) }
{
    // Add TLS over TURN
    tls::TlsSession::TlsSessionCallbacks tls_cbs = {
        /*.onStateChange = */[this](tls::TlsSessionState state){ pimpl_->onTlsStateChange(state); },
        /*.onRxData = */[this](std::vector<uint8_t>&& buf){ pimpl_->onTlsRxData(std::move(buf)); },
        /*.onCertificatesUpdate = */[this](const gnutls_datum_t* l, const gnutls_datum_t* r,
                                           unsigned int n){ pimpl_->onTlsCertificatesUpdate(l, r, n); },
        /*.verifyCertificate = */[this](gnutls_session_t session){ return pimpl_->verifyCertificate(session); }
    };
    tls::TlsParams tls_param = {
        /*.ca_list = */     "",
        /*.peer_ca = */     nullptr,
        /*.cert = */        local_identity.second,
        /*.cert_key = */    local_identity.first,
        /*.dh_params = */   dh_params,
        /*.timeout = */     Impl::TLS_TIMEOUT,
        /*.cert_check = */  nullptr,
    };
    pimpl_->tls = std::make_unique<tls::TlsSession>(turn_ep, tls_param, tls_cbs);
}

TlsTurnEndpoint::~TlsTurnEndpoint() = default;

void
TlsTurnEndpoint::shutdown()
{
    pimpl_->tls->shutdown();
}

bool
TlsTurnEndpoint::isInitiator() const
{
    return pimpl_->tls->isInitiator();
}

void
TlsTurnEndpoint::waitForReady(const std::chrono::steady_clock::duration& timeout)
{
    pimpl_->tls->waitForReady(timeout);
}

int
TlsTurnEndpoint::maxPayload() const
{
    return pimpl_->tls->maxPayload();
}

std::size_t
TlsTurnEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
    return pimpl_->tls->read(buf, len, ec);
}
std::size_t
TlsTurnEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec)
{
    return pimpl_->tls->write(buf, len, ec);
}

const dht::crypto::Certificate&
TlsTurnEndpoint::peerCertificate() const
{
    return pimpl_->peerCertificate;
}

int
TlsTurnEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
    return pimpl_->tls->waitForData(timeout, ec);
}

//==============================================================================

TcpSocketEndpoint::TcpSocketEndpoint(const IpAddr& addr)
    : addr_ {addr}
    , sock_{ static_cast<int>(::socket(addr.getFamily(), SOCK_STREAM, 0)) }
{
    if (sock_ < 0)
        std::system_error(errno, std::generic_category());
    auto bound = ip_utils::getAnyHostAddr(addr.getFamily());
    if (::bind(sock_, bound, bound.getLength()) < 0)
        std::system_error(errno, std::generic_category());
}

TcpSocketEndpoint::~TcpSocketEndpoint()
{
#ifndef _MSC_VER
    ::close(sock_);
#else
    ::closesocket(sock_);
#endif
}

void
TcpSocketEndpoint::connect(const std::chrono::milliseconds& timeout)
{
    int ms =  timeout.count();
    setsockopt(sock_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&ms, sizeof(ms));
    setsockopt(sock_, SOL_SOCKET, SO_SNDTIMEO, (const char *)&ms, sizeof(ms));

    if ((::connect(sock_, addr_, addr_.getLength())) < 0)
        throw std::system_error(errno, std::generic_category());
}

int
TcpSocketEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
    for (;;) {
        struct timeval tv;
        tv.tv_sec = timeout.count() / 1000;
        tv.tv_usec = (timeout.count() % 1000) * 1000;

        fd_set read_fds;
        FD_ZERO(&read_fds);
        FD_SET(sock_, &read_fds);

        auto res = ::select(sock_ + 1, &read_fds, nullptr, nullptr, &tv);
        if (res < 0)
            break;
        if (res == 0)
            return 0; // timeout
        if (FD_ISSET(sock_, &read_fds))
            return 1;
    }

    ec.assign(errno, std::generic_category());
    return -1;
}

std::size_t
TcpSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
    // NOTE: recv buf args is a void* on POSIX compliant system, but it's a char* on mingw
    auto res = ::recv(sock_, reinterpret_cast<char*>(buf), len, 0);
    if (res < 0)
        ec.assign(errno, std::generic_category());
    else
        ec.clear();
    return (res >= 0) ? res : 0;
}

std::size_t
TcpSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec)
{
    // NOTE: recv buf args is a void* on POSIX compliant system, but it's a char* on mingw
    auto res = ::send(sock_, reinterpret_cast<const char*>(buf), len, 0);
    if (res < 0)
        ec.assign(errno, std::generic_category());
    else
        ec.clear();
    return (res >= 0) ? res : 0;
}

//==============================================================================

IceSocketEndpoint::IceSocketEndpoint(std::shared_ptr<IceTransport> ice, bool isSender)
    : ice_(std::move(ice)), iceIsSender(isSender)
{}

IceSocketEndpoint::~IceSocketEndpoint()
{
    shutdown();
}

void
IceSocketEndpoint::shutdown() {
    if (ice_) {
        // Sometimes the other peer never send any packet
        // So, we cancel pending read to avoid to have
        // any blocking operation.
        ice_->cancelOperations();
        ice_->stop();
    }
}

int
IceSocketEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
    if (ice_) {
        if (!ice_->isRunning()) return -1;
        return iceIsSender ? ice_->isDataAvailable(compId_) : ice_->waitForData(compId_, timeout, ec);
    }
    return -1;
}

std::size_t
IceSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
    if (ice_) {
        if (!ice_->isRunning()) return 0;
        try {
          auto res = ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len, ec);
          return (res >= 0) ? res : 0;
        } catch (const std::exception &e) {
          JAMI_ERR("IceSocketEndpoint::read exception: %s", e.what());
        }
        return 0;
    }
    return -1;
}

std::size_t
IceSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec)
{
    if (ice_) {
        if (!ice_->isRunning()) return 0;
        auto res = 0;
        res = ice_->send(compId_, reinterpret_cast<const unsigned char *>(buf), len);
        if (res < 0) {
            ec.assign(errno, std::generic_category());
        } else {
            ec.clear();
        }
        return (res >= 0) ? res : 0;
    }
    return -1;
}

//==============================================================================

class TlsSocketEndpoint::Impl
{
public:
    static constexpr auto TLS_TIMEOUT = std::chrono::seconds(20);

    Impl(AbstractSocketEndpoint& ep, const dht::crypto::Certificate& peer_cert)
        : tr {ep}, peerCertificate {peer_cert} {}

    Impl(AbstractSocketEndpoint &ep,
         std::function<bool(const dht::crypto::Certificate &)> &&cert_check)
        : tr{ep}, peerCertificateCheckFunc{std::make_unique<std::function<bool(const dht::crypto::Certificate &)>>(std::move(cert_check))}, peerCertificate {null_cert} {}

    // TLS callbacks
    int verifyCertificate(gnutls_session_t);
    void onTlsStateChange(tls::TlsSessionState);
    void onTlsRxData(std::vector<uint8_t>&&);
    void onTlsCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int);

    std::unique_ptr<tls::TlsSession> tls;
    AbstractSocketEndpoint& tr;
    const dht::crypto::Certificate& peerCertificate;
    dht::crypto::Certificate null_cert;
    std::unique_ptr<std::function<bool(const dht::crypto::Certificate &)>> peerCertificateCheckFunc;
};

// Declaration at namespace scope is necessary (until C++17)
constexpr std::chrono::seconds TlsSocketEndpoint::Impl::TLS_TIMEOUT;

int
TlsSocketEndpoint::Impl::verifyCertificate(gnutls_session_t session)
{
    dht::crypto::Certificate crt;
    auto verified = init_crt(session, crt);
    if (verified != GNUTLS_E_SUCCESS) return verified;
    if (peerCertificateCheckFunc) {
        if (!(*peerCertificateCheckFunc)(crt)) {
          JAMI_ERR() << "[TLS-SOCKET] Unexpected peer certificate";
          return GNUTLS_E_CERTIFICATE_ERROR;
        }

        null_cert = std::move(crt);
    } else {
        if (crt.getPacked() != peerCertificate.getPacked()) {
            JAMI_ERR() << "[TLS-SOCKET] Unexpected peer certificate";
            return GNUTLS_E_CERTIFICATE_ERROR;
        }
    }

    return GNUTLS_E_SUCCESS;
}

void
TlsSocketEndpoint::Impl::onTlsStateChange(UNUSED tls::TlsSessionState state)
{}

void
TlsSocketEndpoint::Impl::onTlsRxData(UNUSED std::vector<uint8_t>&& buf)
{}

void
TlsSocketEndpoint::Impl::onTlsCertificatesUpdate(UNUSED const gnutls_datum_t* local_raw,
                                                UNUSED const gnutls_datum_t* remote_raw,
                                                UNUSED unsigned int remote_count)
{}

TlsSocketEndpoint::TlsSocketEndpoint(AbstractSocketEndpoint& tr,
                                     const Identity& local_identity,
                                     const std::shared_future<tls::DhParams>& dh_params,
                                     const dht::crypto::Certificate& peer_cert)
    : pimpl_ { std::make_unique<Impl>(tr, peer_cert) }
{
    // Add TLS over TURN
    tls::TlsSession::TlsSessionCallbacks tls_cbs = {
        /*.onStateChange = */[this](tls::TlsSessionState state){ pimpl_->onTlsStateChange(state); },
        /*.onRxData = */[this](std::vector<uint8_t>&& buf){ pimpl_->onTlsRxData(std::move(buf)); },
        /*.onCertificatesUpdate = */[this](const gnutls_datum_t* l, const gnutls_datum_t* r,
                                           unsigned int n){ pimpl_->onTlsCertificatesUpdate(l, r, n); },
        /*.verifyCertificate = */[this](gnutls_session_t session){ return pimpl_->verifyCertificate(session); }
    };
    tls::TlsParams tls_param = {
        /*.ca_list = */     "",
        /*.peer_ca = */     nullptr,
        /*.cert = */        local_identity.second,
        /*.cert_key = */    local_identity.first,
        /*.dh_params = */   dh_params,
        /*.timeout = */     Impl::TLS_TIMEOUT,
        /*.cert_check = */  nullptr,
    };
    pimpl_->tls = std::make_unique<tls::TlsSession>(tr, tls_param, tls_cbs);
}

TlsSocketEndpoint::TlsSocketEndpoint(AbstractSocketEndpoint& tr,
                                    const Identity& local_identity,
                                    const std::shared_future<tls::DhParams>& dh_params,
                                    std::function<bool(const dht::crypto::Certificate&)>&& cert_check)
    : pimpl_ { std::make_unique<Impl>(tr, std::move(cert_check)) }
{
    // Add TLS over TURN
    tls::TlsSession::TlsSessionCallbacks tls_cbs = {
        /*.onStateChange = */[this](tls::TlsSessionState state){ pimpl_->onTlsStateChange(state); },
        /*.onRxData = */[this](std::vector<uint8_t>&& buf){ pimpl_->onTlsRxData(std::move(buf)); },
        /*.onCertificatesUpdate = */[this](const gnutls_datum_t* l, const gnutls_datum_t* r,
                                           unsigned int n){ pimpl_->onTlsCertificatesUpdate(l, r, n); },
        /*.verifyCertificate = */[this](gnutls_session_t session){ return pimpl_->verifyCertificate(session); }
    };
    tls::TlsParams tls_param = {
        /*.ca_list = */     "",
        /*.peer_ca = */     nullptr,
        /*.cert = */        local_identity.second,
        /*.cert_key = */    local_identity.first,
        /*.dh_params = */   dh_params,
        /*.timeout = */     Impl::TLS_TIMEOUT,
        /*.cert_check = */  nullptr,
    };
    pimpl_->tls = std::make_unique<tls::TlsSession>(tr, tls_param, tls_cbs);
}


TlsSocketEndpoint::~TlsSocketEndpoint() = default;

bool
TlsSocketEndpoint::isInitiator() const
{
    return pimpl_->tls->isInitiator();
}

int
TlsSocketEndpoint::maxPayload() const
{
  return pimpl_->tls->maxPayload();
}

std::size_t
TlsSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
    return pimpl_->tls->read(buf, len, ec);
}

std::size_t
TlsSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec)
{
    return pimpl_->tls->write(buf, len, ec);
}

void
TlsSocketEndpoint::waitForReady(const std::chrono::milliseconds& timeout)
{
    pimpl_->tls->waitForReady(timeout);
}

int
TlsSocketEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
    return pimpl_->tls->waitForData(timeout, ec);
}

//==============================================================================

// following namespace prevents an ODR violation with definitions in p2p.cpp
namespace
{

enum class CtrlMsgType
{
    STOP,
    ATTACH_INPUT,
    ATTACH_OUTPUT,
};

struct CtrlMsg
{
    virtual CtrlMsgType type() const = 0;
    virtual ~CtrlMsg() = default;
};

struct StopCtrlMsg final : CtrlMsg
{
    explicit StopCtrlMsg() {}
    CtrlMsgType type() const override { return CtrlMsgType::STOP; }
};

struct AttachInputCtrlMsg final : CtrlMsg
{
    explicit AttachInputCtrlMsg(const std::shared_ptr<Stream>& stream)
        : stream {stream} {}
    CtrlMsgType type() const override { return CtrlMsgType::ATTACH_INPUT; }
    const std::shared_ptr<Stream> stream;
};

struct AttachOutputCtrlMsg final : CtrlMsg
{
    explicit AttachOutputCtrlMsg(const std::shared_ptr<Stream>& stream)
        : stream {stream} {}
    CtrlMsgType type() const override { return CtrlMsgType::ATTACH_OUTPUT; }
    const std::shared_ptr<Stream> stream;
};

} // namespace <anonymous>

//==============================================================================

class PeerConnection::PeerConnectionImpl
{
public:
    PeerConnectionImpl(std::function<void()>&& done,
                       const std::string& peer_uri,
                       std::unique_ptr<SocketType> endpoint)
        : peer_uri {peer_uri}
        , endpoint_ {std::move(endpoint)}
        , eventLoopFut_ {std::async(std::launch::async, [this, done=std::move(done)] {
                try {
                    eventLoop();
                } catch (const std::exception& e) {
                    JAMI_ERR() << "[CNX] peer connection event loop failure: " << e.what();
                    done();
                }
            })} {}

    ~PeerConnectionImpl() {
        ctrlChannel << std::make_unique<StopCtrlMsg>();
        endpoint_->shutdown();
    }

    bool hasStreamWithId(const DRing::DataTransferId& id) {
        auto isInInput = std::any_of(inputs_.begin(), inputs_.end(),
                                     [&id](const std::shared_ptr<Stream>& str) {
                                         return str && str->getId() == id; });
        if (isInInput) return true;
        auto isInOutput =
            std::any_of(outputs_.begin(), outputs_.end(),
                        [&id](const std::shared_ptr<Stream> &str) {
                          return str && str->getId() == id;
                        });
        return isInOutput;
    }

    const std::string peer_uri;
    Channel<std::unique_ptr<CtrlMsg>> ctrlChannel;

private:
    std::unique_ptr<SocketType> endpoint_;
    std::vector<std::shared_ptr<Stream>> inputs_;
    std::vector<std::shared_ptr<Stream>> outputs_;
    std::future<void> eventLoopFut_;
    std::vector<uint8_t> bufferPool_; // will store non rattached buffers

    void eventLoop();

    template <typename L, typename C>
    void handle_stream_list(L& stream_list, const C& callable) {
        if (stream_list.empty())
            return;
        const auto& item = std::begin(stream_list);
        auto& stream = *item;
        try {
            if (callable(stream))
                return;
            JAMI_DBG() << "EOF on stream #" << stream->getId();
        } catch (const std::system_error& e) {
            JAMI_WARN() << "Stream #" << stream->getId()
                        << " IO failed with code = " << e.code();
        } catch (const std::exception& e) {
            JAMI_ERR() << "Unexpected exception during IO with stream #"
                       << stream->getId()
                       << ": " << e.what();
        }
        stream->close();
        stream_list.erase(item);
    }
};

void
PeerConnection::PeerConnectionImpl::eventLoop()
{
    JAMI_DBG() << "[CNX] Peer connection to " << peer_uri << " ready";
    while (true) {
        // Process ctrl orders first
        while (true) {
            std::unique_ptr<CtrlMsg> msg;
            if (outputs_.empty() and inputs_.empty()) {
                if (!ctrlChannel.empty()) {
                    msg = ctrlChannel.receive();
                } else {
                    std::error_code ec;
                    if (endpoint_->waitForData(std::chrono::milliseconds(100), ec) > 0) {
                        std::vector<uint8_t> buf(IO_BUFFER_SIZE);
                        JAMI_DBG("A good buffer arrived before any input or output attachment");
                        auto size = endpoint_->read(buf, ec);
                        if (ec)
                            throw std::system_error(ec);
                        // If it's a good read, we should store the buffer somewhere
                        // and give it to the next input or output.
                        if (size < IO_BUFFER_SIZE)
                            bufferPool_.insert(bufferPool_.end(), buf.begin(), buf.begin() + size);
                    }
                    break;
                }
            } else if (!ctrlChannel.empty()) {
                msg = ctrlChannel.receive();
            } else
                break;

            switch (msg->type()) {
                case CtrlMsgType::ATTACH_INPUT:
                {
                    auto& input_msg = static_cast<AttachInputCtrlMsg&>(*msg);
                    inputs_.emplace_back(std::move(input_msg.stream));
                }
                break;

                case CtrlMsgType::ATTACH_OUTPUT:
                {
                    auto& output_msg = static_cast<AttachOutputCtrlMsg&>(*msg);
                    outputs_.emplace_back(std::move(output_msg.stream));
                }
                break;

                case CtrlMsgType::STOP:
                  return;

                default: JAMI_ERR("BUG: got unhandled control msg!");  break;
            }
        }

        // Then handles IO streams
        std::vector<uint8_t> buf;
        std::error_code ec;

        bool sleep = true;

        // sending loop
        handle_stream_list(inputs_, [&] (auto& stream) {
                if (!stream) return false;
                buf.resize(IO_BUFFER_SIZE);
                if (stream->read(buf)) {
                    if (not buf.empty()) {
                      endpoint_->write(buf, ec);
                      if (ec)
                        throw std::system_error(ec);
                      sleep = false;
                    }
                } else {
                    // EOF on outgoing stream => finished
                    return false;
                }
                if (!bufferPool_.empty()) {
                  stream->write(bufferPool_);
                  bufferPool_.clear();
                } else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) {
                  buf.resize(IO_BUFFER_SIZE);
                  endpoint_->read(buf, ec);
                  if (ec)
                    throw std::system_error(ec);
                  return stream->write(buf);
                } else if (ec)
                    throw std::system_error(ec);
                return true;
            });

        // receiving loop
        handle_stream_list(outputs_, [&] (auto& stream) {
                if (!stream) return false;
                buf.resize(IO_BUFFER_SIZE);
                auto eof = stream->read(buf);
                // if eof we let a chance to send a reply before leaving
                if (not buf.empty()) {
                    endpoint_->write(buf, ec);
                    if (ec)
                        throw std::system_error(ec);
                }
                if (not eof)
                    return false;

                if (!bufferPool_.empty()) {
                    stream->write(bufferPool_);
                    bufferPool_.clear();
                } else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) {
                  buf.resize(IO_BUFFER_SIZE);
                  endpoint_->read(buf, ec);
                  if (ec)
                    throw std::system_error(ec);
                  sleep = false;
                  return stream->write(buf);
                } else if (ec)
                  throw std::system_error(ec);
                return true;
            });

        if (sleep)
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
//==============================================================================

PeerConnection::PeerConnection(std::function<void()>&& done,
                               const std::string& peer_uri,
                               std::unique_ptr<GenericSocket<uint8_t>> endpoint)
    : pimpl_(std::make_unique<PeerConnectionImpl>(std::move(done), peer_uri, std::move(endpoint)))
{}

PeerConnection::~PeerConnection()
{}

void
PeerConnection::attachInputStream(const std::shared_ptr<Stream>& stream)
{
    pimpl_->ctrlChannel << std::make_unique<AttachInputCtrlMsg>(stream);
}

void
PeerConnection::attachOutputStream(const std::shared_ptr<Stream>& stream)
{
    pimpl_->ctrlChannel << std::make_unique<AttachOutputCtrlMsg>(stream);
}

bool
PeerConnection::hasStreamWithId(const DRing::DataTransferId& id)
{
    return pimpl_->hasStreamWithId(id);
}

std::string
PeerConnection::getPeerUri() const
{
    return pimpl_->peer_uri;
}

} // namespace jami