Skip to content
Snippets Groups Projects
Commit f1819432 authored by Guillaume Roguez's avatar Guillaume Roguez
Browse files

add pjnath TurnTransport class


TurnTransport class is a C++ wrapper on PJNATH turn socket API.
Bring an easy to use socket connected to a TURN server.
Support TCP (even with peer) and UDP connection types.

Requires rfc6062 support into pjproject 2.6, given by patch:
88c820ee / contrib: implement rfc6062 in pjnath

Include fonctional tests working with turn.ring.cx.
Run tests as usual with "make check" command.

Change-Id: Idf14c2ea192cab2fccf99b142492086284920a6b
Reviewed-by: default avatarOlivier Soldano <olivier.soldano@savoirfairelinux.com>
parent 5bdbead7
Branches
No related tags found
No related merge requests found
...@@ -704,7 +704,8 @@ AC_CONFIG_FILES([Makefile \ ...@@ -704,7 +704,8 @@ AC_CONFIG_FILES([Makefile \
src/upnp/Makefile \ src/upnp/Makefile \
ringtones/Makefile \ ringtones/Makefile \
test/Makefile\ test/Makefile\
test/sip/Makefile \ test/sip/Makefile
test/turn/Makefile \
test/unitTest/Makefile \ test/unitTest/Makefile \
man/Makefile \ man/Makefile \
......
...@@ -140,7 +140,9 @@ libring_la_SOURCES = \ ...@@ -140,7 +140,9 @@ libring_la_SOURCES = \
smartools.cpp \ smartools.cpp \
smartools.h \ smartools.h \
base64.h \ base64.h \
base64.cpp base64.cpp \
turn_transport.h \
turn_transport.cpp
if HAVE_WIN32 if HAVE_WIN32
libring_la_SOURCES += \ libring_la_SOURCES += \
......
/*
* Copyright (C) 2017 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "turn_transport.h"
#include "logger.h"
#include "ip_utils.h"
#include "sip/sip_utils.h"
#include <pjnath.h>
#include <pjlib-util.h>
#include <pjlib.h>
#include <stdexcept>
#include <future>
#include <atomic>
#include <thread>
#include <vector>
#include <iterator>
#include <mutex>
namespace ring {
enum class RelayState {
NONE,
READY,
DOWN,
};
class TurnTransportPimpl {
public:
TurnTransportPimpl() = default;
~TurnTransportPimpl();
void onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state);
void onRxData(uint8_t* pkt, unsigned pkt_len, const pj_sockaddr_t* peer_addr, unsigned addr_len);
void onPeerConnection(pj_uint32_t conn_id, const pj_sockaddr_t* peer_addr, unsigned addr_len);
void ioJob();
TurnTransportParams settings;
pj_caching_pool poolCache {};
pj_pool_t* pool {nullptr};
pj_stun_config stunConfig {};
pj_turn_sock* relay {nullptr};
pj_str_t relayAddr {};
IpAddr peerRelayAddr; // address where peers should connect to
IpAddr mappedAddr;
std::map<IpAddr, std::vector<char>> streams;
std::mutex streamsMutex;
std::atomic<RelayState> state {RelayState::NONE};
std::atomic_bool ioJobQuit {false};
std::thread ioWorker;
};
TurnTransportPimpl::~TurnTransportPimpl()
{
if (relay)
pj_turn_sock_destroy(relay);
ioJobQuit = true;
if (ioWorker.joinable())
ioWorker.join();
if (pool)
pj_pool_release(pool);
pj_caching_pool_destroy(&poolCache);
}
void
TurnTransportPimpl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state)
{
if (new_state == PJ_TURN_STATE_READY) {
pj_turn_session_info info;
pj_turn_sock_get_info(relay, &info);
peerRelayAddr = IpAddr {info.relay_addr};
mappedAddr = IpAddr {info.mapped_addr};
RING_DBG("TURN server ready, peer relay address: %s", peerRelayAddr.toString(true, true).c_str());
state = RelayState::READY;
} else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY) {
RING_WARN("TURN server disconnected (%s)", pj_turn_state_name(new_state));
state = RelayState::DOWN;
}
}
void
TurnTransportPimpl::onRxData(uint8_t* pkt, unsigned pkt_len,
const pj_sockaddr_t* addr, unsigned addr_len)
{
IpAddr peer_addr ( *static_cast<const pj_sockaddr*>(addr), addr_len );
std::lock_guard<std::mutex> lk {streamsMutex};
auto& vec = streams[peer_addr];
vec.insert(vec.cend(), pkt, pkt + pkt_len);
}
void
TurnTransportPimpl::onPeerConnection(pj_uint32_t conn_id,
const pj_sockaddr_t* addr, unsigned addr_len)
{
IpAddr peer_addr ( *static_cast<const pj_sockaddr*>(addr), addr_len );
RING_DBG("Received connection attempt from %s, id=%x",
peer_addr.toString(true, true).c_str(), conn_id);
{
std::lock_guard<std::mutex> lk {streamsMutex};
streams[peer_addr].clear();
}
pj_turn_connect_peer(relay, conn_id, addr, addr_len);
if (settings.onPeerConnection)
settings.onPeerConnection(conn_id, peer_addr);
}
void
TurnTransportPimpl::ioJob()
{
sip_utils::register_thread();
while (!ioJobQuit.load()) {
const pj_time_val delay = {0, 10};
pj_ioqueue_poll(stunConfig.ioqueue, &delay);
pj_timer_heap_poll(stunConfig.timer_heap, nullptr);
}
}
class PjsipError final : public std::exception {
public:
PjsipError() = default;
explicit PjsipError(pj_status_t st) : std::exception() {
char err_msg[PJ_ERR_MSG_SIZE];
pj_strerror(st, err_msg, sizeof(err_msg));
what_msg_ += ": ";
what_msg_ += err_msg;
}
const char* what() const noexcept override {
return what_msg_.c_str();
};
private:
std::string what_msg_ {"PJSIP api error"};
};
template <class Callable, class... Args>
inline void PjsipCall(Callable& func, Args... args)
{
auto status = func(args...);
if (status != PJ_SUCCESS)
throw PjsipError(status);
}
template <class Callable, class... Args>
inline auto PjsipCallReturn(const Callable& func, Args... args) -> decltype(func(args...))
{
auto res = func(args...);
if (!res)
throw PjsipError();
return res;
}
//==================================================================================================
TurnTransport::TurnTransport(const TurnTransportParams& params)
: pimpl_ {new TurnTransportPimpl}
{
auto server = params.server;
if (!server.getPort())
server.setPort(PJ_STUN_PORT);
if (server.isUnspecified())
throw std::invalid_argument("invalid turn server address");
pimpl_->settings = params;
// PJSIP memory pool
pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0);
pimpl_->pool = PjsipCallReturn(pj_pool_create, &pimpl_->poolCache.factory,
"RgTurnTr", 512, 512, nullptr);
// STUN config
pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr);
// create global timer heap
PjsipCall(pj_timer_heap_create, pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap);
// create global ioqueue
PjsipCall(pj_ioqueue_create, pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue);
// run a thread to handles timer/ioqueue events
pimpl_->ioWorker = std::thread([this]{ pimpl_->ioJob(); });
// TURN callbacks
pj_turn_sock_cb relay_cb;
pj_bzero(&relay_cb, sizeof(relay_cb));
relay_cb.on_rx_data = [](pj_turn_sock* relay, void* pkt, unsigned pkt_len,
const pj_sockaddr_t* peer_addr, unsigned addr_len) {
auto tr = static_cast<TurnTransport*>(pj_turn_sock_get_user_data(relay));
tr->pimpl_->onRxData(reinterpret_cast<uint8_t*>(pkt), pkt_len, peer_addr, addr_len);
};
relay_cb.on_state = [](pj_turn_sock* relay, pj_turn_state_t old_state,
pj_turn_state_t new_state) {
auto tr = static_cast<TurnTransport*>(pj_turn_sock_get_user_data(relay));
tr->pimpl_->onTurnState(old_state, new_state);
};
relay_cb.on_peer_connection = [](pj_turn_sock* relay, pj_uint32_t conn_id,
const pj_sockaddr_t* peer_addr, unsigned addr_len){
auto tr = static_cast<TurnTransport*>(pj_turn_sock_get_user_data(relay));
tr->pimpl_->onPeerConnection(conn_id, peer_addr, addr_len);
};
// TURN socket config
pj_turn_sock_cfg turn_sock_cfg;
pj_turn_sock_cfg_default(&turn_sock_cfg);
turn_sock_cfg.max_pkt_size = params.maxPacketSize;
// TURN socket creation
PjsipCall(pj_turn_sock_create,
&pimpl_->stunConfig, server.getFamily(), PJ_TURN_TP_TCP,
&relay_cb, &turn_sock_cfg, this, &pimpl_->relay);
// TURN allocation setup
pj_turn_alloc_param turn_alloc_param;
pj_turn_alloc_param_default(&turn_alloc_param);
if (params.isPeerConnection)
turn_alloc_param.peer_conn_type = PJ_TURN_TP_TCP; // RFC 6062!!!
pj_stun_auth_cred cred;
pj_bzero(&cred, sizeof(cred));
cred.type = PJ_STUN_AUTH_CRED_STATIC;
pj_cstr(&cred.data.static_cred.realm, params.realm.c_str());
pj_cstr(&cred.data.static_cred.username, params.username.c_str());
cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
pj_cstr(&cred.data.static_cred.data, params.password.c_str());
pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str());
// TURN connection/allocation
RING_DBG() << "Connecting to TURN " << server.toString(true, true);
PjsipCall(pj_turn_sock_alloc,
pimpl_->relay, &pimpl_->relayAddr, server.getPort(),
nullptr, &cred, &turn_alloc_param);
}
TurnTransport::~TurnTransport()
{}
void
TurnTransport::permitPeer(const IpAddr& addr)
{
if (addr.isUnspecified())
throw std::invalid_argument("invalid peer address");
PjsipCall(pj_turn_sock_set_perm, pimpl_->relay, 1, addr.pjPtr(), 1);
}
bool
TurnTransport::isReady() const
{
return pimpl_->state.load() == RelayState::READY;
}
void
TurnTransport::waitServerReady()
{
while (pimpl_->state.load() != RelayState::READY) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
}
const IpAddr&
TurnTransport::peerRelayAddr() const
{
return pimpl_->peerRelayAddr;
}
const IpAddr&
TurnTransport::mappedAddr() const
{
return pimpl_->mappedAddr;
}
bool
TurnTransport::sendto(const IpAddr& peer, const std::vector<char>& buffer)
{
auto status = pj_turn_sock_sendto(pimpl_->relay,
reinterpret_cast<const pj_uint8_t*>(buffer.data()), buffer.size(),
peer.pjPtr(), peer.getLength());
if (status != PJ_SUCCESS && status != PJ_EPENDING)
throw PjsipError(status);
return status == PJ_SUCCESS;
}
void
TurnTransport::recvfrom(std::map<IpAddr, std::vector<char>>& streams)
{
std::lock_guard<std::mutex> lk {pimpl_->streamsMutex};
streams = std::move(pimpl_->streams);
pimpl_->streams.clear();
}
} // namespace ring
/*
* Copyright (C) 2017 Savoir-faire Linux Inc.
*
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include "ip_utils.h"
#include <string>
#include <memory>
#include <functional>
#include <map>
namespace ring {
class TurnTransportPimpl;
struct TurnTransportParams {
IpAddr server;
// Plain Credentials
std::string realm;
std::string username;
std::string password;
bool isPeerConnection {false};
uint32_t connectionId {0};
std::function<void(uint32_t conn_id, const IpAddr& peer_addr)> onPeerConnection;
std::size_t maxPacketSize {3000}; ///< size of one "logical" packet
};
class TurnTransport {
public:
///
/// Constructs a TurnTransport connected by TCP to given server.
///
/// Throw std::invalid_argument of peer address is invalid.
///
/// \param param parameters to setup the transport
///
/// \note If TURN server port is not set, the default TURN port 3478 (RFC5766) is used.
///
TurnTransport(const TurnTransportParams& param);
~TurnTransport();
///
/// Wait for successful connection on the TURN server.
///
/// TurnTransport constructor connects asynchronously on the TURN server.
/// You need to wait the READY state before calling any other APIs.
///
void waitServerReady();
bool isReady() const;
const IpAddr& peerRelayAddr() const;
const IpAddr& mappedAddr() const;
///
/// Gives server access permission to given peer by its address.
///
/// Throw std::invalid_argument of peer address is invalid.
/// Throw std::runtime_error if case of backend errors.
///
/// \param addr peer address
///
/// \note The peer address family must be same as the turn server.
/// \note Must be called only if server is ready.
/// \see waitServerReady
///
void permitPeer(const IpAddr& addr);
///
/// Collect pending data.
///
void recvfrom(std::map<IpAddr, std::vector<char>>& streams);
///
/// Send data to a given peer through the TURN tunnel.
///
bool sendto(const IpAddr& peer, const std::vector<char>& data);
public:
// Move semantic
TurnTransport(TurnTransport&&) = default;
TurnTransport& operator=(TurnTransport&&) = default;
private:
TurnTransport() = delete;
std::unique_ptr<TurnTransportPimpl> pimpl_;
};
} // namespace ring
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
# test binaries # test binaries
ut_* ut_*
test_*
# test result files # test result files
*.log *.log
......
SUBDIRS = sip SUBDIRS = sip turn
SUBDIRS += unitTest SUBDIRS += unitTest
# Rules for the test code (use `make check` to execute)
include $(top_srcdir)/globals.mk
AM_CXXFLAGS = -I$(top_srcdir)/src
AM_LDFLAGS = $(CPPUNIT_LIBS) $(top_builddir)/src/libring.la
check_PROGRAMS = test_turn
test_turn_SOURCES = main.cpp test_turn.h test_TURN.cpp
TESTS = $(check_PROGRAMS)
/*
* Copyright (C) 2017 Savoir-Faire Linux Inc.
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <cppunit/ui/text/TestRunner.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/CompilerOutputter.h>
#include "dring.h"
#include <stdexcept>
void init_daemon()
{
DRing::init(DRing::InitFlag(DRing::DRING_FLAG_DEBUG | DRing::DRING_FLAG_CONSOLE_LOG));
DRing::start("dring-sample.yml");
}
int main()
{
init_daemon();
CppUnit::TextUi::TestRunner runner;
// Register all tests
auto& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());
// Use a compiler error format outputter for results and output into stderr
runner.setOutputter(new CppUnit::CompilerOutputter(&runner.result(), std::cerr ));
bool ret;
try {
// Run tests
ret = !runner.run("", false);
} catch (const std::exception& e) {
std::cerr << "Exception catched during tests: " << e.what() << '\n';
ret = 1;
}
DRing::fini();
return ret;
}
/*
* Copyright (C) 2017 Savoir-Faire Linux Inc.
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "test_TURN.h"
#include "turn_transport.h"
#include <sys/socket.h>
#include <sys/unistd.h>
#include <stdexcept>
#include <thread>
#include <chrono>
#include <vector>
using namespace ring;
CPPUNIT_TEST_SUITE_REGISTRATION( test_TURN );
class TCPSocket {
public:
TCPSocket(int fa) {
sock_ = ::socket(fa, SOCK_STREAM, 0);
if (sock_ < 0)
throw std::system_error(errno, std::system_category());
IpAddr bound {"0.0.0.0"};
::bind(sock_, bound, bound.getLength());
}
~TCPSocket() {
if (sock_ >= 0)
::close(sock_);
}
void connect(const IpAddr& addr) {
if (::connect(sock_, addr, addr.getLength()) < 0)
throw std::system_error(errno, std::system_category());
}
void send(const std::string& pkt) {
if (::send(sock_, pkt.data(), pkt.size(), 0) < 0)
throw std::system_error(errno, std::system_category());
}
void send(const std::vector<char>& pkt) {
if (::send(sock_, pkt.data(), pkt.size(), 0) < 0)
throw std::system_error(errno, std::system_category());
}
std::vector<char> recv(std::size_t max_len) {
std::vector<char> pkt(max_len);
auto rs = ::recv(sock_, pkt.data(), pkt.size(), 0);
if (rs < 0)
throw std::system_error(errno, std::system_category());
pkt.resize(rs);
pkt.shrink_to_fit();
return pkt;
}
private:
int sock_ {-1};
};
void
test_TURN::testSimpleConnection()
{
TurnTransportParams param;
param.server = IpAddr {"turn.ring.cx"};
param.realm = "ring";
param.username = "ring";
param.password = "ring";
param.isPeerConnection = true;
TurnTransport turn {param};
turn.waitServerReady();
TCPSocket sock = {param.server.getFamily()};
// Permit myself
turn.permitPeer(turn.mappedAddr());
sock.connect(turn.peerRelayAddr());
std::string test_data = "Hello, World!";
sock.send(test_data);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::map<IpAddr, std::vector<char>> streams;
turn.recvfrom(streams);
CPPUNIT_ASSERT(streams.size() == 1);
auto peer_addr = std::begin(streams)->first;
const auto& vec = std::begin(streams)->second;
CPPUNIT_ASSERT(std::string(std::begin(vec), std::end(vec)) == test_data);
turn.recvfrom(streams);
CPPUNIT_ASSERT(streams.size() == 0);
turn.sendto(peer_addr, std::vector<char>{1, 2, 3, 4});
std::this_thread::sleep_for(std::chrono::seconds(1));
auto res = sock.recv(1000);
CPPUNIT_ASSERT(res.size() == 4);
#if 0
// DISABLED SECTION
// This code higly load the network and can be long to execute.
// Only kept for manual testing purpose.
std::vector<char> big(100000);
using clock = std::chrono::high_resolution_clock;
auto t1 = clock::now();
sock.send(big);
auto t2 = clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t1).count();
std::cout << "T= " << duration << "ns"
<< ", V= " << (8000. * big.size() / duration) << "Mb/s"
<< " / " << (1000. * big.size() / duration) << "MB/s"
<< '\n';
std::this_thread::sleep_for(std::chrono::seconds(5));
#endif
}
/*
* Copyright (C) 2017 Savoir-Faire Linux Inc.
* Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
*/
#pragma once
// Cppunit import
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/TestCaller.h>
#include <cppunit/TestCase.h>
#include <cppunit/TestSuite.h>
/*
* @file test_TURN.h
* @brief Regroups unitary tests related to the TURN transport class
*/
class test_TURN : public CppUnit::TestFixture
{
private:
void testSimpleConnection(void);
/**
* Use cppunit library macros to add unit test to the factory
*/
CPPUNIT_TEST_SUITE(test_TURN);
CPPUNIT_TEST(testSimpleConnection);
CPPUNIT_TEST_SUITE_END();
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment