Select Git revision
turn_transport.cpp
-
Adrien Béraud authored
Change-Id: Iaad641846fdd69b4f421302bcbfb1a1df7878238
Adrien Béraud authoredChange-Id: Iaad641846fdd69b4f421302bcbfb1a1df7878238
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
turn_transport.cpp 13.40 KiB
/*
* Copyright (C) 2004-2020 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "turn_transport.h"
#include "transport/peer_channel.h"
#include "logger.h"
#include "ip_utils.h"
#include "sip/sip_utils.h"
#include "map_utils.h"
#include <pjnath.h>
#include <pjlib-util.h>
#include <pjlib.h>
#include <future>
#include <atomic>
#include <thread>
#include <vector>
#include <iterator>
#include <mutex>
#include <sstream>
#include <limits>
#include <map>
#include <condition_variable>
namespace jami {
using MutexGuard = std::lock_guard<std::mutex>;
using MutexLock = std::unique_lock<std::mutex>;
inline
namespace {
enum class RelayState
{
NONE,
READY,
DOWN,
};
}
//==============================================================================
template <class Callable, class... Args>
inline void
PjsipCall(Callable& func, Args... args)
{
auto status = func(args...);
if (status != PJ_SUCCESS)
throw sip_utils::PjsipFailure(status);
}
template <class Callable, class... Args>
inline auto
PjsipCallReturn(const Callable& func, Args... args) -> decltype(func(args...))
{
auto res = func(args...);
if (!res)
throw sip_utils::PjsipFailure();
return res;
}
//==============================================================================
class TurnTransportPimpl
{
public:
TurnTransportPimpl() = default;
~TurnTransportPimpl();
void onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state);
void onRxData(const uint8_t* pkt, unsigned pkt_len, const pj_sockaddr_t* peer_addr, unsigned addr_len);
void onPeerConnection(pj_uint32_t conn_id, const pj_sockaddr_t* peer_addr, unsigned addr_len, pj_status_t status);
void ioJob();
std::mutex apiMutex_;
std::map<IpAddr, PeerChannel> peerChannels_;
TurnTransportParams settings;
pj_caching_pool poolCache {};
pj_pool_t* pool {nullptr};
pj_stun_config stunConfig {};
pj_turn_sock* relay {nullptr};
pj_str_t relayAddr {};
IpAddr peerRelayAddr; // address where peers should connect to
IpAddr mappedAddr;
std::atomic<RelayState> state {RelayState::NONE};
std::atomic_bool ioJobQuit {false};
std::thread ioWorker;
};
TurnTransportPimpl::~TurnTransportPimpl()
{
if (relay && state.load() != RelayState::DOWN) {
try {
pj_turn_sock_destroy(relay);
} catch (...) {
JAMI_ERR() << "exception during pj_turn_sock_destroy() call (ignored)";
}
}
ioJobQuit = true;
if (ioWorker.joinable())
ioWorker.join();
if (stunConfig.ioqueue)
pj_ioqueue_destroy(stunConfig.ioqueue);
if (stunConfig.timer_heap)
pj_timer_heap_destroy(stunConfig.timer_heap);
pj_caching_pool_destroy(&poolCache);
}
void
TurnTransportPimpl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state)
{
if (new_state == PJ_TURN_STATE_READY) {
pj_turn_session_info info;
pj_turn_sock_get_info(relay, &info);
peerRelayAddr = IpAddr {info.relay_addr};
mappedAddr = IpAddr {info.mapped_addr};
JAMI_DBG("TURN server ready, peer relay address: %s", peerRelayAddr.toString(true, true).c_str());
state = RelayState::READY;
} else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY) {
JAMI_WARN("TURN server disconnected (%s)", pj_turn_state_name(new_state));
state = RelayState::DOWN;
MutexGuard lk {apiMutex_};
peerChannels_.clear();
}
}
void
TurnTransportPimpl::onRxData(const uint8_t* pkt, unsigned pkt_len,
const pj_sockaddr_t* addr, unsigned addr_len)
{
IpAddr peer_addr (*static_cast<const pj_sockaddr*>(addr), addr_len);
{
MutexGuard lk {apiMutex_};
auto channel_it = peerChannels_.find(peer_addr);
if (channel_it == std::end(peerChannels_))
return;
std::error_code ec;
auto ret = channel_it->second.write((const char*)pkt, pkt_len, ec);
if (ret < 0) {
JAMI_ERR("TURN rx: channel is closed");
}
}
}
void
TurnTransportPimpl::onPeerConnection(pj_uint32_t conn_id,
const pj_sockaddr_t* addr, unsigned addr_len,
pj_status_t status)
{
IpAddr peer_addr (*static_cast<const pj_sockaddr*>(addr), addr_len);
if (status == PJ_SUCCESS) {
JAMI_DBG() << "Received connection attempt from "
<< peer_addr.toString(true, true) << ", id=" << std::hex
<< conn_id;
{
MutexGuard lk {apiMutex_};
peerChannels_[peer_addr];
}
}
if (settings.onPeerConnection)
settings.onPeerConnection(conn_id, peer_addr, status == PJ_SUCCESS);
}
void
TurnTransportPimpl::ioJob()
{
sip_utils::register_thread();
while (!ioJobQuit.load()) {
const pj_time_val delay = {0, 10};
pj_ioqueue_poll(stunConfig.ioqueue, &delay);
pj_timer_heap_poll(stunConfig.timer_heap, nullptr);
}
}
//==============================================================================
TurnTransport::TurnTransport(const TurnTransportParams& params)
{
sip_utils::register_thread();
auto server = params.server;
if (!server.getPort())
server.setPort(PJ_STUN_PORT);
if (server.isUnspecified())
throw std::invalid_argument("invalid turn server address");
pimpl_ = std::unique_ptr<TurnTransportPimpl>(new TurnTransportPimpl);
pimpl_->settings = params;
// PJSIP memory pool
pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0);
pimpl_->pool = PjsipCallReturn(pj_pool_create, &pimpl_->poolCache.factory,
"RgTurnTr", 512, 512, nullptr);
// STUN config
pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr);
// create global timer heap
PjsipCall(pj_timer_heap_create, pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap);
// create global ioqueue
PjsipCall(pj_ioqueue_create, pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue);
// run a thread to handles timer/ioqueue events
pimpl_->ioWorker = std::thread([this]{ pimpl_->ioJob(); });
// TURN callbacks
pj_turn_sock_cb relay_cb;
pj_bzero(&relay_cb, sizeof(relay_cb));
relay_cb.on_rx_data = [](pj_turn_sock* relay, void* pkt, unsigned pkt_len,
const pj_sockaddr_t* peer_addr, unsigned addr_len) {
auto pimpl = static_cast<TurnTransportPimpl*>(pj_turn_sock_get_user_data(relay));
pimpl->onRxData(reinterpret_cast<uint8_t*>(pkt), pkt_len, peer_addr, addr_len);
};
relay_cb.on_state = [](pj_turn_sock* relay, pj_turn_state_t old_state,
pj_turn_state_t new_state) {
auto pimpl = static_cast<TurnTransportPimpl*>(pj_turn_sock_get_user_data(relay));
pimpl->onTurnState(old_state, new_state);
};
relay_cb.on_connection_status = [](pj_turn_sock *relay,
pj_status_t status,
pj_uint32_t conn_id,
const pj_sockaddr_t *peer_addr,
unsigned addr_len) {
auto pimpl = static_cast<TurnTransportPimpl *>(pj_turn_sock_get_user_data(relay));
pimpl->onPeerConnection(conn_id, peer_addr, addr_len, status);
};
// TURN socket config
pj_turn_sock_cfg turn_sock_cfg;
pj_turn_sock_cfg_default(&turn_sock_cfg);
turn_sock_cfg.max_pkt_size = params.maxPacketSize;
// TURN socket creation
PjsipCall(pj_turn_sock_create,
&pimpl_->stunConfig, server.getFamily(), PJ_TURN_TP_TCP,
&relay_cb, &turn_sock_cfg, &*this->pimpl_, &pimpl_->relay);
// TURN allocation setup
pj_turn_alloc_param turn_alloc_param;
pj_turn_alloc_param_default(&turn_alloc_param);
if (params.authorized_family != 0)
turn_alloc_param.af = params.authorized_family; // RFC 6156!!!
if (params.isPeerConnection)
turn_alloc_param.peer_conn_type = PJ_TURN_TP_TCP; // RFC 6062!!!
pj_stun_auth_cred cred;
pj_bzero(&cred, sizeof(cred));
cred.type = PJ_STUN_AUTH_CRED_STATIC;
pj_strset(&cred.data.static_cred.realm, (char*)pimpl_->settings.realm.c_str(), pimpl_->settings.realm.size());
pj_strset(&cred.data.static_cred.username, (char*)pimpl_->settings.username.c_str(), pimpl_->settings.username.size());
cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
pj_strset(&cred.data.static_cred.data, (char*)pimpl_->settings.password.c_str(), pimpl_->settings.password.size());
pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str());
// TURN connection/allocation
JAMI_DBG() << "Connecting to TURN " << server.toString(true, true);
try {
PjsipCall(pj_turn_sock_alloc,
pimpl_->relay, &pimpl_->relayAddr, server.getPort(),
nullptr, &cred, &turn_alloc_param);
} catch (const sip_utils::PjsipFailure& e) {
JAMI_ERR("pj_turn_sock_alloc failed: %s", e.what());
}
}
TurnTransport::~TurnTransport() = default;
void
TurnTransport::shutdown(const IpAddr& addr)
{
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(addr);
channel.stop();
}
bool
TurnTransport::isInitiator() const
{
return !pimpl_->settings.server;
}
void
TurnTransport::permitPeer(const IpAddr& addr)
{
if (addr.isUnspecified())
throw std::invalid_argument("invalid peer address");
if (addr.getFamily() != pimpl_->peerRelayAddr.getFamily())
throw std::invalid_argument("mismatching peer address family");
sip_utils::register_thread();
PjsipCall(pj_turn_sock_set_perm, pimpl_->relay, 1, addr.pjPtr(), 1);
}
bool
TurnTransport::isReady() const
{
return pimpl_->state.load() == RelayState::READY;
}
void
TurnTransport::waitServerReady()
{
while (pimpl_->state.load() != RelayState::READY) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
}
const IpAddr&
TurnTransport::peerRelayAddr() const
{
return pimpl_->peerRelayAddr;
}
const IpAddr&
TurnTransport::mappedAddr() const
{
return pimpl_->mappedAddr;
}
bool
TurnTransport::sendto(const IpAddr& peer, const char* const buffer, std::size_t length)
{
sip_utils::register_thread();
auto status = pj_turn_sock_sendto(pimpl_->relay,
reinterpret_cast<const pj_uint8_t*>(buffer), length,
peer.pjPtr(), peer.getLength());
if (status != PJ_SUCCESS && status != PJ_EPENDING && status != PJ_EBUSY)
throw sip_utils::PjsipFailure(PJ_STATUS_TO_OS(status));
return status != PJ_EBUSY;
}
bool
TurnTransport::sendto(const IpAddr& peer, const std::vector<char>& buffer)
{
return sendto(peer, &buffer[0], buffer.size());
}
ssize_t
TurnTransport::recvfrom(const IpAddr& peer, char* buffer, std::size_t size, std::error_code& ec)
{
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(peer);
lk.unlock();
return channel.read(buffer, size, ec);
}
std::vector<IpAddr>
TurnTransport::peerAddresses() const
{
MutexLock lk {pimpl_->apiMutex_};
return map_utils::extractKeys(pimpl_->peerChannels_);
}
int
TurnTransport::waitForData(const IpAddr& peer, std::chrono::milliseconds timeout, std::error_code& ec) const
{
(void)ec; ///< \todo handle errors
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(peer);
lk.unlock();
return channel.wait(timeout, ec);
}
//==============================================================================
ConnectedTurnTransport::ConnectedTurnTransport(TurnTransport& turn, const IpAddr& peer)
: turn_ {turn}
, peer_ {peer}
{}
void
ConnectedTurnTransport::shutdown()
{
turn_.shutdown(peer_);
}
int
ConnectedTurnTransport::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
return turn_.waitForData(peer_, timeout, ec);
}
std::size_t
ConnectedTurnTransport::write(const ValueType* buf, std::size_t size, std::error_code& ec)
{
try {
auto success = turn_.sendto(peer_, reinterpret_cast<const char*>(buf), size);
if (!success) {
// if !success, pj_turn_sock_sendto returned EBUSY
// So, we should retry to send this later
ec.assign(EAGAIN, std::generic_category());
return 0;
}
} catch (const sip_utils::PjsipFailure& ex) {
ec = ex.code();
return 0;
}
ec.clear();
return size;
}
std::size_t
ConnectedTurnTransport::read(ValueType* buf, std::size_t size, std::error_code& ec)
{
if (size > 0) {
return turn_.recvfrom(peer_, reinterpret_cast<char*>(buf), size, ec);
}
ec.clear();
return size;
}
} // namespace jami