Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
30 results

p2p.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    p2p.cpp 29.62 KiB
    /*
     *  Copyright (C) 2017-2018 Savoir-faire Linux Inc.
     *
     *  Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
     */
    
    #include "p2p.h"
    
    #include "account_schema.h"
    #include "ringaccount.h"
    #include "peer_connection.h"
    #include "turn_transport.h"
    #include "ftp_server.h"
    #include "channel.h"
    #include "security/tls_session.h"
    
    #include <opendht/default_types.h>
    #include <opendht/rng.h>
    
    #include <memory>
    #include <map>
    #include <vector>
    #include <chrono>
    #include <array>
    #include <future>
    #include <algorithm>
    #include <type_traits>
    
    namespace ring {
    
    static constexpr auto DHT_MSG_TIMEOUT = std::chrono::seconds(20);
    static constexpr auto NET_CONNECTION_TIMEOUT = std::chrono::seconds(10);
    
    using Clock = std::chrono::system_clock;
    using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
    
    //==============================================================================
    
    // This namespace prevents a nasty ODR violation with definitions in peer_connection.cpp
    inline namespace
    {
    
    template <typename CT>
    class Timeout
    {
    public:
        using clock = CT;
        using duration = typename CT::duration;
        using time_point = typename CT::time_point;
    
        explicit Timeout(const duration& delay) : delay {delay} {}
    
        void start() {
            start_ = clock::now();
        }
    
        explicit operator bool() const {
            return (clock::now() - start_) >= delay;
        }
    
        const duration delay {duration::zero()};
    
    private:
        time_point start_ {};
    };
    
    //==============================================================================
    
    /**
     * DHT message to convey a end2end connection request to a peer
     */
    class PeerConnectionMsg : public dht::EncryptedValue<PeerConnectionMsg>
    {
    public:
        static constexpr const dht::ValueType& TYPE = dht::ValueType::USER_DATA;
        static constexpr uint32_t protocol_version = 0x01000002; ///< Supported protocol
        static constexpr const char* key_prefix = "peer:"; ///< base to compute the DHT listen key
    
        dht::Value::Id id = dht::Value::INVALID_ID;
        uint32_t protocol {protocol_version}; ///< Protocol identification. First bit reserved to indicate a request (0) or a response (1)
        std::vector<std::string> addresses; ///< Request: public addresses for TURN permission. Response: TURN relay addresses (only 1 in current implementation)
        MSGPACK_DEFINE_MAP(id, protocol, addresses)
    
        PeerConnectionMsg() = default;
        PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::string& arelay)
            : id {id}, protocol {aprotocol}, addresses {{arelay}} {}
        PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::vector<std::string>& asrelay)
            : id {id}, protocol {aprotocol}, addresses {asrelay} {}
    
        bool isRequest() const noexcept { return (protocol & 1) == 0; }
    
        PeerConnectionMsg respond(const IpAddr& relay) const {
            return {id, protocol|1, relay.toString(true, true)};
        }
    
        PeerConnectionMsg respond(const std::vector<std::string>& addresses) const {
            return {id, protocol|1, addresses};
        }
    };
    
    //==============================================================================
    
    enum class CtrlMsgType
    {
        STOP,
        CANCEL,
        TURN_PEER_CONNECT,
        TURN_PEER_DISCONNECT,
        DHT_REQUEST,
        DHT_RESPONSE,
        ADD_DEVICE,
    };
    
    struct CtrlMsgBase
    {
        CtrlMsgBase() = delete;
        explicit CtrlMsgBase(CtrlMsgType id) : id_ {id} {}
        virtual ~CtrlMsgBase() = default;
        CtrlMsgType type() const noexcept { return id_; }
    private:
        const CtrlMsgType id_;
    };
    
    template <class... Args>
    using DataTypeSet = std::tuple<Args...>;
    
    template <CtrlMsgType id, typename DataTypeSet=void>
    struct CtrlMsg : CtrlMsgBase
    {
        template <class... Args>
        explicit CtrlMsg(const Args&... args) : CtrlMsgBase(id), data {args...} {}
    
        DataTypeSet data;
    };
    
    template <CtrlMsgType id, class... Args>
    auto makeMsg(const Args&... args)
    {
        return std::make_unique<CtrlMsg<id, DataTypeSet<Args...>>>(args...);
    }
    
    template <std::size_t N, CtrlMsgType id, typename R, typename T>
    auto msgData(const T& msg)
    {
        using MsgType = typename std::tuple_element<std::size_t(id), R>::type;
        auto& x = static_cast<const MsgType&>(msg);
        return std::get<N>(x.data);
    }
    
    //==============================================================================
    
    using DhtInfoHashMsgData = DataTypeSet<dht::InfoHash, DRing::DataTransferId>;
    using TurnConnectMsgData = DataTypeSet<IpAddr>;
    using PeerCnxMsgData = DataTypeSet<PeerConnectionMsg>;
    using AddDeviceMsgData = DataTypeSet<dht::InfoHash,
                                         DRing::DataTransferId,
                                         std::shared_ptr<dht::crypto::Certificate>,
                                         std::vector<std::string>,
                                         std::function<void(PeerConnection*)>>;
    
    using AllCtrlMsg = DataTypeSet<CtrlMsg<CtrlMsgType::STOP>,
                                   CtrlMsg<CtrlMsgType::CANCEL, DhtInfoHashMsgData>,
                                   CtrlMsg<CtrlMsgType::TURN_PEER_CONNECT, TurnConnectMsgData>,
                                   CtrlMsg<CtrlMsgType::TURN_PEER_DISCONNECT, TurnConnectMsgData>,
                                   CtrlMsg<CtrlMsgType::DHT_REQUEST, PeerCnxMsgData>,
                                   CtrlMsg<CtrlMsgType::DHT_RESPONSE, PeerCnxMsgData>,
                                   CtrlMsg<CtrlMsgType::ADD_DEVICE, AddDeviceMsgData>>;
    
    template <CtrlMsgType id, std::size_t N=0, typename T>
    auto ctrlMsgData(const T& msg)
    {
        return msgData<N, id, AllCtrlMsg>(msg);
    }
    
    } // namespace <anonymous>
    
    //==============================================================================
    
    class DhtPeerConnector::Impl
    {
    public:
        class ClientConnector;
    
        explicit Impl(RingAccount& account)
            : account {account}
            , loopFut_ {std::async(std::launch::async, [this]{ eventLoop(); })} {}
    
        ~Impl() {
            servers_.clear();
            clients_.clear();
            turnAuthv4_.reset();
            turnAuthv6_.reset();
            ctrl << makeMsg<CtrlMsgType::STOP>();
        }
    
        RingAccount& account;
        Channel<std::unique_ptr<CtrlMsgBase>> ctrl;
    
    private:
        std::map<IpAddr, std::unique_ptr<ConnectedTurnTransport>> turnEndpoints_;
        std::unique_ptr<TurnTransport> turnAuthv4_;
        std::unique_ptr<TurnTransport> turnAuthv6_;
    
        // key: Stored certificate PublicKey id (normaly it's the DeviceId)
        // value: pair of shared_ptr<Certificate> and associated RingId
        std::map<dht::InfoHash, std::pair<std::shared_ptr<dht::crypto::Certificate>, dht::InfoHash>> certMap_;
        std::map<IpAddr, dht::InfoHash> connectedPeers_;
    
    protected:
        std::map<std::pair<dht::InfoHash, IpAddr>, std::unique_ptr<PeerConnection>> servers_;
        std::map<std::pair<dht::InfoHash, DRing::DataTransferId>, std::unique_ptr<ClientConnector>> clients_;
        std::mutex clientsMutex_;
    
    private:
        void onTurnPeerConnection(const IpAddr&);
        void onTurnPeerDisconnection(const IpAddr&);
        void onRequestMsg(PeerConnectionMsg&&);
        void onTrustedRequestMsg(PeerConnectionMsg&&, const std::shared_ptr<dht::crypto::Certificate>&,
                                 const dht::InfoHash&);
        void onResponseMsg(PeerConnectionMsg&&);
        void onAddDevice(const dht::InfoHash&,
                         const DRing::DataTransferId&,
                         const std::shared_ptr<dht::crypto::Certificate>&,
                         const std::vector<std::string>&,
                         const std::function<void(PeerConnection*)>&);
        void turnConnect();
        void eventLoop();
        bool validatePeerCertificate(const dht::crypto::Certificate&, dht::InfoHash&);
    
        std::future<void> loopFut_; // keep it last member
    };
    
    //==============================================================================
    
    /// This class is responsible of connection to a specific peer.
    /// The connected peer acting as server (responsible of the TURN session).
    /// When the TURN session is created and your IP is permited, we'll connect it
    /// using a system socket. Later the TLS session is negotiated on this socket.
    class DhtPeerConnector::Impl::ClientConnector
    {
    public:
        using ListenerFunction = std::function<void(PeerConnection*)>;
    
        ClientConnector(Impl& parent,
                        const dht::InfoHash& peer_h,
                        const std::shared_ptr<dht::crypto::Certificate>& peer_cert,
                        const std::vector<std::string>& public_addresses,
                        const ListenerFunction& connect_cb)
            : parent_ {parent}
            , peer_ {peer_h}
            , publicAddresses_ {public_addresses}
            , peerCertificate_ {peer_cert} {
                addListener(connect_cb);
                processTask_ = std::async(
                    std::launch::async,
                    [this] {
                        try { process(); }
                        catch (const std::exception& e) {
                            RING_ERR() << "[CNX] exception during client processing: " << e.what();
                            cancel();
                        }
                    });
            }
    
        ~ClientConnector() {
            for (auto& cb: listeners_)
                cb(nullptr);
            connection_.reset();
        }
    
        bool hasAlreadyAResponse() {
            return responseReceived_;
        }
    
        void addListener(const ListenerFunction& cb) {
            if (!connected_) {
                std::lock_guard<std::mutex> lk {listenersMutex_};
                listeners_.push_back(cb);
            } else {
                cb(connection_.get());
            }
        }
    
        void cancel() {
            parent_.ctrl << makeMsg<CtrlMsgType::CANCEL>(peer_);
        }
    
        void onDhtResponse(PeerConnectionMsg&& response) {
            if (responseReceived_) return;
            response_ = std::move(response);
            responseReceived_ = true;
        }
    
    private:
        void process() {
            // Prepare connection request as a DHT message
            PeerConnectionMsg request;
            request.addresses = std::move(publicAddresses_);
            request.id = ValueIdDist()(parent_.account.rand); /* Random id for the message unicity */
    
            // Send connection request through DHT
            RING_DBG() << parent_.account << "[CNX] request connection to " << peer_;
            parent_.account.dht().putEncrypted(
                dht::InfoHash::get(PeerConnectionMsg::key_prefix + peer_.toString()), peer_, request);
    
            // Wait for call to onResponse() operated by DHT
            Timeout<Clock> dhtMsgTimeout {DHT_MSG_TIMEOUT};
            dhtMsgTimeout.start();
            while (!responseReceived_) {
                if (dhtMsgTimeout)
                    throw std::runtime_error("no response from DHT to E2E request");
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
    
            // Check response validity
            std::unique_ptr<TcpSocketEndpoint> peer_ep;
            if (response_.from != peer_ or
                response_.id != request.id or
                response_.addresses.empty())
                throw std::runtime_error("invalid connection reply");
    
            IpAddr relay_addr;
            for (const auto& address: response_.addresses) {
                if (!(relay_addr = address))
                    throw std::runtime_error("invalid connection reply");
                try {
                    // Connect to TURN peer using a raw socket
                    RING_DBG() << parent_.account << "[CNX] connecting to TURN relay "
                               << relay_addr.toString(true, true);
                    peer_ep = std::make_unique<TcpSocketEndpoint>(relay_addr);
                    peer_ep->connect(); // IMPROVE_ME: socket timeout?
                    break;
                } catch (std::system_error&) {
                    RING_DBG() << parent_.account << "[CNX] Failed to connect to TURN relay "
                               << relay_addr.toString(true, true);
                }
            }
    
            // Negotiate a TLS session
            RING_DBG() << parent_.account << "[CNX] start TLS session";
            auto tls_ep = std::make_unique<TlsSocketEndpoint>(*peer_ep,
                                                              parent_.account.identity(),
                                                              parent_.account.dhParams(),
                                                              *peerCertificate_);
            tls_ep->connect();
    
            // Connected!
            connection_ = std::make_unique<PeerConnection>([this] { cancel(); }, parent_.account,
                                                           peer_.toString(), std::move(tls_ep));
            peer_ep_ = std::move(peer_ep);
    
            connected_ = true;
            for (auto& cb: listeners_) {
                cb(connection_.get());
            }
        }
    
        Impl& parent_;
        const dht::InfoHash peer_;
    
        std::vector<std::string> publicAddresses_;
        std::atomic_bool responseReceived_ {false};
        PeerConnectionMsg response_;
        std::shared_ptr<dht::crypto::Certificate> peerCertificate_;
        std::unique_ptr<TcpSocketEndpoint> peer_ep_;
        std::unique_ptr<PeerConnection> connection_;
    
        std::atomic_bool connected_ {false};
        std::mutex listenersMutex_;
        std::mutex turnMutex_;
        std::vector<ListenerFunction> listeners_;
    
        std::future<void> processTask_;
    };
    
    //==============================================================================
    
    /// Synchronous TCP connect to a TURN server
    /// \note TCP peer connection mode is enabled for reliable data transfer.
    void
    DhtPeerConnector::Impl::turnConnect()
    {
        std::lock_guard<std::mutex> lock(clientsMutex_);
        // Don't retry to reconnect to the TURN server if already connected
        if (turnAuthv4_ && turnAuthv4_->isReady()
            && turnAuthv6_ && turnAuthv6_->isReady())
            return;
    
        auto details = account.getAccountDetails();
        auto server = details[Conf::CONFIG_TURN_SERVER];
        auto realm = details[Conf::CONFIG_TURN_SERVER_REALM];
        auto username = details[Conf::CONFIG_TURN_SERVER_UNAME];
        auto password = details[Conf::CONFIG_TURN_SERVER_PWD];
    
        auto turn_param_v4 = TurnTransportParams {};
        turn_param_v4.server = IpAddr {server.empty() ? "turn.ring.cx" : server};
        turn_param_v4.realm = realm.empty() ? "ring" : realm;
        turn_param_v4.username = username.empty() ? "ring" : username;
        turn_param_v4.password = password.empty() ? "ring" : password;
        turn_param_v4.isPeerConnection = true; // Request for TCP peer connections, not UDP
        turn_param_v4.onPeerConnection = [this](uint32_t conn_id, const IpAddr& peer_addr, bool connected) {
            (void)conn_id;
            if (connected)
                ctrl << makeMsg<CtrlMsgType::TURN_PEER_CONNECT>(peer_addr);
            else
                ctrl << makeMsg<CtrlMsgType::TURN_PEER_DISCONNECT>(peer_addr);
        };
    
        // If a previous turn server exists, but is not ready, we should try to reconnect
        if (turnAuthv4_ && !turnAuthv4_->isReady())
            turnAuthv4_.release();
        if (turnAuthv6_ && !turnAuthv6_->isReady())
            turnAuthv6_.release();
    
        if (!turnAuthv4_ || !turnAuthv4_->isReady()) {
            turn_param_v4.authorized_family = PJ_AF_INET;
            turnAuthv4_ = std::make_unique<TurnTransport>(turn_param_v4);
        }
    
        if (!turnAuthv6_ || !turnAuthv6_->isReady()) {
            auto turn_param_v6 = turn_param_v4;
            turn_param_v6.authorized_family = PJ_AF_INET6;
            turnAuthv6_ = std::make_unique<TurnTransport>(turn_param_v6);
        }
    
        // Wait until TURN server READY state (or timeout)
        Timeout<Clock> timeout {NET_CONNECTION_TIMEOUT};
        timeout.start();
        while (!turnAuthv4_->isReady() && !turnAuthv6_->isReady()) {
            if (timeout)
                throw std::runtime_error("no response from TURN");
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
    
    /// Find who is connected by using connection certificate
    bool
    DhtPeerConnector::Impl::validatePeerCertificate(const dht::crypto::Certificate& cert,
                                                    dht::InfoHash& peer_h)
    {
        const auto& iter = certMap_.find(cert.getId());
        if (iter != std::cend(certMap_)) {
            if (iter->second.first->getPacked() == cert.getPacked()) {
                peer_h = iter->second.second;
                return true;
            }
        }
        return false;
    }
    
    /// Negotiate a TLS session over a TURN socket this method does [yoda].
    /// At this stage both endpoints has a dedicated TCP connection on each other.
    void
    DhtPeerConnector::Impl::onTurnPeerConnection(const IpAddr& peer_addr)
    {
        RING_DBG() << account << "[CNX] TURN connection attempt from "
                   << peer_addr.toString(true, true);
    
        auto turn_ep = std::unique_ptr<ConnectedTurnTransport>(nullptr);
        if (peer_addr.isIpv4())
            turn_ep = std::make_unique<ConnectedTurnTransport>(*turnAuthv4_, peer_addr);
        else
            turn_ep = std::make_unique<ConnectedTurnTransport>(*turnAuthv6_, peer_addr);
    
        RING_DBG() << account << "[CNX] start TLS session over TURN socket";
        dht::InfoHash peer_h;
        auto tls_ep = std::make_unique<TlsTurnEndpoint>(
            *turn_ep, account.identity(), account.dhParams(),
            [&, this] (const dht::crypto::Certificate& cert) { return validatePeerCertificate(cert, peer_h); });
    
        // block until TLS is negotiated (must throw in case of error)
        try {
            tls_ep->connect();
        } catch (...) {
            RING_WARN() << "[CNX] TLS connection failure from peer " << peer_addr.toString(true, true);
            return;
        }
    
        RING_DBG() << account << "[CNX] Accepted TLS-TURN connection from RingID " << peer_h;
        connectedPeers_.emplace(peer_addr, tls_ep->peerCertificate().getId());
        auto connection = std::make_unique<PeerConnection>([] {}, account, peer_addr.toString(),
                                                           std::move(tls_ep));
        connection->attachOutputStream(std::make_shared<FtpServer>(account.getAccountID(), peer_h.toString()));
        servers_.emplace(std::make_pair(peer_h, peer_addr), std::move(connection));
    
        // note: operating this way let endpoint to be deleted safely in case of exceptions
        turnEndpoints_.emplace(std::make_pair(peer_addr, std::move(turn_ep)));
    }
    
    void
    DhtPeerConnector::Impl::onTurnPeerDisconnection(const IpAddr& peer_addr)
    {
        auto it = std::find_if(servers_.begin(), servers_.end(),
                    [&peer_addr](const auto& element) {
                        return element.first.second == peer_addr;});
        if (it == servers_.end()) return;
        RING_WARN() << account << "[CNX] disconnection from peer " << peer_addr.toString(true, true);
        servers_.erase(it);
        connectedPeers_.erase(peer_addr);
        turnEndpoints_.erase(peer_addr);
    }
    
    void
    DhtPeerConnector::Impl::onRequestMsg(PeerConnectionMsg&& request)
    {
        RING_DBG() << account << "[CNX] rx DHT request from " << request.from;
    
        // Asynch certificate checking -> trig onTrustedRequestMsg when trusted certificate is found
        account.findCertificate(
            request.from,
            [this, request=std::move(request)] (const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
                dht::InfoHash peer_h;
                if (account.foundPeerDevice(cert, peer_h))
                    onTrustedRequestMsg(std::move(request), cert, peer_h);
                else
                    RING_WARN() << account << "[CNX] rejected untrusted connection request from "
                                << request.from;
        });
    }
    
    void
    DhtPeerConnector::Impl::onTrustedRequestMsg(PeerConnectionMsg&& request,
                                                const std::shared_ptr<dht::crypto::Certificate>& cert,
                                                const dht::InfoHash& peer_h)
    {
        turnConnect(); // start a TURN client connection on first pass, next ones just add new peer cnx handlers
    
        // Save peer certificate for later TLS session (MUST BE DONE BEFORE TURN PEER AUTHORIZATION)
        certMap_.emplace(cert->getId(), std::make_pair(cert, peer_h));
    
        auto sendRelayV4 = false, sendRelayV6 = false;
    
        for (auto& ip: request.addresses) {
            try {
                if (IpAddr(ip).isIpv4()) {
                    sendRelayV4 = true;
                    turnAuthv4_->permitPeer(ip);
                    RING_DBG() << account << "[CNX] authorized peer connection from " << ip;
                } else if (IpAddr(ip).isIpv6()) {
                    sendRelayV6 = true;
                    turnAuthv6_->permitPeer(ip);
                    RING_DBG() << account << "[CNX] authorized peer connection from " << ip;
                } else {
                    RING_DBG() << account << "Unknown family type: " << ip;
                }
            } catch (const std::exception& e) {
                RING_WARN() << account << "[CNX] ignored peer connection '" << ip << "', " << e.what();
            }
        }
    
        std::vector<std::string> addresses;
        if (sendRelayV4)
            addresses.emplace_back(turnAuthv4_->peerRelayAddr().toString(true, true));
        if (sendRelayV6)
            addresses.emplace_back(turnAuthv6_->peerRelayAddr().toString(true, true));
        if (addresses.empty()) {
            RING_DBG() << account << "[CNX] connection aborted, no family address found";
            return;
        }
        RING_DBG() << account << "[CNX] connection accepted, DHT reply to " << request.from;
        account.dht().putEncrypted(
            dht::InfoHash::get(PeerConnectionMsg::key_prefix + request.from.toString()),
            request.from, request.respond(addresses));
    
        // Now wait for a TURN connection from peer (see onTurnPeerConnection)
    }
    
    void
    DhtPeerConnector::Impl::onResponseMsg(PeerConnectionMsg&& response)
    {
        RING_DBG() << account << "[CNX] rx DHT reply from " << response.from;
        std::lock_guard<std::mutex> lock(clientsMutex_);
        for (auto& client: clients_) {
            // NOTE We can receives multiple files from one peer. So fill unanswered clients.
            if (client.first.first == response.from
                && client.second && !client.second->hasAlreadyAResponse()) {
                client.second->onDhtResponse(std::move(response));
                break;
            }
        }
    }
    
    void
    DhtPeerConnector::Impl::onAddDevice(const dht::InfoHash& dev_h,
                                        const DRing::DataTransferId& tid,
                                        const std::shared_ptr<dht::crypto::Certificate>& peer_cert,
                                        const std::vector<std::string>& public_addresses,
                                        const std::function<void(PeerConnection*)>& connect_cb)
    {
        auto client = std::make_pair(dev_h, tid);
        std::lock_guard<std::mutex> lock(clientsMutex_);
        const auto& iter = clients_.find(client);
        if (iter == std::end(clients_)) {
            clients_.emplace(
                client,
                std::make_unique<Impl::ClientConnector>(*this, dev_h, peer_cert, public_addresses, connect_cb));
        } else {
            iter->second->addListener(connect_cb);
        }
    }
    
    void
    DhtPeerConnector::Impl::eventLoop()
    {
        // Loop until STOP msg
        while (true) {
            std::unique_ptr<CtrlMsgBase> msg;
            ctrl >> msg;
            switch (msg->type()) {
                case CtrlMsgType::STOP:
                    return;
    
                case CtrlMsgType::TURN_PEER_CONNECT:
                    onTurnPeerConnection(ctrlMsgData<CtrlMsgType::TURN_PEER_CONNECT>(*msg));
                    break;
    
                case CtrlMsgType::TURN_PEER_DISCONNECT:
                    onTurnPeerDisconnection(ctrlMsgData<CtrlMsgType::TURN_PEER_DISCONNECT>(*msg));
                    break;
    
                case CtrlMsgType::CANCEL:
                    {
                        auto dev_h = ctrlMsgData<CtrlMsgType::CANCEL, 0>(*msg);
                        auto id = ctrlMsgData<CtrlMsgType::CANCEL, 1>(*msg);
                        // Cancel outgoing files
                        std::lock_guard<std::mutex> lock(clientsMutex_);
                        clients_.erase(std::make_pair(dev_h, id));
                        // Cancel incoming files
                        auto it = std::find_if(servers_.begin(), servers_.end(),
                                    [&dev_h, &id](const auto& element) {
                                        return (element.first.first == dev_h
                                                && element.second
                                                && element.second->hasStreamWithId(id));});
                        if (it == servers_.end()) break;
                        servers_.erase(it);
                        connectedPeers_.erase(it->first.second);
                        turnEndpoints_.erase(it->first.second);
                    }
                    break;
    
                case CtrlMsgType::DHT_REQUEST:
                    onRequestMsg(ctrlMsgData<CtrlMsgType::DHT_REQUEST>(*msg));
                    break;
    
                case CtrlMsgType::DHT_RESPONSE:
                    onResponseMsg(ctrlMsgData<CtrlMsgType::DHT_RESPONSE>(*msg));
                    break;
    
                case CtrlMsgType::ADD_DEVICE:
                    onAddDevice(ctrlMsgData<CtrlMsgType::ADD_DEVICE, 0>(*msg),
                                ctrlMsgData<CtrlMsgType::ADD_DEVICE, 1>(*msg),
                                ctrlMsgData<CtrlMsgType::ADD_DEVICE, 2>(*msg),
                                ctrlMsgData<CtrlMsgType::ADD_DEVICE, 3>(*msg),
                                ctrlMsgData<CtrlMsgType::ADD_DEVICE, 4>(*msg));
                    break;
    
                default: RING_ERR("BUG: got unhandled control msg!"); break;
            }
        }
    }
    
    //==============================================================================
    
    DhtPeerConnector::DhtPeerConnector(RingAccount& account)
        : pimpl_ {new Impl {account}}
    {}
    
    DhtPeerConnector::~DhtPeerConnector() = default;
    
    /// Called by a RingAccount when it's DHT is connected
    /// Install a DHT LISTEN operation on given device to receive data connection requests and replies
    /// The DHT key is Hash(PeerConnectionMsg::key_prefix + device_id), where '+' is the string concatenation.
    void
    DhtPeerConnector::onDhtConnected(const std::string& device_id)
    {
        pimpl_->account.dht().listen<PeerConnectionMsg>(
            dht::InfoHash::get(PeerConnectionMsg::key_prefix + device_id),
            [this](PeerConnectionMsg&& msg) {
                if (msg.from == pimpl_->account.dht().getId())
                    return true;
                if (!pimpl_->account.isMessageTreated(msg.id)) {
                    if (msg.isRequest()) {
                        // TODO: filter-out request from non trusted peer
                        pimpl_->ctrl << makeMsg<CtrlMsgType::DHT_REQUEST>(std::move(msg));
                    } else
                        pimpl_->ctrl << makeMsg<CtrlMsgType::DHT_RESPONSE>(std::move(msg));
                }
                return true;
            });
    }
    
    void
    DhtPeerConnector::requestConnection(const std::string& peer_id,
                                        const DRing::DataTransferId& tid,
                                        const std::function<void(PeerConnection*)>& connect_cb)
    {
        const auto peer_h = dht::InfoHash(peer_id);
    
        // Notes for reader:
        // 1) dht.getPublicAddress() suffers of a non-usability into forEachDevice() callbacks.
        //    If you call it in forEachDevice callbacks, it'll never not return...
        //    Seems that getPublicAddress() and forEachDevice() need to process into the same thread
        //    (here the one where dht_ loop runs).
        // 2) anyway its good to keep this processing here in case of multiple device
        //    as the result is the same for each device.
        auto addresses = pimpl_->account.publicAddresses();
    
        // Add local addresses
        // XXX: is it really needed? use-case? a local TURN server?
        //addresses.emplace_back(ip_utils::getLocalAddr(AF_INET));
        //addresses.emplace_back(ip_utils::getLocalAddr(AF_INET6));
    
        // TODO: bypass DHT devices lookup if connection already exist
    
        pimpl_->account.forEachDevice(
            peer_h,
            [this, addresses, connect_cb, tid](const std::shared_ptr<RingAccount>& account,
                                          const dht::InfoHash& dev_h) {
                if (dev_h == account->dht().getId()) {
                    RING_ERR() << account << "[CNX] no connection to yourself, bad person!";
                    return;
                }
    
                account->findCertificate(
                    dev_h,
                    [this, dev_h, addresses, connect_cb, tid] (const std::shared_ptr<dht::crypto::Certificate>& cert) {
                        pimpl_->ctrl << makeMsg<CtrlMsgType::ADD_DEVICE>(dev_h, tid, cert, addresses, connect_cb);
                    });
            },
    
            [this, peer_h, connect_cb](bool found) {
                if (!found) {
                    RING_WARN() << pimpl_->account << "[CNX] aborted, no devices for " << peer_h;
                    connect_cb(nullptr);
                }
            });
    }
    
    void
    DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) {
        // NOTE: see TODO in previous function.
    
        // NOTE: There is two ways to store current PeerConnection. Indeed, if it is
        // an incoming data transfer, it will be stored in servers, and will be
        // determined via the pair (peer_h, tid), because we receives the file from
        // a contact. If it's an outgoing file, we send the file to a device, so,
        // will be determined via the pair (dev_h, tid).
        // That's why we need to call CANCEL for the peer_h and each dev_h.
        const auto peer_h = dht::InfoHash(peer_id);
    
        pimpl_->ctrl << makeMsg<CtrlMsgType::CANCEL>(peer_h, tid);
        pimpl_->account.forEachDevice(
            peer_h,
            [this, tid](const std::shared_ptr<RingAccount>& account,
                              const dht::InfoHash& dev_h) {
                if (dev_h == account->dht().getId()) {
                    RING_ERR() << account << "[CNX] no connection to yourself, bad person!";
                    return;
                }
                pimpl_->ctrl << makeMsg<CtrlMsgType::CANCEL>(dev_h, tid);
            },
    
            [this, peer_h](bool found) {
                if (!found) {
                    RING_WARN() << pimpl_->account << "[CNX] aborted, no devices for " << peer_h;
                }
            });
    }
    
    } // namespace ring