Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
30 results

turn_transport.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    turn_transport.cpp 13.40 KiB
    /*
     *  Copyright (C) 2004-2020 Savoir-faire Linux Inc.
     *
     *  Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
     */
    
    #include "turn_transport.h"
    #include "transport/peer_channel.h"
    
    #include "logger.h"
    #include "ip_utils.h"
    #include "sip/sip_utils.h"
    #include "map_utils.h"
    
    #include <pjnath.h>
    #include <pjlib-util.h>
    #include <pjlib.h>
    
    #include <future>
    #include <atomic>
    #include <thread>
    #include <vector>
    #include <iterator>
    #include <mutex>
    #include <sstream>
    #include <limits>
    #include <map>
    #include <condition_variable>
    
    namespace jami {
    
    using MutexGuard = std::lock_guard<std::mutex>;
    using MutexLock = std::unique_lock<std::mutex>;
    
    inline
    namespace {
    
    enum class RelayState
    {
        NONE,
        READY,
        DOWN,
    };
    
    }
    
    //==============================================================================
    
    template <class Callable, class... Args>
    inline void
    PjsipCall(Callable& func, Args... args)
    {
        auto status = func(args...);
        if (status != PJ_SUCCESS)
            throw sip_utils::PjsipFailure(status);
    }
    
    template <class Callable, class... Args>
    inline auto
    PjsipCallReturn(const Callable& func, Args... args) -> decltype(func(args...))
    {
        auto res = func(args...);
        if (!res)
            throw sip_utils::PjsipFailure();
        return res;
    }
    
    //==============================================================================
    
    class TurnTransportPimpl
    {
    public:
        TurnTransportPimpl() = default;
        ~TurnTransportPimpl();
    
        void onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state);
        void onRxData(const uint8_t* pkt, unsigned pkt_len, const pj_sockaddr_t* peer_addr, unsigned addr_len);
        void onPeerConnection(pj_uint32_t conn_id, const pj_sockaddr_t* peer_addr, unsigned addr_len, pj_status_t status);
        void ioJob();
    
        std::mutex apiMutex_;
    
        std::map<IpAddr, PeerChannel> peerChannels_;
    
        TurnTransportParams settings;
        pj_caching_pool poolCache {};
        pj_pool_t* pool {nullptr};
        pj_stun_config stunConfig {};
        pj_turn_sock* relay {nullptr};
        pj_str_t relayAddr {};
        IpAddr peerRelayAddr; // address where peers should connect to
        IpAddr mappedAddr;
    
        std::atomic<RelayState> state {RelayState::NONE};
        std::atomic_bool ioJobQuit {false};
        std::thread ioWorker;
    };
    
    TurnTransportPimpl::~TurnTransportPimpl()
    {
        if (relay && state.load() != RelayState::DOWN) {
            try {
                pj_turn_sock_destroy(relay);
            } catch (...) {
                JAMI_ERR() << "exception during pj_turn_sock_destroy() call (ignored)";
            }
        }
        ioJobQuit = true;
        if (ioWorker.joinable())
            ioWorker.join();
        if (stunConfig.ioqueue)
            pj_ioqueue_destroy(stunConfig.ioqueue);
        if (stunConfig.timer_heap)
            pj_timer_heap_destroy(stunConfig.timer_heap);
        pj_caching_pool_destroy(&poolCache);
    }
    
    void
    TurnTransportPimpl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state)
    {
        if (new_state == PJ_TURN_STATE_READY) {
            pj_turn_session_info info;
            pj_turn_sock_get_info(relay, &info);
            peerRelayAddr = IpAddr {info.relay_addr};
            mappedAddr = IpAddr {info.mapped_addr};
            JAMI_DBG("TURN server ready, peer relay address: %s", peerRelayAddr.toString(true, true).c_str());
            state = RelayState::READY;
        } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY) {
            JAMI_WARN("TURN server disconnected (%s)", pj_turn_state_name(new_state));
            state = RelayState::DOWN;
            MutexGuard lk {apiMutex_};
            peerChannels_.clear();
        }
    }
    
    void
    TurnTransportPimpl::onRxData(const uint8_t* pkt, unsigned pkt_len,
                                 const pj_sockaddr_t* addr, unsigned addr_len)
    {
        IpAddr peer_addr (*static_cast<const pj_sockaddr*>(addr), addr_len);
        {
            MutexGuard lk {apiMutex_};
            auto channel_it = peerChannels_.find(peer_addr);
            if (channel_it == std::end(peerChannels_))
                return;
            std::error_code ec;
            auto ret = channel_it->second.write((const char*)pkt, pkt_len, ec);
            if (ret < 0) {
                JAMI_ERR("TURN rx: channel is closed");
            }
        }
    }
    
    void
    TurnTransportPimpl::onPeerConnection(pj_uint32_t conn_id,
                                         const pj_sockaddr_t* addr, unsigned addr_len,
                                         pj_status_t status)
    {
        IpAddr peer_addr (*static_cast<const pj_sockaddr*>(addr), addr_len);
        if (status == PJ_SUCCESS) {
            JAMI_DBG() << "Received connection attempt from "
                        << peer_addr.toString(true, true) << ", id=" << std::hex
                        << conn_id;
            {
                MutexGuard lk {apiMutex_};
                peerChannels_[peer_addr];
            }
        }
    
        if (settings.onPeerConnection)
            settings.onPeerConnection(conn_id, peer_addr, status == PJ_SUCCESS);
    }
    
    void
    TurnTransportPimpl::ioJob()
    {
        sip_utils::register_thread();
    
        while (!ioJobQuit.load()) {
            const pj_time_val delay = {0, 10};
            pj_ioqueue_poll(stunConfig.ioqueue, &delay);
            pj_timer_heap_poll(stunConfig.timer_heap, nullptr);
        }
    }
    
    //==============================================================================
    
    TurnTransport::TurnTransport(const TurnTransportParams& params)
    {
        sip_utils::register_thread();
    
        auto server = params.server;
        if (!server.getPort())
            server.setPort(PJ_STUN_PORT);
    
        if (server.isUnspecified())
            throw std::invalid_argument("invalid turn server address");
    
        pimpl_ = std::unique_ptr<TurnTransportPimpl>(new TurnTransportPimpl);
        pimpl_->settings = params;
    
        // PJSIP memory pool
        pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0);
        pimpl_->pool = PjsipCallReturn(pj_pool_create, &pimpl_->poolCache.factory,
                                       "RgTurnTr", 512, 512, nullptr);
    
        // STUN config
        pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr);
    
        // create global timer heap
        PjsipCall(pj_timer_heap_create, pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap);
    
        // create global ioqueue
        PjsipCall(pj_ioqueue_create, pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue);
    
        // run a thread to handles timer/ioqueue events
        pimpl_->ioWorker = std::thread([this]{ pimpl_->ioJob(); });
    
        // TURN callbacks
        pj_turn_sock_cb relay_cb;
        pj_bzero(&relay_cb, sizeof(relay_cb));
        relay_cb.on_rx_data = [](pj_turn_sock* relay, void* pkt, unsigned pkt_len,
                                 const pj_sockaddr_t* peer_addr, unsigned addr_len) {
            auto pimpl = static_cast<TurnTransportPimpl*>(pj_turn_sock_get_user_data(relay));
            pimpl->onRxData(reinterpret_cast<uint8_t*>(pkt), pkt_len, peer_addr, addr_len);
        };
        relay_cb.on_state = [](pj_turn_sock* relay, pj_turn_state_t old_state,
                               pj_turn_state_t new_state) {
            auto pimpl = static_cast<TurnTransportPimpl*>(pj_turn_sock_get_user_data(relay));
            pimpl->onTurnState(old_state, new_state);
        };
        relay_cb.on_connection_status = [](pj_turn_sock *relay,
                                            pj_status_t status,
                                            pj_uint32_t conn_id,
                                            const pj_sockaddr_t *peer_addr,
                                            unsigned addr_len) {
          auto pimpl = static_cast<TurnTransportPimpl *>(pj_turn_sock_get_user_data(relay));
          pimpl->onPeerConnection(conn_id, peer_addr, addr_len, status);
        };
    
        // TURN socket config
        pj_turn_sock_cfg turn_sock_cfg;
        pj_turn_sock_cfg_default(&turn_sock_cfg);
        turn_sock_cfg.max_pkt_size = params.maxPacketSize;
    
        // TURN socket creation
        PjsipCall(pj_turn_sock_create,
                  &pimpl_->stunConfig, server.getFamily(), PJ_TURN_TP_TCP,
                  &relay_cb, &turn_sock_cfg, &*this->pimpl_, &pimpl_->relay);
    
        // TURN allocation setup
        pj_turn_alloc_param turn_alloc_param;
        pj_turn_alloc_param_default(&turn_alloc_param);
        if (params.authorized_family != 0)
            turn_alloc_param.af = params.authorized_family; // RFC 6156!!!
    
        if (params.isPeerConnection)
            turn_alloc_param.peer_conn_type = PJ_TURN_TP_TCP; // RFC 6062!!!
    
        pj_stun_auth_cred cred;
        pj_bzero(&cred, sizeof(cred));
        cred.type = PJ_STUN_AUTH_CRED_STATIC;
        pj_strset(&cred.data.static_cred.realm, (char*)pimpl_->settings.realm.c_str(), pimpl_->settings.realm.size());
        pj_strset(&cred.data.static_cred.username, (char*)pimpl_->settings.username.c_str(), pimpl_->settings.username.size());
        cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
        pj_strset(&cred.data.static_cred.data, (char*)pimpl_->settings.password.c_str(), pimpl_->settings.password.size());
    
        pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str());
    
        // TURN connection/allocation
        JAMI_DBG() << "Connecting to TURN " << server.toString(true, true);
        try {
            PjsipCall(pj_turn_sock_alloc,
                    pimpl_->relay, &pimpl_->relayAddr, server.getPort(),
                    nullptr, &cred, &turn_alloc_param);
        } catch (const sip_utils::PjsipFailure& e) {
            JAMI_ERR("pj_turn_sock_alloc failed: %s", e.what());
        }
    }
    
    TurnTransport::~TurnTransport() = default;
    
    void
    TurnTransport::shutdown(const IpAddr& addr)
    {
        MutexLock lk {pimpl_->apiMutex_};
        auto& channel = pimpl_->peerChannels_.at(addr);
        channel.stop();
    }
    
    bool
    TurnTransport::isInitiator() const
    {
        return !pimpl_->settings.server;
    }
    
    void
    TurnTransport::permitPeer(const IpAddr& addr)
    {
        if (addr.isUnspecified())
            throw std::invalid_argument("invalid peer address");
    
        if (addr.getFamily() != pimpl_->peerRelayAddr.getFamily())
            throw std::invalid_argument("mismatching peer address family");
    
        sip_utils::register_thread();
        PjsipCall(pj_turn_sock_set_perm, pimpl_->relay, 1, addr.pjPtr(), 1);
    }
    
    bool
    TurnTransport::isReady() const
    {
        return pimpl_->state.load() == RelayState::READY;
    }
    
    void
    TurnTransport::waitServerReady()
    {
        while (pimpl_->state.load() != RelayState::READY) {
            std::this_thread::sleep_for(std::chrono::milliseconds(250));
        }
    }
    
    const IpAddr&
    TurnTransport::peerRelayAddr() const
    {
        return pimpl_->peerRelayAddr;
    }
    
    const IpAddr&
    TurnTransport::mappedAddr() const
    {
        return pimpl_->mappedAddr;
    }
    
    bool
    TurnTransport::sendto(const IpAddr& peer, const char* const buffer, std::size_t length)
    {
        sip_utils::register_thread();
        auto status = pj_turn_sock_sendto(pimpl_->relay,
                                          reinterpret_cast<const pj_uint8_t*>(buffer), length,
                                          peer.pjPtr(), peer.getLength());
        if (status != PJ_SUCCESS && status != PJ_EPENDING && status != PJ_EBUSY)
            throw sip_utils::PjsipFailure(PJ_STATUS_TO_OS(status));
    
        return status != PJ_EBUSY;
    }
    
    bool
    TurnTransport::sendto(const IpAddr& peer, const std::vector<char>& buffer)
    {
        return sendto(peer, &buffer[0], buffer.size());
    }
    
    ssize_t
    TurnTransport::recvfrom(const IpAddr& peer, char* buffer, std::size_t size, std::error_code& ec)
    {
        MutexLock lk {pimpl_->apiMutex_};
        auto& channel = pimpl_->peerChannels_.at(peer);
        lk.unlock();
        return channel.read(buffer, size, ec);
    }
    
    std::vector<IpAddr>
    TurnTransport::peerAddresses() const
    {
        MutexLock lk {pimpl_->apiMutex_};
        return map_utils::extractKeys(pimpl_->peerChannels_);
    }
    
    int
    TurnTransport::waitForData(const IpAddr& peer, std::chrono::milliseconds timeout, std::error_code& ec) const
    {
        (void)ec; ///< \todo handle errors
        MutexLock lk {pimpl_->apiMutex_};
        auto& channel = pimpl_->peerChannels_.at(peer);
        lk.unlock();
        return channel.wait(timeout, ec);
    }
    
    //==============================================================================
    
    ConnectedTurnTransport::ConnectedTurnTransport(TurnTransport& turn, const IpAddr& peer)
        : turn_ {turn}
        , peer_ {peer}
    {}
    
    void
    ConnectedTurnTransport::shutdown()
    {
        turn_.shutdown(peer_);
    }
    
    int
    ConnectedTurnTransport::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
    {
        return turn_.waitForData(peer_, timeout, ec);
    }
    
    std::size_t
    ConnectedTurnTransport::write(const ValueType* buf, std::size_t size, std::error_code& ec)
    {
        try {
            auto success = turn_.sendto(peer_, reinterpret_cast<const char*>(buf), size);
            if (!success) {
                // if !success, pj_turn_sock_sendto returned EBUSY
                // So, we should retry to send this later
                ec.assign(EAGAIN, std::generic_category());
                return 0;
            }
        } catch (const sip_utils::PjsipFailure& ex) {
            ec = ex.code();
            return 0;
        }
    
        ec.clear();
        return size;
    }
    
    std::size_t
    ConnectedTurnTransport::read(ValueType* buf, std::size_t size, std::error_code& ec)
    {
        if (size > 0) {
            return turn_.recvfrom(peer_, reinterpret_cast<char*>(buf), size, ec);
        }
    
        ec.clear();
        return size;
    }
    
    } // namespace jami