diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index b2a38ad26340c99982449c9270ba1635e8136705..e44e557ef16c026a19c25ccf6f9ad79b2e9803c4 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -50,6 +50,12 @@ namespace ring { +// TODO: C++14 ? remove me and use std::min +template< class T > +static constexpr const T& min( const T& a, const T& b ) { + return (b < a) ? b : a; +} + static void register_thread() { @@ -106,6 +112,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, , component_count_(component_count) , compIO_(component_count) , initiator_session_(master) + , thread_() { if (options.upnpEnable) upnp_.reset(new upnp::Controller()); @@ -168,6 +175,17 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, } else config_.turn.port = 0; + static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64); + TRY( pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap) ); + TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue) ); + + thread_ = std::thread([this]{ + register_thread(); + while (not threadTerminateFlags_) { + handleEvents(500); // limit polling to 500ms + } + }); + pj_ice_strans* icest = nullptr; pj_status_t status = pj_ice_strans_create(name, &config_, component_count, this, &icecb, &icest); @@ -177,7 +195,61 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, IceTransport::~IceTransport() { - icest_.reset(); // must be done first to invalid callbacks + register_thread(); + + threadTerminateFlags_ = true; + if (thread_.joinable()) + thread_.join(); + + icest_.reset(); // must be done before ioqueue/timer destruction + + if (config_.stun_cfg.ioqueue) + pj_ioqueue_destroy(config_.stun_cfg.ioqueue); + + if (config_.stun_cfg.timer_heap) + pj_timer_heap_destroy(config_.stun_cfg.timer_heap); +} + +void +IceTransport::handleEvents(unsigned max_msec) +{ + // By tests, never seen more than two events per 500ms + static constexpr auto MAX_NET_EVENTS = 2; + + pj_time_val max_timeout = {0, 0}; + pj_time_val timeout = {0, 0}; + unsigned net_event_count = 0; + + max_timeout.msec = max_msec; + + timeout.sec = timeout.msec = 0; + pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout); + + // timeout limitation + if (timeout.msec >= 1000) + timeout.msec = 999; + if (PJ_TIME_VAL_GT(timeout, max_timeout)) + timeout = max_timeout; + + do { + auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout); + + // timeout + if (not n_events) + return; + + // error + if (n_events < 0) { + const auto err = pj_get_os_error(); + // Kept as debug as some errors are "normal" in regular context + RING_DBG("IceIOQueue: error %d - %s", err, sip_utils::sip_strerror(err).c_str()); + std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); + return; + } + + net_event_count += n_events; + timeout.sec = timeout.msec = 0; + } while (net_event_count < MAX_NET_EVENTS); } void @@ -798,16 +870,9 @@ IceTransport::waitForData(int comp_id, unsigned int timeout) return io.queue.front().datalen; } -// TODO: C++14 ? remove me and use std::min -template< class T > -static constexpr const T& min( const T& a, const T& b ) { - return (b < a) ? b : a; -} - IceTransportFactory::IceTransportFactory() : cp_() , pool_(nullptr, pj_pool_release) - , thread_() , ice_cfg_() { pj_caching_pool_init(&cp_, NULL, 0); @@ -819,12 +884,6 @@ IceTransportFactory::IceTransportFactory() pj_ice_strans_cfg_default(&ice_cfg_); ice_cfg_.stun_cfg.pf = &cp_.factory; - static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64); - TRY( pj_timer_heap_create(pool_.get(), 100, &ice_cfg_.stun_cfg.timer_heap) ); - TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &ice_cfg_.stun_cfg.ioqueue) ); - - thread_ = std::thread(std::bind(&IceTransportFactory::processThread, this)); - ice_cfg_.af = pj_AF_INET(); ice_cfg_.stun.cfg.max_pkt_size = 8192; @@ -835,73 +894,10 @@ IceTransportFactory::IceTransportFactory() IceTransportFactory::~IceTransportFactory() { - thread_quit_flag_ = PJ_TRUE; - if (thread_.joinable()) - thread_.join(); - - if (ice_cfg_.stun_cfg.ioqueue) - pj_ioqueue_destroy(ice_cfg_.stun_cfg.ioqueue); - - if (ice_cfg_.stun_cfg.timer_heap) - pj_timer_heap_destroy(ice_cfg_.stun_cfg.timer_heap); - pool_.reset(); pj_caching_pool_destroy(&cp_); } -static void -handleIOEvents(pj_ice_strans_cfg& cfg, unsigned max_msec) -{ - // By tests, never seen more than two events per 500ms - static constexpr auto MAX_NET_EVENTS = 2; - - pj_time_val max_timeout = {0, 0}; - pj_time_val timeout = {0, 0}; - unsigned net_event_count = 0; - - max_timeout.msec = max_msec; - - timeout.sec = timeout.msec = 0; - pj_timer_heap_poll(cfg.stun_cfg.timer_heap, &timeout); - - // timeout limitation - if (timeout.msec >= 1000) - timeout.msec = 999; - if (PJ_TIME_VAL_GT(timeout, max_timeout)) - timeout = max_timeout; - - do { - auto n_events = pj_ioqueue_poll(cfg.stun_cfg.ioqueue, &timeout); - - // timeout - if (not n_events) - return; - - // error - if (n_events < 0) { - const auto err = pj_get_os_error(); - // Kept as debug as some errors are "normal" in regular context - RING_DBG("IceIOQueue: error %d - %s", err, sip_utils::sip_strerror(err).c_str()); - std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); - return; - } - - net_event_count += n_events; - timeout.sec = timeout.msec = 0; - } while (net_event_count < MAX_NET_EVENTS); -} - -int -IceTransportFactory::processThread() -{ - register_thread(); - while (!thread_quit_flag_) { - handleIOEvents(ice_cfg_, 500); // limit polling to 500ms - } - - return 0; -} - std::shared_ptr<IceTransport> IceTransportFactory::createTransport(const char* name, int component_count, bool master, diff --git a/src/ice_transport.h b/src/ice_transport.h index ad2ee061a322b3638e7a515930e62b5a4bb0a9cb..109c8a126a1ef7d05035d3551ff803327afe31e4 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -259,6 +259,11 @@ class IceTransport { void selectUPnPIceCandidates(); std::unique_ptr<upnp::Controller> upnp_; + + // IO/Timer events are handled by following thread + std::thread thread_; + std::atomic_bool threadTerminateFlags_ {false}; + void handleEvents(unsigned max_msec); }; class IceTransportFactory { @@ -271,8 +276,6 @@ class IceTransportFactory { bool master, const IceTransportOptions& options = {}); - int processThread(); - /** * PJSIP specifics */ @@ -282,9 +285,7 @@ class IceTransportFactory { private: pj_caching_pool cp_; std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> pool_; - std::thread thread_; pj_ice_strans_cfg ice_cfg_; - pj_bool_t thread_quit_flag_ {PJ_FALSE}; }; };