Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
30 results

ice_transport.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    ice_transport.cpp 26.14 KiB
    /*
     *  Copyright (C) 2004-2015 Savoir-Faire Linux Inc.
     *  Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
     *
     *  Additional permission under GNU GPL version 3 section 7:
     *
     *  If you modify this program, or any covered work, by linking or
     *  combining it with the OpenSSL project's OpenSSL library (or a
     *  modified version of that library), containing parts covered by the
     *  terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
     *  grants you additional permission to convey the resulting work.
     *  Corresponding Source for a non-source form of such a combination
     *  shall include the source code for the parts of OpenSSL used as well
     *  as that of the covered work.
     */
    
    #include "ice_transport.h"
    #include "ice_socket.h"
    #include "logger.h"
    #include "sip/sip_utils.h"
    #include "manager.h"
    #include "upnp/upnp_control.h"
    
    #include <pjlib.h>
    
    #include <utility>
    #include <algorithm>
    #include <sstream>
    #include <chrono>
    #include <thread>
    
    #define TRY(ret) do {      \
            if ((ret) != PJ_SUCCESS)                             \
                throw std::runtime_error(#ret " failed");      \
        } while (0)
    
    namespace ring {
    
    static void
    register_thread()
    {
        // We have to register the external thread so it could access the pjsip frameworks
        if (!pj_thread_is_registered()) {
    #if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
            static thread_local pj_thread_desc desc;
            static thread_local pj_thread_t *this_thread;
    #else
            static __thread pj_thread_desc desc;
            static __thread pj_thread_t *this_thread;
    #endif
            pj_thread_register(NULL, desc, &this_thread);
            RING_DBG("Registered thread %p (0x%X)", this_thread, pj_getpid());
        }
    }
    
    IceTransport::Packet::Packet(void *pkt, pj_size_t size)
        : data(new char[size]), datalen(size)
    {
        std::copy_n(reinterpret_cast<char*>(pkt), size, data.get());
    }
    
    void
    IceTransport::cb_on_rx_data(pj_ice_strans* ice_st,
                                unsigned comp_id,
                                void *pkt, pj_size_t size,
                                const pj_sockaddr_t* /*src_addr*/,
                                unsigned /*src_addr_len*/)
    {
        if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st)))
            tr->onReceiveData(comp_id, pkt, size);
        else
            RING_WARN("null IceTransport");
    }
    
    void
    IceTransport::cb_on_ice_complete(pj_ice_strans* ice_st,
                                     pj_ice_strans_op op,
                                     pj_status_t status)
    {
        if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st)))
            tr->onComplete(ice_st, op, status);
        else
            RING_WARN("null IceTransport");
    }
    
    IceTransport::IceTransport(const char* name, int component_count,
                               bool master, bool upnp_enabled,
                               IceTransportCompleteCb on_initdone_cb,
                               IceTransportCompleteCb on_negodone_cb)
        : pool_(nullptr, pj_pool_release)
        , on_initdone_cb_(on_initdone_cb)
        , on_negodone_cb_(on_negodone_cb)
        , component_count_(component_count)
        , compIO_(component_count)
        , initiator_session_(master)
    {
        if (upnp_enabled)
            upnp_.reset(new upnp::Controller());
    
        auto& iceTransportFactory = Manager::instance().getIceTransportFactory();
    
        pool_.reset(pj_pool_create(iceTransportFactory.getPoolFactory(),
                                   "IceTransport.pool", 512, 512, NULL));
        if (not pool_)
            throw std::runtime_error("pj_pool_create() failed");
    
        pj_ice_strans_cb icecb;
        pj_bzero(&icecb, sizeof(icecb));
        icecb.on_rx_data = cb_on_rx_data;
        icecb.on_ice_complete = cb_on_ice_complete;
    
        pj_ice_strans* icest = nullptr;
        pj_status_t status = pj_ice_strans_create(name,
                                                  iceTransportFactory.getIceCfg(),
                                                  component_count, this, &icecb,
                                                  &icest);
        if (status != PJ_SUCCESS || icest == nullptr)
            throw std::runtime_error("pj_ice_strans_create() failed");
    }
    
    IceTransport::~IceTransport()
    {
        icest_.reset(); // must be done first to invalid callbacks
    }
    
    void
    IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op,
                             pj_status_t status)
    {
        const char *opname =
            op == PJ_ICE_STRANS_OP_INIT? "initialization" :
            op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op";
    
        if (!icest_.get())
            icest_.reset(ice_st);
    
        const bool done = status == PJ_SUCCESS;
        RING_DBG("ICE %s with %s", opname, done?"success":"error");
    
        if (!done) {
            char errmsg[PJ_ERR_MSG_SIZE];
            pj_strerror(status, errmsg, sizeof(errmsg));
            RING_ERR("ICE %s failed: %s", opname, errmsg);
        }
    
        {
            std::lock_guard<std::mutex> lk(iceMutex_);
            if (op == PJ_ICE_STRANS_OP_INIT) {
                iceTransportInitDone_ = done;
                if (iceTransportInitDone_) {
                    if (initiator_session_)
                        setInitiatorSession();
                    else
                        setSlaveSession();
                    selectUPnPIceCandidates();
                }
            } else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) {
                iceTransportNegoDone_ = done;
            }
        }
    
        if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_)
            on_initdone_cb_(*this, done);
        else if (op == PJ_ICE_STRANS_OP_NEGOTIATION and on_negodone_cb_)
            on_negodone_cb_(*this, done);
    
        iceCV_.notify_all();
    }
    
    void
    IceTransport::getUFragPwd()
    {
        pj_str_t local_ufrag, local_pwd;
        pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, NULL, NULL);
        local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen);
        local_pwd_.assign(local_pwd.ptr, local_pwd.slen);
    }
    
    void
    IceTransport::getDefaultCanditates()
    {
        for (unsigned i=0; i < component_count_; ++i)
            pj_ice_strans_get_def_cand(icest_.get(), i+1, &cand_[i]);
    }
    
    bool
    IceTransport::createIceSession(pj_ice_sess_role role)
    {
        if (pj_ice_strans_init_ice(icest_.get(), role, NULL, NULL) != PJ_SUCCESS) {
            RING_ERR("pj_ice_strans_init_ice() failed");
            return false;
        }
    
        // Fetch some information on local configuration
        getUFragPwd();
        getDefaultCanditates();
        RING_DBG("ICE [local] ufrag=%s, pwd=%s", local_ufrag_.c_str(), local_pwd_.c_str());
        return true;
    }
    
    bool
    IceTransport::setInitiatorSession()
    {
        RING_DBG("ICE as master");
        initiator_session_ = true;
        if (isInitialized()) {
            auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING);
            if (status != PJ_SUCCESS) {
                RING_ERR("ICE role change failed");
                sip_utils::sip_strerror(status);
                return false;
            }
            return true;
        }
        return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING);
    }
    
    bool
    IceTransport::setSlaveSession()
    {
        RING_DBG("ICE as slave");
        initiator_session_ = false;
        if (isInitialized()) {
            auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED);
            if (status != PJ_SUCCESS) {
                RING_ERR("ICE role change failed");
                sip_utils::sip_strerror(status);
                return false;
            }
            return true;
        }
        return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED);
    }
    
    bool
    IceTransport::isInitiator() const
    {
        if (isInitialized())
            return pj_ice_strans_get_role(icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING;
        return initiator_session_;
    }
    
    bool
    IceTransport::start(const Attribute& rem_attrs,
                        const std::vector<IceCandidate>& rem_candidates)
    {
        if (not isInitialized()) {
            RING_ERR("ICE: not initialized transport");
            return false;
        }
    
        // pj_ice_strans_start_ice crashes if remote candidates array is empty
        if (rem_candidates.empty()) {
            RING_ERR("ICE start failed: no remote candidates");
            return false;
        }
    
        pj_str_t ufrag, pwd;
        RING_DBG("ICE negotiation starting (%u remote candidates)", rem_candidates.size());
        auto status = pj_ice_strans_start_ice(icest_.get(),
                                              pj_cstr(&ufrag, rem_attrs.ufrag.c_str()),
                                              pj_cstr(&pwd, rem_attrs.pwd.c_str()),
                                              rem_candidates.size(),
                                              rem_candidates.data());
        if (status != PJ_SUCCESS) {
            RING_ERR("ICE start failed");
            sip_utils::sip_strerror(status);
            return false;
        }
        return true;
    }
    
    std::string
    IceTransport::unpackLine(std::vector<uint8_t>::const_iterator& begin,
                             std::vector<uint8_t>::const_iterator& end)
    {
        if (std::distance(begin, end) <= 0)
            return {};
    
        // Search for EOL
        std::vector<uint8_t>::const_iterator line_end(begin);
        while (line_end != end && *line_end != NEW_LINE && *line_end)
            ++line_end;
    
        if (std::distance(begin, line_end) <= 0)
            return {};
    
        std::string str(begin, line_end);
    
        // Consume the new line character
        if (std::distance(line_end, end) > 0)
            ++line_end;
    
        begin = line_end;
        return str;
    }
    
    bool
    IceTransport::start(const std::vector<uint8_t>& rem_data)
    {
        auto begin = rem_data.cbegin();
        auto end = rem_data.cend();
        auto rem_ufrag = unpackLine(begin, end);
        auto rem_pwd = unpackLine(begin, end);
        if (rem_pwd.empty() or rem_pwd.empty()) {
            RING_ERR("ICE remote attributes parsing error");
            return false;
        }
        std::vector<IceCandidate> rem_candidates;
        try {
            while (true) {
                IceCandidate candidate;
                const auto line = unpackLine(begin, end);
                if (line.empty())
                    break;
                if (getCandidateFromSDP(line, candidate))
                    rem_candidates.push_back(candidate);
            }
        } catch (std::exception& e) {
            RING_ERR("ICE remote candidates parsing error");
            return false;
        }
        return start({rem_ufrag, rem_pwd}, rem_candidates);
    }
    
    bool
    IceTransport::stop()
    {
        if (not isInitialized()) {
            RING_ERR("Session not created yet");
            return false;
        }
    
        auto status = pj_ice_strans_stop_ice(icest_.get());
        if (status != PJ_SUCCESS) {
            RING_ERR("ICE start failed");
            sip_utils::sip_strerror(status);
            return false;
        }
    
        return true;
    }
    
    bool
    IceTransport::isInitialized() const
    {
        if (auto icest = icest_.get())
            return pj_ice_strans_has_sess(icest);
        return false;
    }
    
    bool
    IceTransport::isStarted() const
    {
        return pj_ice_strans_sess_is_running(icest_.get());
    }
    
    bool
    IceTransport::isCompleted() const
    {
        return pj_ice_strans_sess_is_complete(icest_.get());
    }
    
    bool
    IceTransport::isRunning() const
    {
        return isInitialized() and pj_ice_strans_get_state(icest_.get()) == PJ_ICE_STRANS_STATE_RUNNING;
    }
    
    bool
    IceTransport::isFailed() const
    {
        return isInitialized() and pj_ice_strans_get_state(icest_.get()) == PJ_ICE_STRANS_STATE_FAILED;
    }
    
    IpAddr
    IceTransport::getLocalAddress(unsigned comp_id) const
    {
        if (isInitialized())
            return cand_[comp_id].addr;
        return {};
    }
    
    IpAddr
    IceTransport::getRemoteAddress(unsigned comp_id) const
    {
        if (isInitialized()) {
            if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1))
                return sess->rcand->addr;
        }
        return {};
    }
    
    const IceTransport::Attribute
    IceTransport::getLocalAttributes() const
    {
        return {local_ufrag_, local_pwd_};
    }
    
    std::vector<std::string>
    IceTransport::getLocalCandidates(unsigned comp_id) const
    {
        std::vector<std::string> res;
        pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)];
        unsigned cand_cnt = PJ_ARRAY_SIZE(cand);
    
        if (pj_ice_strans_enum_cands(icest_.get(), comp_id+1, &cand_cnt, cand) != PJ_SUCCESS) {
            RING_ERR("pj_ice_strans_enum_cands() failed");
            return res;
        }
    
        for (unsigned i=0; i<cand_cnt; ++i) {
            std::ostringstream val;
            char ipaddr[PJ_INET6_ADDRSTRLEN];
    
            val << std::string(cand[i].foundation.ptr, cand[i].foundation.slen);
            val << " " << (unsigned)cand[i].comp_id << " UDP " << cand[i].prio;
            val << " " << pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0);
            val << " " << (unsigned)pj_sockaddr_get_port(&cand[i].addr);
            val << " typ " << pj_ice_get_cand_type_name(cand[i].type);
    
            res.push_back(val.str());
        }
    
        return res;
    }
    
    std::vector<IpAddr>
    IceTransport::getLocalCandidatesAddr(unsigned comp_id) const
    {
        std::vector<IpAddr> cand_addrs;
        pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)];
        unsigned cand_cnt = PJ_ARRAY_SIZE(cand);
    
        if (pj_ice_strans_enum_cands(icest_.get(), comp_id, &cand_cnt, cand) != PJ_SUCCESS) {
            RING_ERR("pj_ice_strans_enum_cands() failed");
            return cand_addrs;
        }
    
        for (unsigned i=0; i<cand_cnt; ++i)
            cand_addrs.push_back(cand[i].addr);
    
        return cand_addrs;
    }
    
    void
    IceTransport::addCandidate(int comp_id, const IpAddr& localAddr,
                               const IpAddr& publicAddr)
    {
        pj_ice_sess_cand cand;
    
        cand.type = PJ_ICE_CAND_TYPE_SRFLX;
        cand.status = PJ_EPENDING; /* not used */
        cand.comp_id = comp_id;
        cand.transport_id = 1; /* 1 = STUN */
        cand.local_pref = 65535; /* host */
        /* cand.foundation = ? */
        /* cand.prio = calculated by ice session */
        /* make base and addr the same since we're not going through a server */
        pj_sockaddr_cp(&cand.base_addr, localAddr.pjPtr());
        pj_sockaddr_cp(&cand.addr, publicAddr.pjPtr());
        pj_sockaddr_cp(&cand.rel_addr, &cand.base_addr);
        pj_ice_calc_foundation(pool_.get(), &cand.foundation, cand.type, &cand.base_addr);
    
        auto ret = pj_ice_sess_add_cand(pj_ice_strans_get_ice_sess(icest_.get()),
            cand.comp_id,
            cand.transport_id,
            cand.type,
            cand.local_pref,
            &cand.foundation,
            &cand.addr,
            &cand.base_addr,
            &cand.rel_addr,
            pj_sockaddr_get_len(&cand.addr),
            NULL);
    
        if (ret != PJ_SUCCESS) {
            RING_ERR("fail to add candidate for comp_id=%d : %s : %s", comp_id
                     , localAddr.toString().c_str()
                     , publicAddr.toString().c_str());
            sip_utils::sip_strerror(ret);
        } else {
            RING_DBG("success to add candidate for comp_id=%d : %s : %s", comp_id
                    , localAddr.toString().c_str()
                    , publicAddr.toString().c_str());
        }
    }
    
    void
    IceTransport::selectUPnPIceCandidates()
    {
        /* use upnp to open ports and add the proper candidates */
        if (upnp_) {
            /* for every component, get the candidate(s)
             * create a port mapping either with that port, or with an available port
             * add candidate with that port and public IP
             */
            if (auto publicIP = upnp_->getExternalIP()) {
                /* comp_id start at 1 */
                for (unsigned comp_id = 1; comp_id <= component_count_; ++comp_id) {
                    RING_DBG("UPnP: Opening port(s) for ICE comp %d and adding candidate with public IP",
                             comp_id);
                    auto candidates = getLocalCandidatesAddr(comp_id);
                    for (IpAddr addr : candidates) {
                        auto localIP = upnp_->getLocalIP();
                        localIP.setPort(addr.getPort());
                        if (addr != localIP)
                            continue;
                        uint16_t port = addr.getPort();
                        uint16_t port_used;
                        if (upnp_->addAnyMapping(port, upnp::PortType::UDP, true, &port_used)) {
                            publicIP.setPort(port_used);
                            addCandidate(comp_id, addr, publicIP);
                        } else
                            RING_WARN("UPnP: Could not create a port mapping for the ICE candide");
                    }
                }
            } else {
                RING_WARN("UPnP: Could not determine public IP for ICE candidates");
            }
        }
    }
    
    std::vector<uint8_t>
    IceTransport::getLocalAttributesAndCandidates() const
    {
        std::stringstream ss;
        ss << local_ufrag_ << NEW_LINE;
        ss << local_pwd_ << NEW_LINE;
        for  (unsigned i=0; i<component_count_; i++) {
            const auto& candidates = getLocalCandidates(i);
            for (const auto& c : candidates)
                ss << c << NEW_LINE;
        }
        auto str(ss.str());
        std::vector<uint8_t> ret(str.begin(), str.end());
        return ret;
    }
    
    void
    IceTransport::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size)
    {
        if (!comp_id or comp_id > component_count_) {
            RING_ERR("rx: invalid comp_id (%u)", comp_id);
            return;
        }
        if (!size)
            return;
        auto& io = compIO_[comp_id-1];
        std::unique_lock<std::mutex> lk(io.mutex);
        if (io.cb) {
            io.cb((uint8_t*)pkt, size);
        } else {
            io.queue.emplace_back(pkt, size);
            io.cv.notify_one();
        }
    }
    
    bool
    IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand)
    {
        char foundation[33], transport[13], ipaddr[81], type[33];
        pj_str_t tmpaddr;
        int af, comp_id, prio, port;
        int cnt = sscanf(line.c_str(), "%32s %d %12s %d %80s %d typ %32s",
                         foundation,
                         &comp_id,
                         transport,
                         &prio,
                         ipaddr,
                         &port,
                         type);
    
        if (cnt != 7) {
            RING_WARN("ICE: invalid remote candidate line");
            return false;
        }
    
        pj_bzero(&cand, sizeof(IceCandidate));
    
        if (strcmp(type, "host")==0)
            cand.type = PJ_ICE_CAND_TYPE_HOST;
        else if (strcmp(type, "srflx")==0)
            cand.type = PJ_ICE_CAND_TYPE_SRFLX;
        else if (strcmp(type, "relay")==0)
            cand.type = PJ_ICE_CAND_TYPE_RELAYED;
        else {
            RING_WARN("ICE: invalid remote candidate type '%s'", type);
            return false;
        }
    
        cand.comp_id = (pj_uint8_t)comp_id;
        cand.prio = prio;
    
        if (strchr(ipaddr, ':'))
            af = pj_AF_INET6();
        else
            af = pj_AF_INET();
    
        tmpaddr = pj_str(ipaddr);
        pj_sockaddr_init(af, &cand.addr, NULL, 0);
        auto status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr);
        if (status != PJ_SUCCESS) {
            RING_ERR("ICE: invalid remote IP address '%s'", ipaddr);
            return false;
        }
    
        pj_sockaddr_set_port(&cand.addr, (pj_uint16_t)port);
        pj_strdup2(pool_.get(), &cand.foundation, foundation);
    
        return true;
    }
    
    ssize_t
    IceTransport::recv(int comp_id, unsigned char* buf, size_t len)
    {
        register_thread();
        auto& io = compIO_[comp_id];
        std::unique_lock<std::mutex> lk(io.mutex);
    
        if (io.queue.empty())
            return 0;
    
        auto& packet = io.queue.front();
        const auto count = std::min(len, packet.datalen);
        std::copy_n(packet.data.get(), count, buf);
        io.queue.pop_front();
    
        return count;
    }
    
    void
    IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb)
    {
        auto& io = compIO_[comp_id];
        std::unique_lock<std::mutex> lk(io.mutex);
        io.cb = cb;
    
        if (cb) {
            // Flush existing queue using the callback
            for (const auto& packet : io.queue)
                io.cb((uint8_t*)packet.data.get(), packet.datalen);
            io.queue.clear();
        }
    }
    
    ssize_t
    IceTransport::send(int comp_id, const unsigned char* buf, size_t len)
    {
        if (not isInitialized()) {
            RING_ERR("ICE: not initialized transport");
            return -1;
        }
    
        register_thread();
        auto remote = getRemoteAddress(comp_id);
        if (!remote) {
            RING_ERR("Can't find remote address for component %d", comp_id);
            return -1;
        }
        auto status = pj_ice_strans_sendto(icest_.get(), comp_id+1, buf, len, remote.pjPtr(), remote.getLength());
        if (status != PJ_SUCCESS) {
            sip_utils::sip_strerror(status);
            RING_ERR("send failed");
            return -1;
        }
        return len;
    }
    
    ssize_t
    IceTransport::getNextPacketSize(int comp_id)
    {
        auto& io = compIO_[comp_id];
        std::unique_lock<std::mutex> lk(io.mutex);
        if (io.queue.empty()) {
            return 0;
        }
        return io.queue.front().datalen;
    }
    
    int
    IceTransport::waitForInitialization(unsigned timeout)
    {
        std::unique_lock<std::mutex> lk(iceMutex_);
        if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout),
                             [this]{ return iceTransportInitDone_; })) {
            RING_WARN("waitForInitialization: timeout");
            return -1;
        }
        RING_DBG("waitForInitialization: %u", iceTransportInitDone_);
        return iceTransportInitDone_;
    }
    
    int
    IceTransport::waitForNegotiation(unsigned timeout)
    {
        std::unique_lock<std::mutex> lk(iceMutex_);
        if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout),
                             [this]{ return iceTransportNegoDone_; })) {
            RING_WARN("waitForIceNegotiation: timeout");
            return -1;
        }
        RING_DBG("waitForNegotiation: %u", iceTransportNegoDone_);
        return iceTransportNegoDone_;
    }
    
    ssize_t
    IceTransport::waitForData(int comp_id, unsigned int timeout)
    {
        auto& io = compIO_[comp_id];
        std::unique_lock<std::mutex> lk(io.mutex);
        if (!io.cv.wait_for(lk, std::chrono::milliseconds(timeout),
                            [&io]{ return !io.queue.empty(); })) {
            return 0;
        }
        return io.queue.front().datalen;
    }
    
    // TODO: C++14 ? remove me and use std::min
    template< class T >
    static constexpr const T& min( const T& a, const T& b ) {
        return (b < a) ? b : a;
    }
    
    IceTransportFactory::IceTransportFactory()
        : cp_()
        , pool_(nullptr, pj_pool_release)
        , thread_()
        , ice_cfg_()
    {
        pj_caching_pool_init(&cp_, NULL, 0);
        pool_.reset(pj_pool_create(&cp_.factory, "IceTransportFactory.pool",
                                   512, 512, NULL));
        if (not pool_)
            throw std::runtime_error("pj_pool_create() failed");
    
        pj_ice_strans_cfg_default(&ice_cfg_);
        ice_cfg_.stun_cfg.pf = &cp_.factory;
    
        static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64);
        TRY( pj_timer_heap_create(pool_.get(), 100, &ice_cfg_.stun_cfg.timer_heap) );
        TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &ice_cfg_.stun_cfg.ioqueue) );
    
        thread_ = std::thread(std::bind(&IceTransportFactory::processThread, this));
    
        ice_cfg_.af = pj_AF_INET();
    
        ice_cfg_.stun.cfg.max_pkt_size = 8192;
        ice_cfg_.turn.cfg.max_pkt_size = 8192;
        //ice_cfg_.stun.max_host_cands = icedemo.opt.max_host;
        ice_cfg_.opt.aggressive = PJ_FALSE;
    
        // TODO: STUN server candidate
    
        // TODO: TURN server candidate
    }
    
    IceTransportFactory::~IceTransportFactory()
    {
        thread_quit_flag_ = PJ_TRUE;
        if (thread_.joinable())
            thread_.join();
    
        if (ice_cfg_.stun_cfg.ioqueue)
            pj_ioqueue_destroy(ice_cfg_.stun_cfg.ioqueue);
    
        if (ice_cfg_.stun_cfg.timer_heap)
            pj_timer_heap_destroy(ice_cfg_.stun_cfg.timer_heap);
    
        pool_.reset();
        pj_caching_pool_destroy(&cp_);
    }
    
    static void
    handleIOEvents(pj_ice_strans_cfg& cfg, unsigned max_msec)
    {
        // By tests, never seen more than two events per 500ms
        static constexpr auto MAX_NET_EVENTS = 2;
    
        pj_time_val max_timeout = {0, 0};
        pj_time_val timeout = {0, 0};
        unsigned net_event_count = 0;
    
        max_timeout.msec = max_msec;
    
        timeout.sec = timeout.msec = 0;
        pj_timer_heap_poll(cfg.stun_cfg.timer_heap, &timeout);
    
        // timeout limitation
        if (timeout.msec >= 1000)
            timeout.msec = 999;
        if (PJ_TIME_VAL_GT(timeout, max_timeout))
            timeout = max_timeout;
    
        do {
            auto n_events = pj_ioqueue_poll(cfg.stun_cfg.ioqueue, &timeout);
    
            // timeout
            if (not n_events)
                return;
    
            // error
            if (n_events < 0) {
                const auto err = pj_get_os_error();
                char err_msg[128];
                pj_strerror(err, err_msg, sizeof(err_msg));
                err_msg[sizeof(err_msg)-1] = '\0';
                // Kept as debug as some errors are "normal" in regular context
                RING_DBG("IceIOQueue: error %d - %s", err, err_msg);
                std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
                return;
            }
    
            net_event_count += n_events;
            timeout.sec = timeout.msec = 0;
        } while (net_event_count < MAX_NET_EVENTS);
    }
    
    int
    IceTransportFactory::processThread()
    {
        register_thread();
        while (!thread_quit_flag_) {
            handleIOEvents(ice_cfg_, 500); // limit polling to 500ms
        }
    
        return 0;
    }
    
    std::shared_ptr<IceTransport>
    IceTransportFactory::createTransport(const char* name,
                                         int component_count,
                                         bool master,
                                         bool upnp_enabled,
                                         IceTransportCompleteCb&& on_initdone_cb,
                                         IceTransportCompleteCb&& on_negodone_cb)
    {
        try {
            return std::make_shared<IceTransport>(name, component_count, master, upnp_enabled,
                                                  std::forward<IceTransportCompleteCb>(on_initdone_cb),
                                                  std::forward<IceTransportCompleteCb>(on_negodone_cb));
        } catch(const std::exception& e) {
            RING_ERR("%s",e.what());
            return nullptr;
        }
    }
    
    void
    IceSocket::close()
    {
        ice_transport_.reset();
    }
    
    ssize_t
    IceSocket::recv(unsigned char* buf, size_t len)
    {
        if (!ice_transport_.get())
            return -1;
        return ice_transport_->recv(compId_, buf, len);
    }
    
    ssize_t
    IceSocket::send(const unsigned char* buf, size_t len)
    {
        if (!ice_transport_.get())
            return -1;
        return ice_transport_->send(compId_, buf, len);
    }
    
    ssize_t
    IceSocket::getNextPacketSize() const
    {
        if (!ice_transport_.get())
            return -1;
        return ice_transport_->getNextPacketSize(compId_);
    }
    
    ssize_t
    IceSocket::waitForData(unsigned int timeout)
    {
        if (!ice_transport_.get())
            return -1;
    
        return ice_transport_->waitForData(compId_, timeout);
    }
    
    } // namespace ring