diff --git a/daemon/src/Makefile.am b/daemon/src/Makefile.am index 7a641800bde8fbf1db4ec36859168ffd2b3a7e17..ff5a087321471c013b3fbff9da6835a6039f4308 100644 --- a/daemon/src/Makefile.am +++ b/daemon/src/Makefile.am @@ -101,6 +101,8 @@ libsflphone_la_SOURCES = conference.cpp \ ip_utils.h \ ip_utils.cpp \ utf8_utils.cpp \ + ice_transport.cpp \ + ice_transport.h \ plugin_manager.cpp \ plugin_loader_dl.cpp \ ring_plugin.h \ diff --git a/daemon/src/audio/audiortp/avformat_rtp_session.cpp b/daemon/src/audio/audiortp/avformat_rtp_session.cpp index 78c11db8ff4b740f3cd622c7ab2a6d245c8c3159..f0c55b985ab71be456b893473f925756746ff3a9 100644 --- a/daemon/src/audio/audiortp/avformat_rtp_session.cpp +++ b/daemon/src/audio/audiortp/avformat_rtp_session.cpp @@ -473,6 +473,23 @@ AVFormatRtpSession::start(int localPort) startReceiver(); } +void +AVFormatRtpSession::start(std::unique_ptr<IceSocket> rtp_sock, + std::unique_ptr<IceSocket> rtcp_sock) +{ + std::lock_guard<std::recursive_mutex> lock(mutex_); + + if (not sending_ and not receiving_) { + stop(); + return; + } + + socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock))); + + startSender(); + startReceiver(); +} + void AVFormatRtpSession::stop() { diff --git a/daemon/src/audio/audiortp/avformat_rtp_session.h b/daemon/src/audio/audiortp/avformat_rtp_session.h index de1e1c9ffb5af6c01b88a1f43a73a7ae84b4e79d..b94c00e6625e6db8f8c950315fba1eb6e921d3aa 100644 --- a/daemon/src/audio/audiortp/avformat_rtp_session.h +++ b/daemon/src/audio/audiortp/avformat_rtp_session.h @@ -54,6 +54,7 @@ class RingBuffer; class Resampler; class AudioSender; class AudioReceiveThread; +class IceSocket; class AVFormatRtpSession { public: @@ -62,6 +63,8 @@ class AVFormatRtpSession { ~AVFormatRtpSession(); void start(int localPort); + void start(std::unique_ptr<IceSocket> rtp_sock, + std::unique_ptr<IceSocket> rtcp_sock); void stop(); void updateDestination(const std::string& destination, unsigned int port); void updateSDP(const Sdp &sdp); diff --git a/daemon/src/call.cpp b/daemon/src/call.cpp index 3b3aacf4d6ec0bf91bc06df4a292157458c21eea..9b361cdb108d7ad3e93ff9c2fa066a6396fe7b9b 100644 --- a/daemon/src/call.cpp +++ b/daemon/src/call.cpp @@ -60,6 +60,7 @@ void Call::removeCall() { Manager::instance().callFactory.removeCall(*this); + iceTransport_.reset(); } const std::string& @@ -300,3 +301,82 @@ Call::getNullDetails() details["ACCOUNTID"] = ""; return details; } + +void +Call::initIceTransport(bool master, unsigned channel_num) +{ + auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); + const auto& on_initdone = [this, master](sfl::IceTransport& iceTransport, bool done) { + if (done) { + if (master) + iceTransport.setInitiatorSession(); + else + iceTransport.setSlaveSession(); + } + + { + std::unique_lock<std::mutex> lk(callMutex_); + iceTransportInitDone_ = done; + } + + iceCV_.notify_one(); + }; + const auto& on_negodone = [this, master](sfl::IceTransport& /*iceTransport*/, bool done) { + { + std::unique_lock<std::mutex> lk(callMutex_); + iceTransportNegoDone_ = done; + } + + iceCV_.notify_one(); + }; + + iceTransport_ = iceTransportFactory.createTransport(getCallId().c_str(), channel_num, + on_initdone, + on_negodone); +} + +int +Call::waitForIceInitialization(unsigned timeout) +{ + std::unique_lock<std::mutex> lk(callMutex_); + if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), + [this]{ return iceTransportInitDone_; })) { + SFL_WARN("waitForIceInitialization: timeout"); + return -1; + } + SFL_DBG("waitForIceInitialization: %u", iceTransportInitDone_); + return iceTransportInitDone_; +} + +int +Call::waitForIceNegotiation(unsigned timeout) +{ + std::unique_lock<std::mutex> lk(callMutex_); + if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), + [this]{ return iceTransportNegoDone_; })) { + SFL_WARN("waitForIceNegotiation: timeout"); + return -1; + } + SFL_DBG("waitForIceNegotiation: %u", iceTransportNegoDone_); + return iceTransportNegoDone_; +} + +bool +Call::isIceUsed() const +{ + std::unique_lock<std::mutex> lk(callMutex_); + return iceTransportInitDone_; +} + +bool +Call::isIceRunning() const +{ + std::unique_lock<std::mutex> lk(callMutex_); + return iceTransportNegoDone_; +} + +sfl::IceSocket* +Call::newIceSocket(unsigned compId) const +{ + return new sfl::IceSocket(iceTransport_, compId); +} diff --git a/daemon/src/call.h b/daemon/src/call.h index 48d4ed0b3ea09653db57a90b96bcd25b96d53380..d5f0105bec748f1f9211b5b7a991870ffa01effb 100644 --- a/daemon/src/call.h +++ b/daemon/src/call.h @@ -38,12 +38,14 @@ #include "audio/recordable.h" #include "ip_utils.h" +#include "ice_transport.h" #include <mutex> #include <map> #include <sstream> #include <memory> #include <vector> +#include <condition_variable> class VoIPLink; class Account; @@ -55,7 +57,7 @@ template <class T> using CallMap = std::map<std::string, std::shared_ptr<T> >; * @brief A call is the base class for protocol-based calls */ -class Call : public sfl::Recordable { +class Call : public sfl::Recordable, public std::enable_shared_from_this<Call> { public: static const char * const DEFAULT_ID; @@ -300,6 +302,19 @@ class Call : public sfl::Recordable { void removeCall(); + void initIceTransport(bool master, unsigned channel_num=4); + + int waitForIceInitialization(unsigned timeout); + + int waitForIceNegotiation(unsigned timeout); + + bool isIceUsed() const; + bool isIceRunning() const; + sfl::IceSocket* newIceSocket(unsigned compId) const; + std::shared_ptr<sfl::IceTransport> getIceTransport() const { + return iceTransport_; + } + protected: /** * Constructor of a call @@ -308,6 +323,8 @@ class Call : public sfl::Recordable { */ Call(Account& account, const std::string& id, Call::CallType type); + std::shared_ptr<sfl::IceTransport> iceTransport_ {}; + private: bool validTransition(CallState newState); @@ -355,6 +372,11 @@ class Call : public sfl::Recordable { time_t timestamp_start_ {0}; time_t timestamp_stop_ {0}; + + /** ICE support */ + std::condition_variable iceCV_ {}; + bool iceTransportInitDone_ {false}; + bool iceTransportNegoDone_ {false}; }; #endif // __CALL_H__ diff --git a/daemon/src/ice_socket.h b/daemon/src/ice_socket.h new file mode 100644 index 0000000000000000000000000000000000000000..0d29d9c45a81487b6409f9023c4d573d314389e3 --- /dev/null +++ b/daemon/src/ice_socket.h @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2004-2014 Savoir-Faire Linux Inc. + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Additional permission under GNU GPL version 3 section 7: + * + * If you modify this program, or any covered work, by linking or + * combining it with the OpenSSL project's OpenSSL library (or a + * modified version of that library), containing parts covered by the + * terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc. + * grants you additional permission to convey the resulting work. + * Corresponding Source for a non-source form of such a combination + * shall include the source code for the parts of OpenSSL used as well + * as that of the covered work. + */ +#ifndef ICE_SOCKET_H +#define ICE_SOCKET_H + +#include <memory> + +namespace sfl { + +class IceTransport; + +class IceSocket +{ + private: + std::shared_ptr<IceTransport> ice_transport_ {}; + int compId_ = -1; + + public: + IceSocket(std::shared_ptr<IceTransport> iceTransport, int compId) + : ice_transport_(iceTransport), compId_(compId) {} + + void close(); + ssize_t recv(unsigned char* buf, size_t len); + ssize_t send(const unsigned char* buf, size_t len); + ssize_t getNextPacketSize() const; + ssize_t waitForData(unsigned int timeout); +}; + +}; + +#endif /* ICE_SOCKET_H */ diff --git a/daemon/src/ice_transport.cpp b/daemon/src/ice_transport.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f075dff61bafbb07a4d5d6576c6149b1df3d02e5 --- /dev/null +++ b/daemon/src/ice_transport.cpp @@ -0,0 +1,695 @@ +/* + * Copyright (C) 2004-2014 Savoir-Faire Linux Inc. + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Additional permission under GNU GPL version 3 section 7: + * + * If you modify this program, or any covered work, by linking or + * combining it with the OpenSSL project's OpenSSL library (or a + * modified version of that library), containing parts covered by the + * terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc. + * grants you additional permission to convey the resulting work. + * Corresponding Source for a non-source form of such a combination + * shall include the source code for the parts of OpenSSL used as well + * as that of the covered work. + */ + +#include "ice_transport.h" +#include "ice_socket.h" +#include "logger.h" +#include "sip/sip_utils.h" +#include "manager.h" + +#include <pjlib.h> +#include <utility> +#include <algorithm> +#include <sstream> + +#define TRY(ret) do { \ + if ((ret) != PJ_SUCCESS) \ + throw std::runtime_error(#ret " failed"); \ + } while (0) + +namespace sfl { + +// GLOBALS (blame PJSIP) + +static pj_caching_pool g_cp_; +static pj_pool_t* g_pool_ = nullptr; + +static void +register_thread() +{ + // We have to register the external thread so it could access the pjsip frameworks + if (!pj_thread_is_registered()) { +#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) + static thread_local pj_thread_desc desc; + static thread_local pj_thread_t *this_thread; +#else + static __thread pj_thread_desc desc; + static __thread pj_thread_t *this_thread; +#endif + SFL_DBG("Registering thread"); + pj_thread_register(NULL, desc, &this_thread); + } +} + +IceTransport::Packet::Packet(void *pkt, pj_size_t size) + : data(new char[size]), datalen(size) +{ + std::copy_n(reinterpret_cast<char*>(pkt), size, data.get()); +} + + +void +IceTransport::cb_on_rx_data(pj_ice_strans* ice_st, + unsigned comp_id, + void *pkt, pj_size_t size, + const pj_sockaddr_t* /*src_addr*/, + unsigned /*src_addr_len*/) +{ + if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st))) + tr->onReceiveData(comp_id, pkt, size); + else + SFL_WARN("null IceTransport"); +} + +void +IceTransport::cb_on_ice_complete(pj_ice_strans* ice_st, + pj_ice_strans_op op, + pj_status_t status) +{ + if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st))) + tr->onComplete(ice_st, op, status); + else + SFL_WARN("null IceTransport"); +} + +IceTransport::IceTransport(const char* name, int component_count, + IceTransportCompleteCb on_initdone_cb, + IceTransportCompleteCb on_negodone_cb) + : on_initdone_cb_(on_initdone_cb) + , on_negodone_cb_(on_negodone_cb) + , component_count_(component_count) + , compIO_(component_count) +{ + auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); + pj_ice_strans_cb icecb; + + pj_bzero(&icecb, sizeof(icecb)); + icecb.on_rx_data = cb_on_rx_data; + icecb.on_ice_complete = cb_on_ice_complete; + + pj_ice_strans *icest = nullptr; + pj_status_t status = pj_ice_strans_create(name, + iceTransportFactory.getIceCfg(), + component_count, this, &icecb, + &icest); + if (status != PJ_SUCCESS || icest == nullptr) + throw std::runtime_error("pj_ice_strans_create() failed"); +} + +void +IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, + pj_status_t status) +{ + const char *opname = + op == PJ_ICE_STRANS_OP_INIT? "initialization" : + op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; + + if (!icest_.get()) + icest_.reset(ice_st); + + const bool done = status == PJ_SUCCESS; + + if (!done) { + char errmsg[PJ_ERR_MSG_SIZE]; + pj_strerror(status, errmsg, sizeof(errmsg)); + SFL_ERR("ICE %s failed: %s", opname, errmsg); + } + SFL_DBG("ICE %s with %s", opname, done?"success":"error"); + if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_) { + on_initdone_cb_(*this, done); + } else if (op == PJ_ICE_STRANS_OP_NEGOTIATION and on_negodone_cb_) { + on_negodone_cb_(*this, done); + } +} + +void +IceTransport::getUFragPwd() +{ + pj_str_t local_ufrag, local_pwd; + pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, NULL, NULL); + local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); + local_pwd_.assign(local_pwd.ptr, local_pwd.slen); +} + +void +IceTransport::getDefaultCanditates() +{ + for (unsigned i=0; i < component_count_; ++i) + pj_ice_strans_get_def_cand(icest_.get(), i+1, &cand_[i]); +} + +bool +IceTransport::createIceSession(pj_ice_sess_role role) +{ + if (pj_ice_strans_init_ice(icest_.get(), role, NULL, NULL) != PJ_SUCCESS) { + SFL_ERR("pj_ice_strans_init_ice() failed"); + return false; + } + + // Fetch some information on local configuration + getUFragPwd(); + getDefaultCanditates(); + SFL_DBG("ICE [local] ufrag=%s, pwd=%s", local_ufrag_.c_str(), local_pwd_.c_str()); + return true; +} + +bool +IceTransport::setInitiatorSession() +{ + SFL_DBG("ICE as master"); + if (isInitialized()) { + auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING); + if (status != PJ_SUCCESS) { + SFL_ERR("ICE role change failed"); + sip_utils::sip_strerror(status); + return false; + } + return true; + } + return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING); +} + +bool +IceTransport::setSlaveSession() +{ + SFL_DBG("ICE as slave"); + if (isInitialized()) { + auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED); + if (status != PJ_SUCCESS) { + SFL_ERR("ICE role change failed"); + sip_utils::sip_strerror(status); + return false; + } + return true; + } + createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED); +} + +bool +IceTransport::start(const Attribute& rem_attrs, + const std::vector<IceCandidate>& rem_candidates) +{ + pj_str_t ufrag, pwd; + SFL_DBG("ICE negotiation starting (%u remote candidates)", rem_candidates.size()); + auto status = pj_ice_strans_start_ice(icest_.get(), + pj_cstr(&ufrag, rem_attrs.ufrag.c_str()), + pj_cstr(&pwd, rem_attrs.pwd.c_str()), + rem_candidates.size(), + rem_candidates.data()); + if (status != PJ_SUCCESS) { + SFL_ERR("ICE start failed"); + sip_utils::sip_strerror(status); + return false; + } + return true; +} + +std::string +IceTransport::unpackLine(std::vector<uint8_t>::const_iterator& begin, + std::vector<uint8_t>::const_iterator& end) +{ + if (std::distance(begin, end) <= 0) + return {}; + + // Search for EOL + std::vector<uint8_t>::const_iterator line_end(begin); + while (line_end != end && *line_end != NEW_LINE && *line_end) + ++line_end; + + if (std::distance(begin, line_end) <= 0) + return {}; + + std::string str(begin, line_end); + + // Consume the new line character + if (std::distance(line_end, end) > 0) + ++line_end; + + begin = line_end; + return str; +} + +bool +IceTransport::start(const std::vector<uint8_t>& rem_data) +{ + auto begin = rem_data.cbegin(); + auto end = rem_data.cend(); + auto rem_ufrag = unpackLine(begin, end); + auto rem_pwd = unpackLine(begin, end); + if (rem_pwd.empty() or rem_pwd.empty()) { + SFL_ERR("ICE remote attributes parsing error"); + return false; + } + std::vector<IceCandidate> rem_candidates; + try { + while (true) { + IceCandidate candidate; + const auto line = unpackLine(begin, end); + if (line.empty()) + break; + if (getCandidateFromSDP(line, candidate)) + rem_candidates.push_back(candidate); + } + } catch (std::exception& e) { + SFL_ERR("ICE remote candidates parsing error"); + return false; + } + return start({rem_ufrag, rem_pwd}, rem_candidates); +} + +bool +IceTransport::stop() +{ + if (not pj_ice_strans_has_sess(icest_.get())) { + SFL_ERR("Session not created yet"); + return false; + } + + auto status = pj_ice_strans_stop_ice(icest_.get()); + if (status != PJ_SUCCESS) { + SFL_ERR("ICE start failed"); + sip_utils::sip_strerror(status); + return false; + } + + return true; +} + +bool +IceTransport::isInitialized() const +{ + return pj_ice_strans_has_sess(icest_.get()); +} + +bool +IceTransport::isStarted() const +{ + return pj_ice_strans_sess_is_running(icest_.get()); +} + +bool +IceTransport::isCompleted() const +{ + return pj_ice_strans_sess_is_complete(icest_.get()); +} + +IpAddr +IceTransport::getLocalAddress(unsigned comp_id) const +{ + if (isInitialized()) + return cand_[comp_id].addr; + return {}; +} + +IpAddr +IceTransport::getRemoteAddress(unsigned comp_id) const +{ + if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) + return sess->rcand->addr; + return {}; +} + +const IceTransport::Attribute +IceTransport::getLocalAttributes() const +{ + return {local_ufrag_, local_pwd_}; +} + +std::vector<std::string> +IceTransport::getLocalCandidates(unsigned comp_id) const +{ + std::vector<std::string> res; + pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)]; + unsigned cand_cnt = PJ_ARRAY_SIZE(cand); + + if (pj_ice_strans_enum_cands(icest_.get(), comp_id+1, &cand_cnt, cand) != PJ_SUCCESS) { + SFL_ERR("pj_ice_strans_enum_cands() failed"); + return res; + } + + for (unsigned i=0; i<cand_cnt; ++i) { + std::ostringstream val; + char ipaddr[PJ_INET6_ADDRSTRLEN]; + + val << std::string(cand[i].foundation.ptr, cand[i].foundation.slen); + val << " " << (unsigned)cand[i].comp_id << " UDP " << cand[i].prio; + val << " " << pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0); + val << " " << (unsigned)pj_sockaddr_get_port(&cand[i].addr); + val << " typ " << pj_ice_get_cand_type_name(cand[i].type); + + res.push_back(val.str()); + } + + return res; +} + +std::vector<uint8_t> +IceTransport::getLocalAttributesAndCandidates() const +{ + std::stringstream ss; + ss << local_ufrag_ << NEW_LINE; + ss << local_pwd_ << NEW_LINE; + for (unsigned i=0; i<component_count_; i++) { + const auto& candidates = getLocalCandidates(i); + for (const auto& c : candidates) + ss << c << NEW_LINE; + } + auto str(ss.str()); + std::vector<uint8_t> ret(str.begin(), str.end()); + return ret; +} + +void +IceTransport::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size) +{ + if (!comp_id or comp_id > component_count_) { + SFL_ERR("rx: invalid comp_id (%u)", comp_id); + return; + } + if (!size) + return; + auto& io = compIO_[comp_id-1]; + std::unique_lock<std::mutex> lk(io.mutex); + if (io.cb) { + io.cb((uint8_t*)pkt, size); + } else { + io.queue.emplace_back(pkt, size); + io.cv.notify_one(); + } +} + +bool +IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) +{ + char foundation[33], transport[13], ipaddr[81], type[33]; + pj_str_t tmpaddr; + int af, comp_id, prio, port; + int cnt = sscanf(line.c_str(), "%32s %d %12s %d %80s %d typ %32s", + foundation, + &comp_id, + transport, + &prio, + ipaddr, + &port, + type); + + if (cnt != 7) { + SFL_WARN("ICE: invalid remote candidate line"); + return false; + } + + pj_bzero(&cand, sizeof(IceCandidate)); + + if (strcmp(type, "host")==0) + cand.type = PJ_ICE_CAND_TYPE_HOST; + else if (strcmp(type, "srflx")==0) + cand.type = PJ_ICE_CAND_TYPE_SRFLX; + else if (strcmp(type, "relay")==0) + cand.type = PJ_ICE_CAND_TYPE_RELAYED; + else { + SFL_WARN("ICE: invalid remote candidate type '%s'", type); + return false; + } + + cand.comp_id = (pj_uint8_t)comp_id; + cand.prio = prio; + + if (strchr(ipaddr, ':')) + af = pj_AF_INET6(); + else + af = pj_AF_INET(); + + tmpaddr = pj_str(ipaddr); + pj_sockaddr_init(af, &cand.addr, NULL, 0); + auto status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr); + if (status != PJ_SUCCESS) { + SFL_ERR("ICE: invalid remote IP address '%s'", ipaddr); + return false; + } + + pj_sockaddr_set_port(&cand.addr, (pj_uint16_t)port); + pj_strdup2(g_pool_, &cand.foundation, foundation); + return true; +} + +ssize_t +IceTransport::recv(int comp_id, unsigned char* buf, size_t len) +{ + register_thread(); + auto& io = compIO_[comp_id]; + std::unique_lock<std::mutex> lk(io.mutex); + + if (io.queue.empty()) + return 0; + + auto& packet = io.queue.front(); + const auto count = std::min(len, packet.datalen); + std::copy_n(packet.data.get(), count, buf); + io.queue.pop_front(); + + return count; +} + +void +IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) +{ + auto& io = compIO_[comp_id]; + std::unique_lock<std::mutex> lk(io.mutex); + io.cb = cb; + + if (cb) { + // Flush existing queue using the callback + for (const auto& packet : io.queue) + io.cb((uint8_t*)packet.data.get(), packet.datalen); + io.queue.clear(); + } +} + +ssize_t +IceTransport::send(int comp_id, const unsigned char* buf, size_t len) +{ + register_thread(); + auto remote = getRemoteAddress(comp_id); + if (!remote) { + SFL_ERR("Can't find remote address for component %d", comp_id); + return -1; + } + auto status = pj_ice_strans_sendto(icest_.get(), comp_id+1, buf, len, remote.pjPtr(), remote.getLength()); + if (status != PJ_SUCCESS) { + sip_utils::sip_strerror(status); + SFL_ERR("send failed"); + return -1; + } + return len; +} + +ssize_t +IceTransport::getNextPacketSize(int comp_id) +{ + auto& io = compIO_[comp_id]; + std::unique_lock<std::mutex> lk(io.mutex); + if (io.queue.empty()) { + return 0; + } + return io.queue.front().datalen; +} + +ssize_t +IceTransport::waitForData(int comp_id, unsigned int timeout) +{ + auto& io = compIO_[comp_id]; + std::unique_lock<std::mutex> lk(io.mutex); + if (!io.cv.wait_for(lk, std::chrono::milliseconds(timeout), + [&io]{ return !io.queue.empty(); })) { + return 0; + } + return io.queue.front().datalen; +} + +IceTransportFactory::IceTransportFactory() : + ice_cfg_(), thread_(nullptr, pj_thread_destroy) +{ + pj_caching_pool_init(&g_cp_, NULL, 0); + g_pool_ = pj_pool_create(&g_cp_.factory, "icetransportpool", + 512, 512, NULL); + if (not g_pool_) + throw std::runtime_error("pj_pool_create() failed"); + + pj_ice_strans_cfg_default(&ice_cfg_); + ice_cfg_.stun_cfg.pf = &g_cp_.factory; + + TRY( pj_timer_heap_create(g_pool_, 100, &ice_cfg_.stun_cfg.timer_heap) ); + TRY( pj_ioqueue_create(g_pool_, 16, &ice_cfg_.stun_cfg.ioqueue) ); + + pj_thread_t* thread = nullptr; + const auto& thread_work = [](void* udata) { + register_thread(); + return static_cast<IceTransportFactory*>(udata)->processThread(); + }; + TRY( pj_thread_create(g_pool_, "icetransportpool", + thread_work, this, 0, 0, &thread) ); + thread_.reset(thread); + + ice_cfg_.af = pj_AF_INET(); + + //ice_cfg_.stun.max_host_cands = icedemo.opt.max_host; + ice_cfg_.opt.aggressive = PJ_FALSE; + + // TODO: STUN server candidate + + // TODO: TURN server candidate +} + +IceTransportFactory::~IceTransportFactory() +{ + pj_thread_sleep(500); + + thread_quit_flag_ = PJ_TRUE; + if (thread_) { + pj_thread_join(thread_.get()); + thread_.reset(); + } + + if (ice_cfg_.stun_cfg.ioqueue) + pj_ioqueue_destroy(ice_cfg_.stun_cfg.ioqueue); + + if (ice_cfg_.stun_cfg.timer_heap) + pj_timer_heap_destroy(ice_cfg_.stun_cfg.timer_heap); + + pj_pool_release(g_pool_); + pj_caching_pool_destroy(&g_cp_); +} + +int +IceTransportFactory::processThread() +{ + while (!thread_quit_flag_) { + handleEvents(500, NULL); + } + + return 0; +} + +int +IceTransportFactory::handleEvents(unsigned max_msec, unsigned *p_count) +{ + enum { MAX_NET_EVENTS = 1 }; + pj_time_val max_timeout = {0, 0}; + pj_time_val timeout = {0, 0}; + unsigned count = 0, net_event_count = 0; + int c; + + max_timeout.msec = max_msec; + + timeout.sec = timeout.msec = 0; + c = pj_timer_heap_poll(ice_cfg_.stun_cfg.timer_heap, &timeout); + if (c > 0) + count += c; + + pj_assert(timeout.sec >= 0 && timeout.msec >= 0); + if (timeout.msec >= 1000) timeout.msec = 999; + + if (PJ_TIME_VAL_GT(timeout, max_timeout)) + timeout = max_timeout; + + do { + c = pj_ioqueue_poll(ice_cfg_.stun_cfg.ioqueue, &timeout); + if (c < 0) { + pj_status_t err = pj_get_netos_error(); + pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); + if (p_count) + *p_count = count; + return err; + } else if (c == 0) { + break; + } else { + net_event_count += c; + timeout.sec = timeout.msec = 0; + } + } while (c > 0 && net_event_count < MAX_NET_EVENTS); + + count += net_event_count; + if (p_count) + *p_count = count; + + return PJ_SUCCESS; +} + + +std::shared_ptr<IceTransport> +IceTransportFactory::createTransport(const char* name, + int component_count, + IceTransportCompleteCb&& on_initdone_cb, + IceTransportCompleteCb&& on_negodone_cb) +{ + return std::make_shared<IceTransport>(name, component_count, + std::forward<IceTransportCompleteCb>(on_initdone_cb), + std::forward<IceTransportCompleteCb>(on_negodone_cb)); +} + +void +IceSocket::close() +{ + ice_transport_.reset(); +} + +ssize_t +IceSocket::recv(unsigned char* buf, size_t len) +{ + if (!ice_transport_.get()) + return -1; + return ice_transport_->recv(compId_, buf, len); +} + +ssize_t +IceSocket::send(const unsigned char* buf, size_t len) +{ + if (!ice_transport_.get()) + return -1; + return ice_transport_->send(compId_, buf, len); +} + +ssize_t +IceSocket::getNextPacketSize() const +{ + if (!ice_transport_.get()) + return -1; + return ice_transport_->getNextPacketSize(compId_); +} + +ssize_t +IceSocket::waitForData(unsigned int timeout) +{ + if (!ice_transport_.get()) + return -1; + + return ice_transport_->waitForData(compId_, timeout); +} + +} diff --git a/daemon/src/ice_transport.h b/daemon/src/ice_transport.h new file mode 100644 index 0000000000000000000000000000000000000000..d782f2cc62747bba4a7399a882935792132ae14c --- /dev/null +++ b/daemon/src/ice_transport.h @@ -0,0 +1,226 @@ +/* + * Copyright (C) 2004-2014 Savoir-Faire Linux Inc. + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Additional permission under GNU GPL version 3 section 7: + * + * If you modify this program, or any covered work, by linking or + * combining it with the OpenSSL project's OpenSSL library (or a + * modified version of that library), containing parts covered by the + * terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc. + * grants you additional permission to convey the resulting work. + * Corresponding Source for a non-source form of such a combination + * shall include the source code for the parts of OpenSSL used as well + * as that of the covered work. + */ + +#ifndef ICE_TRANSPORT_H +#define ICE_TRANSPORT_H + +#include "ice_socket.h" +#include "ip_utils.h" + +#include <pjnath.h> +#include <pjlib.h> +#include <pjlib-util.h> + +#include <map> +#include <functional> +#include <memory> +#include <atomic> +#include <vector> +#include <queue> +#include <mutex> +#include <condition_variable> + +namespace sfl { + +class IceTransport; + +using IceTransportCompleteCb = std::function<void(IceTransport&, bool)>; +using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>; +using IceCandidate = pj_ice_sess_cand; + +class IceTransport { + public: + using Attribute = struct { + std::string ufrag; + std::string pwd; + }; + + /** + * Constructor + */ + IceTransport(const char* name, int component_count, + IceTransportCompleteCb on_initdone_cb, + IceTransportCompleteCb on_negodone_cb); + + /** + * Set/change transport role as initiator. + * Should be called before start method. + */ + bool setInitiatorSession(); + + /** + * Set/change transport role as slave. + * Should be called before start method. + */ + bool setSlaveSession(); + + /** + * Start tranport negociation between local candidates and given remote + * to find the right candidate pair. + * This function doesn't block, the callback on_negodone_cb will be called + * with the negotiation result when operation is really done. + * Return false if negotiation cannot be started else true. + */ + bool start(const Attribute& rem_attrs, + const std::vector<IceCandidate>& rem_candidates); + bool start(const std::vector<uint8_t>& attrs_candidates); + + /** + * Stop a started or completed transport. + */ + bool stop(); + + bool isInitialized() const; + + bool isStarted() const; + + bool isCompleted() const; + + IpAddr getLocalAddress(unsigned comp_id) const; + + IpAddr getRemoteAddress(unsigned comp_id) const; + + IpAddr getDefaultLocalAddress() const { + return getLocalAddress(0); + } + + /** + * Return ICE session attributes + */ + const Attribute getLocalAttributes() const; + + /** + * Return ICE session attributes + */ + std::vector<std::string> getLocalCandidates(unsigned comp_id) const; + + /** + * Returns serialized ICE attributes and candidates. + */ + std::vector<uint8_t> getLocalAttributesAndCandidates() const; + + bool getCandidateFromSDP(const std::string& line, IceCandidate& cand); + + // I/O methods + + void setOnRecv(unsigned comp_id, IceRecvCb cb); + + ssize_t recv(int comp_id, unsigned char* buf, size_t len); + + ssize_t send(int comp_id, const unsigned char* buf, size_t len); + + ssize_t getNextPacketSize(int comp_id); + + ssize_t waitForData(int comp_id, unsigned int timeout); + + private: + static constexpr int MAX_CANDIDATES {32}; + + // New line character used for (de)serialisation + static constexpr char NEW_LINE = '\n'; + + static void cb_on_rx_data(pj_ice_strans *ice_st, + unsigned comp_id, + void *pkt, pj_size_t size, + const pj_sockaddr_t *src_addr, + unsigned src_addr_len); + + static void cb_on_ice_complete(pj_ice_strans *ice_st, + pj_ice_strans_op op, + pj_status_t status); + + static std::string unpackLine(std::vector<uint8_t>::const_iterator& begin, + std::vector<uint8_t>::const_iterator& end); + + struct IceSTransDeleter { + void operator ()(pj_ice_strans* ptr) { + pj_ice_strans_stop_ice(ptr); + pj_ice_strans_destroy(ptr); + } + }; + + void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, + pj_status_t status); + + void onReceiveData(unsigned comp_id, void *pkt, pj_size_t size); + + bool createIceSession(pj_ice_sess_role role); + + void getUFragPwd(); + + void getDefaultCanditates(); + + IceTransportCompleteCb on_initdone_cb_; + IceTransportCompleteCb on_negodone_cb_; + std::unique_ptr<pj_ice_strans, IceSTransDeleter> icest_; + unsigned component_count_; + pj_ice_sess_cand cand_[MAX_CANDIDATES] {}; + std::string local_ufrag_; + std::string local_pwd_; + pj_sockaddr remoteAddr_; + + struct Packet { + Packet(void *pkt, pj_size_t size); + std::unique_ptr<char> data; + size_t datalen; + }; + struct ComponentIO { + std::mutex mutex; + std::condition_variable cv; + std::deque<Packet> queue; + IceRecvCb cb; + }; + std::vector<ComponentIO> compIO_; +}; + +class IceTransportFactory { + public: + IceTransportFactory(); + ~IceTransportFactory(); + + std::shared_ptr<IceTransport> createTransport(const char* name, + int component_count, + IceTransportCompleteCb&& on_initdone_cb, + IceTransportCompleteCb&& on_negodone_cb); + + int processThread(); + + const pj_ice_strans_cfg* getIceCfg() const { return &ice_cfg_; } + + private: + int handleEvents(unsigned max_msec, unsigned *p_count); + + pj_ice_strans_cfg ice_cfg_; + std::unique_ptr<pj_thread_t, decltype(*pj_thread_destroy)> thread_; + pj_bool_t thread_quit_flag_ {PJ_FALSE}; +}; + +}; + +#endif /* ICE_TRANSPORT_H */ diff --git a/daemon/src/managerimpl.cpp b/daemon/src/managerimpl.cpp index 3697822dc3ad739920456c6ba101c810483e7ca7..6ae01b3f1b3bf67610db46255d9712ea1319453e 100644 --- a/daemon/src/managerimpl.cpp +++ b/daemon/src/managerimpl.cpp @@ -79,6 +79,7 @@ #endif #include "conference.h" +#include "ice_transport.h" #include <cerrno> #include <algorithm> @@ -95,6 +96,8 @@ using namespace sfl; std::atomic_bool ManagerImpl::initialized = {false}; +using namespace sfl; + static void copy_over(const std::string &srcPath, const std::string &destPath) { @@ -137,7 +140,7 @@ ManagerImpl::ManagerImpl() : waitingCalls_(), waitingCallsMutex_(), path_() , ringbufferpool_(new sfl::RingBufferPool) , callFactory(), conferenceMap_(), history_(), - finished_(false), accountFactory_() + finished_(false), accountFactory_(), ice_tf_() { // initialize random generator // mt19937_64 should be seeded with 2 x 32 bits @@ -176,6 +179,20 @@ ManagerImpl::init(const std::string &config_file) // FIXME: this is no good initialized = true; +#define TRY(ret) do { \ + if (ret != PJ_SUCCESS) \ + throw std::runtime_error(#ret " failed"); \ + } while (0) + + // Our PJSIP dependency (SIP and ICE) + TRY(pj_init()); + TRY(pjlib_util_init()); + TRY(pjnath_init()); + +#undef TRY + + ice_tf_.reset(new IceTransportFactory()); + path_ = config_file.empty() ? retrieveConfigPath() : config_file; SFL_DBG("Configuration file path: %s", path_.c_str()); @@ -265,6 +282,9 @@ ManagerImpl::finish() audiodriver_.reset(); } + + ice_tf_.reset(); + pj_shutdown(); } catch (const VoipLinkException &err) { SFL_ERR("%s", err.what()); } diff --git a/daemon/src/managerimpl.h b/daemon/src/managerimpl.h index f18e9b11999a87d8f6ce9582763affc8428d716b..d665bb31f229bd0df58b3d2b9c1819699296e6ce 100644 --- a/daemon/src/managerimpl.h +++ b/daemon/src/managerimpl.h @@ -74,10 +74,12 @@ namespace sfl { class RingBufferPool; class DTMF; class TelephoneTone; + class IceTransportFactory; } class PluginManager; + /** To send multiple string */ typedef std::list<std::string> TokenList; @@ -972,6 +974,8 @@ class ManagerImpl { */ void unregisterEventHandler(uintptr_t handlerId); + sfl::IceTransportFactory& getIceTransportFactory() { return *ice_tf_; } + private: NON_COPYABLE(ManagerImpl); @@ -1009,6 +1013,9 @@ class ManagerImpl { void loadAccount(const YAML::Node &item, int &errorCount, const std::string &accountOrder); + + /* ICE support */ + std::unique_ptr<sfl::IceTransportFactory> ice_tf_; }; #endif // MANAGER_IMPL_H_ diff --git a/daemon/src/sip/sdp.cpp b/daemon/src/sip/sdp.cpp index 509bff3dfe197d7427635f3bd634e527c8e6ad18..b25b3af157a8ba5751b264b5b35c169cc4fae451 100644 --- a/daemon/src/sip/sdp.cpp +++ b/daemon/src/sip/sdp.cpp @@ -45,6 +45,7 @@ #endif #include <algorithm> +#include <cassert> using std::string; using std::map; @@ -841,7 +842,6 @@ void Sdp::addSdesAttribute(const vector<std::string>& crypto) } } - void Sdp::addZrtpAttribute(pjmedia_sdp_media* media, std::string hash) { /* Format: ":version value" */ @@ -853,6 +853,75 @@ void Sdp::addZrtpAttribute(pjmedia_sdp_media* media, std::string hash) throw SdpException("Could not add zrtp attribute to media"); } +void +Sdp::addIceCandidates(unsigned media_index, const std::vector<std::string>& cands) +{ + assert(media_index < localSession_->media_count); + auto media = localSession_->media[media_index]; + + for (const auto &item : cands) { + pj_str_t val = { (char*) item.c_str(), static_cast<pj_ssize_t>(item.size()) }; + pjmedia_sdp_attr *attr = pjmedia_sdp_attr_create(memPool_, "candidate", &val); + + if (pjmedia_sdp_media_add_attr(media, attr) != PJ_SUCCESS) + throw SdpException("Could not add ICE candidates attribute to media"); + } +} + +std::vector<std::string> +Sdp::getIceCandidates(unsigned media_index) const +{ + auto session = remoteSession_ ? remoteSession_ : activeRemoteSession_; + assert(session); + assert(media_index < session->media_count); + auto media = session->media[media_index]; + std::vector<std::string> candidates; + + for (unsigned i=0; i < media->attr_count; i++) { + pjmedia_sdp_attr *attribute = media->attr[i]; + if (pj_stricmp2(&attribute->name, "candidate") == 0) + candidates.push_back(std::string(attribute->value.ptr, attribute->value.slen)); + } + + return candidates; +} + +void +Sdp::addIceAttributes(const sfl::IceTransport::Attribute&& ice_attrs) +{ + pj_str_t value; + pjmedia_sdp_attr *attr; + + value = { (char*)ice_attrs.ufrag.c_str(), static_cast<pj_ssize_t>(ice_attrs.ufrag.size()) }; + attr = pjmedia_sdp_attr_create(memPool_, "ice-ufrag", &value); + + if (pjmedia_sdp_attr_add(&localSession_->attr_count, localSession_->attr, attr) != PJ_SUCCESS) + throw SdpException("Could not add ICE.ufrag attribute to local SDP"); + + value = { (char*)ice_attrs.pwd.c_str(), static_cast<pj_ssize_t>(ice_attrs.pwd.size()) }; + attr = pjmedia_sdp_attr_create(memPool_, "ice-pwd", &value); + + if (pjmedia_sdp_attr_add(&localSession_->attr_count, localSession_->attr, attr) != PJ_SUCCESS) + throw SdpException("Could not add ICE.pwd attribute to local SDP"); +} + +sfl::IceTransport::Attribute +Sdp::getIceAttributes() const +{ + sfl::IceTransport::Attribute ice_attrs; + auto session = remoteSession_ ? remoteSession_ : activeRemoteSession_; + assert(session); + + for (unsigned i=0; i < session->attr_count; i++) { + pjmedia_sdp_attr *attribute = session->attr[i]; + if (pj_stricmp2(&attribute->name, "ice-ufrag") == 0) + ice_attrs.ufrag.assign(attribute->value.ptr, attribute->value.slen); + else if (pj_stricmp2(&attribute->name, "ice-pwd") == 0) + ice_attrs.pwd.assign(attribute->value.ptr, attribute->value.slen); + } + return ice_attrs; +} + // Returns index of desired media attribute, or -1 if not found */ static int getIndexOfAttribute(const pjmedia_sdp_session * const session, const char * const type) diff --git a/daemon/src/sip/sdp.h b/daemon/src/sip/sdp.h index eae0a82031f4d919cc374216d6dbd28f81711ee3..5948b15763728e8af3d4e857e73f5808c8a09c96 100644 --- a/daemon/src/sip/sdp.h +++ b/daemon/src/sip/sdp.h @@ -34,6 +34,7 @@ #include "noncopyable.h" #include "ip_utils.h" +#include "ice_transport.h" #include <pjmedia/sdp.h> #include <pjmedia/sdp_neg.h> @@ -89,6 +90,10 @@ class Sdp { return localSession_; } + const pjmedia_sdp_session *getActiveLocalSdpSession() const { + return activeLocalSession_; + } + /** * Read accessor. Get the remote passive sdp session information before negotiation * @@ -98,6 +103,10 @@ class Sdp { return remoteSession_; } + const pjmedia_sdp_session *getActiveRemoteSdpSession() const { + return activeRemoteSession_; + } + /** * Set the negotiated sdp offer from the sip payload. * @@ -270,7 +279,16 @@ class Sdp { bool getOutgoingVideoSettings(std::map<std::string, std::string> &settings) const; bool getOutgoingAudioSettings(std::map<std::string, std::string> &settings) const; + void addIceAttributes(const sfl::IceTransport::Attribute&& ice_attrs); + sfl::IceTransport::Attribute getIceAttributes() const; + + void addIceCandidates(unsigned media_index, + const std::vector<std::string>& cands); + + std::vector<std::string> getIceCandidates(unsigned media_index) const; + private: + NON_COPYABLE(Sdp); friend class SDPTest; diff --git a/daemon/src/sip/sipaccount.cpp b/daemon/src/sip/sipaccount.cpp index d5e1a86e87301aeb7c50994810ebcfe506915b5a..0ccc54d006182921b32289d09d0cc7443285f172 100644 --- a/daemon/src/sip/sipaccount.cpp +++ b/daemon/src/sip/sipaccount.cpp @@ -214,6 +214,8 @@ SIPAccount::newOutgoingCall(const std::string& id, const std::string& toUrl) SFL_DBG("UserAgent: New registered account call to %s", toUrl.c_str()); } + call->initIceTransport(true); + call->setIPToIP(isIP2IP()); call->setPeerNumber(toUri); call->initRecFilename(to); @@ -275,6 +277,9 @@ SIPAccount::newOutgoingCall(const std::string& id, const std::string& toUrl) bool SIPAccount::SIPStartCall(std::shared_ptr<SIPCall>& call) { + // Add Ice headers to local SDP + call->setupLocalSDPFromIce(); + std::string toUri(call->getPeerNumber()); // expecting a fully well formed sip uri pj_str_t pjTo = pj_str((char*) toUri.c_str()); diff --git a/daemon/src/sip/sipcall.cpp b/daemon/src/sip/sipcall.cpp index fc03d138b78c22db86cdb8453d000d1696ecc551..35174f8f1e19722c7e8be00ea6c281695d577c4d 100644 --- a/daemon/src/sip/sipcall.cpp +++ b/daemon/src/sip/sipcall.cpp @@ -42,20 +42,26 @@ #include "sdp.h" #include "manager.h" #include "array_size.h" - +#include "audio/audiolayer.h" #if USE_CCRTP #include "audio/audiortp/audio_rtp_factory.h" // for AudioRtpFactoryException #else #include "audio/audiortp/avformat_rtp_session.h" #endif +#include "client/callmanager.h" #if HAVE_INSTANT_MESSAGING #include "im/instant_messaging.h" #endif #ifdef SFL_VIDEO +#include "video/video_rtp_session.h" #include "client/videomanager.h" +#include <chrono> + +using namespace sfl; + static sfl_video::VideoSettings getSettings() { @@ -66,6 +72,19 @@ getSettings() static const int INITIAL_SIZE = 16384; static const int INCREMENT_SIZE = INITIAL_SIZE; +static constexpr int DEFAULT_ICE_INIT_TIMEOUT {10}; // seconds +static constexpr int DEFAULT_ICE_NEGO_TIMEOUT {60}; // seconds + +// SDP media Ids +static constexpr int SDP_AUDIO_MEDIA_ID {0}; +static constexpr int SDP_VIDEO_MEDIA_ID {1}; + +// ICE components Id used on SIP +static constexpr int ICE_AUDIO_RTP_COMPID {0}; +static constexpr int ICE_AUDIO_RTCP_COMPID {1}; +static constexpr int ICE_VIDEO_RTP_COMPID {2}; +static constexpr int ICE_VIDEO_RTCP_COMPID {3}; + /** A map to retreive SFLphone internal call id * Given a SIP call ID (usefull for transaction sucha as transfer)*/ @@ -141,12 +160,12 @@ SIPCall::stopRtpIfCurrent() { if (Manager::instance().isCurrentCall(*this)) { #if USE_CCRTP - getAudioRtp().stop(); + audiortp_.stop(); #else avformatrtp_->stop(); #endif #ifdef SFL_VIDEO - getVideoRtp().stop(); + videortp_.stop(); #endif } } @@ -314,6 +333,10 @@ void SIPCall::answer() throw std::runtime_error("Could not send invite request answer (200 OK)"); } + if (iceTransport_->isStarted()) + waitForIceNegotiation(DEFAULT_ICE_NEGO_TIMEOUT); + startAllMedia(); + setConnectionState(CONNECTED); setState(ACTIVE); } @@ -756,8 +779,200 @@ void SIPCall::onAnswered() { if (getConnectionState() != Call::CONNECTED) { + if (iceTransport_->isStarted()) + waitForIceNegotiation(DEFAULT_ICE_NEGO_TIMEOUT); + startAllMedia(); setConnectionState(Call::CONNECTED); setState(Call::ACTIVE); Manager::instance().peerAnsweredCall(getCallId()); } } + +void +SIPCall::setupLocalSDPFromIce() +{ + if (waitForIceInitialization(DEFAULT_ICE_INIT_TIMEOUT) <= 0) { + SFL_ERR("ICE init failed, ICE will not be used for medias"); + return; + } + + sdp_->addIceAttributes(iceTransport_->getLocalAttributes()); + + // Add video and audio channels + sdp_->addIceCandidates(SDP_AUDIO_MEDIA_ID, iceTransport_->getLocalCandidates(ICE_AUDIO_RTP_COMPID)); + sdp_->addIceCandidates(SDP_AUDIO_MEDIA_ID, iceTransport_->getLocalCandidates(ICE_AUDIO_RTCP_COMPID)); +#ifdef SFL_VIDEO + sdp_->addIceCandidates(SDP_VIDEO_MEDIA_ID, iceTransport_->getLocalCandidates(ICE_VIDEO_RTP_COMPID)); + sdp_->addIceCandidates(SDP_VIDEO_MEDIA_ID, iceTransport_->getLocalCandidates(ICE_VIDEO_RTCP_COMPID)); +#endif +} + +std::vector<sfl::IceCandidate> +SIPCall::getAllRemoteCandidates() +{ + std::vector<sfl::IceCandidate> rem_candidates; + + auto addSDPCandidates = [this](unsigned sdpMediaId, + std::vector<sfl::IceCandidate>& out) { + IceCandidate cand; + for (auto& line : sdp_->getIceCandidates(sdpMediaId)) { + if (iceTransport_->getCandidateFromSDP(line, cand)) + out.emplace_back(cand); + } + }; + + addSDPCandidates(SDP_AUDIO_MEDIA_ID, rem_candidates); +#ifdef SFL_VIDEO + addSDPCandidates(SDP_VIDEO_MEDIA_ID, rem_candidates); +#endif + + return rem_candidates; +} + +bool +SIPCall::startIce() +{ + if (iceTransport_->isStarted() || iceTransport_->isCompleted()) + return true; + auto rem_ice_attrs = sdp_->getIceAttributes(); + if (rem_ice_attrs.ufrag.empty() or rem_ice_attrs.pwd.empty()) { + SFL_ERR("ICE empty attributes"); + return false; + } + return iceTransport_->start(rem_ice_attrs, getAllRemoteCandidates()); +} + +void +SIPCall::startAllMedia() +{ + auto& remoteIP = sdp_->getRemoteIP(); +#if USE_CCRTP + try { + audiortp_.updateDestinationIpAddress(); + } catch (const AudioRtpFactoryException &e) { + SFL_ERR("%s", e.what()); + } + + audiortp_.setDtmfPayloadType(sdp_->getTelephoneEventType()); +#else + avformatrtp_->updateSDP(*sdp_); + avformatrtp_->updateDestination(remoteIP, sdp_->getRemoteAudioPort()); + if (isIceRunning()) { + std::unique_ptr<sfl::IceSocket> sockRTP(newIceSocket(0)); + std::unique_ptr<sfl::IceSocket> sockRTCP(newIceSocket(1)); + avformatrtp_->start(std::move(sockRTP), std::move(sockRTCP)); + } else { + const auto localAudioPort = sdp_->getLocalAudioPort(); + avformatrtp_->start(localAudioPort ? localAudioPort : sdp_->getRemoteAudioPort()); + } +#endif + +#ifdef SFL_VIDEO + auto remoteVideoPort = sdp_->getRemoteVideoPort(); + videortp_.updateSDP(*sdp_); + videortp_.updateDestination(remoteIP, remoteVideoPort); + if (isIceRunning()) { + std::unique_ptr<sfl::IceSocket> sockRTP(newIceSocket(2)); + std::unique_ptr<sfl::IceSocket> sockRTCP(newIceSocket(3)); + try { + videortp_.start(std::move(sockRTP), std::move(sockRTCP)); + } catch (const std::runtime_error &e) { + SFL_ERR("videortp_.start() with ICE failed, %s", e.what()); + } + } else { + const auto localVideoPort = sdp_->getLocalVideoPort(); + try { + videortp_.start(localVideoPort ? localVideoPort : remoteVideoPort); + } catch (const std::runtime_error &e) { + SFL_ERR("videortp_.start() failed, %s", e.what()); + } + } +#endif + + // Get the crypto attribute containing srtp's cryptographic context (keys, cipher) + CryptoOffer crypto_offer; + getSDP().getRemoteSdpCryptoFromOffer(sdp_->getActiveRemoteSdpSession(), crypto_offer); + +#if USE_CCRTP && HAVE_SDES + bool nego_success = false; + + if (!crypto_offer.empty()) { + std::vector<sfl::CryptoSuiteDefinition> localCapabilities; + + for (size_t i = 0; i < SFL_ARRAYSIZE(sfl::CryptoSuites); ++i) + localCapabilities.push_back(sfl::CryptoSuites[i]); + + sfl::SdesNegotiator sdesnego(localCapabilities, crypto_offer); + auto callMgr = Manager::instance().getClient()->getCallManager(); + + if (sdesnego.negotiate()) { + nego_success = true; + + try { + audiortp_.setRemoteCryptoInfo(sdesnego); + callMgr->secureSdesOn(getCallId()); + } catch (const AudioRtpFactoryException &e) { + SFL_ERR("%s", e.what()); + callMgr->secureSdesOff(getCallId()); + } + } else { + SFL_ERR("SDES negotiation failure"); + callMgr->secureSdesOff(getCallId()); + } + } else { + SFL_DBG("No crypto offer available"); + } + + // We did not find any crypto context for this media, RTP fallback + if (!nego_success && audiortp_.isSdesEnabled()) { + SFL_ERR("Negotiation failed but SRTP is enabled, fallback on RTP"); + audiortp_.stop(); + audiortp_.setSrtpEnabled(false); + + const auto& account = getSIPAccount(); + if (account.getSrtpFallback()) { + audiortp_.initSession(); + + if (account.isStunEnabled()) + updateSDPFromSTUN(); + } + } +#endif // USE_CCRTP && HAVE_SDES + + std::vector<sfl::AudioCodec*> sessionMedia(sdp_->getSessionAudioMedia()); + + if (sessionMedia.empty()) { + SFL_WARN("Session media is empty"); + return; + } + + try { + Manager::instance().startAudioDriverStream(); + + std::vector<AudioCodec*> audioCodecs; + + for (const auto & i : sessionMedia) { + if (!i) + continue; + + const int pl = i->getPayloadType(); + + sfl::AudioCodec *ac = Manager::instance().audioCodecFactory.instantiateCodec(pl); + + if (!ac) { + SFL_ERR("Could not instantiate codec %d", pl); + } else { + audioCodecs.push_back(ac); + } + } + +#if USE_CCRTP + if (not audioCodecs.empty()) + getAudioRtp().updateSessionMedia(audioCodecs); +#endif + } catch (const SdpException &e) { + SFL_ERR("%s", e.what()); + } catch (const std::exception &rtpException) { + SFL_ERR("%s", rtpException.what()); + } +} diff --git a/daemon/src/sip/sipcall.h b/daemon/src/sip/sipcall.h index c20bdf6f60843edda9a1e3f9e7ff677233bb9d54..5b24b7c4376a31d6dc62854f6e4e6bc66a2c2f14 100644 --- a/daemon/src/sip/sipcall.h +++ b/daemon/src/sip/sipcall.h @@ -194,6 +194,12 @@ class SIPCall : public Call */ void onClosed(); + void setupLocalSDPFromIce(); + + bool startIce(); + + void startAllMedia(); + private: NON_COPYABLE(SIPCall); @@ -212,6 +218,8 @@ class SIPCall : public Call int SIPSessionReinvite(); + std::vector<sfl::IceCandidate> getAllRemoteCandidates(); + #if USE_CCRTP /** * Audio Rtp Session factory diff --git a/daemon/src/sip/sipvoiplink.cpp b/daemon/src/sip/sipvoiplink.cpp index 2be08be206dfbf7af2355350f35fc559d151c48a..4bf5c4e232d74bb9f023a450d8eed13a296c0863 100644 --- a/daemon/src/sip/sipvoiplink.cpp +++ b/daemon/src/sip/sipvoiplink.cpp @@ -379,6 +379,8 @@ transaction_request_cb(pjsip_rx_data *rdata) #endif call->getSDP().receiveOffer(r_sdp, account->getActiveAudioCodecs(), account->getActiveVideoCodecs()); + call->initIceTransport(false); + call->setupLocalSDPFromIce(); sfl::AudioCodec* ac = Manager::instance().audioCodecFactory.instantiateCodec(PAYLOAD_CODEC_ULAW); @@ -835,9 +837,10 @@ invite_session_state_changed_cb(pjsip_inv_session *inv, pjsip_event *ev) if (!inv) return; - auto call = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); - if (!call) + auto call_ptr = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + if (!call_ptr) return; + auto call = std::static_pointer_cast<SIPCall>(call_ptr->shared_from_this()); if (ev and inv->state != PJSIP_INV_STATE_CONFIRMED) { // Update UI with the current status code and description @@ -897,9 +900,10 @@ sdp_request_offer_cb(pjsip_inv_session *inv, const pjmedia_sdp_session *offer) if (!inv) return; - auto call = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); - if (!call) + auto call_ptr = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + if (!call_ptr) return; + auto call = std::static_pointer_cast<SIPCall>(call_ptr->shared_from_this()); const auto& account = call->getSIPAccount(); auto& localSDP = call->getSDP(); @@ -916,9 +920,10 @@ sdp_create_offer_cb(pjsip_inv_session *inv, pjmedia_sdp_session **p_offer) if (!inv or !p_offer) return; - auto call = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); - if (!call) + auto call_ptr = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + if (!call_ptr) return; + auto call = std::static_pointer_cast<SIPCall>(call_ptr->shared_from_this()); const auto& account = call->getSIPAccount(); @@ -996,11 +1001,12 @@ sdp_media_update_cb(pjsip_inv_session *inv, pj_status_t status) if (!inv) return; - auto call = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); - if (!call) { + auto call_ptr = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + if (!call_ptr) { SFL_DBG("Call declined by peer, SDP negotiation stopped"); return; } + auto call = std::static_pointer_cast<SIPCall>(call_ptr->shared_from_this()); if (status != PJ_SUCCESS) { const int reason = inv->state != PJSIP_INV_STATE_NULL and @@ -1032,116 +1038,9 @@ sdp_media_update_cb(pjsip_inv_session *inv, pj_status_t status) // Update connection information sdp.setMediaTransportInfoFromRemoteSdp(); -#if USE_CCRTP - auto& audioRTP = call->getAudioRtp(); - try { - audioRTP.updateDestinationIpAddress(); - } catch (const AudioRtpFactoryException &e) { - SFL_ERR("%s", e.what()); - } - audioRTP.setDtmfPayloadType(sdp.getTelephoneEventType()); -#else - call->getAVFormatRTP().updateSDP(sdp); - call->getAVFormatRTP().updateDestination(sdp.getRemoteIP(), sdp.getRemoteAudioPort()); - auto localAudioPort = sdp.getLocalAudioPort(); - if (!localAudioPort) - localAudioPort = sdp.getRemoteAudioPort(); - call->getAVFormatRTP().start(localAudioPort); -#endif - -#ifdef SFL_VIDEO - auto& videoRTP = call->getVideoRtp(); - videoRTP.updateSDP(sdp); - videoRTP.updateDestination(sdp.getRemoteIP(), sdp.getRemoteVideoPort()); - const auto localVideoPort = sdp.getLocalVideoPort(); - videoRTP.start(localVideoPort ? localVideoPort : sdp.getRemoteVideoPort()); -#endif - - // Get the crypto attribute containing srtp's cryptographic context (keys, cipher) - CryptoOffer crypto_offer; - sdp.getRemoteSdpCryptoFromOffer(remoteSDP, crypto_offer); - -#if USE_CCRTP && HAVE_SDES - bool nego_success = false; - - if (!crypto_offer.empty()) { - std::vector<sfl::CryptoSuiteDefinition> localCapabilities; - - for (size_t i = 0; i < SFL_ARRAYSIZE(sfl::CryptoSuites); ++i) - localCapabilities.push_back(sfl::CryptoSuites[i]); - - sfl::SdesNegotiator sdesnego(localCapabilities, crypto_offer); - - if (sdesnego.negotiate()) { - nego_success = true; - - try { - audioRTP.setRemoteCryptoInfo(sdesnego); - Manager::instance().getClient()->getCallManager()->secureSdesOn(call->getCallId()); - } catch (const AudioRtpFactoryException &e) { - SFL_ERR("%s", e.what()); - Manager::instance().getClient()->getCallManager()->secureSdesOff(call->getCallId()); - } - } else { - SFL_ERR("SDES negotiation failure"); - Manager::instance().getClient()->getCallManager()->secureSdesOff(call->getCallId()); - } - } else { - SFL_DBG("No crypto offer available"); - } - - // We did not find any crypto context for this media, RTP fallback - if (!nego_success && audioRTP.isSdesEnabled()) { - SFL_ERR("Negotiation failed but SRTP is enabled, fallback on RTP"); - audioRTP.stop(); - audioRTP.setSrtpEnabled(false); - - const auto& account = call->getSIPAccount(); - if (account.getSrtpFallback()) { - audioRTP.initSession(); - - if (account.isStunEnabled()) - call->updateSDPFromSTUN(); - } - } -#endif // HAVE_SDES - - std::vector<sfl::AudioCodec*> sessionMedia(sdp.getSessionAudioMedia()); - - if (sessionMedia.empty()) { - SFL_WARN("Session media is empty"); - return; - } - - try { - Manager::instance().startAudioDriverStream(); - - std::vector<AudioCodec*> audioCodecs; - - for (const auto & i : sessionMedia) { - if (!i) - continue; - - const int pl = i->getPayloadType(); - - sfl::AudioCodec *ac = Manager::instance().audioCodecFactory.instantiateCodec(pl); - - if (!ac) { - SFL_ERR("Could not instantiate codec %d", pl); - } else { - audioCodecs.push_back(ac); - } - } - -#if USE_CCRTP - if (not audioCodecs.empty()) - call->getAudioRtp().updateSessionMedia(audioCodecs); -#endif - } catch (const SdpException &e) { - SFL_ERR("%s", e.what()); - } catch (const std::exception &rtpException) { - SFL_ERR("%s", rtpException.what()); - } + // Handle possible ICE transport + if (!call->startIce()) + SFL_WARN("ICE not started"); } static void @@ -1221,7 +1120,10 @@ transaction_state_changed_cb(pjsip_inv_session * inv, pjsip_transaction *tsx, return; } - auto call = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + auto call_ptr = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + if (!call_ptr) + return; + auto call = std::static_pointer_cast<SIPCall>(call_ptr->shared_from_this()); if (event->body.rx_msg.rdata) { pjsip_rx_data *r_data = event->body.rx_msg.rdata; @@ -1316,9 +1218,10 @@ transaction_state_changed_cb(pjsip_inv_session * inv, pjsip_transaction *tsx, static void onCallTransfered(pjsip_inv_session *inv, pjsip_rx_data *rdata) { - auto currentCall = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); - if (!currentCall) + auto call_ptr = static_cast<SIPCall*>(inv->mod_data[mod_ua_.id]); + if (!call_ptr) return; + auto currentCall = std::static_pointer_cast<SIPCall>(call_ptr->shared_from_this()); static const pj_str_t str_refer_to = CONST_PJ_STR("Refer-To"); pjsip_generic_string_hdr *refer_to = static_cast<pjsip_generic_string_hdr*> diff --git a/daemon/src/video/socket_pair.cpp b/daemon/src/video/socket_pair.cpp index 2371630bb766641f7d259b131612b3e95d41d3b7..38f9fa15a3e2b77940233679a8489805d879a03b 100644 --- a/daemon/src/video/socket_pair.cpp +++ b/daemon/src/video/socket_pair.cpp @@ -32,6 +32,7 @@ #include "libav_deps.h" #include "socket_pair.h" +#include "ice_socket.h" #include "libav_utils.h" #include "logger.h" @@ -136,23 +137,34 @@ namespace sfl_video { static const int RTP_BUFFER_SIZE = 1472; -SocketPair::SocketPair(const char *uri, int localPort) : - rtcpWriteMutex_(), - rtpHandle_(0), - rtcpHandle_(0), - rtpDestAddr_(), - rtpDestAddrLen_(), - rtcpDestAddr_(), - rtcpDestAddrLen_(), - interrupted_(false) +SocketPair::SocketPair(const char *uri, int localPort) + : rtp_sock_() + , rtcp_sock_() + , rtcpWriteMutex_() + , rtpDestAddr_() + , rtpDestAddrLen_() + , rtcpDestAddr_() + , rtcpDestAddrLen_() { openSockets(uri, localPort); } +SocketPair::SocketPair(std::unique_ptr<sfl::IceSocket> rtp_sock, + std::unique_ptr<sfl::IceSocket> rtcp_sock) + : rtp_sock_(std::move(rtp_sock)) + , rtcp_sock_(std::move(rtcp_sock)) + , rtcpWriteMutex_() + , rtpDestAddr_() + , rtpDestAddrLen_() + , rtcpDestAddr_() + , rtcpDestAddrLen_() +{} + SocketPair::~SocketPair() { interrupted_ = true; - closeSockets(); + if (rtpHandle_ >= 0) + closeSockets(); } void SocketPair::interrupt() @@ -193,6 +205,9 @@ void SocketPair::openSockets(const char *uri, int local_rtp_port) closeSockets(); throw std::runtime_error("Socket creation failed"); } + + SFL_WARN("SocketPair: local{%d,%d}, remote{%d,%d}", + local_rtp_port, local_rtcp_port, rtp_port, rtcp_port); } VideoIOHandle* SocketPair::createIOContext() @@ -202,67 +217,131 @@ VideoIOHandle* SocketPair::createIOContext() reinterpret_cast<void*>(this)); } +int +SocketPair::waitForData() +{ + if (rtpHandle_ >= 0) { + // work with system socket + struct pollfd p[2] = { {rtpHandle_, POLLIN, 0}, + {rtcpHandle_, POLLIN, 0} }; + return poll(p, 2, NET_POLL_TIMEOUT); + } + + // work with IceSocket + auto result = rtp_sock_->waitForData(NET_POLL_TIMEOUT); + if (result < 0) { + errno = EIO; + return -1; + } + + return result; +} + +int +SocketPair::readRtpData(void *buf, int buf_size) +{ + if (rtpHandle_ >= 0) { + // work with system socket + struct sockaddr_storage from; + socklen_t from_len = sizeof(from); + auto result = recvfrom(rtpHandle_, buf, buf_size, 0, + (struct sockaddr *)&from, &from_len); + return result; + } + + // work with IceSocket + auto result = rtp_sock_->recv(static_cast<unsigned char*>(buf), buf_size); + if (result < 0) { + errno = EIO; + return -1; + } + if (result == 0) { + errno = EAGAIN; + return -1; + } + return result; +} + +int +SocketPair::readRtcpData(void *buf, int buf_size) +{ + if (rtcpHandle_ >= 0) { + // work with system socket + struct sockaddr_storage from; + socklen_t from_len = sizeof(from); + return recvfrom(rtcpHandle_, buf, buf_size, 0, + (struct sockaddr *)&from, &from_len); + } + + // work with IceSocket + auto result = rtcp_sock_->recv(static_cast<unsigned char*>(buf), buf_size); + if (result < 0) { + errno = EIO; + return -1; + } + if (result == 0) { + errno = EAGAIN; + return -1; + } + return result; +} + +int +SocketPair::writeRtpData(void *buf, int buf_size) +{ + if (rtpHandle_ >= 0) { + auto ret = ff_network_wait_fd(rtpHandle_); + if (ret < 0) + return ret; + return sendto(rtpHandle_, buf, buf_size, 0, + (sockaddr*) &rtpDestAddr_, rtpDestAddrLen_); + } + + // work with IceSocket + return rtp_sock_->send(static_cast<unsigned char*>(buf), buf_size); +} + +int +SocketPair::writeRtcpData(void *buf, int buf_size) +{ + std::lock_guard<std::mutex> lock(rtcpWriteMutex_); + + if (rtcpHandle_ >= 0) { + auto ret = ff_network_wait_fd(rtcpHandle_); + if (ret < 0) + return ret; + return sendto(rtcpHandle_, buf, buf_size, 0, + (sockaddr*) &rtcpDestAddr_, rtcpDestAddrLen_); + } + + // work with IceSocket + return rtcp_sock_->send(static_cast<unsigned char*>(buf), buf_size); +} + int SocketPair::readCallback(void *opaque, uint8_t *buf, int buf_size) { SocketPair *context = static_cast<SocketPair*>(opaque); - struct sockaddr_storage from; - socklen_t from_len; - int len, n; - struct pollfd p[2] = { {context->rtpHandle_, POLLIN, 0}, - {context->rtcpHandle_, POLLIN, 0}}; +retry: + if (context->interrupted_) { + SFL_ERR("interrupted"); + return -EINTR; + } - for(;;) { - if (context->interrupted_) { - SFL_ERR("interrupted"); - return -EINTR; - } + if (context->waitForData() < 0) { + if (errno == EINTR) + goto retry; + return -EIO; + } - /* build fdset to listen to RTP and RTCP packets */ - // FIXME:WORKAROUND: reduce to RTP handle until RTCP is fixed - n = poll(p, 1, NET_POLL_TIMEOUT); - if (n > 0) { -// FIXME:WORKAROUND: prevent excessive packet loss -#if 0 - /* first try RTCP */ - if (p[1].revents & POLLIN) { - from_len = sizeof(from); - - { - len = recvfrom(context->rtcpHandle_, buf, buf_size, 0, - (struct sockaddr *)&from, &from_len); - } - - if (len < 0) { - if (errno == EAGAIN or errno == EINTR) - continue; - return -EIO; - } - break; - } -#endif - /* then RTP */ - if (p[0].revents & POLLIN) { - from_len = sizeof(from); - - { - len = recvfrom(context->rtpHandle_, buf, buf_size, 0, - (struct sockaddr *)&from, &from_len); - } - - if (len < 0) { - if (errno == EAGAIN or errno == EINTR) - continue; - return -EIO; - } - break; - } - } else if (n < 0) { - if (errno == EINTR) - continue; - return -EIO; - } + /* RTP */ + int len = context->readRtpData(buf, buf_size); + if (len < 0) { + if (errno == EAGAIN or errno == EINTR) + goto retry; + return -EIO; } + return len; } @@ -280,33 +359,26 @@ enum RTCPType { int SocketPair::writeCallback(void *opaque, uint8_t *buf, int buf_size) { SocketPair *context = static_cast<SocketPair*>(opaque); - int ret; retry: if (RTP_PT_IS_RTCP(buf[1])) { + return buf_size; /* RTCP payload type */ - std::lock_guard<std::mutex> lock(context->rtcpWriteMutex_); - ret = ff_network_wait_fd(context->rtcpHandle_); + ret = context->writeRtcpData(buf, buf_size); if (ret < 0) { if (ret == -EAGAIN) goto retry; return ret; } - - ret = sendto(context->rtcpHandle_, buf, buf_size, 0, - (sockaddr*) &context->rtcpDestAddr_, context->rtcpDestAddrLen_); } else { /* RTP payload type */ - ret = ff_network_wait_fd(context->rtpHandle_); + ret = context->writeRtpData(buf, buf_size); if (ret < 0) { if (ret == -EAGAIN) goto retry; return ret; } - - ret = sendto(context->rtpHandle_, buf, buf_size, 0, - (sockaddr*) &context->rtpDestAddr_, context->rtpDestAddrLen_); } return ret < 0 ? errno : ret; diff --git a/daemon/src/video/socket_pair.h b/daemon/src/video/socket_pair.h index 1e45498354b192ee81ebcc3b3de95b5fd7e482a2..6132635cc0b04f5f25f34aa6c53223a91176c307 100644 --- a/daemon/src/video/socket_pair.h +++ b/daemon/src/video/socket_pair.h @@ -38,32 +38,48 @@ #include <mutex> #include <stdint.h> +namespace sfl { +class IceSocket; +}; + namespace sfl_video { class SocketPair { public: SocketPair(const char *uri, int localPort); + SocketPair(std::unique_ptr<sfl::IceSocket> rtp_sock, + std::unique_ptr<sfl::IceSocket> rtcp_sock); ~SocketPair(); void interrupt(); VideoIOHandle* createIOContext(); void openSockets(const char *uri, int localPort); void closeSockets(); - static int readCallback(void *opaque, uint8_t *buf, int buf_size); - static int writeCallback(void *opaque, uint8_t *buf, int buf_size); private: NON_COPYABLE(SocketPair); + static int readCallback(void *opaque, uint8_t *buf, int buf_size); + static int writeCallback(void *opaque, uint8_t *buf, int buf_size); + + int waitForData(); + int readRtpData(void *buf, int buf_size); + int readRtcpData(void *buf, int buf_size); + int writeRtpData(void *buf, int buf_size); + int writeRtcpData(void *buf, int buf_size); + + std::unique_ptr<sfl::IceSocket> rtp_sock_; + std::unique_ptr<sfl::IceSocket> rtcp_sock_; + std::mutex rtcpWriteMutex_; - int rtpHandle_; - int rtcpHandle_; + int rtpHandle_ {-1}; + int rtcpHandle_ {-1}; sockaddr_storage rtpDestAddr_; socklen_t rtpDestAddrLen_; sockaddr_storage rtcpDestAddr_; socklen_t rtcpDestAddrLen_; - bool interrupted_; + bool interrupted_ {false}; }; } diff --git a/daemon/src/video/video_rtp_session.cpp b/daemon/src/video/video_rtp_session.cpp index 93dc73d1149dfdaedfcd8c9f2baae63757f27391..6b9cf1111719ab398ddfb8e950d3f6aeaab03ad9 100644 --- a/daemon/src/video/video_rtp_session.cpp +++ b/daemon/src/video/video_rtp_session.cpp @@ -34,6 +34,7 @@ #include "video_sender.h" #include "video_receive_thread.h" #include "video_mixer.h" +#include "ice_socket.h" #include "socket_pair.h" #include "sip/sdp.h" #include "sip/sipvoiplink.h" @@ -197,6 +198,34 @@ void VideoRtpSession::start(int localPort) } } +void VideoRtpSession::start(std::unique_ptr<sfl::IceSocket> rtp_sock, + std::unique_ptr<sfl::IceSocket> rtcp_sock) +{ + std::lock_guard<std::recursive_mutex> lock(mutex_); + + if (not sending_ and not receiving_) { + stop(); + return; + } + + socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock))); + + startSender(); + startReceiver(); + + // Setup video pipeline + if (conference_) + setupConferenceVideoPipeline(conference_); + else if (sender_) { + auto videoCtrl = Manager::instance().getVideoManager(); + videoLocal_ = videoCtrl->getVideoCamera(); + if (videoLocal_ and videoLocal_->attach(sender_.get())) + videoCtrl->switchToCamera(); + } else { + videoLocal_.reset(); + } +} + void VideoRtpSession::stop() { std::lock_guard<std::recursive_mutex> lock(mutex_); diff --git a/daemon/src/video/video_rtp_session.h b/daemon/src/video/video_rtp_session.h index b0a9ab4be3188d3ac8115ca8c887d9745c6d3230..4a4d9fe570d93cf861acf1fdd50c3afd1849e137 100644 --- a/daemon/src/video/video_rtp_session.h +++ b/daemon/src/video/video_rtp_session.h @@ -47,6 +47,10 @@ class Sdp; class Conference; +namespace sfl { +class IceSocket; +}; + namespace sfl_video { class VideoRtpSession { @@ -56,6 +60,8 @@ public: ~VideoRtpSession(); void start(int localPort); + void start(std::unique_ptr<sfl::IceSocket> rtp_sock, + std::unique_ptr<sfl::IceSocket> rtcp_sock); void stop(); void updateDestination(const std::string &destination, unsigned int port);