Skip to content
Snippets Groups Projects
Commit 5edab08f authored by Guillaume Roguez's avatar Guillaume Roguez
Browse files

ice: move IO thread per transport

IO events were handled by a unique thread owned by IceTransportFactory.
This patch changes that by creating a thread per IceTransport instance.

This improves a bit (on multiple-threaded architectures at least)
events handling as not all events handler run at the same speed
(SIP events are slow, Media IOs need low overhead).

Issue: #79692
Change-Id: Ie742373176f9447a14286910e6af562427cecdbd
parent 6151e58d
Branches
Tags
No related merge requests found
...@@ -50,6 +50,12 @@ ...@@ -50,6 +50,12 @@
namespace ring { namespace ring {
// TODO: C++14 ? remove me and use std::min
template< class T >
static constexpr const T& min( const T& a, const T& b ) {
return (b < a) ? b : a;
}
static void static void
register_thread() register_thread()
{ {
...@@ -106,6 +112,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, ...@@ -106,6 +112,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master,
, component_count_(component_count) , component_count_(component_count)
, compIO_(component_count) , compIO_(component_count)
, initiator_session_(master) , initiator_session_(master)
, thread_()
{ {
if (options.upnpEnable) if (options.upnpEnable)
upnp_.reset(new upnp::Controller()); upnp_.reset(new upnp::Controller());
...@@ -168,6 +175,17 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, ...@@ -168,6 +175,17 @@ IceTransport::IceTransport(const char* name, int component_count, bool master,
} else } else
config_.turn.port = 0; config_.turn.port = 0;
static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64);
TRY( pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap) );
TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue) );
thread_ = std::thread([this]{
register_thread();
while (not threadTerminateFlags_) {
handleEvents(500); // limit polling to 500ms
}
});
pj_ice_strans* icest = nullptr; pj_ice_strans* icest = nullptr;
pj_status_t status = pj_ice_strans_create(name, &config_, component_count, pj_status_t status = pj_ice_strans_create(name, &config_, component_count,
this, &icecb, &icest); this, &icecb, &icest);
...@@ -177,7 +195,61 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, ...@@ -177,7 +195,61 @@ IceTransport::IceTransport(const char* name, int component_count, bool master,
IceTransport::~IceTransport() IceTransport::~IceTransport()
{ {
icest_.reset(); // must be done first to invalid callbacks register_thread();
threadTerminateFlags_ = true;
if (thread_.joinable())
thread_.join();
icest_.reset(); // must be done before ioqueue/timer destruction
if (config_.stun_cfg.ioqueue)
pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
if (config_.stun_cfg.timer_heap)
pj_timer_heap_destroy(config_.stun_cfg.timer_heap);
}
void
IceTransport::handleEvents(unsigned max_msec)
{
// By tests, never seen more than two events per 500ms
static constexpr auto MAX_NET_EVENTS = 2;
pj_time_val max_timeout = {0, 0};
pj_time_val timeout = {0, 0};
unsigned net_event_count = 0;
max_timeout.msec = max_msec;
timeout.sec = timeout.msec = 0;
pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout);
// timeout limitation
if (timeout.msec >= 1000)
timeout.msec = 999;
if (PJ_TIME_VAL_GT(timeout, max_timeout))
timeout = max_timeout;
do {
auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
// timeout
if (not n_events)
return;
// error
if (n_events < 0) {
const auto err = pj_get_os_error();
// Kept as debug as some errors are "normal" in regular context
RING_DBG("IceIOQueue: error %d - %s", err, sip_utils::sip_strerror(err).c_str());
std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
return;
}
net_event_count += n_events;
timeout.sec = timeout.msec = 0;
} while (net_event_count < MAX_NET_EVENTS);
} }
void void
...@@ -798,16 +870,9 @@ IceTransport::waitForData(int comp_id, unsigned int timeout) ...@@ -798,16 +870,9 @@ IceTransport::waitForData(int comp_id, unsigned int timeout)
return io.queue.front().datalen; return io.queue.front().datalen;
} }
// TODO: C++14 ? remove me and use std::min
template< class T >
static constexpr const T& min( const T& a, const T& b ) {
return (b < a) ? b : a;
}
IceTransportFactory::IceTransportFactory() IceTransportFactory::IceTransportFactory()
: cp_() : cp_()
, pool_(nullptr, pj_pool_release) , pool_(nullptr, pj_pool_release)
, thread_()
, ice_cfg_() , ice_cfg_()
{ {
pj_caching_pool_init(&cp_, NULL, 0); pj_caching_pool_init(&cp_, NULL, 0);
...@@ -819,12 +884,6 @@ IceTransportFactory::IceTransportFactory() ...@@ -819,12 +884,6 @@ IceTransportFactory::IceTransportFactory()
pj_ice_strans_cfg_default(&ice_cfg_); pj_ice_strans_cfg_default(&ice_cfg_);
ice_cfg_.stun_cfg.pf = &cp_.factory; ice_cfg_.stun_cfg.pf = &cp_.factory;
static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64);
TRY( pj_timer_heap_create(pool_.get(), 100, &ice_cfg_.stun_cfg.timer_heap) );
TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &ice_cfg_.stun_cfg.ioqueue) );
thread_ = std::thread(std::bind(&IceTransportFactory::processThread, this));
ice_cfg_.af = pj_AF_INET(); ice_cfg_.af = pj_AF_INET();
ice_cfg_.stun.cfg.max_pkt_size = 8192; ice_cfg_.stun.cfg.max_pkt_size = 8192;
...@@ -835,73 +894,10 @@ IceTransportFactory::IceTransportFactory() ...@@ -835,73 +894,10 @@ IceTransportFactory::IceTransportFactory()
IceTransportFactory::~IceTransportFactory() IceTransportFactory::~IceTransportFactory()
{ {
thread_quit_flag_ = PJ_TRUE;
if (thread_.joinable())
thread_.join();
if (ice_cfg_.stun_cfg.ioqueue)
pj_ioqueue_destroy(ice_cfg_.stun_cfg.ioqueue);
if (ice_cfg_.stun_cfg.timer_heap)
pj_timer_heap_destroy(ice_cfg_.stun_cfg.timer_heap);
pool_.reset(); pool_.reset();
pj_caching_pool_destroy(&cp_); pj_caching_pool_destroy(&cp_);
} }
static void
handleIOEvents(pj_ice_strans_cfg& cfg, unsigned max_msec)
{
// By tests, never seen more than two events per 500ms
static constexpr auto MAX_NET_EVENTS = 2;
pj_time_val max_timeout = {0, 0};
pj_time_val timeout = {0, 0};
unsigned net_event_count = 0;
max_timeout.msec = max_msec;
timeout.sec = timeout.msec = 0;
pj_timer_heap_poll(cfg.stun_cfg.timer_heap, &timeout);
// timeout limitation
if (timeout.msec >= 1000)
timeout.msec = 999;
if (PJ_TIME_VAL_GT(timeout, max_timeout))
timeout = max_timeout;
do {
auto n_events = pj_ioqueue_poll(cfg.stun_cfg.ioqueue, &timeout);
// timeout
if (not n_events)
return;
// error
if (n_events < 0) {
const auto err = pj_get_os_error();
// Kept as debug as some errors are "normal" in regular context
RING_DBG("IceIOQueue: error %d - %s", err, sip_utils::sip_strerror(err).c_str());
std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
return;
}
net_event_count += n_events;
timeout.sec = timeout.msec = 0;
} while (net_event_count < MAX_NET_EVENTS);
}
int
IceTransportFactory::processThread()
{
register_thread();
while (!thread_quit_flag_) {
handleIOEvents(ice_cfg_, 500); // limit polling to 500ms
}
return 0;
}
std::shared_ptr<IceTransport> std::shared_ptr<IceTransport>
IceTransportFactory::createTransport(const char* name, int component_count, IceTransportFactory::createTransport(const char* name, int component_count,
bool master, bool master,
......
...@@ -259,6 +259,11 @@ class IceTransport { ...@@ -259,6 +259,11 @@ class IceTransport {
void selectUPnPIceCandidates(); void selectUPnPIceCandidates();
std::unique_ptr<upnp::Controller> upnp_; std::unique_ptr<upnp::Controller> upnp_;
// IO/Timer events are handled by following thread
std::thread thread_;
std::atomic_bool threadTerminateFlags_ {false};
void handleEvents(unsigned max_msec);
}; };
class IceTransportFactory { class IceTransportFactory {
...@@ -271,8 +276,6 @@ class IceTransportFactory { ...@@ -271,8 +276,6 @@ class IceTransportFactory {
bool master, bool master,
const IceTransportOptions& options = {}); const IceTransportOptions& options = {});
int processThread();
/** /**
* PJSIP specifics * PJSIP specifics
*/ */
...@@ -282,9 +285,7 @@ class IceTransportFactory { ...@@ -282,9 +285,7 @@ class IceTransportFactory {
private: private:
pj_caching_pool cp_; pj_caching_pool cp_;
std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> pool_; std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> pool_;
std::thread thread_;
pj_ice_strans_cfg ice_cfg_; pj_ice_strans_cfg ice_cfg_;
pj_bool_t thread_quit_flag_ {PJ_FALSE};
}; };
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment