diff --git a/src/account.cpp b/src/account.cpp index 0916b1af5ed4fb1e305ab76edcc2b1e4a47ed9a5..b0491d91917028030a062aa7b13cda02507c8a00 100644 --- a/src/account.cpp +++ b/src/account.cpp @@ -124,6 +124,7 @@ Account::Account(const std::string& accountID) , localModeratorsEnabled_(true) , allModeratorsEnabled_(true) , multiStreamEnabled_(false) + , iceCompIdRfc5245Compliant_(false) { // Initialize the codec order, used when creating a new account loadDefaultCodecs(); diff --git a/src/account.h b/src/account.h index 607a798d9a308d930dc55269dc2b1cf485571972..dbffd88b1dc4f4117fd45609309926635a28a906 100644 --- a/src/account.h +++ b/src/account.h @@ -354,6 +354,22 @@ public: bool isMultiStreamEnabled() const { return multiStreamEnabled_; } void enableMultiStream(bool enable) { multiStreamEnabled_ = enable; } + // Enable/disable compliancy with RFC-5245 for component IDs format. + // The ICE component IDs are enumerated relative to the SDP session, + // i.e., starts from 1 and incremented for each component. + // However, RFC-5245 requires that the ICE component IDs are enumerated + // relative to the media stream, e.g., component IDs 1 and 2 for audio, + // and component IDs 1 and 2 for video. This non-conformity can cause + // inter-operability issues. + // When the compliancy feature is enabled, the component ID in the + // generated SDP will be compliant to RFC-5245. This feature should be + // enabled only when the peer is compliant to RFC-5245 as well. + // The current version is able to correctly parse both formats. + // This feature is needed for backward compatiblity, and should be removed + // once the backward compatibility is no more required. + bool isIceCompIdRfc5245Compliant() const { return iceCompIdRfc5245Compliant_; } + void enableIceCompIdRfc5245Compliance(bool enable) { iceCompIdRfc5245Compliant_ = enable; } + public: // virtual methods that has to be implemented by concrete classes /** @@ -551,6 +567,7 @@ protected: bool allModeratorsEnabled_; bool multiStreamEnabled_; + bool iceCompIdRfc5245Compliant_; /** * private account codec searching functions diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 0da5051dea3dcce490bddab16891bb78d448e49c..f779e3a2f1277895f30f7f35f16f489ba7e184e2 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -51,6 +51,13 @@ throw std::runtime_error(#ret " failed"); \ } while (0) +// Validate that the component ID is within the expected range +#define ASSERT_COMP_ID(compId, compCount) \ + do { \ + if ((compId) == 0 or (compId) > (compCount)) \ + throw std::runtime_error("Invalid component ID " + (std::to_string(compId))); \ + } while (0) + namespace jami { static constexpr unsigned STUN_MAX_PACKET_SIZE {8192}; @@ -83,7 +90,7 @@ struct IceSTransDeleter class IceTransport::Impl { public: - Impl(const char* name, int component_count, bool master, const IceTransportOptions& options); + Impl(const char* name, const IceTransportOptions& options); ~Impl(); void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status); @@ -101,13 +108,10 @@ public: * Should be called before start method. */ bool setSlaveSession(); - bool createIceSession(pj_ice_sess_role role); void getUFragPwd(); - void getDefaultCandidates(); - std::string link() const; // Non-mutex protected of public versions @@ -133,7 +137,7 @@ public: // Generate server reflexive candidates using UPNP mappings. std::vector<std::pair<IpAddr, IpAddr>> setupUpnpReflexiveCandidates(); void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr); - const IpAddr& getDefaultRemoteAddress(unsigned comp_id) const; + const IpAddr getDefaultRemoteAddress(unsigned comp_id) const; bool handleEvents(unsigned max_msec); std::unique_ptr<pj_pool_t, std::function<void(pj_pool_t*)>> pool_ {}; @@ -142,8 +146,9 @@ public: IceRecvInfo on_recv_cb_ {}; mutable std::mutex iceMutex_ {}; std::unique_ptr<pj_ice_strans, IceSTransDeleter> icest_; - unsigned component_count_ {}; - pj_ice_sess_cand cand_[MAX_CANDIDATES] {}; + unsigned streamsCount_ {0}; + unsigned compCountPerStream_ {0}; + unsigned compCount_ {0}; std::string local_ufrag_ {}; std::string local_pwd_ {}; pj_sockaddr remoteAddr_ {}; @@ -161,8 +166,6 @@ public: std::vector<char> data {}; }; - std::vector<PeerChannel> peerChannels_ {}; - struct ComponentIO { std::mutex mutex; @@ -171,7 +174,12 @@ public: IceRecvCb cb; }; + // NOTE: Component IDs start from 1, while these three vectors + // are indexed from 0. Conversion from ID to vector index must + // be done properly. std::vector<ComponentIO> compIO_ {}; + std::vector<PeerChannel> peerChannels_ {}; + std::vector<IpAddr> iceDefaultRemoteAddr_; std::atomic_bool initiatorSession_ {true}; @@ -205,9 +213,6 @@ public: std::atomic_bool destroying_ {false}; onShutdownCb scb {}; - - // Default remote addresses - std::vector<IpAddr> iceDefaultRemoteAddr_; }; //============================================================================== @@ -296,10 +301,7 @@ add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& i //============================================================================== -IceTransport::Impl::Impl(const char* name, - int component_count, - bool master, - const IceTransportOptions& options) +IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options) : pool_(nullptr, [](pj_pool_t* pool) { sip_utils::register_thread(); @@ -307,19 +309,22 @@ IceTransport::Impl::Impl(const char* name, }) , on_initdone_cb_(options.onInitDone) , on_negodone_cb_(options.onNegoDone) - , component_count_(component_count) - , compIO_(component_count) - , initiatorSession_(master) + , streamsCount_(options.streamsCount) + , compCountPerStream_(options.compCountPerStream) + , compCount_(streamsCount_ * compCountPerStream_) + , compIO_(compCount_) + , peerChannels_(compCount_) + , iceDefaultRemoteAddr_(compCount_) + , initiatorSession_(options.master) , accountLocalAddr_(std::move(options.accountLocalAddr)) , accountPublicAddr_(std::move(options.accountPublicAddr)) , thread_() - , iceDefaultRemoteAddr_(component_count) { JAMI_DBG("[ice:%p] Creating IceTransport session for \"%s\" - comp count %u - as a %s", this, name, - component_count, - master ? "master" : "slave"); + compCount_, + initiatorSession_ ? "master" : "slave"); sip_utils::register_thread(); if (options.upnpEnable) @@ -343,8 +348,6 @@ IceTransport::Impl::Impl(const char* name, config_.opt.aggressive = PJ_FALSE; } - peerChannels_.resize(component_count_ + 1); - addDefaultCandidates(); addServerReflexiveCandidates(setupGenericReflexiveCandidates()); @@ -416,7 +419,7 @@ IceTransport::Impl::Impl(const char* name, TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue)); pj_ice_strans* icest = nullptr; - pj_status_t status = pj_ice_strans_create(name, &config_, component_count, this, &icecb, &icest); + pj_status_t status = pj_ice_strans_create(name, &config_, compCount_, this, &icecb, &icest); if (status != PJ_SUCCESS || icest == nullptr) { throw std::runtime_error("pj_ice_strans_create() failed"); @@ -445,7 +448,7 @@ IceTransport::Impl::Impl(const char* name, }); // Init to invalid addresses - iceDefaultRemoteAddr_.reserve(component_count); + iceDefaultRemoteAddr_.reserve(compCount_); } IceTransport::Impl::~Impl() @@ -593,25 +596,11 @@ IceTransport::Impl::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_st else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) { if (done) { // Dump of connection pairs - std::ostringstream out; - for (unsigned i = 0; i < component_count_; ++i) { - auto laddr = getLocalAddress(i); - auto raddr = getRemoteAddress(i); - - if (laddr and raddr) { - out << " [" << i + 1 << "] " << laddr.toString(true, true) << " [" - << getCandidateType(getSelectedCandidate(i, false)) << "] " - << " <-> " << raddr.toString(true, true) << " [" - << getCandidateType(getSelectedCandidate(i, true)) << "] " << '\n'; - } else { - out << " [" << i + 1 << "] disabled\n"; - } - } - + auto out = link(); JAMI_DBG("[ice:%p] %s connection pairs ([comp id] local [type] <-> remote [type]):\n%s", this, (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"), - out.str().c_str()); + out.c_str()); } if (on_negodone_cb_) on_negodone_cb_(done); @@ -625,20 +614,20 @@ std::string IceTransport::Impl::link() const { std::ostringstream out; - for (unsigned i = 0; i < component_count_; ++i) { - auto laddr = getLocalAddress(i); - auto raddr = getRemoteAddress(i); - - if (laddr and raddr) { - out << " [" << i + 1 << "] " << laddr.toString(true, true) << " [" - << getCandidateType(getSelectedCandidate(i, false)) << "] " - << " <-> " << raddr.toString(true, true) << " [" - << getCandidateType(getSelectedCandidate(i, true)) << "] "; - } else { - out << " [" << i + 1 << "] disabled"; + for (unsigned strm = 0; strm < streamsCount_; strm++) { + for (unsigned i = 1; i <= compCountPerStream_; i++) { + auto laddr = getLocalAddress(i); + auto raddr = getRemoteAddress(i); + + if (laddr and raddr) { + out << " [" << i << "] " << laddr.toString(true, true) << " [" + << getCandidateType(getSelectedCandidate(i, false)) << "] " + << " <-> " << raddr.toString(true, true) << " [" + << getCandidateType(getSelectedCandidate(i, true)) << "] " << '\n'; + } else { + out << " [" << i << "] disabled\n"; + } } - if (i + 1 != component_count_) - out << "\n"; } return out.str(); } @@ -680,13 +669,15 @@ IceTransport::Impl::setSlaveSession() const pj_ice_sess_cand* IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const { + ASSERT_COMP_ID(comp_id, compCount_); + // Return the selected candidate pair. Might not be the nominated pair if // ICE has not concluded yet, but should be the nominated pair afterwards. if (not _isRunning()) { JAMI_ERR("[ice:%p] ICE transport is not running", this); return nullptr; } - const auto* sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id + 1); + const auto* sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id); if (sess == nullptr) { JAMI_ERR("[ice:%p] Component %i has no valid pair", this, comp_id); return nullptr; @@ -701,6 +692,8 @@ IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const IpAddr IceTransport::Impl::getLocalAddress(unsigned comp_id) const { + ASSERT_COMP_ID(comp_id, compCount_); + if (auto cand = getSelectedCandidate(comp_id, false)) return cand->addr; @@ -711,6 +704,8 @@ IceTransport::Impl::getLocalAddress(unsigned comp_id) const IpAddr IceTransport::Impl::getRemoteAddress(unsigned comp_id) const { + ASSERT_COMP_ID(comp_id, compCount_); + if (auto cand = getSelectedCandidate(comp_id, true)) return cand->addr; @@ -734,13 +729,6 @@ IceTransport::Impl::getUFragPwd() local_pwd_.assign(local_pwd.ptr, local_pwd.slen); } -void -IceTransport::Impl::getDefaultCandidates() -{ - for (unsigned i = 0; i < component_count_; ++i) - pj_ice_strans_get_def_cand(icest_.get(), i + 1, &cand_[i]); -} - bool IceTransport::Impl::createIceSession(pj_ice_sess_role role) { @@ -751,7 +739,6 @@ IceTransport::Impl::createIceSession(pj_ice_sess_role role) // Fetch some information on local configuration getUFragPwd(); - getDefaultCandidates(); JAMI_DBG("[ice:%p] (local) ufrag=%s, pwd=%s", this, local_ufrag_.c_str(), local_pwd_.c_str()); return true; } @@ -812,7 +799,7 @@ IceTransport::Impl::requestUpnpMappings() auto portType = transport == PJ_CAND_UDP ? PortType::UDP : PortType::TCP; // Request upnp mapping for each component. - for (unsigned compId = 1; compId <= component_count_; compId++) { + for (unsigned id = 1; id <= compCount_; id++) { // Set port number to 0 to get any available port. Mapping requestedMap(portType); @@ -843,18 +830,18 @@ IceTransport::Impl::requestUpnpMappings() bool IceTransport::Impl::hasUpnp() const { - return upnp_ and upnpMappings_.size() == component_count_; + return upnp_ and upnpMappings_.size() == compCount_; } void IceTransport::Impl::addServerReflexiveCandidates( const std::vector<std::pair<IpAddr, IpAddr>>& addrList) { - if (addrList.size() != component_count_) { + if (addrList.size() != compCount_) { JAMI_WARN("[ice:%p]: Provided addr list size %lu does not match component count %u", this, addrList.size(), - component_count_); + compCount_); return; } @@ -865,25 +852,26 @@ IceTransport::Impl::addServerReflexiveCandidates( assert(config_.stun_tp_cnt > 0 && config_.stun_tp_cnt < PJ_ICE_MAX_STUN); auto& stun = config_.stun_tp[config_.stun_tp_cnt - 1]; - for (unsigned compIdx = 0; compIdx < component_count_; compIdx++) { - auto& localAddr = addrList[compIdx].first; - auto& publicAddr = addrList[compIdx].second; + for (unsigned id = 1; id <= compCount_; id++) { + auto idx = id - 1; + auto& localAddr = addrList[idx].first; + auto& publicAddr = addrList[idx].second; - pj_sockaddr_cp(&stun.cfg.user_mapping[compIdx].local_addr, localAddr.pjPtr()); - pj_sockaddr_cp(&stun.cfg.user_mapping[compIdx].mapped_addr, publicAddr.pjPtr()); + pj_sockaddr_cp(&stun.cfg.user_mapping[idx].local_addr, localAddr.pjPtr()); + pj_sockaddr_cp(&stun.cfg.user_mapping[idx].mapped_addr, publicAddr.pjPtr()); if (isTcpEnabled()) { if (publicAddr.getPort() == 9) { - stun.cfg.user_mapping[compIdx].tp_type = PJ_CAND_TCP_ACTIVE; + stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_ACTIVE; } else { - stun.cfg.user_mapping[compIdx].tp_type = PJ_CAND_TCP_PASSIVE; + stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_PASSIVE; } } else { - stun.cfg.user_mapping[compIdx].tp_type = PJ_CAND_UDP; + stun.cfg.user_mapping[idx].tp_type = PJ_CAND_UDP; } } - stun.cfg.user_mapping_cnt = component_count_; + stun.cfg.user_mapping_cnt = compCount_; assert(stun.cfg.user_mapping_cnt < PJ_ICE_MAX_COMP); } @@ -900,8 +888,8 @@ IceTransport::Impl::setupGenericReflexiveCandidates() // candidates and set to active otherwise. if (accountLocalAddr_ and accountPublicAddr_) { - addrList.reserve(component_count_); - for (unsigned compIdx = 0; compIdx < component_count_; compIdx++) { + addrList.reserve(compCount_); + for (unsigned id = 1; id <= compCount_; id++) { // For TCP, the type is set to active, because most likely the incoming // connection will be blocked by the NAT. // For UDP use random port number. @@ -913,11 +901,10 @@ IceTransport::Impl::setupGenericReflexiveCandidates() accountPublicAddr_.setPort(port); addrList.emplace_back(accountLocalAddr_, accountPublicAddr_); - JAMI_DBG("[ice:%p]: Add generic local reflexive candidates [%s : %s] for comp %u", + JAMI_DBG("[ice:%p]: Add generic local reflexive candidates [%s : %s]", this, accountLocalAddr_.toString(true).c_str(), - accountPublicAddr_.toString(true).c_str(), - compIdx + 1); + accountPublicAddr_.toString(true).c_str()); } } @@ -933,11 +920,11 @@ IceTransport::Impl::setupUpnpReflexiveCandidates() std::lock_guard<std::mutex> lock(upnpMappingsMutex_); - if (static_cast<unsigned>(upnpMappings_.size()) < component_count_) { + if (static_cast<unsigned>(upnpMappings_.size()) < compCount_) { JAMI_WARN("[ice:%p]: Not enough mappings %lu. Expected %u", this, upnpMappings_.size(), - component_count_); + compCount_); return {}; } @@ -965,41 +952,40 @@ IceTransport::Impl::setupUpnpReflexiveCandidates() } void -IceTransport::Impl::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr) +IceTransport::Impl::setDefaultRemoteAddress(unsigned compId, const IpAddr& addr) { - // Component ID must be valid. - assert(static_cast<unsigned>(comp_id) < component_count_); + ASSERT_COMP_ID(compId, compCount_); - iceDefaultRemoteAddr_[comp_id] = addr; + iceDefaultRemoteAddr_[compId - 1] = addr; // The port does not matter. Set it 0 to avoid confusion. - iceDefaultRemoteAddr_[comp_id].setPort(0); + iceDefaultRemoteAddr_[compId - 1].setPort(0); } -const IpAddr& -IceTransport::Impl::getDefaultRemoteAddress(unsigned comp_id) const +const IpAddr +IceTransport::Impl::getDefaultRemoteAddress(unsigned compId) const { - // Component ID must be valid. - assert(static_cast<unsigned>(comp_id) < component_count_); + ASSERT_COMP_ID(compId, compCount_); - return iceDefaultRemoteAddr_[comp_id]; + return iceDefaultRemoteAddr_[compId - 1]; } void IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size) { - if (!comp_id or comp_id > component_count_) { - JAMI_ERR("rx: invalid comp_id (%u)", comp_id); - return; - } - if (!size) + ASSERT_COMP_ID(comp_id, compCount_); + + if (size == 0) return; - auto& io = compIO_[comp_id - 1]; + + auto idx = comp_id - 1; + auto& io = compIO_[idx]; + std::unique_lock<std::mutex> lk(io.mutex); if (io.cb) { io.cb((uint8_t*) pkt, size); } else { std::error_code ec; - auto err = peerChannels_.at(comp_id - 1).write((const char*) pkt, size, ec); + auto err = peerChannels_.at(idx).write((const char*) pkt, size, ec); if (err < 0) { JAMI_ERR("[ice:%p] rx: channel is closed", this); } @@ -1008,11 +994,8 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size) //============================================================================== -IceTransport::IceTransport(const char* name, - int component_count, - bool master, - const IceTransportOptions& options) - : pimpl_ {std::make_unique<Impl>(name, component_count, master, options)} +IceTransport::IceTransport(const char* name, const IceTransportOptions& options) + : pimpl_ {std::make_unique<Impl>(name, options)} {} IceTransport::~IceTransport() {} @@ -1055,7 +1038,7 @@ IceTransport::isFailed() const unsigned IceTransport::getComponentCount() const { - return pimpl_->component_count_; + return pimpl_->compCount_; } bool @@ -1153,16 +1136,24 @@ IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& r bool IceTransport::startIce(const SDP& sdp) { + if (pimpl_->streamsCount_ != 1) { + JAMI_ERR("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_); + return false; + } + if (not isInitialized()) { JAMI_ERR("[ice:%p] not initialized transport", pimpl_.get()); pimpl_->is_stopped_ = true; return false; } - for (unsigned i = 0; i < pimpl_->component_count_; i++) { - auto candVec = getLocalCandidates(i); + for (unsigned id = 1; id <= getComponentCount(); id++) { + auto candVec = getLocalCandidates(id); for (auto const& cand : candVec) { - JAMI_DBG("[ice:%p] Using local candidate %s for comp %u", pimpl_.get(), cand.c_str(), i); + JAMI_DBG("[ice:%p] Using local candidate %s for comp %u", + pimpl_.get(), + cand.c_str(), + id); } } @@ -1175,7 +1166,7 @@ IceTransport::startIce(const SDP& sdp) rem_candidates.reserve(sdp.candidates.size()); IceCandidate cand; for (const auto& line : sdp.candidates) { - if (getCandidateFromSDP(line, cand)) + if (parseIceAttributeLine(0, line, cand)) rem_candidates.emplace_back(cand); } std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; @@ -1254,8 +1245,9 @@ IceTransport::getLocalAttributes() const std::vector<std::string> IceTransport::getLocalCandidates(unsigned comp_id) const { + ASSERT_COMP_ID(comp_id, getComponentCount()); std::vector<std::string> res; - pj_ice_sess_cand cand[PJ_ARRAY_SIZE(pimpl_->cand_)]; + pj_ice_sess_cand cand[MAX_CANDIDATES]; unsigned cand_cnt = PJ_ARRAY_SIZE(cand); { @@ -1311,39 +1303,92 @@ IceTransport::getLocalCandidates(unsigned comp_id) const return res; } +std::vector<std::string> +IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const +{ + ASSERT_COMP_ID(compId, getComponentCount()); + + std::vector<std::string> res; + pj_ice_sess_cand cand[MAX_CANDIDATES]; + unsigned cand_cnt = MAX_CANDIDATES; + + { + std::lock_guard<std::mutex> lk {pimpl_->iceMutex_}; + if (!pimpl_->icest_) + return res; + // In the implementation, the component IDs are enumerated globally + // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create + // only one pj_ice_strans instance. However, the component IDs are + // enumerated per stream in the generated SDP (1, 2, 1, 2, ...) in + // order to be compliant with the spec. + + auto globalCompId = streamIdx * 2 + compId; + if (pj_ice_strans_enum_cands(pimpl_->icest_.get(), globalCompId, &cand_cnt, cand) + != PJ_SUCCESS) { + JAMI_ERR("[ice:%p] pj_ice_strans_enum_cands() failed", pimpl_.get()); + return res; + } + } + + res.reserve(cand_cnt); + // Build ICE attributes according to RFC 6544, section 4.5. + for (unsigned i = 0; i < cand_cnt; ++i) { + char ipaddr[PJ_INET6_ADDRSTRLEN]; + std::string tcp_type; + if (cand[i].transport != PJ_CAND_UDP) { + tcp_type += " tcptype"; + switch (cand[i].transport) { + case PJ_CAND_TCP_ACTIVE: + tcp_type += " active"; + break; + case PJ_CAND_TCP_PASSIVE: + tcp_type += " passive"; + break; + case PJ_CAND_TCP_SO: + default: + tcp_type += " so"; + break; + } + } + res.emplace_back( + fmt::format("{} {} {} {} {} {} typ {}{}", + std::string_view(cand[i].foundation.ptr, cand[i].foundation.slen), + compId, + (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"), + cand[i].prio, + pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0), + pj_sockaddr_get_port(&cand[i].addr), + pj_ice_get_cand_type_name(cand[i].type), + tcp_type)); + } + + return res; +} bool -IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) const +IceTransport::parseIceAttributeLine(unsigned streamIdx, + const std::string& line, + IceCandidate& cand) const { // Silently ignore empty lines if (line.empty()) return false; - /** Section 4.5, RFC 6544 (https://tools.ietf.org/html/rfc6544) - * candidate-attribute = "candidate" ":" foundation SP component-id SP - * "TCP" SP - * priority SP - * connection-address SP - * port SP - * cand-type - * [SP rel-addr] - * [SP rel-port] - * SP tcp-type-ext - * *(SP extension-att-name SP - * extension-att-value) - * - * tcp-type-ext = "tcptype" SP tcp-type - * tcp-type = "active" / "passive" / "so" - */ + if (streamIdx >= pimpl_->streamsCount_) { + throw std::runtime_error("Stream index " + std::to_string(streamIdx) + " is invalid!"); + } + int af, cnt; char foundation[32], transport[12], ipaddr[80], type[32], tcp_type[32]; pj_str_t tmpaddr; - int comp_id, prio, port; + unsigned comp_id, prio, port; pj_status_t status; pj_bool_t is_tcp = PJ_FALSE; + // Parse ICE attribute line according to RFC-6544 section 4.5. + // TODO/WARNING: There is no fail-safe in case of malformed attributes. cnt = sscanf(line.c_str(), - "%31s %d %11s %d %79s %d typ %31s tcptype %31s\n", + "%31s %u %11s %u %79s %u typ %31s tcptype %31s\n", foundation, &comp_id, transport, @@ -1391,7 +1436,13 @@ IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) c cand.transport = PJ_CAND_UDP; } + // If the component Id is enumerated relative to media, convert + // it to absolute enumeration. + if (comp_id <= pimpl_->compCountPerStream_) { + comp_id += pimpl_->compCountPerStream_ * streamIdx; + } cand.comp_id = (pj_uint8_t) comp_id; + cand.prio = prio; if (strchr(ipaddr, ':')) @@ -1416,9 +1467,10 @@ IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) c } ssize_t -IceTransport::recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec) +IceTransport::recv(unsigned compId, unsigned char* buf, size_t len, std::error_code& ec) { - auto& io = pimpl_->compIO_[comp_id]; + ASSERT_COMP_ID(compId, getComponentCount()); + auto& io = pimpl_->compIO_[compId - 1]; std::lock_guard<std::mutex> lk(io.mutex); if (io.queue.empty()) { @@ -1440,19 +1492,21 @@ IceTransport::recv(int comp_id, unsigned char* buf, size_t len, std::error_code& } ssize_t -IceTransport::recvfrom(int comp_id, char* buf, size_t len, std::error_code& ec) +IceTransport::recvfrom(unsigned compId, char* buf, size_t len, std::error_code& ec) { - return pimpl_->peerChannels_.at(comp_id).read(buf, len, ec); + ASSERT_COMP_ID(compId, getComponentCount()); + return pimpl_->peerChannels_.at(compId - 1).read(buf, len, ec); } void -IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) +IceTransport::setOnRecv(unsigned compId, IceRecvCb cb) { - auto& io = pimpl_->compIO_[comp_id]; + ASSERT_COMP_ID(compId, getComponentCount()); + auto& io = pimpl_->compIO_[compId - 1]; std::lock_guard<std::mutex> lk(io.mutex); - io.cb = cb; + io.cb = std::move(cb); - if (cb) { + if (io.cb) { // Flush existing queue using the callback for (const auto& packet : io.queue) io.cb((uint8_t*) packet.data.data(), packet.data.size()); @@ -1467,18 +1521,20 @@ IceTransport::setOnShutdown(onShutdownCb&& cb) } ssize_t -IceTransport::send(int comp_id, const unsigned char* buf, size_t len) +IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) { + ASSERT_COMP_ID(compId, getComponentCount()); + sip_utils::register_thread(); - auto remote = getRemoteAddress(comp_id); + auto remote = getRemoteAddress(compId); if (!remote) { - JAMI_ERR("[ice:%p] can't find remote address for component %d", pimpl_.get(), comp_id); + JAMI_ERR("[ice:%p] can't find remote address for component %d", pimpl_.get(), compId); errno = EINVAL; return -1; } auto status = pj_ice_strans_sendto2(pimpl_->icest_.get(), - comp_id + 1, + compId, buf, len, remote.pjPtr(), @@ -1533,15 +1589,10 @@ IceTransport::waitForNegotiation(std::chrono::milliseconds timeout) } ssize_t -IceTransport::isDataAvailable(int comp_id) +IceTransport::waitForData(unsigned compId, std::chrono::milliseconds timeout, std::error_code& ec) { - return pimpl_->peerChannels_.at(comp_id).isDataAvailable(); -} - -ssize_t -IceTransport::waitForData(int comp_id, std::chrono::milliseconds timeout, std::error_code& ec) -{ - return pimpl_->peerChannels_.at(comp_id).wait(timeout, ec); + ASSERT_COMP_ID(compId, getComponentCount()); + return pimpl_->peerChannels_.at(compId - 1).wait(timeout, ec); } std::vector<SDP> @@ -1588,8 +1639,13 @@ IceTransport::isTCPEnabled() } ICESDP -IceTransport::parse_SDP(std::string_view sdp_msg, const IceTransport& ice) +IceTransport::parseIceCandidates(std::string_view sdp_msg) { + if (pimpl_->streamsCount_ != 1) { + JAMI_ERR("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_); + return {}; + } + ICESDP res; int nr = 0; for (std::string_view line; jami::getline(sdp_msg, line); nr++) { @@ -1599,7 +1655,7 @@ IceTransport::parse_SDP(std::string_view sdp_msg, const IceTransport& ice) res.rem_pwd = line; } else { IceCandidate cand; - if (ice.getCandidateFromSDP(std::string(line), cand)) { + if (parseIceAttributeLine(0, std::string(line), cand)) { JAMI_DBG("Add remote ICE candidate: %.*s", (int) line.size(), line.data()); res.rem_candidates.emplace_back(cand); } @@ -1609,7 +1665,7 @@ IceTransport::parse_SDP(std::string_view sdp_msg, const IceTransport& ice) } void -IceTransport::setDefaultRemoteAddress(int comp_id, const IpAddr& addr) +IceTransport::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr) { pimpl_->setDefaultRemoteAddress(comp_id, addr); } @@ -1656,13 +1712,10 @@ IceTransportFactory::~IceTransportFactory() } std::shared_ptr<IceTransport> -IceTransportFactory::createTransport(const char* name, - int component_count, - bool master, - const IceTransportOptions& options) +IceTransportFactory::createTransport(const char* name, const IceTransportOptions& options) { try { - return std::make_shared<IceTransport>(name, component_count, master, options); + return std::make_shared<IceTransport>(name, options); } catch (const std::exception& e) { JAMI_ERR("%s", e.what()); return nullptr; @@ -1670,13 +1723,10 @@ IceTransportFactory::createTransport(const char* name, } std::unique_ptr<IceTransport> -IceTransportFactory::createUTransport(const char* name, - int component_count, - bool master, - const IceTransportOptions& options) +IceTransportFactory::createUTransport(const char* name, const IceTransportOptions& options) { try { - return std::make_unique<IceTransport>(name, component_count, master, options); + return std::make_unique<IceTransport>(name, options); } catch (const std::exception& e) { JAMI_ERR("%s", e.what()); return nullptr; diff --git a/src/ice_transport.h b/src/ice_transport.h index f30c478776ca87e7315e9c97d0b60d95433ceab1..d6531f985842f348c1a0fb724b6913d684dec1bd 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -95,6 +95,9 @@ struct TurnServerInfo struct IceTransportOptions { + bool master {true}; + unsigned streamsCount {1}; + unsigned compCountPerStream {1}; bool upnpEnable {false}; IceTransportCompleteCb onInitDone {}; IceTransportCompleteCb onNegoDone {}; @@ -130,10 +133,7 @@ public: /** * Constructor */ - IceTransport(const char* name, - int component_count, - bool master, - const IceTransportOptions& options = {}); + IceTransport(const char* name, const IceTransportOptions& options = {}); ~IceTransport(); /** * Get current state @@ -196,7 +196,7 @@ public: std::string getLastErrMsg() const; - IpAddr getDefaultLocalAddress() const { return getLocalAddress(0); } + IpAddr getDefaultLocalAddress() const { return getLocalAddress(1); } /** * Return ICE session attributes @@ -208,6 +208,15 @@ public: */ std::vector<std::string> getLocalCandidates(unsigned comp_id) const; + /** + * Return ICE session attributes + */ + std::vector<std::string> getLocalCandidates(unsigned streamIdx, unsigned compId) const; + + bool parseIceAttributeLine(unsigned streamIdx, + const std::string& line, + IceCandidate& cand) const; + bool getCandidateFromSDP(const std::string& line, IceCandidate& cand) const; // I/O methods @@ -215,23 +224,16 @@ public: void setOnRecv(unsigned comp_id, IceRecvCb cb); void setOnShutdown(onShutdownCb&& cb); - ssize_t recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec); - ssize_t recvfrom(int comp_id, char* buf, size_t len, std::error_code& ec); + ssize_t recv(unsigned comp_id, unsigned char* buf, size_t len, std::error_code& ec); + ssize_t recvfrom(unsigned comp_id, char* buf, size_t len, std::error_code& ec); - ssize_t send(int comp_id, const unsigned char* buf, size_t len); + ssize_t send(unsigned comp_id, const unsigned char* buf, size_t len); int waitForInitialization(std::chrono::milliseconds timeout); int waitForNegotiation(std::chrono::milliseconds timeout); - ssize_t waitForData(int comp_id, std::chrono::milliseconds timeout, std::error_code& ec); - - /** - * Return without waiting how many bytes are ready to read - * @param comp_id Ice component - * @return the number of bytes ready to read - */ - ssize_t isDataAvailable(int comp_id); + ssize_t waitForData(unsigned comp_id, std::chrono::milliseconds timeout, std::error_code& ec); unsigned getComponentCount() const; @@ -248,9 +250,9 @@ public: bool isTCPEnabled(); - static ICESDP parse_SDP(std::string_view sdp_msg, const IceTransport& ice); + ICESDP parseIceCandidates(std::string_view sdp_msg); - void setDefaultRemoteAddress(int comp_id, const IpAddr& addr); + void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr); std::string link() const; @@ -266,13 +268,9 @@ public: ~IceTransportFactory(); std::shared_ptr<IceTransport> createTransport(const char* name, - int component_count, - bool master, const IceTransportOptions& options = {}); std::unique_ptr<IceTransport> createUTransport(const char* name, - int component_count, - bool master, const IceTransportOptions& options = {}); /** diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index cd94b3a50cad5d61acf7f5bd304944ab9eadabc5..bd1bae2354e6a5eaacdae3aa7769b2c98d13af81 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -325,7 +325,7 @@ ConnectionManager::Impl::connectDeviceStartIce(const DeviceId& deviceId, const d if (!ice) return; - auto sdp = IceTransport::parse_SDP(response.ice_msg, *ice); + auto sdp = ice->parseIceCandidates(response.ice_msg); if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str()); @@ -537,8 +537,11 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif sthis->infos_[{deviceId, vid}] = info; } std::unique_lock<std::mutex> lk {info->mutex_}; + ice_config.master = false; + ice_config.streamsCount = JamiAccount::ICE_STREAMS_COUNT; + ice_config.compCountPerStream = JamiAccount::ICE_COMP_COUNT_PER_STREAM; info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( - sthis->account.getAccountID().c_str(), 1, false, ice_config); + sthis->account.getAccountID().c_str(), ice_config); if (!info->ice_) { JAMI_ERR("Cannot initialize ICE session."); @@ -751,7 +754,7 @@ ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req) return; } - auto sdp = IceTransport::parse_SDP(req.ice_msg, *ice); + auto sdp = ice->parseIceCandidates(req.ice_msg); answerTo(*ice, req.id, req.from); if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { JAMI_ERR("[Account:%s] start ICE failed - fallback to TURN", account.getAccountID().c_str()); @@ -883,8 +886,12 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, shared->account.getAccountID().c_str(), deviceId.c_str()); std::unique_lock<std::mutex> lk {info->mutex_}; - info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( - shared->account.getAccountID().c_str(), 1, true, ice_config); + ice_config.streamsCount = JamiAccount::ICE_STREAMS_COUNT; + ice_config.compCountPerStream = JamiAccount::ICE_COMP_COUNT_PER_STREAM; + ice_config.master = true; + info->ice_ = Manager::instance() + .getIceTransportFactory() + .createUTransport(shared->account.getAccountID().c_str(), ice_config); if (not info->ice_) { JAMI_ERR("Cannot initialize ICE session."); if (shared->connReadyCb_) diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index ec05fd27c279447d5a8f3bc91a8422c569145e79..1cfc331f71b0b61672dd330c5fca54eaa4203280 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -218,7 +218,7 @@ struct JamiAccount::DiscoveredPeer std::shared_ptr<Task> cleanupTask; }; -static constexpr int ICE_COMP_SIP_TRANSPORT {0}; +static constexpr int ICE_COMP_ID_SIP_TRANSPORT {1}; static constexpr const char* const RING_URI_PREFIX = "ring:"; static constexpr const char* const JAMI_URI_PREFIX = "jami:"; @@ -682,7 +682,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: }); auto remoted_address = sipConn.channel->underlyingICE()->getRemoteAddress( - ICE_COMP_SIP_TRANSPORT); + ICE_COMP_ID_SIP_TRANSPORT); try { onConnectedOutgoingCall(dev_call, toUri, remoted_address); } catch (const VoipLinkException&) { @@ -4499,7 +4499,9 @@ JamiAccount::sendSIPMessage(SipConnection& conn, // Build SIP Message // "deviceID@IP" - auto toURI = getToUri(to + "@" + channel->underlyingICE()->getRemoteAddress(0).toString(true)); + auto toURI = getToUri( + to + "@" + + channel->underlyingICE()->getRemoteAddress(ICE_COMP_ID_SIP_TRANSPORT).toString(true)); std::string from = getFromUri(); pjsip_tx_data* tdata; @@ -4653,7 +4655,7 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, pc->setTransport(sip_tr); pc->setState(Call::ConnectionState::PROGRESSING); if (auto ice = socket->underlyingICE()) { - auto remoted_address = ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT); + auto remoted_address = ice->getRemoteAddress(ICE_COMP_ID_SIP_TRANSPORT); try { onConnectedOutgoingCall(pc, peerId, remoted_address); } catch (const VoipLinkException&) { diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 6e883df9893c3f770c78740e6089b6879199b473..1ecf86c23974eebb5cd934f60656a860c3192097 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -108,6 +108,8 @@ public: = "https://config.jami.net/proxyList"; /* constexpr */ static const std::pair<uint16_t, uint16_t> DHT_PORT_RANGE; + constexpr static int ICE_STREAMS_COUNT {1}; + constexpr static int ICE_COMP_COUNT_PER_STREAM {1}; const char* getAccountType() const override { return ACCOUNT_TYPE; } diff --git a/src/peer_connection.h b/src/peer_connection.h index d3905154122f44e7324e8d3c0674191470c1aae4..9f64190d49d35c674538db103bb5b1dff4e79907 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -106,7 +106,7 @@ private: std::shared_ptr<IceTransport> ice_ {nullptr}; std::atomic_bool iceStopped {false}; std::atomic_bool iceIsSender {false}; - uint8_t compId_ {0}; + uint8_t compId_ {1}; }; //============================================================================== diff --git a/src/sip/sdp.cpp b/src/sip/sdp.cpp index ee587126774e1b1ca9b6101b31271b0e5caf030a..21d14b85d49dbcf8f5aedbbac79a5ea498ad54c2 100644 --- a/src/sip/sdp.cpp +++ b/src/sip/sdp.cpp @@ -897,9 +897,9 @@ Sdp::addIceCandidates(unsigned media_index, const std::vector<std::string>& cand std::vector<std::string> Sdp::getIceCandidates(unsigned media_index) const { - auto session = activeRemoteSession_ ? activeRemoteSession_ : remoteSession_; + auto remoteSession = activeRemoteSession_ ? activeRemoteSession_ : remoteSession_; auto localSession = activeLocalSession_ ? activeLocalSession_ : localSession_; - if (not session) { + if (not remoteSession) { JAMI_ERR("getIceCandidates failed: no remote session"); return {}; } @@ -907,12 +907,12 @@ Sdp::getIceCandidates(unsigned media_index) const JAMI_ERR("getIceCandidates failed: no local session"); return {}; } - if (media_index >= session->media_count || media_index >= localSession->media_count) { + if (media_index >= remoteSession->media_count || media_index >= localSession->media_count) { JAMI_ERR("getIceCandidates failed: cannot access media#%u (may be deactivated)", media_index); return {}; } - auto media = session->media[media_index]; + auto media = remoteSession->media[media_index]; auto localMedia = localSession->media[media_index]; if (media->desc.port == 0 || localMedia->desc.port == 0) { JAMI_WARN("Media#%u is disabled. Media ports: local %u, remote %u", diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index 28f030fc6274a2ac8bf3abaaeaadd15b915f4b93..141d296ff88bc18575a39082773a946510d56e09 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -78,6 +78,8 @@ getVideoSettings() static constexpr std::chrono::seconds DEFAULT_ICE_INIT_TIMEOUT {35}; // seconds static constexpr std::chrono::seconds DEFAULT_ICE_NEGO_TIMEOUT {60}; // seconds static constexpr std::chrono::milliseconds MS_BETWEEN_2_KEYFRAME_REQUEST {1000}; +static constexpr int ICE_COMP_ID_RTP {1}; +static constexpr int ICE_COMP_COUNT_PER_STREAM {2}; SIPCall::SIPCall(const std::shared_ptr<SIPAccountBase>& account, const std::string& callId, @@ -127,11 +129,15 @@ SIPCall::SIPCall(const std::shared_ptr<SIPAccountBase>& account, std::vector<MediaAttribute> mediaList; if (mediaAttrList.size() > 0) { mediaList = mediaAttrList; - } else { + } else if (type_ == Call::CallType::INCOMING) { // Handle incoming call without media offer. JAMI_WARN("[call:%s] No media offered in the incoming invite. Will answer with audio-only", getCallId().c_str()); - mediaList = getSIPAccount()->createDefaultMediaList(false, getState() == CallState::HOLD); + mediaList = getSIPAccount()->createDefaultMediaList(getSIPAccount()->isVideoEnabled(), + getState() == CallState::HOLD); + } else { + JAMI_ERR("[call:%s] Media list can not be empty for outgoing calls", getCallId().c_str()); + return; } JAMI_DBG("[call:%s] Create a new SIP call with %lu medias", @@ -655,13 +661,16 @@ SIPCall::terminateSipSession(int status) auto contact = account->getContactHeader(transport_ ? transport_->get() : nullptr); sip_utils::addContactHeader(&contact, tdata); + // Add user-agent header + sip_utils::addUserAgentHeader(account->getUserAgentName(), tdata); } else { JAMI_ERR("No account detected"); + std::ostringstream msg; + msg << "[call:" << getCallId().c_str() << "] " + << "The account owning this call is invalid"; + throw std::runtime_error(msg.str()); } - // Add user-agent header - sip_utils::addUserAgentHeader(account->getUserAgentName(), tdata); - ret = pjsip_inv_send_msg(inviteSession_.get(), tdata); if (ret != PJ_SUCCESS) JAMI_ERR("[call:%s] failed to send terminate msg, SIP error (%s)", @@ -1467,20 +1476,38 @@ SIPCall::addLocalIceAttributes() JAMI_DBG("[call:%s] fill SDP with ICE transport %p", getCallId().c_str(), media_tr); sdp_->addIceAttributes(media_tr->getLocalAttributes()); - unsigned idx = 0; - unsigned compId = 1; - for (auto const& stream : rtpStreams_) { - JAMI_DBG("[call:%s] add ICE local candidates for media [%s] @ %u", - getCallId().c_str(), - stream.mediaAttribute_->toString().c_str(), - idx); - // RTP - sdp_->addIceCandidates(idx, media_tr->getLocalCandidates(compId)); - // RTCP - sdp_->addIceCandidates(idx, media_tr->getLocalCandidates(compId + 1)); - - idx++; - compId += 2; + if (account->isIceCompIdRfc5245Compliant()) { + unsigned streamIdx = 0; + for (auto const& stream : rtpStreams_) { + JAMI_DBG("[call:%s] add ICE local candidates for media [%s] @ %u", + getCallId().c_str(), + stream.mediaAttribute_->toString().c_str(), + streamIdx); + // RTP + sdp_->addIceCandidates(streamIdx, + media_tr->getLocalCandidates(streamIdx, ICE_COMP_ID_RTP)); + // RTCP + sdp_->addIceCandidates(streamIdx, + media_tr->getLocalCandidates(streamIdx, ICE_COMP_ID_RTP + 1)); + + streamIdx++; + } + } else { + unsigned idx = 0; + unsigned compId = 1; + for (auto const& stream : rtpStreams_) { + JAMI_DBG("[call:%s] add ICE local candidates for media [%s] @ %u", + getCallId().c_str(), + stream.mediaAttribute_->toString().c_str(), + idx); + // RTP + sdp_->addIceCandidates(idx, media_tr->getLocalCandidates(compId)); + // RTCP + sdp_->addIceCandidates(idx, media_tr->getLocalCandidates(compId + 1)); + + idx++; + compId += 2; + } } } @@ -1498,7 +1525,7 @@ SIPCall::getAllRemoteCandidates() for (unsigned mediaIdx = 0; mediaIdx < static_cast<unsigned>(rtpStreams_.size()); mediaIdx++) { IceCandidate cand; for (auto& line : sdp_->getIceCandidates(mediaIdx)) { - if (media_tr->getCandidateFromSDP(line, cand)) { + if (media_tr->parseIceAttributeLine(mediaIdx, line, cand)) { JAMI_DBG("[call:%s] add remote ICE candidate: %s", getCallId().c_str(), line.c_str()); @@ -1631,9 +1658,8 @@ SIPCall::startAllMedia() } auto slots = sdp_->getMediaSlots(); - unsigned ice_comp_id = 0; bool peer_holding {true}; - int slotN = -1; + int streamIdx = -1; // reset readyToRecord_ = false; @@ -1641,7 +1667,7 @@ SIPCall::startAllMedia() bool isVideoEnabled = false; for (const auto& slot : slots) { - ++slotN; + streamIdx++; const auto& local = slot.first; const auto& remote = slot.second; @@ -1653,16 +1679,18 @@ SIPCall::startAllMedia() if (local.type != remote.type) { JAMI_ERR("[call:%s] [SDP:slot#%u] Inconsistent media type between local and remote", getCallId().c_str(), - slotN); + streamIdx); continue; } if (!local.codec) { - JAMI_WARN("[call:%s] [SDP:slot#%u] Missing local codec", getCallId().c_str(), slotN); + JAMI_WARN("[call:%s] [SDP:slot#%u] Missing local codec", getCallId().c_str(), streamIdx); continue; } if (!remote.codec) { - JAMI_WARN("[call:%s] [SDP:slot#%u] Missing remote codec", getCallId().c_str(), slotN); + JAMI_WARN("[call:%s] [SDP:slot#%u] Missing remote codec", + getCallId().c_str(), + streamIdx); continue; } @@ -1670,11 +1698,11 @@ SIPCall::startAllMedia() JAMI_WARN( "[call:%s] [SDP:slot#%u] Secure mode but no crypto attributes. Ignoring the media", getCallId().c_str(), - slotN); + streamIdx); continue; } - auto const& rtpStream = rtpStreams_[slotN]; + auto const& rtpStream = rtpStreams_[streamIdx]; if (not rtpStream.mediaAttribute_) { throw std::runtime_error("Missing media attribute"); } @@ -1689,9 +1717,9 @@ SIPCall::startAllMedia() // because of the audio loop if (getState() != CallState::HOLD) { if (isIceRunning()) { - rtpStream.rtpSession_->start(newIceSocket(ice_comp_id), - newIceSocket(ice_comp_id + 1)); - ice_comp_id += 2; + // Create sockets for RTP and RTCP, and start the session. + auto compId = ICE_COMP_ID_RTP + streamIdx * ICE_COMP_COUNT_PER_STREAM; + rtpStream.rtpSession_->start(newIceSocket(compId), newIceSocket(compId + 1)); } else rtpStream.rtpSession_->start(nullptr, nullptr); } @@ -2490,13 +2518,12 @@ SIPCall::initIceMediaTransport(bool master, std::optional<IceTransportOptions> o }); }; + iceOptions.master = master; + iceOptions.streamsCount = static_cast<unsigned>(rtpStreams_.size()); // Each RTP stream requires a pair of ICE components (RTP + RTCP). - int compCount = static_cast<int>(rtpStreams_.size() * 2); + iceOptions.compCountPerStream = ICE_COMP_COUNT_PER_STREAM; auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); - auto transport = iceTransportFactory.createUTransport(getCallId().c_str(), - compCount, - master, - iceOptions); + auto transport = iceTransportFactory.createUTransport(getCallId().c_str(), iceOptions); std::lock_guard<std::mutex> lk(transportMtx_); // Destroy old ice on a separate io pool if (tmpMediaTransport_) diff --git a/src/sip/siptransport.cpp b/src/sip/siptransport.cpp index e3117599fad69f5bb5b88d900743365be406d6b2..2220d5d0bce8d20ef3577c6e4a057bd9793c740f 100644 --- a/src/sip/siptransport.cpp +++ b/src/sip/siptransport.cpp @@ -61,6 +61,7 @@ constexpr const char* TRANSPORT_STATE_STR[] = {"CONNECTED", "DESTROY", "UNKNOWN STATE"}; constexpr const size_t TRANSPORT_STATE_SZ = arraySize(TRANSPORT_STATE_STR); +static constexpr int ICE_COMP_ID_SIP_TRANSP {1}; void SipTransport::deleteTransport(pjsip_transport* t) @@ -402,8 +403,8 @@ SipTransportBroker::getChanneledTransport(const std::shared_ptr<ChannelSocket>& auto ice = socket->underlyingICE(); if (!ice) return {}; - auto local = ice->getLocalAddress(0); - auto remote = ice->getRemoteAddress(0); + auto local = ice->getLocalAddress(ICE_COMP_ID_SIP_TRANSP); + auto remote = ice->getRemoteAddress(ICE_COMP_ID_SIP_TRANSP); auto type = local.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS; auto sips_tr = std::make_unique<tls::ChanneledSIPTransport>(endpt_, type, diff --git a/src/transport/peer_channel.h b/src/transport/peer_channel.h index ba8ac8f31b8d951b9caf48dbe042f14b831aeeaa..7ce6e9252d07b9ff2a1d822d62853d99c22dcd64 100644 --- a/src/transport/peer_channel.h +++ b/src/transport/peer_channel.h @@ -38,12 +38,6 @@ public: o.cv_.notify_all(); } - ssize_t isDataAvailable() - { - std::lock_guard<std::mutex> lk {mutex_}; - return stream_.size(); - } - template<typename Duration> ssize_t wait(Duration timeout, std::error_code& ec) { diff --git a/test/unitTest/Makefile.am b/test/unitTest/Makefile.am index 6f6e2b9cbb805eaadb8c91662d1dc094a7febbce..1db04730a5ec80813789d334f1ee8e2e70bc0b4e 100644 --- a/test/unitTest/Makefile.am +++ b/test/unitTest/Makefile.am @@ -157,4 +157,7 @@ ut_syncHistory_SOURCES = syncHistory/syncHistory.cpp check_PROGRAMS += ut_ice ut_ice_SOURCES = ice/ice.cpp +check_PROGRAMS += ut_ice_sdp_parser +ut_ice_sdp_parser_SOURCES = ice/ice_sdp_parser.cpp + TESTS = $(check_PROGRAMS) diff --git a/test/unitTest/ice/ice.cpp b/test/unitTest/ice/ice.cpp index db21d3fb7212333bde21a429ce82a6742e961f31..d07cb21d84fc4d34cbfef4b2771d0fc47bfd7f5c 100644 --- a/test/unitTest/ice/ice.cpp +++ b/test/unitTest/ice/ice.cpp @@ -121,7 +121,7 @@ IceTest::testRawIceConnection() CPPUNIT_ASSERT(cv_resp.wait_for(lk_resp, std::chrono::seconds(10), [&] { return !response.empty(); })); - auto sdp = IceTransport::parse_SDP(response, *ice_master); + auto sdp = ice_master->parseIceCandidates(response); CPPUNIT_ASSERT( ice_master->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))); }); @@ -130,9 +130,11 @@ IceTest::testRawIceConnection() iceMasterReady = ok; cv.notify_one(); }; + ice_config.master = true; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; + ice_master = Manager::instance().getIceTransportFactory().createTransport("master ICE", - 1, - true, ice_config); cv_create.notify_all(); ice_config.onInitDone = [&](bool ok) { @@ -153,7 +155,7 @@ IceTest::testRawIceConnection() cv_resp.notify_one(); CPPUNIT_ASSERT( cv_init.wait_for(lk_resp, std::chrono::seconds(10), [&] { return !init.empty(); })); - auto sdp = IceTransport::parse_SDP(init, *ice_slave); + auto sdp = ice_slave->parseIceCandidates(init); CPPUNIT_ASSERT( ice_slave->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))); }); @@ -162,9 +164,11 @@ IceTest::testRawIceConnection() iceSlaveReady = ok; cv.notify_one(); }; + ice_config.master = false; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; + ice_slave = Manager::instance().getIceTransportFactory().createTransport("slave ICE", - 1, - false, ice_config); cv_create.notify_all(); CPPUNIT_ASSERT( @@ -216,7 +220,8 @@ IceTest::testTurnMasterIceConnection() CPPUNIT_ASSERT(cv_resp.wait_for(lk_resp, std::chrono::seconds(10), [&] { return !response.empty(); })); - auto sdp = IceTransport::parse_SDP(response, *ice_master); + + auto sdp = ice_master->parseIceCandidates(response); CPPUNIT_ASSERT( ice_master->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))); }); @@ -232,9 +237,10 @@ IceTest::testTurnMasterIceConnection() .setUsername("ring") .setPassword("ring") .setRealm("ring")); + ice_config.master = true; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; ice_master = Manager::instance().getIceTransportFactory().createTransport("master ICE", - 1, - true, ice_config); cv_create.notify_all(); ice_config.turnServers = {}; @@ -265,7 +271,7 @@ IceTest::testTurnMasterIceConnection() cv_resp.notify_one(); CPPUNIT_ASSERT( cv_init.wait_for(lk_resp, std::chrono::seconds(10), [&] { return !init.empty(); })); - auto sdp = IceTransport::parse_SDP(init, *ice_slave); + auto sdp = ice_slave->parseIceCandidates(init); CPPUNIT_ASSERT( ice_slave->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))); }); @@ -274,14 +280,15 @@ IceTest::testTurnMasterIceConnection() iceSlaveReady = ok; cv.notify_one(); }; + ice_config.master = false; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; ice_slave = Manager::instance().getIceTransportFactory().createTransport("slave ICE", - 1, - false, ice_config); cv_create.notify_all(); CPPUNIT_ASSERT( cv.wait_for(lk, std::chrono::seconds(10), [&] { return iceMasterReady && iceSlaveReady; })); - CPPUNIT_ASSERT(ice_master->getLocalAddress(0).toString(false) == turnV4_->toString(false)); + CPPUNIT_ASSERT(ice_master->getLocalAddress(1).toString(false) == turnV4_->toString(false)); } void @@ -329,7 +336,7 @@ IceTest::testTurnSlaveIceConnection() CPPUNIT_ASSERT(cv_resp.wait_for(lk_resp, std::chrono::seconds(10), [&] { return !response.empty(); })); - auto sdp = IceTransport::parse_SDP(response, *ice_master); + auto sdp = ice_master->parseIceCandidates(response); CPPUNIT_ASSERT( ice_master->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))); }); @@ -340,9 +347,10 @@ IceTest::testTurnSlaveIceConnection() }; ice_config.accountPublicAddr = IpAddr(*addr4[0].get()); ice_config.accountLocalAddr = ip_utils::getLocalAddr(AF_INET); + ice_config.master = true; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; ice_master = Manager::instance().getIceTransportFactory().createTransport("master ICE", - 1, - true, ice_config); cv_create.notify_all(); ice_config.onInitDone = [&](bool ok) { @@ -372,7 +380,7 @@ IceTest::testTurnSlaveIceConnection() cv_resp.notify_one(); CPPUNIT_ASSERT( cv_init.wait_for(lk_resp, std::chrono::seconds(10), [&] { return !init.empty(); })); - auto sdp = IceTransport::parse_SDP(init, *ice_slave); + auto sdp = ice_slave->parseIceCandidates(init); CPPUNIT_ASSERT( ice_slave->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))); }); @@ -386,14 +394,15 @@ IceTest::testTurnSlaveIceConnection() .setUsername("ring") .setPassword("ring") .setRealm("ring")); + ice_config.master = false; + ice_config.streamsCount = 1; + ice_config.compCountPerStream = 1; ice_slave = Manager::instance().getIceTransportFactory().createTransport("slave ICE", - 1, - false, ice_config); cv_create.notify_all(); CPPUNIT_ASSERT( cv.wait_for(lk, std::chrono::seconds(10), [&] { return iceMasterReady && iceSlaveReady; })); - CPPUNIT_ASSERT(ice_slave->getLocalAddress(0).toString(false) == turnV4_->toString(false)); + CPPUNIT_ASSERT(ice_slave->getLocalAddress(1).toString(false) == turnV4_->toString(false)); } } // namespace test diff --git a/test/unitTest/ice/ice_sdp_parser.cpp b/test/unitTest/ice/ice_sdp_parser.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0b24caf4e9673e318c699f6bb4a08be0d0e5184c --- /dev/null +++ b/test/unitTest/ice/ice_sdp_parser.cpp @@ -0,0 +1,632 @@ +/* + * Copyright (C) 2021 Savoir-faire Linux Inc. + * + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <cppunit/TestAssert.h> +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include <condition_variable> +#include <string> + +#include "manager.h" +#include "jamidht/jamiaccount.h" +#include "../../test_runner.h" +#include "dring.h" +#include "call_const.h" +#include "account_const.h" +#include "sip/sipcall.h" +#include "media/audio/audio_rtp_session.h" +#include "media/audio/audio_receive_thread.h" + +using namespace DRing::Account; +using namespace DRing::Call; + +namespace jami { +namespace test { + +struct CallData +{ + struct Signal + { + Signal(const std::string& name, const std::string& event = {}) + : name_(std::move(name)) + , event_(std::move(event)) {}; + + std::string name_ {}; + std::string event_ {}; + }; + + std::string accountId_ {}; + std::string userName_ {}; + std::string alias_ {}; + std::string callId_ {}; + std::vector<Signal> signals_; + std::condition_variable cv_ {}; + std::mutex mtx_; + bool compliancyEnabled_ {false}; +}; + +// Used to register a MediaFrame observer to RTP session in order +// to validate the media stream. +class MediaReceiver : public Observer<std::shared_ptr<MediaFrame>> +{ +public: + MediaReceiver(MediaType type) + : mediaType_(type) + , mediaTypeStr_(type == MediaType::MEDIA_AUDIO ? "AUDIO" : "VIDEO") {}; + + virtual ~MediaReceiver() {}; + void update(Observable<std::shared_ptr<jami::MediaFrame>>* observer, + const std::shared_ptr<jami::MediaFrame>& mediaframe) override; + + bool waitForMediaFlow(); + const MediaType mediaType_ {MediaType::MEDIA_NONE}; + const std::string mediaTypeStr_ {}; + const std::chrono::seconds TIME_OUT {10}; + const unsigned REQUIRED_FRAME_COUNT {100}; + +private: + unsigned long frameCounter_ {0}; + std::condition_variable cv_ {}; + std::mutex mtx_; +}; + +void +MediaReceiver::update(Observable<std::shared_ptr<jami::MediaFrame>>*, + const std::shared_ptr<jami::MediaFrame>& frame) +{ + std::unique_lock<std::mutex> lock {mtx_}; + if (frame and frame->getFrame()) + frameCounter_++; + + if (frameCounter_ % 10 == 1) { + JAMI_INFO("[%s] Frame counter %lu", mediaTypeStr_.c_str(), frameCounter_); + } + + if (frameCounter_ >= REQUIRED_FRAME_COUNT) + cv_.notify_one(); +} + +bool +MediaReceiver::waitForMediaFlow() +{ + std::unique_lock<std::mutex> lock {mtx_}; + + return cv_.wait_for(lock, TIME_OUT, [this] { return frameCounter_ > 100; }); +} + +class IceSdpParsingTest : public CppUnit::TestFixture +{ +public: + IceSdpParsingTest() + : audioReceiver_(std::make_shared<MediaReceiver>(MediaType::MEDIA_AUDIO)) + { + // Init daemon + DRing::init(DRing::InitFlag(DRing::DRING_FLAG_DEBUG | DRing::DRING_FLAG_CONSOLE_LOG)); + if (not Manager::instance().initialized) + CPPUNIT_ASSERT(DRing::start("dring-sample.yml")); + + // We must have valid media receiver. + CPPUNIT_ASSERT(audioReceiver_); + } + ~IceSdpParsingTest() { DRing::fini(); } + + static std::string name() { return "IceSdpParsingTest"; } + void setUp(); + void tearDown(); + +private: + // Test cases. + void call_with_rfc5245_compliancy_disabled(); + void call_with_rfc5245_compliancy_enabled(); + + CPPUNIT_TEST_SUITE(IceSdpParsingTest); + CPPUNIT_TEST(call_with_rfc5245_compliancy_disabled); + CPPUNIT_TEST(call_with_rfc5245_compliancy_enabled); + CPPUNIT_TEST_SUITE_END(); + + // Event/Signal handlers + static void onCallStateChange(const std::string& callId, + const std::string& state, + CallData& callData); + static void onIncomingCallWithMedia(const std::string& accountId, + const std::string& callId, + const std::vector<DRing::MediaMap> mediaList, + CallData& callData); + static void onMediaNegotiationStatus(const std::string& callId, + const std::string& event, + CallData& callData); + + // Helpers + void audio_video_call(); + static void configureTest(CallData& bob, CallData& alice); + static std::string getUserAlias(const std::string& callId); + // Wait for a signal from the callbacks. Some signals also report the event that + // triggered the signal a like the StateChange signal. + static bool waitForSignal(CallData& callData, + const std::string& signal, + const std::string& expectedEvent = {}); + static bool attachReceiver(const std::string& callId, std::shared_ptr<MediaReceiver> receiver); + static bool detachReceiver(const std::string& callId, std::shared_ptr<MediaReceiver> receiver); + +private: + CallData aliceData_; + CallData bobData_; + std::shared_ptr<MediaReceiver> audioReceiver_; +}; + +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(IceSdpParsingTest, IceSdpParsingTest::name()); + +void +IceSdpParsingTest::setUp() +{ + std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "ALICE"; + details[ConfProperties::ALIAS] = "ALICE"; + details[ConfProperties::UPNP_ENABLED] = "false"; + aliceData_.accountId_ = Manager::instance().addAccount(details); + + details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "BOB"; + details[ConfProperties::ALIAS] = "BOB"; + details[ConfProperties::UPNP_ENABLED] = "false"; + bobData_.accountId_ = Manager::instance().addAccount(details); + + JAMI_INFO("Initializing accounts ..."); + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceData_.accountId_); + aliceAccount->enableMultiStream(true); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobData_.accountId_); + bobAccount->enableMultiStream(true); + + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::atomic_bool accountsReady {false}; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string&, const std::map<std::string, std::string>&) { + bool ready = false; + auto details = aliceAccount->getVolatileAccountDetails(); + auto daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + ready = (daemonStatus == "REGISTERED"); + details = bobAccount->getVolatileAccountDetails(); + daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + ready &= (daemonStatus == "REGISTERED"); + if (ready) { + accountsReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return accountsReady.load(); })); + DRing::unregisterSignalHandlers(); +} + +void +IceSdpParsingTest::tearDown() +{ + JAMI_INFO("Removing created accounts ..."); + + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + auto currentAccSize = Manager::instance().getAccountList().size(); + std::atomic_bool accountsRemoved {false}; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&]() { + if (Manager::instance().getAccountList().size() <= currentAccSize - 2) { + accountsRemoved = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + Manager::instance().removeAccount(aliceData_.accountId_, true); + Manager::instance().removeAccount(bobData_.accountId_, true); + // Because cppunit is not linked with dbus, just poll if removed + CPPUNIT_ASSERT( + cv.wait_for(lk, std::chrono::seconds(30), [&] { return accountsRemoved.load(); })); + + DRing::unregisterSignalHandlers(); +} + +std::string +IceSdpParsingTest::getUserAlias(const std::string& callId) +{ + auto call = Manager::instance().getCallFromCallID(callId); + + if (not call) { + JAMI_WARN("Call [%s] does not exist!", callId.c_str()); + return {}; + } + + auto const& account = call->getAccount().lock(); + if (not account) { + return {}; + } + + return account->getAccountDetails()[ConfProperties::ALIAS]; +} + +void +IceSdpParsingTest::onIncomingCallWithMedia(const std::string& accountId, + const std::string& callId, + const std::vector<DRing::MediaMap> mediaList, + CallData& callData) +{ + CPPUNIT_ASSERT_EQUAL(callData.accountId_, accountId); + + JAMI_INFO("Signal [%s] - user [%s] - call [%s] - media count [%lu]", + DRing::CallSignal::IncomingCallWithMedia::name, + callData.alias_.c_str(), + callId.c_str(), + mediaList.size()); + + // NOTE. + // We shouldn't access shared_ptr<Call> as this event is supposed to mimic + // the client, and the client have no access to this type. But here, we only + // needed to check if the call exists. This is the most straightforward and + // reliable way to do it until we add a new API (like hasCall(id)). + if (not Manager::instance().getCallFromCallID(callId)) { + JAMI_WARN("Call [%s] does not exist!", callId.c_str()); + callData.callId_ = {}; + return; + } + + std::unique_lock<std::mutex> lock {callData.mtx_}; + callData.callId_ = callId; + callData.signals_.emplace_back(CallData::Signal(DRing::CallSignal::IncomingCallWithMedia::name)); + + callData.cv_.notify_one(); +} + +void +IceSdpParsingTest::onCallStateChange(const std::string& callId, + const std::string& state, + CallData& callData) +{ + auto call = Manager::instance().getCallFromCallID(callId); + if (not call) { + JAMI_WARN("Call [%s] does not exist!", callId.c_str()); + return; + } + + auto account = call->getAccount().lock(); + if (not account) { + JAMI_WARN("Account owning the call [%s] does not exist!", callId.c_str()); + return; + } + + JAMI_INFO("Signal [%s] - user [%s] - call [%s] - state [%s]", + DRing::CallSignal::StateChange::name, + callData.alias_.c_str(), + callId.c_str(), + state.c_str()); + + if (account->getAccountID() != callData.accountId_) + return; + + { + std::unique_lock<std::mutex> lock {callData.mtx_}; + callData.signals_.emplace_back( + CallData::Signal(DRing::CallSignal::StateChange::name, state)); + } + + if (state == "CURRENT" or state == "OVER" or state == "HUNGUP") { + callData.cv_.notify_one(); + } +} + +void +IceSdpParsingTest::onMediaNegotiationStatus(const std::string& callId, + const std::string& event, + CallData& callData) +{ + auto call = Manager::instance().getCallFromCallID(callId); + if (not call) { + JAMI_WARN("Call [%s] does not exist!", callId.c_str()); + return; + } + + auto account = call->getAccount().lock(); + if (not account) { + JAMI_WARN("Account owning the call [%s] does not exist!", callId.c_str()); + return; + } + + JAMI_INFO("Signal [%s] - user [%s] - call [%s] - state [%s]", + DRing::CallSignal::MediaNegotiationStatus::name, + account->getAccountDetails()[ConfProperties::ALIAS].c_str(), + call->getCallId().c_str(), + event.c_str()); + + if (account->getAccountID() != callData.accountId_) + return; + + { + std::unique_lock<std::mutex> lock {callData.mtx_}; + callData.signals_.emplace_back( + CallData::Signal(DRing::CallSignal::MediaNegotiationStatus::name, event)); + } + + callData.cv_.notify_one(); +} + +bool +IceSdpParsingTest::waitForSignal(CallData& callData, + const std::string& expectedSignal, + const std::string& expectedEvent) +{ + const std::chrono::seconds TIME_OUT {30}; + std::unique_lock<std::mutex> lock {callData.mtx_}; + + // Combined signal + event (if any). + std::string sigEvent(expectedSignal); + if (not expectedEvent.empty()) + sigEvent += "::" + expectedEvent; + + JAMI_INFO("[%s] is waiting for [%s] signal/event", callData.alias_.c_str(), sigEvent.c_str()); + + auto res = callData.cv_.wait_for(lock, TIME_OUT, [&] { + // Search for the expected signal in list of received signals. + for (auto it = callData.signals_.begin(); it != callData.signals_.end(); it++) { + // The predicate is true if the signal names match, and if the + // expectedEvent is not empty, the events must also match. + if (it->name_ == expectedSignal + and (expectedEvent.empty() or it->event_ == expectedEvent)) { + // Done with this signal. + callData.signals_.erase(it); + return true; + } + } + // Signal/event not found. + return false; + }); + + if (not res) { + JAMI_ERR("[%s] waiting for signal/event [%s] timed-out!", + callData.alias_.c_str(), + sigEvent.c_str()); + + JAMI_INFO("[%s] currently has the following signals:", callData.alias_.c_str()); + + for (auto const& sig : callData.signals_) { + JAMI_INFO() << "Signal [" << sig.name_ + << (sig.event_.empty() ? "" : ("::" + sig.event_)) << "]"; + } + } + + return res; +} + +bool +IceSdpParsingTest::attachReceiver(const std::string& callId, + std::shared_ptr<MediaReceiver> mediaReceiver) +{ + CPPUNIT_ASSERT(mediaReceiver); + auto call = std::dynamic_pointer_cast<SIPCall>(Manager::instance().getCallFromCallID(callId)); + if (not call) { + JAMI_ERR("Call [%s] does not exist!", callId.c_str()); + } + CPPUNIT_ASSERT(call); + + CPPUNIT_ASSERT(mediaReceiver->mediaType_ == MediaType::MEDIA_AUDIO + or mediaReceiver->mediaType_ == MediaType::MEDIA_VIDEO); + + if (mediaReceiver->mediaType_ == MediaType::MEDIA_AUDIO) { + auto audioRtp = call->getAudioRtp(); + auto receiver = audioRtp->getAudioReceive().get(); + CPPUNIT_ASSERT(receiver != nullptr); + if (receiver == nullptr) + return false; + return receiver->attach(mediaReceiver.get()); + } + + auto videoRtp = call->getVideoRtp(); + auto receiver = videoRtp->getVideoReceive().get(); + CPPUNIT_ASSERT(receiver != nullptr); + return receiver->attach(mediaReceiver.get()); +} + +bool +IceSdpParsingTest::detachReceiver(const std::string& callId, + std::shared_ptr<MediaReceiver> mediaReceiver) +{ + auto call = std::dynamic_pointer_cast<SIPCall>(Manager::instance().getCallFromCallID(callId)); + CPPUNIT_ASSERT(call); + + CPPUNIT_ASSERT(mediaReceiver); + CPPUNIT_ASSERT(mediaReceiver->mediaType_ == MediaType::MEDIA_AUDIO + or mediaReceiver->mediaType_ == MediaType::MEDIA_VIDEO); + + if (mediaReceiver->mediaType_ == MediaType::MEDIA_AUDIO) { + auto audioRtp = call->getAudioRtp(); + auto receiver = audioRtp->getAudioReceive().get(); + CPPUNIT_ASSERT(receiver != nullptr); + return receiver->detach(mediaReceiver.get()); + } + + auto videoRtp = call->getVideoRtp(); + auto receiver = videoRtp->getVideoReceive().get(); + CPPUNIT_ASSERT(receiver != nullptr); + return receiver->detach(mediaReceiver.get()); +} + +void +IceSdpParsingTest::configureTest(CallData& aliceData, CallData& bobData) +{ + { + CPPUNIT_ASSERT(not aliceData.accountId_.empty()); + auto const& account = Manager::instance().getAccount<JamiAccount>(aliceData.accountId_); + aliceData.userName_ = account->getAccountDetails()[ConfProperties::USERNAME]; + aliceData.alias_ = account->getAccountDetails()[ConfProperties::ALIAS]; + + account->enableIceCompIdRfc5245Compliance(aliceData.compliancyEnabled_); + } + + { + CPPUNIT_ASSERT(not bobData.accountId_.empty()); + auto const& account = Manager::instance().getAccount<JamiAccount>(bobData.accountId_); + bobData.userName_ = account->getAccountDetails()[ConfProperties::USERNAME]; + bobData.alias_ = account->getAccountDetails()[ConfProperties::ALIAS]; + + account->enableIceCompIdRfc5245Compliance(bobData.compliancyEnabled_); + } + + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> signalHandlers; + + // Insert needed signal handlers. + signalHandlers.insert(DRing::exportable_callback<DRing::CallSignal::IncomingCallWithMedia>( + [&](const std::string& accountId, + const std::string& callId, + const std::string&, + const std::vector<DRing::MediaMap> mediaList) { + auto user = getUserAlias(callId); + if (not user.empty()) + onIncomingCallWithMedia(accountId, + callId, + mediaList, + user == aliceData.alias_ ? aliceData : bobData); + })); + + signalHandlers.insert(DRing::exportable_callback<DRing::CallSignal::StateChange>( + [&](const std::string& callId, const std::string& state, signed) { + auto user = getUserAlias(callId); + if (not user.empty()) + onCallStateChange(callId, state, user == aliceData.alias_ ? aliceData : bobData); + })); + + signalHandlers.insert(DRing::exportable_callback<DRing::CallSignal::MediaNegotiationStatus>( + [&](const std::string& callId, const std::string& event) { + auto user = getUserAlias(callId); + if (not user.empty()) + onMediaNegotiationStatus(callId, + event, + user == aliceData.alias_ ? aliceData : bobData); + })); + + DRing::registerSignalHandlers(signalHandlers); +} + +void +IceSdpParsingTest::audio_video_call() +{ + JAMI_INFO("Waiting for accounts setup ..."); + // TODO remove. This sleeps is because it take some time for the DHT to be connected + // and account announced + std::this_thread::sleep_for(std::chrono::seconds(10)); + + configureTest(aliceData_, bobData_); + + JAMI_INFO("=== Start a call and validate ==="); + + MediaAttribute audio(MediaType::MEDIA_AUDIO); + audio.label_ = "main audio"; + MediaAttribute video(MediaType::MEDIA_VIDEO); + video.label_ = "main video"; + + std::vector<MediaAttribute> offer; + offer.emplace_back(audio); + offer.emplace_back(video); + std::vector<MediaAttribute> answer; + answer.emplace_back(audio); + answer.emplace_back(video); + + auto const& aliceCall = std::dynamic_pointer_cast<SIPCall>( + (Manager::instance().getAccount<JamiAccount>(aliceData_.accountId_)) + ->newOutgoingCall(bobData_.userName_, offer)); + CPPUNIT_ASSERT(aliceCall); + aliceData_.callId_ = aliceCall->getCallId(); + + JAMI_INFO("ALICE [%s] started a call with BOB [%s] and wait for answer", + aliceData_.accountId_.c_str(), + bobData_.accountId_.c_str()); + + // Wait for incoming call signal. + CPPUNIT_ASSERT(waitForSignal(bobData_, DRing::CallSignal::IncomingCallWithMedia::name)); + // Answer the call. + { + auto const& mediaList = MediaAttribute::mediaAttributesToMediaMaps(answer); + Manager::instance().answerCallWithMedia(bobData_.callId_, mediaList); + } + + // Wait for media negotiation complete signal. + CPPUNIT_ASSERT(waitForSignal(bobData_, + DRing::CallSignal::MediaNegotiationStatus::name, + MediaNegotiationStatusEvents::NEGOTIATION_SUCCESS)); + // Wait for the StateChange signal. + CPPUNIT_ASSERT( + waitForSignal(bobData_, DRing::CallSignal::StateChange::name, StateEvent::CURRENT)); + + JAMI_INFO("BOB answered the call [%s]", bobData_.callId_.c_str()); + + // Wait for media negotiation complete signal. + CPPUNIT_ASSERT(waitForSignal(aliceData_, + DRing::CallSignal::MediaNegotiationStatus::name, + MediaNegotiationStatusEvents::NEGOTIATION_SUCCESS)); + + // Give some time to media to start. + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Register the media observer to validate media flow. + // NOTE: For now, only audio is validated. + CPPUNIT_ASSERT(attachReceiver(aliceData_.callId_, audioReceiver_)); + + JAMI_INFO("Waiting for media fot flow ..."); + CPPUNIT_ASSERT(audioReceiver_->waitForMediaFlow()); + CPPUNIT_ASSERT(detachReceiver(aliceData_.callId_, audioReceiver_)); + + // Bob hang-up. + JAMI_INFO("Hang up BOB's call and wait for ALICE to hang up"); + Manager::instance().hangupCall(bobData_.callId_); + + CPPUNIT_ASSERT_EQUAL(true, + waitForSignal(aliceData_, + DRing::CallSignal::StateChange::name, + StateEvent::HUNGUP)); + + JAMI_INFO("Call terminated on both sides"); +} + +void +IceSdpParsingTest::call_with_rfc5245_compliancy_disabled() +{ + JAMI_INFO("=== Begin test %s ===", __FUNCTION__); + + aliceData_.compliancyEnabled_ = bobData_.compliancyEnabled_ = false; + audio_video_call(); +} + +void +IceSdpParsingTest::call_with_rfc5245_compliancy_enabled() +{ + JAMI_INFO("=== Begin test %s ===", __FUNCTION__); + + aliceData_.compliancyEnabled_ = bobData_.compliancyEnabled_ = true; + audio_video_call(); +} + +} // namespace test +} // namespace jami + +RING_TEST_RUNNER(jami::test::IceSdpParsingTest::name())