diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 2cbeaae473bcce88ce6672c314693aa46141141a..3e074f316b10d9f09296cb8c31fd72aeab0cd9dd 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -43,6 +43,8 @@ #include <thread> #include <cerrno> +#include "pj/limits.h" + #define TRY(ret) do { \ if ((ret) != PJ_SUCCESS) \ throw std::runtime_error(#ret " failed"); \ @@ -54,6 +56,7 @@ static constexpr unsigned STUN_MAX_PACKET_SIZE {8192}; static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header static constexpr int MAX_CANDIDATES {32}; +static constexpr int MAX_DESTRUCTION_TIMEOUT {3}; //============================================================================== @@ -179,7 +182,7 @@ public: // IO/Timer events are handled by following thread std::thread thread_; std::atomic_bool threadTerminateFlags_ {false}; - void handleEvents(unsigned max_msec); + bool handleEvents(unsigned max_msec); // Wait data on components std::vector<pj_ssize_t> lastReadLen_; @@ -368,8 +371,21 @@ IceTransport::Impl::Impl(const char* name, int component_count, bool master, thread_ = std::thread([this]{ sip_utils::register_thread(); while (not threadTerminateFlags_) { + // NOTE: handleEvents can return false in this case + // but here we don't care if there is event or not. handleEvents(500); // limit polling to 500ms } + // NOTE: This last handleEvents is necessary to close TURN socket. + // Because when destroying the TURN session pjproject creates a pj_timer + // to postpone the TURN destruction. This timer is only called if we poll + // the event queue. + auto started_destruction = std::chrono::system_clock::now(); + while (handleEvents(500)) { + if (std::chrono::system_clock::now() - started_destruction > std::chrono::seconds(MAX_DESTRUCTION_TIMEOUT)) { + // If the transport is not closed after 3 seconds, avoid blocking + break; + } + } }); } @@ -377,12 +393,12 @@ IceTransport::Impl::~Impl() { sip_utils::register_thread(); + icest_.reset(); // must be done before ioqueue/timer destruction + threadTerminateFlags_ = true; if (thread_.joinable()) thread_.join(); - icest_.reset(); // must be done before ioqueue/timer destruction - if (config_.stun_cfg.ioqueue) pj_ioqueue_destroy(config_.stun_cfg.ioqueue); @@ -430,7 +446,7 @@ IceTransport::Impl::_isFailed() const return false; } -void +bool IceTransport::Impl::handleEvents(unsigned max_msec) { // By tests, never seen more than two events per 500ms @@ -444,6 +460,7 @@ IceTransport::Impl::handleEvents(unsigned max_msec) timeout.sec = timeout.msec = 0; pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout); + auto ret = timeout.msec != PJ_MAXINT32; // timeout limitation if (timeout.msec >= 1000) @@ -456,7 +473,7 @@ IceTransport::Impl::handleEvents(unsigned max_msec) // timeout if (not n_events) - return; + return ret; // error if (n_events < 0) { @@ -465,12 +482,13 @@ IceTransport::Impl::handleEvents(unsigned max_msec) last_errmsg_ = sip_utils::sip_strerror(err); JAMI_DBG("[ice:%p] ioqueue error %d: %s", this, err, last_errmsg_.c_str()); std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); - return; + return ret; } net_event_count += n_events; timeout.sec = timeout.msec = 0; } while (net_event_count < MAX_NET_EVENTS); + return ret; } void