/* * Copyright (C) 2004-2019 Savoir-faire Linux Inc. * * Author: Guillaume Roguez * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "ice_transport.h" #include "ice_socket.h" #include "logger.h" #include "sip/sip_utils.h" #include "manager.h" #include "upnp/upnp_control.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define TRY(ret) do { \ if ((ret) != PJ_SUCCESS) \ throw std::runtime_error(#ret " failed"); \ } while (0) namespace jami { static constexpr unsigned STUN_MAX_PACKET_SIZE {8192}; static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header static constexpr int MAX_CANDIDATES {32}; //============================================================================== using MutexGuard = std::lock_guard; using MutexLock = std::unique_lock; namespace { struct IceSTransDeleter { void operator ()(pj_ice_strans* ptr) { pj_ice_strans_stop_ice(ptr); pj_ice_strans_destroy(ptr); } }; class PeerChannel { public: PeerChannel() {} ~PeerChannel() { stop(); } PeerChannel(PeerChannel &&o) { MutexGuard lk{o.mutex_}; stream_ = std::move(o.stream_); } PeerChannel &operator=(PeerChannel &&o) { std::lock(mutex_, o.mutex_); MutexGuard lk1{mutex_, std::adopt_lock}; MutexGuard lk2{o.mutex_, std::adopt_lock}; stream_ = std::move(o.stream_); return *this; } void operator<<(const std::string &data) { MutexGuard lk{mutex_}; stream_.clear(); stream_ << data; notified_ = true; cv_.notify_one(); } ssize_t isDataAvailable() { MutexGuard lk{mutex_}; auto pos = stream_.tellg(); stream_.seekg(0, std::ios_base::end); auto available = (stream_.tellg() - pos); stream_.seekg(pos); return available; } template bool wait(Duration timeout) { std::lock(apiMutex_, mutex_); MutexGuard lk_api{apiMutex_, std::adopt_lock}; MutexLock lk{mutex_, std::adopt_lock}; auto a = cv_.wait_for(lk, timeout, [this] { return stop_ or /*(data.size() != 0)*/ !stream_.eof(); }); return a; } std::size_t read(char *output, std::size_t size) { std::lock(apiMutex_, mutex_); MutexGuard lk_api{apiMutex_, std::adopt_lock}; MutexLock lk{mutex_, std::adopt_lock}; cv_.wait(lk, [&, this] { if (stop_) return true; stream_.read(&output[0], size); return stream_.gcount() > 0; }); return stop_ ? 0 : stream_.gcount(); } void stop() noexcept { { MutexGuard lk{mutex_}; if (stop_) return; stop_ = true; } cv_.notify_all(); // Make sure that no thread is blocked into read() or wait() methods MutexGuard lk_api{apiMutex_}; } private: PeerChannel(const PeerChannel &o) = delete; PeerChannel &operator=(const PeerChannel &o) = delete; std::mutex apiMutex_{}; std::mutex mutex_{}; std::condition_variable cv_{}; std::stringstream stream_{}; bool stop_{false}; bool notified_{false}; std::vector data; friend void operator<<(std::vector &, PeerChannel &); }; } // namespace //============================================================================== class IceTransport::Impl { public: Impl(const char* name, int component_count, bool master, const IceTransportOptions& options); ~Impl(); void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status); void onReceiveData(unsigned comp_id, void *pkt, pj_size_t size); /** * Set/change transport role as initiator. * Should be called before start method. */ bool setInitiatorSession(); /** * Set/change transport role as slave. * Should be called before start method. */ bool setSlaveSession(); bool createIceSession(pj_ice_sess_role role); void getUFragPwd(); void getDefaultCanditates(); // Non-mutex protected of public versions bool _isInitialized() const; bool _isStarted() const; bool _isRunning() const; bool _isFailed() const; IpAddr getLocalAddress(unsigned comp_id) const; IpAddr getRemoteAddress(unsigned comp_id) const; std::unique_ptr> pool_; IceTransportCompleteCb on_initdone_cb_; IceTransportCompleteCb on_negodone_cb_; IceRecvInfo on_recv_cb_; std::unique_ptr icest_; unsigned component_count_; pj_ice_sess_cand cand_[MAX_CANDIDATES] {}; std::string local_ufrag_; std::string local_pwd_; pj_sockaddr remoteAddr_; std::condition_variable iceCV_ {}; mutable std::mutex iceMutex_ {}; pj_ice_strans_cfg config_; std::string last_errmsg_; struct Packet { Packet(void *pkt, pj_size_t size) : data{reinterpret_cast(pkt), reinterpret_cast(pkt) + size} { } std::vector data; }; std::vector peerChannels_; std::mutex apiMutex_; struct ComponentIO { std::mutex mutex; std::condition_variable cv; std::deque queue; IceRecvCb cb; }; std::vector compIO_; std::atomic_bool initiatorSession_ {true}; /** * Returns the IP of each candidate for a given component in the ICE session */ struct LocalCandidate { IpAddr addr; pj_ice_cand_transport transport; }; std::vector getLocalICECandidates(unsigned comp_id) const; /** * Adds a reflective candidate to ICE session * Must be called before negotiation */ void addReflectiveCandidate(int comp_id, const IpAddr &base, const IpAddr &addr, const pj_ice_cand_transport& transport); /** * Creates UPnP port mappings and adds ICE candidates based on those mappings */ void selectUPnPIceCandidates(); std::unique_ptr upnp_; bool onlyIPv4Private_ {true}; // IO/Timer events are handled by following thread std::thread thread_; std::atomic_bool threadTerminateFlags_ {false}; void handleEvents(unsigned max_msec); // Wait data on components std::vector lastReadLen_; std::condition_variable waitDataCv_ = {}; }; //============================================================================== /** * Add stun/turn servers or default host as candidates */ static void add_stun_server(pj_ice_strans_cfg& cfg, int af) { if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) throw std::runtime_error("Too many STUN servers"); auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; pj_ice_strans_stun_cfg_default(&stun); stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; stun.af = af; stun.conn_type = cfg.stun.conn_type; JAMI_DBG("[ice] added host stun server"); } static void add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info) { if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) throw std::runtime_error("Too many STUN servers"); IpAddr ip {info.uri}; // Given URI cannot be DNS resolved or not IPv4 or IPv6? // This prevents a crash into PJSIP when ip.toString() is called. if (ip.getFamily() == AF_UNSPEC) { JAMI_WARN("[ice] STUN server '%s' not used, unresolvable address", info.uri.c_str()); return; } auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; pj_ice_strans_stun_cfg_default(&stun); pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str()); stun.af = ip.getFamily(); stun.port = PJ_STUN_PORT; stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; stun.conn_type = cfg.stun.conn_type; JAMI_DBG("[ice] added stun server '%s', port %d", pj_strbuf(&stun.server), stun.port); } static void add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info) { if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN) throw std::runtime_error("Too many TURN servers"); IpAddr ip {info.uri}; // Same comment as add_stun_server() if (ip.getFamily() == AF_UNSPEC) { JAMI_WARN("[ice] TURN server '%s' not used, unresolvable address", info.uri.c_str()); return; } auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++]; pj_ice_strans_turn_cfg_default(&turn); pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str()); turn.af = ip.getFamily(); turn.port = PJ_STUN_PORT; turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; turn.conn_type = cfg.turn.conn_type; // Authorization (only static plain password supported yet) if (not info.password.empty()) { turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; pj_strset(&turn.auth_cred.data.static_cred.realm, (char*)info.realm.c_str(), info.realm.size()); pj_strset(&turn.auth_cred.data.static_cred.username, (char*)info.username.c_str(), info.username.size()); pj_strset(&turn.auth_cred.data.static_cred.data, (char*)info.password.c_str(), info.password.size()); } JAMI_DBG("[ice] added turn server '%s', port %d", pj_strbuf(&turn.server), turn.port); } //============================================================================== IceTransport::Impl::Impl(const char* name, int component_count, bool master, const IceTransportOptions& options) : pool_(nullptr, [](pj_pool_t* pool) { sip_utils::register_thread(); pj_pool_release(pool); }) , on_initdone_cb_(options.onInitDone) , on_negodone_cb_(options.onNegoDone) , on_recv_cb_(options.onRecvReady) , component_count_(component_count) , compIO_(component_count) , initiatorSession_(master) , thread_() { if (options.upnpEnable) upnp_.reset(new upnp::Controller()); auto &iceTransportFactory = Manager::instance().getIceTransportFactory(); config_ = iceTransportFactory.getIceCfg(); // config copy if (options.tcpEnable) { config_.protocol = PJ_ICE_TP_TCP; config_.stun.conn_type = PJ_STUN_TP_TCP; config_.turn.conn_type = PJ_TURN_TP_TCP; } else { config_.protocol = PJ_ICE_TP_UDP; config_.stun.conn_type = PJ_STUN_TP_UDP; config_.turn.conn_type = PJ_TURN_TP_UDP; } if (options.aggressive) { config_.opt.aggressive = PJ_TRUE; } else { config_.opt.aggressive = PJ_FALSE; } peerChannels_.resize(component_count_ + 1); lastReadLen_.resize(component_count_); // Add local hosts (IPv4, IPv6) as stun candidates add_stun_server(config_, pj_AF_INET6()); add_stun_server(config_, pj_AF_INET()); sip_utils::register_thread(); pool_.reset(pj_pool_create(iceTransportFactory.getPoolFactory(), "IceTransport.pool", 512, 512, NULL)); if (not pool_) throw std::runtime_error("pj_pool_create() failed"); pj_ice_strans_cb icecb; pj_bzero(&icecb, sizeof(icecb)); icecb.on_rx_data = \ [] (pj_ice_strans* ice_st, unsigned comp_id, void *pkt, pj_size_t size, const pj_sockaddr_t* /*src_addr*/, unsigned /*src_addr_len*/) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) tr->onReceiveData(comp_id, pkt, size); else JAMI_WARN("null IceTransport"); }; icecb.on_ice_complete = \ [] (pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) tr->onComplete(ice_st, op, status); else JAMI_WARN("null IceTransport"); }; icecb.on_data_sent = [](pj_ice_strans* ice_st, unsigned comp_id, pj_ssize_t size) { if (auto* tr = static_cast(pj_ice_strans_get_user_data(ice_st))) { if (comp_id > 0 && comp_id - 1 < tr->lastReadLen_.size()) { tr->lastReadLen_[comp_id - 1] = size; tr->waitDataCv_.notify_all(); } } else JAMI_WARN("null IceTransport"); }; // Add STUN servers for (auto& server : options.stunServers) add_stun_server(*pool_, config_, server); // Add TURN servers for (auto& server : options.turnServers) add_turn_server(*pool_, config_, server); static constexpr auto IOQUEUE_MAX_HANDLES = std::min(PJ_IOQUEUE_MAX_HANDLES, 64); TRY( pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap) ); TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue) ); pj_ice_strans* icest = nullptr; pj_status_t status = pj_ice_strans_create(name, &config_, component_count, this, &icecb, &icest); if (status != PJ_SUCCESS || icest == nullptr) { throw std::runtime_error("pj_ice_strans_create() failed"); } // Must be created after any potential failure thread_ = std::thread([this]{ sip_utils::register_thread(); while (not threadTerminateFlags_) { handleEvents(500); // limit polling to 500ms } }); } IceTransport::Impl::~Impl() { sip_utils::register_thread(); threadTerminateFlags_ = true; if (thread_.joinable()) thread_.join(); icest_.reset(); // must be done before ioqueue/timer destruction if (config_.stun_cfg.ioqueue) pj_ioqueue_destroy(config_.stun_cfg.ioqueue); if (config_.stun_cfg.timer_heap) pj_timer_heap_destroy(config_.stun_cfg.timer_heap); } bool IceTransport::Impl::_isInitialized() const { if (auto icest = icest_.get()) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED; } return false; } bool IceTransport::Impl::_isStarted() const { if (auto icest = icest_.get()) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED; } return false; } bool IceTransport::Impl::_isRunning() const { if (auto icest = icest_.get()) { auto state = pj_ice_strans_get_state(icest); return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED; } return false; } bool IceTransport::Impl::_isFailed() const { if (auto icest = icest_.get()) return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED; return false; } void IceTransport::Impl::handleEvents(unsigned max_msec) { // By tests, never seen more than two events per 500ms static constexpr auto MAX_NET_EVENTS = 2; pj_time_val max_timeout = {0, 0}; pj_time_val timeout = {0, 0}; unsigned net_event_count = 0; max_timeout.msec = max_msec; timeout.sec = timeout.msec = 0; pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout); // timeout limitation if (timeout.msec >= 1000) timeout.msec = 999; if (PJ_TIME_VAL_GT(timeout, max_timeout)) timeout = max_timeout; do { auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout); // timeout if (not n_events) return; // error if (n_events < 0) { const auto err = pj_get_os_error(); // Kept as debug as some errors are "normal" in regular context last_errmsg_ = sip_utils::sip_strerror(err); JAMI_DBG("[ice:%p] ioqueue error %d: %s", this, err, last_errmsg_.c_str()); std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); return; } net_event_count += n_events; timeout.sec = timeout.msec = 0; } while (net_event_count < MAX_NET_EVENTS); } void IceTransport::Impl::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) { const char *opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization" : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; const bool done = status == PJ_SUCCESS; if (done) { JAMI_DBG("[ice:%p] %s success", this, opname); } else { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] %s failed: %s", this, opname, last_errmsg_.c_str()); } { std::lock_guard lk(iceMutex_); if (!icest_.get()) icest_.reset(ice_st); } if (done and op == PJ_ICE_STRANS_OP_INIT) { if (initiatorSession_) setInitiatorSession(); else setSlaveSession(); selectUPnPIceCandidates(); } if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_) on_initdone_cb_(done); else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) { if (done) { // Dump of connection pairs std::stringstream out; for (unsigned i=0; i < component_count_; ++i) { auto laddr = getLocalAddress(i); auto raddr = getRemoteAddress(i); if (laddr and raddr) { out << " [" << i << "] " << laddr.toString(true, true) << " <-> " << raddr.toString(true, true) << '\n'; } else { out << " [" << i << "] disabled\n"; } } JAMI_DBG("[ice:%p] connection pairs (local <-> remote):\n%s", this, out.str().c_str()); } if (on_negodone_cb_) on_negodone_cb_(done); } // Unlock waitForXXX APIs iceCV_.notify_all(); } bool IceTransport::Impl::setInitiatorSession() { JAMI_DBG("ICE as master"); initiatorSession_ = true; if (_isInitialized()) { auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); return false; } return true; } return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING); } bool IceTransport::Impl::setSlaveSession() { JAMI_DBG("ICE as slave"); initiatorSession_ = false; if (_isInitialized()) { auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); JAMI_ERR("[ice:%p] role change failed: %s", this, last_errmsg_.c_str()); return false; } return true; } return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED); } IpAddr IceTransport::Impl::getLocalAddress(unsigned comp_id) const { // Return the local IP of negotiated connection pair if (_isRunning()) { if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) return sess->lcand->addr; else return {}; // disabled component } else JAMI_WARN("[ice:%p] bad call: non-negotiated transport", this); // Return the default IP (could be not nominated and valid after negotiation) if (_isInitialized()) return cand_[comp_id].addr; JAMI_ERR("[ice:%p] bad call: non-initialized transport", this); return {}; } IpAddr IceTransport::Impl::getRemoteAddress(unsigned comp_id) const { // Return the remote IP of negotiated connection pair if (_isRunning()) { if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) return sess->rcand->addr; else return {}; // disabled component } else JAMI_WARN("[ice:%p] bad call: non-negotiated transport", this); JAMI_ERR("[ice:%p] bad call: non-negotiated transport", this); return {}; } void IceTransport::Impl::getUFragPwd() { pj_str_t local_ufrag, local_pwd; pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, nullptr, nullptr); local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); local_pwd_.assign(local_pwd.ptr, local_pwd.slen); } void IceTransport::Impl::getDefaultCanditates() { for (unsigned i=0; i < component_count_; ++i) pj_ice_strans_get_def_cand(icest_.get(), i+1, &cand_[i]); } bool IceTransport::Impl::createIceSession(pj_ice_sess_role role) { if (pj_ice_strans_init_ice(icest_.get(), role, nullptr, nullptr) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_init_ice() failed", this); return false; } // Fetch some information on local configuration getUFragPwd(); getDefaultCanditates(); JAMI_DBG("[ice:%p] (local) ufrag=%s, pwd=%s", this, local_ufrag_.c_str(), local_pwd_.c_str()); return true; } std::vector IceTransport::Impl::getLocalICECandidates(unsigned comp_id) const { std::vector cand_addrs; pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)]; unsigned cand_cnt = PJ_ARRAY_SIZE(cand); if (pj_ice_strans_enum_cands(icest_.get(), comp_id, &cand_cnt, cand) != PJ_SUCCESS) { JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", this); return cand_addrs; } for (unsigned i=0; i