From 6e323fa54f29ae76c375084d3d14939529700e68 Mon Sep 17 00:00:00 2001 From: atraczyk <andreastraczyk@gmail.com> Date: Mon, 31 Oct 2016 14:04:24 -0400 Subject: [PATCH] cherry-pick commits from: 67f74aae2b13e9a41395731868e493d0d23e507f to : 10f592d549432a4b791e2d8a68b9c77c8c0449be Change-Id: I8366f82b6da50b963b478d68a4f095714f7980e8 --- README | 18 +- bin/Makefile.am | 8 +- .../cx.ring.Ring.ConfigurationManager.xml | 112 + bin/dbus/dbusclient.cpp | 2 + bin/dbus/dbusconfigurationmanager.cpp | 18 + bin/dbus/dbusconfigurationmanager.h | 3 + configure.ac | 21 +- src/Makefile.am | 7 +- src/call.cpp | 2 +- src/client/configurationmanager.cpp | 51 +- src/client/ring_signal.cpp | 2 + src/dring/account_const.h | 9 +- src/dring/call_const.h | 1 + src/dring/configurationmanager_interface.h | 12 + src/ice_transport.cpp | 2064 ++++++++--------- src/ice_transport.h | 48 +- src/manager.cpp | 30 + src/manager.h | 2 + src/media/video/v4l2/vaapi.cpp | 249 ++ src/media/video/v4l2/vaapi.h | 85 + src/media/video/video_input.cpp | 16 +- src/ringdht/Makefile.am | 8 +- src/ringdht/namedirectory.cpp | 261 +++ src/ringdht/namedirectory.h | 61 + src/ringdht/ringaccount.cpp | 216 +- src/ringdht/ringaccount.h | 19 +- src/sip/sipaccountbase.cpp | 30 +- src/sip/sipvoiplink.cpp | 2 +- src/thread_pool.cpp | 2 +- 29 files changed, 2206 insertions(+), 1153 deletions(-) create mode 100644 src/media/video/v4l2/vaapi.cpp create mode 100644 src/media/video/v4l2/vaapi.h create mode 100644 src/ringdht/namedirectory.cpp create mode 100644 src/ringdht/namedirectory.h diff --git a/README b/README index 4c3f4383a9..bf138a7727 100644 --- a/README +++ b/README @@ -19,7 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. Introduction ------------ -Ring is a Voice-over-IP software phone. We want it to be: +GNU Ring is a Voice-over-IP software phone. We want it to be: - user friendly (fast, sleek, easy to learn interface) - professional grade (transfers, holds, optimal audio quality) - compatible with Asterisk (using SIP account) @@ -27,17 +27,17 @@ Ring is a Voice-over-IP software phone. We want it to be: - customizable As the SIP/audio daemon and the user interface are separate processes, -it is easy to provide different user interfaces. Ring comes with +it is easy to provide different user interfaces. GNU Ring comes with various graphical user interfaces and even scripts to control the daemon from the shell. -Ring is currently used by the support team of Savoir-faire Linux Inc. +GNU Ring is currently used by the support team of Savoir-faire Linux Inc. More information is available on the project homepage: http://www.ring.cx/ This source tree contains the daemon application only, DRing, that handles -the business logic of Ring. UIs are located in differents repositories. See +the business logic of GNU Ring. UIs are located in differents repositories. See the Contributing section for more information. @@ -136,7 +136,7 @@ dbus is not available the client should implement all the methods in the How to compile with the REST API -------------------------------- -Ring offers two REST API. One written in C++, and the other written in Cython. +GNU Ring offers two REST API. One written in C++, and the other written in Cython. Up to this date, only the C++ API is available. The Cython API will soon follow. To compile Ring-daemon with the C++ REST API, follow these two steps : @@ -163,7 +163,7 @@ clients (KDE and GNOME). By doing this, you will be able to call other accounts known to this server. -Contributing to Ring +Contributing to GNU Ring ------------------------ Of course we love patches. And contributions. And spring rolls. @@ -176,10 +176,10 @@ contains the client subprojects: - https://gerrit-ring.savoirfairelinux.com/#/admin/projects/ Do not hesitate to join us and post comments, suggestions, questions -and general feedback on the Ring mailing-list: -http://lists.savoirfairelinux.net/mailman/listinfo/ring +and general feedback on the GNU Ring mailing-list: +https://lists.gnu.org/mailman/listinfo/ring IRC (on #freenode): - #ring - -- The Ring Team + -- The GNU Ring Team diff --git a/bin/Makefile.am b/bin/Makefile.am index deb5c45591..bbe9d273e1 100644 --- a/bin/Makefile.am +++ b/bin/Makefile.am @@ -35,15 +35,15 @@ endif if RING_RESTCPP SUBDIRS=restcpp -sbin_PROGRAMS = dring +sbin_PROGRAMS = restdring -dring_SOURCES = main.cpp +restdring_SOURCES = main.cpp -dring_CXXFLAGS= -g \ +restdring_CXXFLAGS= -g \ -I$(top_srcdir)/src \ -I$(top_srcdir)/src/dring \ -DREST_API \ -DTOP_BUILDDIR=\"$$(cd "$(top_builddir)"; pwd)\" -dring_LDADD = restcpp/libclient_rest.la $(top_builddir)/src/libring.la +restdring_LDADD = restcpp/libclient_rest.la $(top_builddir)/src/libring.la endif diff --git a/bin/dbus/cx.ring.Ring.ConfigurationManager.xml b/bin/dbus/cx.ring.Ring.ConfigurationManager.xml index 4bc665df95..76c36ca753 100644 --- a/bin/dbus/cx.ring.Ring.ConfigurationManager.xml +++ b/bin/dbus/cx.ring.Ring.ConfigurationManager.xml @@ -220,6 +220,118 @@ </arg> </method> + <method name="lookupName" tp:name-for-bindings="lookupName"> + <tp:docstring> + Performs name lookup with RingNS protocol for the specified account (if any) or using the default nameserver. + </tp:docstring> + <arg type="s" name="accountID" direction="in"> + The account to use. If empty, use the default RingNS server. + </arg> + <arg type="s" name="nameserverUri" direction="in"> + The name server URI to use, considered only if accountID is empty. + </arg> + <arg type="s" name="name" direction="in"> + </arg> + <arg type="b" name="success" direction="out"> + <tp:docstring> + True if the operation was initialized successfully. registeredNameFound will be trigered on completion. + </tp:docstring> + </arg> + </method> + <method name="lookupAddress" tp:name-for-bindings="lookupAddress"> + <tp:docstring> + Performs address lookup with RingNS protocol for the specified account (if any) or using the default nameserver. + </tp:docstring> + <arg type="s" name="accountID" direction="in"> + The account to use. If empty, use the default RingNS server. + </arg> + <arg type="s" name="nameserverUri" direction="in"> + The name server URI to use, considered only if accountID is empty. + </arg> + <arg type="s" name="address" direction="in"> + <tp:docstring> + Address to lookup for. Must not include spaces. + </tp:docstring> + </arg> + <arg type="b" name="success" direction="out"> + <tp:docstring> + True if the operation was initialized successfully. registeredNameFound will be trigered on completion. + False in case of operation initialization error. registeredNameFound won't be called in that case. + </tp:docstring> + </arg> + </method> + <signal name="registeredNameFound" tp:name-for-bindings="registeredNameFound"> + <tp:docstring> + Notify clients when a new registered address-name mapping is known. + If status is not success (0), requested field (name or address) is left empty. + </tp:docstring> + <arg type="s" name="accountID"> + </arg> + <arg type="i" name="status"> + <tp:docstring> + Status code: 0 for success + <ul> + <li>SUCCESS = 0 everything went fine. Name/address pair was found.</li> + <li>INVALID_NAME = 1 provided name is not valid.</li> + <li>NOT_FOUND = 2 everything went fine. Name/address pair was not found.</li> + <li>ERROR = 3 An error happened</li> + </ul> + </tp:docstring> + </arg> + <arg type="s" name="address"> + </arg> + <arg type="s" name="name"> + </arg> + </signal> + + <method name="registerName" tp:name-for-bindings="registerName"> + <tp:docstring> + Performs name registration with RingNS protocol for the specified account. + </tp:docstring> + <arg type="s" name="accountID" direction="in"> + </arg> + <arg type="s" name="password" direction="in"> + <tp:docstring> + Ring account main password. + </tp:docstring> + </arg> + <arg type="s" name="name" direction="in"> + <tp:docstring> + Name to register. Must be lower-case ASCII, digits or "-" or "_". + </tp:docstring> + </arg> + <arg type="b" name="success" direction="out"> + <tp:docstring> + True if the operation was initialized successfully. nameRegistrationEnded will be trigered on completion. + False in case of operation initialization error. nameRegistrationEnded won't be called in that case. + </tp:docstring> + </arg> + </method> + <signal name="nameRegistrationEnded" tp:name-for-bindings="nameRegistrationEnded"> + <tp:docstring> + Notify clients when the registerName operation ended. + </tp:docstring> + <arg type="s" name="accountID"> + </arg> + <arg type="i" name="status"> + <tp:docstring> + Status code: 0 for success + <ul> + <li>SUCCESS = 0 everything went fine. Name is now registered.</li> + <li>WRONG_PASSWORD = 1 registration failed: wrong password.</li> + <li>INVALID_NAME = 2 registration failed: invalid name.</li> + <li>ALREADY_TAKEN = 3 registration failed: name is already taken.</li> + <li>NETWORK_ERROR = 4 registration failed: network or server error.</li> + </ul> + </tp:docstring> + </arg> + <arg type="s" name="name"> + <tp:docstring> + The name that was attempted to register. + </tp:docstring> + </arg> + </signal> + <method name="testAccountICEInitialization" tp:name-for-bindings="testAccountICEInitialization"> <tp:docstring> Test initializing an ICE transport with the current account configuration. diff --git a/bin/dbus/dbusclient.cpp b/bin/dbus/dbusclient.cpp index 7dd9e6018f..7b3f018303 100644 --- a/bin/dbus/dbusclient.cpp +++ b/bin/dbus/dbusclient.cpp @@ -177,6 +177,8 @@ DBusClient::initLibrary(int flags) exportable_callback<ConfigurationSignal::IncomingTrustRequest>(bind(&DBusConfigurationManager::incomingTrustRequest, confM, _1, _2, _3, _4 )), exportable_callback<ConfigurationSignal::ExportOnRingEnded>(bind(&DBusConfigurationManager::exportOnRingEnded, confM, _1, _2, _3 )), exportable_callback<ConfigurationSignal::KnownDevicesChanged>(bind(&DBusConfigurationManager::knownDevicesChanged, confM, _1, _2 )), + exportable_callback<ConfigurationSignal::NameRegistrationEnded>(bind(&DBusConfigurationManager::nameRegistrationEnded, confM, _1, _2, _3 )), + exportable_callback<ConfigurationSignal::RegisteredNameFound>(bind(&DBusConfigurationManager::registeredNameFound, confM, _1, _2, _3, _4 )), exportable_callback<ConfigurationSignal::CertificatePinned>(bind(&DBusConfigurationManager::certificatePinned, confM, _1 )), exportable_callback<ConfigurationSignal::CertificatePathPinned>(bind(&DBusConfigurationManager::certificatePathPinned, confM, _1, _2 )), exportable_callback<ConfigurationSignal::CertificateExpired>(bind(&DBusConfigurationManager::certificateExpired, confM, _1 )), diff --git a/bin/dbus/dbusconfigurationmanager.cpp b/bin/dbus/dbusconfigurationmanager.cpp index 62dc9f8dde..fdf6f59bfe 100644 --- a/bin/dbus/dbusconfigurationmanager.cpp +++ b/bin/dbus/dbusconfigurationmanager.cpp @@ -86,6 +86,24 @@ DBusConfigurationManager::getKnownRingDevices(const std::string& accountID) -> d return DRing::getKnownRingDevices(accountID); } +auto +DBusConfigurationManager::lookupName(const std::string& account, const std::string& nameserver, const std::string& name) -> decltype(DRing::lookupName(account, nameserver, name)) +{ + return DRing::lookupName(account, nameserver, name); +} + +auto +DBusConfigurationManager::lookupAddress(const std::string& account, const std::string& nameserver, const std::string& address) -> decltype(DRing::lookupAddress(account, nameserver, address)) +{ + return DRing::lookupAddress(account, nameserver, address); +} + +auto +DBusConfigurationManager::registerName(const std::string& account, const std::string& password, const std::string& name) -> decltype(DRing::registerName(account, password, name)) +{ + return DRing::registerName(account, password, name); +} + void DBusConfigurationManager::removeAccount(const std::string& accountID) { diff --git a/bin/dbus/dbusconfigurationmanager.h b/bin/dbus/dbusconfigurationmanager.h index c25be1eb1e..562c9ab4f9 100644 --- a/bin/dbus/dbusconfigurationmanager.h +++ b/bin/dbus/dbusconfigurationmanager.h @@ -65,6 +65,9 @@ class DBusConfigurationManager : std::string addAccount(const std::map<std::string, std::string>& details); bool exportOnRing(const std::string& accountID, const std::string& password); std::map<std::string, std::string> getKnownRingDevices(const std::string& accountID); + bool lookupName(const std::string& account, const std::string& nameserver, const std::string& name); + bool lookupAddress(const std::string& account, const std::string& nameserver, const std::string& address); + bool registerName(const std::string& account, const std::string& password, const std::string& name); void removeAccount(const std::string& accoundID); std::vector<std::string> getAccountList(); void sendRegister(const std::string& accoundID, const bool& enable); diff --git a/configure.ac b/configure.ac index fef34818ab..e83b479cf3 100644 --- a/configure.ac +++ b/configure.ac @@ -78,7 +78,8 @@ case "${host_os}" in AC_DEFINE([WIN32_LEAN_AND_MEAN],[1], [Define to limit the scope of <windows.h>.]) CPPFLAGS+="-D_WIN32_WINNT=0x0601 -DWINVER=0x0601 -D__USE_MINGW_ANSI_STDIO=1" - LDFLAGS+="-no-undefined -avoid-version -Wl,--nxcompat -Wl,--dynamicbase" + LDFLAGS+="-Wl,--nxcompat -Wl,--dynamicbase" + LIBS+=" -lws2_32" ac_default_prefix="`pwd`/_win32" DESTDIR="`pwd`/_win32/" AC_SUBST(WINDOWS_ARCH) @@ -387,20 +388,26 @@ AS_IF([test "x$with_dbus" = "xyes"], [ AM_CONDITIONAL(RING_DBUS, true)], AM_CONDITIONAL(RING_DBUS, false)); +dnl Ring name service is default-enabled +AC_ARG_ENABLE([ringns], AS_HELP_STRING([--disable-ringns], [Enable Ring Name Service])) +AM_CONDITIONAL([RINGNS], test "x$enable_ringns" != "xno", [Define if you use the Ring Name Service]) +AC_DEFINE_UNQUOTED([HAVE_RINGNS], `if test "x$enable_ringns" != "xno"; then echo 1; else echo 0; fi`, [Define if you use the Ring Name Service]) + # Rest C++ with restbed AC_ARG_WITH([restcpp], AS_HELP_STRING([--with-restcpp], [enable rest support with C++])) -AS_IF([test "x$with_restcpp" = "xyes"], [ - PKG_CHECK_MODULES(RESTBED, librestbed,, AC_MSG_WARN([Missing restbed files])) +AS_IF([test "x$enable_ringns" != "xno" || test "x$with_restcpp" = "xyes"], + AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files]))); +AS_IF([test "x$with_restcpp" = "xyes"], [ AS_AC_EXPAND(SBINDIR, $sbindir) AC_SUBST(SBINDIR) - AC_CONFIG_FILES([bin/restcpp/Makefile]) - - AM_CONDITIONAL(RING_RESTCPP, true)], - AM_CONDITIONAL(RING_RESTCPP, false)); + AM_CONDITIONAL(RING_RESTCPP, true) + ], + AM_CONDITIONAL(RING_RESTCPP, false) +); dnl Check for libav PKG_CHECK_MODULES(LIBAVCODEC, libavcodec >= 53.5.0,, AC_MSG_ERROR([Missing libavcodec development files])) diff --git a/src/Makefile.am b/src/Makefile.am index 6db024c779..1c36d1799b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,7 +55,12 @@ libring_la_LDFLAGS = \ @GNUTLS_LIBS@ \ @OPENDHT_LIBS@ \ @ZLIB_LIBS@ \ - $(PCRE_LIBS) + $(PCRE_LIBS) \ + @LIBS@ + +if HAVE_WIN32 +libring_la_LDFLAGS += -no-undefined -avoid-version +endif if HAVE_OSX #FIXME necessary for -lintl diff --git a/src/call.cpp b/src/call.cpp index 0cedb49e7b..aac915369a 100644 --- a/src/call.cpp +++ b/src/call.cpp @@ -168,7 +168,7 @@ Call::setState(CallState call_state, ConnectionState cnx_state, signed code) pendingInMessages_.clear(); pendingOutMessages_.clear(); } - } else if (call_state == CallState::ACTIVE and not pendingOutMessages_.empty()) { + } else if (call_state == CallState::ACTIVE and connectionState_ == ConnectionState::CONNECTED and not pendingOutMessages_.empty()) { for (const auto& msg : pendingOutMessages_) sendTextMessage(msg.first, msg.second); pendingOutMessages_.clear(); diff --git a/src/client/configurationmanager.cpp b/src/client/configurationmanager.cpp index 157ce84d59..97132fec0a 100644 --- a/src/client/configurationmanager.cpp +++ b/src/client/configurationmanager.cpp @@ -61,6 +61,7 @@ namespace DRing { constexpr unsigned CODECS_NOT_LOADED = 0x1000; /** Codecs not found */ using ring::SIPAccount; +using ring::RingAccount; using ring::tls::TlsValidator; using ring::tls::CertificateStore; using ring::DeviceType; @@ -828,12 +829,52 @@ connectivityChanged() RING_ERR("UPnP context error: %s", e.what()); } - auto account_list = ring::Manager::instance().getAccountList(); - for (auto account_id : account_list) { - if (auto account = ring::Manager::instance().getAccount(account_id)) { - account->connectivityChanged(); - } + for (const auto &account : ring::Manager::instance().getAllAccounts()) { + account->connectivityChanged(); + } +} + +bool lookupName(const std::string& account, const std::string& nameserver, const std::string& name) +{ +#if HAVE_RINGNS + if (account.empty()) { + ring::NameDirectory::instance(nameserver).lookupName(name, [name](const std::string& result, ring::NameDirectory::Response response) { + ring::emitSignal<DRing::ConfigurationSignal::RegisteredNameFound>("", (int)response, result, name); + }); + return true; + } else if (auto acc = ring::Manager::instance().getAccount<RingAccount>(account)) { + acc->lookupName(name); + return true; + } +#endif + return false; +} + +bool lookupAddress(const std::string& account, const std::string& nameserver, const std::string& address) +{ +#if HAVE_RINGNS + if (account.empty()) { + ring::NameDirectory::instance(nameserver).lookupAddress(address, [address](const std::string& result, ring::NameDirectory::Response response) { + ring::emitSignal<DRing::ConfigurationSignal::RegisteredNameFound>("", (int)response, address, result); + }); + return true; + } else if (auto acc = ring::Manager::instance().getAccount<RingAccount>(account)) { + acc->lookupAddress(address); + return true; } +#endif + return false; +} + +bool registerName(const std::string& account, const std::string& password, const std::string& name) +{ +#if HAVE_RINGNS + if (auto acc = ring::Manager::instance().getAccount<RingAccount>(account)) { + acc->registerName(password, name); + return true; + } +#endif + return false; } } // namespace DRing diff --git a/src/client/ring_signal.cpp b/src/client/ring_signal.cpp index 86f6de1ed0..7564d05e9a 100644 --- a/src/client/ring_signal.cpp +++ b/src/client/ring_signal.cpp @@ -69,6 +69,8 @@ getSignalHandlers() exported_callback<DRing::ConfigurationSignal::IncomingTrustRequest>(), exported_callback<DRing::ConfigurationSignal::ExportOnRingEnded>(), exported_callback<DRing::ConfigurationSignal::KnownDevicesChanged>(), + exported_callback<DRing::ConfigurationSignal::NameRegistrationEnded>(), + exported_callback<DRing::ConfigurationSignal::RegisteredNameFound>(), exported_callback<DRing::ConfigurationSignal::MediaParametersChanged>(), exported_callback<DRing::ConfigurationSignal::Error>(), #ifdef __ANDROID__ diff --git a/src/dring/account_const.h b/src/dring/account_const.h index 5ac7dfd56d..9db2f649e6 100644 --- a/src/dring/account_const.h +++ b/src/dring/account_const.h @@ -76,6 +76,7 @@ enum class testAccountICEInitializationStatus : int { namespace VolatileProperties { constexpr static const char ACTIVE [] = "Account.active"; +constexpr static const char REGISTERED_NAME [] = "Account.registredName"; // Volatile parameters namespace Registration { @@ -222,12 +223,12 @@ constexpr static const char ALLOW_FROM_TRUSTED [] = "DHT.AllowFromTrusted"; } //namespace DRing::Account::DHT -namespace ETH { +namespace RingNS { -constexpr static const char KEY_FILE [] = "ETH.keyFile"; -constexpr static const char ACCOUNT [] = "ETH.account"; +constexpr static const char URI [] = "RingNS.uri"; +constexpr static const char ACCOUNT [] = "RingNS.account"; -} //namespace DRing::Account::ETH +} //namespace DRing::Account::RingNS namespace CodecInfo { diff --git a/src/dring/call_const.h b/src/dring/call_const.h index 98486238a6..9bd26b15e2 100644 --- a/src/dring/call_const.h +++ b/src/dring/call_const.h @@ -44,6 +44,7 @@ namespace Details { constexpr static char CALL_TYPE [] = "CALL_TYPE" ; constexpr static char PEER_NUMBER [] = "PEER_NUMBER" ; +constexpr static char REGISTERED_NAME [] = "REGISTERED_NAME" ; constexpr static char DISPLAY_NAME [] = "DISPLAY_NAME" ; constexpr static char CALL_STATE [] = "CALL_STATE" ; constexpr static char CONF_ID [] = "CONF_ID" ; diff --git a/src/dring/configurationmanager_interface.h b/src/dring/configurationmanager_interface.h index 6e2f24154b..51e9f9b07b 100644 --- a/src/dring/configurationmanager_interface.h +++ b/src/dring/configurationmanager_interface.h @@ -48,6 +48,10 @@ std::string addAccount(const std::map<std::string, std::string>& details); bool exportOnRing(const std::string& accountID, const std::string& password); std::map<std::string, std::string> getKnownRingDevices(const std::string& accountID); +bool lookupName(const std::string& account, const std::string& nameserver, const std::string& name); +bool lookupAddress(const std::string& account, const std::string& nameserver, const std::string& address); +bool registerName(const std::string& account, const std::string& password, const std::string& name); + void removeAccount(const std::string& accountID); void setAccountEnabled(const std::string& accountID, bool enable); std::vector<std::string> getAccountList(); @@ -216,10 +220,18 @@ struct ConfigurationSignal { constexpr static const char* name = "ExportOnRingEnded"; using cb_type = void(const std::string& /*account_id*/, int state, const std::string& pin); }; + struct NameRegistrationEnded { + constexpr static const char* name = "NameRegistrationEnded"; + using cb_type = void(const std::string& /*account_id*/, int state, const std::string& name); + }; struct KnownDevicesChanged { constexpr static const char* name = "KnownDevicesChanged"; using cb_type = void(const std::string& /*account_id*/, const std::map<std::string, std::string>& devices); }; + struct RegisteredNameFound { + constexpr static const char* name = "RegisteredNameFound"; + using cb_type = void(const std::string& /*account_id*/, int state, const std::string& /*address*/, const std::string& /*name*/); + }; struct CertificatePinned { constexpr static const char* name = "CertificatePinned"; using cb_type = void(const std::string& /*certId*/); diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 139a89fa1d..a0128fd9ca 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -1,1032 +1,1032 @@ -/* - * Copyright (C) 2004-2016 Savoir-faire Linux Inc. - * - * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "ice_transport.h" -#include "ice_socket.h" -#include "logger.h" -#include "sip/sip_utils.h" -#include "manager.h" -#include "upnp/upnp_control.h" - -#include <pjlib.h> - -#include <utility> -#include <algorithm> -#include <sstream> -#include <chrono> -#include <thread> -#include <cerrno> - -#define TRY(ret) do { \ - if ((ret) != PJ_SUCCESS) \ - throw std::runtime_error(#ret " failed"); \ - } while (0) - -namespace ring { - -static constexpr unsigned STUN_MAX_PACKET_SIZE {8192}; - -// TODO: C++14 ? remove me and use std::min -template< class T > -static constexpr const T& min( const T& a, const T& b ) { - return (b < a) ? b : a; -} - -static void -register_thread() -{ - // We have to register the external thread so it could access the pjsip frameworks - if (!pj_thread_is_registered()) { -#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) || defined(WIN32_NATIVE) - static thread_local pj_thread_desc desc; - static thread_local pj_thread_t *this_thread; -#else - static __thread pj_thread_desc desc; - static __thread pj_thread_t *this_thread; -#endif - pj_thread_register(NULL, desc, &this_thread); - RING_DBG("Registered thread %p (0x%X)", this_thread, pj_getpid()); - } -} - -/** - * Add stun/turn servers or default host as candidates - */ -static void -add_stun_server(pj_ice_strans_cfg& cfg, int af) -{ - if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) - throw std::runtime_error("Too many STUN servers"); - auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; - - pj_ice_strans_stun_cfg_default(&stun); - stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; - stun.af = af; - - RING_DBG("[ice] added host stun server"); -} - -static void -add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info) -{ - if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) - throw std::runtime_error("Too many STUN servers"); - - IpAddr ip {info.uri}; - - // Given URI cannot be DNS resolved or not IPv4 or IPv6? - // This prevents a crash into PJSIP when ip.toString() is called. - if (ip.getFamily() == AF_UNSPEC) { - RING_WARN("[ice] STUN server '%s' not used, unresolvable address", info.uri.c_str()); - return; - } - - auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; - pj_ice_strans_stun_cfg_default(&stun); - pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str()); - stun.af = ip.getFamily(); - stun.port = PJ_STUN_PORT; - stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; - - RING_DBG("[ice] added stun server '%s', port %d", pj_strbuf(&stun.server), stun.port); -} - -static void -add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info) -{ - if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN) - throw std::runtime_error("Too many TURN servers"); - - IpAddr ip {info.uri}; - - // Same comment as add_stun_server() - if (ip.getFamily() == AF_UNSPEC) { - RING_WARN("[ice] TURN server '%s' not used, unresolvable address", info.uri.c_str()); - return; - } - - auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++]; - pj_ice_strans_turn_cfg_default(&turn); - pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str()); - turn.af = ip.getFamily(); - turn.port = PJ_STUN_PORT; - turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; - - // Authorization (only static plain password supported yet) - if (not info.password.empty()) { - turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; - turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - pj_cstr(&turn.auth_cred.data.static_cred.realm, info.realm.c_str()); - pj_cstr(&turn.auth_cred.data.static_cred.username, info.username.c_str()); - pj_cstr(&turn.auth_cred.data.static_cred.data, info.password.c_str()); - } - - RING_DBG("[ice] added turn server '%s', port %d", pj_strbuf(&turn.server), turn.port); -} - -//################################################################################################## - -IceTransport::Packet::Packet(void *pkt, pj_size_t size) - : data(new char[size]), datalen(size) -{ - std::copy_n(reinterpret_cast<char*>(pkt), size, data.get()); -} - -void -IceTransport::cb_on_rx_data(pj_ice_strans* ice_st, - unsigned comp_id, - void *pkt, pj_size_t size, - const pj_sockaddr_t* /*src_addr*/, - unsigned /*src_addr_len*/) -{ - if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st))) - tr->onReceiveData(comp_id, pkt, size); - else - RING_WARN("null IceTransport"); -} - -void -IceTransport::cb_on_ice_complete(pj_ice_strans* ice_st, - pj_ice_strans_op op, - pj_status_t status) -{ - if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st))) - tr->onComplete(ice_st, op, status); - else - RING_WARN("null IceTransport"); -} - - -IceTransport::IceTransport(const char* name, int component_count, bool master, - const IceTransportOptions& options) - : pool_(nullptr, pj_pool_release) - , on_initdone_cb_(options.onInitDone) - , on_negodone_cb_(options.onNegoDone) - , component_count_(component_count) - , compIO_(component_count) - , initiatorSession_(master) - , thread_() -{ - if (options.upnpEnable) - upnp_.reset(new upnp::Controller()); - - auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); - config_ = iceTransportFactory.getIceCfg(); // config copy - - pool_.reset(pj_pool_create(iceTransportFactory.getPoolFactory(), - "IceTransport.pool", 512, 512, NULL)); - if (not pool_) - throw std::runtime_error("pj_pool_create() failed"); - - pj_ice_strans_cb icecb; - pj_bzero(&icecb, sizeof(icecb)); - icecb.on_rx_data = cb_on_rx_data; - icecb.on_ice_complete = cb_on_ice_complete; - - // Add STUN servers - for (auto& server : options.stunServers) - add_stun_server(*pool_, config_, server); - - // Add TURN servers - for (auto& server : options.turnServers) - add_turn_server(*pool_, config_, server); - - static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64); - TRY( pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap) ); - TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue) ); - - pj_ice_strans* icest = nullptr; - pj_status_t status = pj_ice_strans_create(name, &config_, component_count, - this, &icecb, &icest); - - if (status != PJ_SUCCESS || icest == nullptr) { - throw std::runtime_error("pj_ice_strans_create() failed"); - } - - // Must be created after any potential failure - thread_ = std::thread([this]{ - register_thread(); - while (not threadTerminateFlags_) { - handleEvents(500); // limit polling to 500ms - } - }); -} - -IceTransport::~IceTransport() -{ - register_thread(); - - threadTerminateFlags_ = true; - if (thread_.joinable()) - thread_.join(); - - icest_.reset(); // must be done before ioqueue/timer destruction - - if (config_.stun_cfg.ioqueue) - pj_ioqueue_destroy(config_.stun_cfg.ioqueue); - - if (config_.stun_cfg.timer_heap) - pj_timer_heap_destroy(config_.stun_cfg.timer_heap); -} - -void -IceTransport::handleEvents(unsigned max_msec) -{ - // By tests, never seen more than two events per 500ms - static constexpr auto MAX_NET_EVENTS = 2; - - pj_time_val max_timeout = {0, 0}; - pj_time_val timeout = {0, 0}; - unsigned net_event_count = 0; - - max_timeout.msec = max_msec; - - timeout.sec = timeout.msec = 0; - pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout); - - // timeout limitation - if (timeout.msec >= 1000) - timeout.msec = 999; - if (PJ_TIME_VAL_GT(timeout, max_timeout)) - timeout = max_timeout; - - do { - auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout); - - // timeout - if (not n_events) - return; - - // error - if (n_events < 0) { - const auto err = pj_get_os_error(); - // Kept as debug as some errors are "normal" in regular context - last_errmsg_ = sip_utils::sip_strerror(err); - RING_DBG("IceIOQueue: error %d - %s", err, last_errmsg_.c_str()); - std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); - return; - } - - net_event_count += n_events; - timeout.sec = timeout.msec = 0; - } while (net_event_count < MAX_NET_EVENTS); -} - -void -IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, - pj_status_t status) -{ - const char *opname = - op == PJ_ICE_STRANS_OP_INIT ? "initialization" : - op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; - - const bool done = status == PJ_SUCCESS; - if (done) { - RING_DBG("ICE %s success", opname); - } - else { - last_errmsg_ = sip_utils::sip_strerror(status); - RING_ERR("ICE %s failed: %s", opname, last_errmsg_.c_str()); - } - - { - std::lock_guard<std::mutex> lk(iceMutex_); - if (!icest_.get()) - icest_.reset(ice_st); - } - - if (done and op == PJ_ICE_STRANS_OP_INIT) { - if (initiatorSession_) - setInitiatorSession(); - else - setSlaveSession(); - selectUPnPIceCandidates(); - } - - if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_) - on_initdone_cb_(*this, done); - else if (op == PJ_ICE_STRANS_OP_NEGOTIATION and on_negodone_cb_) - on_negodone_cb_(*this, done); - - // Unlock waitForXXX APIs - iceCV_.notify_all(); -} - -void -IceTransport::getUFragPwd() -{ - pj_str_t local_ufrag, local_pwd; - pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, NULL, NULL); - local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); - local_pwd_.assign(local_pwd.ptr, local_pwd.slen); -} - -std::string -IceTransport::getLastErrMsg() const -{ - return last_errmsg_; -} - -void -IceTransport::getDefaultCanditates() -{ - for (unsigned i=0; i < component_count_; ++i) - pj_ice_strans_get_def_cand(icest_.get(), i+1, &cand_[i]); -} - -bool -IceTransport::createIceSession(pj_ice_sess_role role) -{ - if (pj_ice_strans_init_ice(icest_.get(), role, nullptr, nullptr) != PJ_SUCCESS) { - RING_ERR("pj_ice_strans_init_ice() failed"); - return false; - } - - // Fetch some information on local configuration - getUFragPwd(); - getDefaultCanditates(); - RING_DBG("ICE [local] ufrag=%s, pwd=%s", local_ufrag_.c_str(), local_pwd_.c_str()); - return true; -} - -bool -IceTransport::setInitiatorSession() -{ - RING_DBG("ICE as master"); - initiatorSession_ = true; - if (isInitialized()) { - auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING); - if (status != PJ_SUCCESS) { - last_errmsg_ = sip_utils::sip_strerror(status); - RING_ERR("ICE role change failed: %s", last_errmsg_.c_str()); - return false; - } - return true; - } - return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING); -} - -bool -IceTransport::setSlaveSession() -{ - RING_DBG("ICE as slave"); - initiatorSession_ = false; - if (isInitialized()) { - auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED); - if (status != PJ_SUCCESS) { - last_errmsg_ = sip_utils::sip_strerror(status); - RING_ERR("ICE role change failed: %s", last_errmsg_.c_str()); - return false; - } - return true; - } - return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED); -} - -bool -IceTransport::isInitiator() const -{ - if (isInitialized()) - return pj_ice_strans_get_role(icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING; - return initiatorSession_; -} - -bool -IceTransport::start(const Attribute& rem_attrs, - const std::vector<IceCandidate>& rem_candidates) -{ - if (not isInitialized()) { - RING_ERR("ICE: not initialized transport"); - return false; - } - - // pj_ice_strans_start_ice crashes if remote candidates array is empty - if (rem_candidates.empty()) { - RING_ERR("ICE start failed: no remote candidates"); - return false; - } - - pj_str_t ufrag, pwd; - RING_DBG("ICE negotiation starting (%zu remote candidates)", rem_candidates.size()); - auto status = pj_ice_strans_start_ice(icest_.get(), - pj_cstr(&ufrag, rem_attrs.ufrag.c_str()), - pj_cstr(&pwd, rem_attrs.pwd.c_str()), - rem_candidates.size(), - rem_candidates.data()); - if (status != PJ_SUCCESS) { - last_errmsg_ = sip_utils::sip_strerror(status); - RING_ERR("ICE start failed: %s", last_errmsg_.c_str()); - return false; - } - return true; -} - -std::string -IceTransport::unpackLine(std::vector<uint8_t>::const_iterator& begin, - std::vector<uint8_t>::const_iterator& end) -{ - if (std::distance(begin, end) <= 0) - return {}; - - // Search for EOL - std::vector<uint8_t>::const_iterator line_end(begin); - while (line_end != end && *line_end != NEW_LINE && *line_end) - ++line_end; - - if (std::distance(begin, line_end) <= 0) - return {}; - - std::string str(begin, line_end); - - // Consume the new line character - if (std::distance(line_end, end) > 0) - ++line_end; - - begin = line_end; - return str; -} - -bool -IceTransport::start(const std::vector<uint8_t>& rem_data) -{ - auto begin = rem_data.cbegin(); - auto end = rem_data.cend(); - auto rem_ufrag = unpackLine(begin, end); - auto rem_pwd = unpackLine(begin, end); - if (rem_pwd.empty() or rem_pwd.empty()) { - RING_ERR("ICE remote attributes parsing error"); - return false; - } - std::vector<IceCandidate> rem_candidates; - try { - while (true) { - IceCandidate candidate; - const auto line = unpackLine(begin, end); - if (line.empty()) - break; - if (getCandidateFromSDP(line, candidate)) - rem_candidates.push_back(candidate); - } - } catch (std::exception& e) { - RING_ERR("ICE remote candidates parsing error"); - return false; - } - return start({rem_ufrag, rem_pwd}, rem_candidates); -} - -bool -IceTransport::stop() -{ - if (isStarted()) { - auto status = pj_ice_strans_stop_ice(icest_.get()); - if (status != PJ_SUCCESS) { - last_errmsg_ = sip_utils::sip_strerror(status); - RING_ERR("ICE stop failed: %s", last_errmsg_.c_str()); - return false; - } - } - return true; -} - -bool -IceTransport::_isInitialized() const -{ - if (auto icest = icest_.get()) { - auto state = pj_ice_strans_get_state(icest); - return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED; - } - return false; -} - -bool -IceTransport::_isStarted() const -{ - if (auto icest = icest_.get()) { - auto state = pj_ice_strans_get_state(icest); - return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED; - } - return false; -} - -bool -IceTransport::_isRunning() const -{ - if (auto icest = icest_.get()) { - auto state = pj_ice_strans_get_state(icest); - return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED; - } - return false; -} - -bool -IceTransport::_isFailed() const -{ - if (auto icest = icest_.get()) - return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED; - return false; -} - -IpAddr -IceTransport::getLocalAddress(unsigned comp_id) const -{ - // Return the local IP of negotiated connection pair - if (isRunning()) { - if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) { - return sess->lcand->addr; - } - RING_WARN("Non-negotiated transport: try to return default local IP"); - } - - // Return the default IP (could be not nominated and valid after negotiation) - if (isInitialized()) - return cand_[comp_id].addr; - - RING_ERR("bad call: non-initialized transport"); - return {}; -} - -IpAddr -IceTransport::getRemoteAddress(unsigned comp_id) const -{ - // Return the remote IP of negotiated connection pair - if (isRunning()) { - if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) - return sess->rcand->addr; - RING_ERR("runtime error: negotiated transport without valid pair"); - } - - RING_ERR("bad call: non-negotiated transport"); - return {}; -} - -const IceTransport::Attribute -IceTransport::getLocalAttributes() const -{ - return {local_ufrag_, local_pwd_}; -} - -std::vector<std::string> -IceTransport::getLocalCandidates(unsigned comp_id) const -{ - std::vector<std::string> res; - pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)]; - unsigned cand_cnt = PJ_ARRAY_SIZE(cand); - - if (pj_ice_strans_enum_cands(icest_.get(), comp_id+1, &cand_cnt, cand) != PJ_SUCCESS) { - RING_ERR("pj_ice_strans_enum_cands() failed"); - return res; - } - - for (unsigned i=0; i<cand_cnt; ++i) { - std::ostringstream val; - char ipaddr[PJ_INET6_ADDRSTRLEN]; - - val << std::string(cand[i].foundation.ptr, cand[i].foundation.slen); - val << " " << (unsigned)cand[i].comp_id << " UDP " << cand[i].prio; - val << " " << pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0); - val << " " << (unsigned)pj_sockaddr_get_port(&cand[i].addr); - val << " typ " << pj_ice_get_cand_type_name(cand[i].type); - - res.push_back(val.str()); - } - - return res; -} - -std::vector<IpAddr> -IceTransport::getLocalCandidatesAddr(unsigned comp_id) const -{ - std::vector<IpAddr> cand_addrs; - pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)]; - unsigned cand_cnt = PJ_ARRAY_SIZE(cand); - - if (pj_ice_strans_enum_cands(icest_.get(), comp_id, &cand_cnt, cand) != PJ_SUCCESS) { - RING_ERR("pj_ice_strans_enum_cands() failed"); - return cand_addrs; - } - - for (unsigned i=0; i<cand_cnt; ++i) - cand_addrs.push_back(cand[i].addr); - - return cand_addrs; -} - -bool -IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP) -{ - if (not isInitialized()) - return false; - - // Register only if no NAT traversal methods exists - if (upnp_ or config_.stun.port > 0 or config_.turn.port > 0) - return false; - - // Find the local candidate corresponding to local host, - // then register a rflx candidate using given public address - // and this local address as base. It's port is used for both address - // even if on the public side it have strong probabilities to not exist. - // But as this candidate is made after initialization, it's not used during - // negotiation, only to exchanged candidates between peers. - auto localIP = ip_utils::getLocalAddr(); - auto pubIP = publicIP; - for (const auto& addr : getLocalCandidatesAddr(compId)) { - auto port = addr.getPort(); - localIP.setPort(port); - if (addr != localIP) - continue; - pubIP.setPort(port); - addReflectiveCandidate(compId, addr, pubIP); - return true; - } - return false; -} - -void -IceTransport::addReflectiveCandidate(int comp_id, const IpAddr& base, const IpAddr& addr) -{ - pj_ice_sess_cand cand; - - cand.type = PJ_ICE_CAND_TYPE_SRFLX; - cand.status = PJ_EPENDING; /* not used */ - cand.comp_id = comp_id; - cand.transport_id = 1; /* 1 = STUN */ - cand.local_pref = 65535; /* host */ - /* cand.foundation = ? */ - /* cand.prio = calculated by ice session */ - /* make base and addr the same since we're not going through a server */ - pj_sockaddr_cp(&cand.base_addr, base.pjPtr()); - pj_sockaddr_cp(&cand.addr, addr.pjPtr()); - pj_sockaddr_cp(&cand.rel_addr, &cand.base_addr); - pj_ice_calc_foundation(pool_.get(), &cand.foundation, cand.type, &cand.base_addr); - - auto ret = pj_ice_sess_add_cand(pj_ice_strans_get_ice_sess(icest_.get()), - cand.comp_id, - cand.transport_id, - cand.type, - cand.local_pref, - &cand.foundation, - &cand.addr, - &cand.base_addr, - &cand.rel_addr, - pj_sockaddr_get_len(&cand.addr), - NULL); - - if (ret != PJ_SUCCESS) { - last_errmsg_ = sip_utils::sip_strerror(ret); - RING_ERR("pj_ice_sess_add_cand failed with error %d: %s", ret, - last_errmsg_.c_str()); - RING_ERR("failed to add candidate for comp_id=%d : %s : %s", comp_id, - base.toString().c_str(), addr.toString().c_str()); - } else { - RING_DBG("succeed to add candidate for comp_id=%d : %s : %s", comp_id, - base.toString().c_str(), addr.toString().c_str()); - } -} - -void -IceTransport::selectUPnPIceCandidates() -{ - /* use upnp to open ports and add the proper candidates */ - if (upnp_) { - /* for every component, get the candidate(s) - * create a port mapping either with that port, or with an available port - * add candidate with that port and public IP - */ - if (auto publicIP = upnp_->getExternalIP()) { - /* comp_id start at 1 */ - for (unsigned comp_id = 1; comp_id <= component_count_; ++comp_id) { - RING_DBG("UPnP: Opening port(s) for ICE comp %d and adding candidate with public IP", - comp_id); - auto candidates = getLocalCandidatesAddr(comp_id); - for (IpAddr addr : candidates) { - auto localIP = upnp_->getLocalIP(); - localIP.setPort(addr.getPort()); - if (addr != localIP) - continue; - uint16_t port = addr.getPort(); - uint16_t port_used; - if (upnp_->addAnyMapping(port, upnp::PortType::UDP, true, &port_used)) { - publicIP.setPort(port_used); - addReflectiveCandidate(comp_id, addr, publicIP); - } else - RING_WARN("UPnP: Could not create a port mapping for the ICE candide"); - } - } - } else { - RING_WARN("UPnP: Could not determine public IP for ICE candidates"); - } - } -} - -std::vector<uint8_t> -IceTransport::getLocalAttributesAndCandidates() const -{ - if (not isInitialized()) - return {}; - - std::stringstream ss; - ss << local_ufrag_ << NEW_LINE; - ss << local_pwd_ << NEW_LINE; - for (unsigned i=0; i<component_count_; i++) { - const auto& candidates = getLocalCandidates(i); - for (const auto& c : candidates) - ss << c << NEW_LINE; - } - auto str(ss.str()); - return std::vector<uint8_t>(str.begin(), str.end()); -} - -void -IceTransport::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size) -{ - if (!comp_id or comp_id > component_count_) { - RING_ERR("rx: invalid comp_id (%u)", comp_id); - return; - } - if (!size) - return; - auto& io = compIO_[comp_id-1]; - std::lock_guard<std::mutex> lk(io.mutex); - if (io.cb) { - io.cb((uint8_t*)pkt, size); - } else { - io.queue.emplace_back(pkt, size); - io.cv.notify_one(); - } -} - -bool -IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) -{ - char foundation[33], transport[13], ipaddr[81], type[33]; - pj_str_t tmpaddr; - int af, comp_id, prio, port; - int cnt = sscanf(line.c_str(), "%32s %d %12s %d %80s %d typ %32s", - foundation, - &comp_id, - transport, - &prio, - ipaddr, - &port, - type); - - if (cnt != 7) { - RING_WARN("ICE: invalid remote candidate line"); - return false; - } - - pj_bzero(&cand, sizeof(IceCandidate)); - - if (strcmp(type, "host")==0) - cand.type = PJ_ICE_CAND_TYPE_HOST; - else if (strcmp(type, "srflx")==0) - cand.type = PJ_ICE_CAND_TYPE_SRFLX; - else if (strcmp(type, "relay")==0) - cand.type = PJ_ICE_CAND_TYPE_RELAYED; - else { - RING_WARN("ICE: invalid remote candidate type '%s'", type); - return false; - } - - cand.comp_id = (pj_uint8_t)comp_id; - cand.prio = prio; - - if (strchr(ipaddr, ':')) - af = pj_AF_INET6(); - else - af = pj_AF_INET(); - - tmpaddr = pj_str(ipaddr); - pj_sockaddr_init(af, &cand.addr, NULL, 0); - auto status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr); - if (status != PJ_SUCCESS) { - RING_ERR("ICE: invalid remote IP address '%s'", ipaddr); - return false; - } - - pj_sockaddr_set_port(&cand.addr, (pj_uint16_t)port); - pj_strdup2(pool_.get(), &cand.foundation, foundation); - - return true; -} - -ssize_t -IceTransport::recv(int comp_id, unsigned char* buf, size_t len) -{ - register_thread(); - auto& io = compIO_[comp_id]; - std::lock_guard<std::mutex> lk(io.mutex); - - if (io.queue.empty()) - return 0; - - auto& packet = io.queue.front(); - const auto count = std::min(len, packet.datalen); - std::copy_n(packet.data.get(), count, buf); - io.queue.pop_front(); - - return count; -} - -void -IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) -{ - auto& io = compIO_[comp_id]; - std::lock_guard<std::mutex> lk(io.mutex); - io.cb = cb; - - if (cb) { - // Flush existing queue using the callback - for (const auto& packet : io.queue) - io.cb((uint8_t*)packet.data.get(), packet.datalen); - io.queue.clear(); - } -} - -ssize_t -IceTransport::send(int comp_id, const unsigned char* buf, size_t len) -{ - register_thread(); - auto remote = getRemoteAddress(comp_id); - if (!remote) { - RING_ERR("Can't find remote address for component %d", comp_id); - errno = EINVAL; - return -1; - } - auto status = pj_ice_strans_sendto(icest_.get(), comp_id+1, buf, len, remote.pjPtr(), remote.getLength()); - if (status != PJ_SUCCESS) { - if (status == PJ_EBUSY) { - errno = EAGAIN; - } else { - last_errmsg_ = sip_utils::sip_strerror(status); - RING_ERR("ice send failed: %s", last_errmsg_.c_str()); - errno = EIO; - } - return -1; - } - - return len; -} - -ssize_t -IceTransport::getNextPacketSize(int comp_id) -{ - auto& io = compIO_[comp_id]; - std::lock_guard<std::mutex> lk(io.mutex); - if (io.queue.empty()) { - return 0; - } - return io.queue.front().datalen; -} - -int -IceTransport::waitForInitialization(unsigned timeout) -{ - std::unique_lock<std::mutex> lk(iceMutex_); - if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), - [this]{ return _isInitialized() or _isFailed(); })) { - RING_WARN("waitForInitialization: timeout"); - return -1; - } - return not _isFailed(); -} - -int -IceTransport::waitForNegotiation(unsigned timeout) -{ - std::unique_lock<std::mutex> lk(iceMutex_); - if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), - [this]{ return _isRunning() or _isFailed(); })) { - RING_WARN("waitForIceNegotiation: timeout"); - return -1; - } - return not _isFailed(); -} - -ssize_t -IceTransport::waitForData(int comp_id, unsigned int timeout) -{ - auto& io = compIO_[comp_id]; - std::unique_lock<std::mutex> lk(io.mutex); - if (!io.cv.wait_for(lk, std::chrono::milliseconds(timeout), - [this, &io]{ return !io.queue.empty() or !isRunning(); })) { - return 0; - } - if (!isRunning()) - return -1; // acknowledged as an error - return io.queue.front().datalen; -} - -//################################################################################################## - -IceTransportFactory::IceTransportFactory() - : cp_() - , pool_(nullptr, pj_pool_release) - , ice_cfg_() -{ - pj_caching_pool_init(&cp_, NULL, 0); - pool_.reset(pj_pool_create(&cp_.factory, "IceTransportFactory.pool", - 512, 512, NULL)); - if (not pool_) - throw std::runtime_error("pj_pool_create() failed"); - - pj_ice_strans_cfg_default(&ice_cfg_); - ice_cfg_.stun_cfg.pf = &cp_.factory; - - // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires - // a minimum of 500ms on fixed-line links. Our usual case is wireless links. - // This solves too long ICE exchange by DHT. - // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout. - ice_cfg_.stun_cfg.rto_msec = 500; - - // Add local hosts (IPv4, IPv6) as stun candidates - add_stun_server(ice_cfg_, pj_AF_INET6()); - add_stun_server(ice_cfg_, pj_AF_INET()); - - ice_cfg_.opt.aggressive = PJ_FALSE; -} - -IceTransportFactory::~IceTransportFactory() -{ - pool_.reset(); - pj_caching_pool_destroy(&cp_); -} - -std::shared_ptr<IceTransport> -IceTransportFactory::createTransport(const char* name, int component_count, - bool master, - const IceTransportOptions& options) -{ - try { - return std::make_shared<IceTransport>(name, component_count, master, options); - } catch(const std::exception& e) { - RING_ERR("%s",e.what()); - return nullptr; - } -} - -void -IceSocket::close() -{ - ice_transport_.reset(); -} - -ssize_t -IceSocket::recv(unsigned char* buf, size_t len) -{ - if (!ice_transport_.get()) - return -1; - return ice_transport_->recv(compId_, buf, len); -} - -ssize_t -IceSocket::send(const unsigned char* buf, size_t len) -{ - if (!ice_transport_.get()) - return -1; - return ice_transport_->send(compId_, buf, len); -} - -ssize_t -IceSocket::getNextPacketSize() const -{ - if (!ice_transport_.get()) - return -1; - return ice_transport_->getNextPacketSize(compId_); -} - -ssize_t -IceSocket::waitForData(unsigned int timeout) -{ - if (!ice_transport_.get()) - return -1; - - return ice_transport_->waitForData(compId_, timeout); -} - -void -IceSocket::setOnRecv(IceRecvCb cb) -{ - if (!ice_transport_.get()) - return; - return ice_transport_->setOnRecv(compId_, cb); -} - -} // namespace ring +/* + * Copyright (C) 2004-2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "ice_transport.h" +#include "ice_socket.h" +#include "logger.h" +#include "sip/sip_utils.h" +#include "manager.h" +#include "upnp/upnp_control.h" + +#include <pjlib.h> + +#include <utility> +#include <algorithm> +#include <sstream> +#include <chrono> +#include <thread> +#include <cerrno> + +#define TRY(ret) do { \ + if ((ret) != PJ_SUCCESS) \ + throw std::runtime_error(#ret " failed"); \ + } while (0) + +namespace ring { + +static constexpr unsigned STUN_MAX_PACKET_SIZE {8192}; + +// TODO: C++14 ? remove me and use std::min +template< class T > +static constexpr const T& min( const T& a, const T& b ) { + return (b < a) ? b : a; +} + +static void +register_thread() +{ + // We have to register the external thread so it could access the pjsip frameworks + if (!pj_thread_is_registered()) { +#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) || defined(WIN32_NATIVE) + static thread_local pj_thread_desc desc; + static thread_local pj_thread_t *this_thread; +#else + static __thread pj_thread_desc desc; + static __thread pj_thread_t *this_thread; +#endif + pj_thread_register(NULL, desc, &this_thread); + RING_DBG("Registered thread %p (0x%X)", this_thread, pj_getpid()); + } +} + +/** + * Add stun/turn servers or default host as candidates + */ +static void +add_stun_server(pj_ice_strans_cfg& cfg, int af) +{ + if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) + throw std::runtime_error("Too many STUN servers"); + auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; + + pj_ice_strans_stun_cfg_default(&stun); + stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; + stun.af = af; + + RING_DBG("[ice] added host stun server"); +} + +static void +add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info) +{ + if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN) + throw std::runtime_error("Too many STUN servers"); + + IpAddr ip {info.uri}; + + // Given URI cannot be DNS resolved or not IPv4 or IPv6? + // This prevents a crash into PJSIP when ip.toString() is called. + if (ip.getFamily() == AF_UNSPEC) { + RING_WARN("[ice] STUN server '%s' not used, unresolvable address", info.uri.c_str()); + return; + } + + auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++]; + pj_ice_strans_stun_cfg_default(&stun); + pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str()); + stun.af = ip.getFamily(); + stun.port = PJ_STUN_PORT; + stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; + + RING_DBG("[ice] added stun server '%s', port %d", pj_strbuf(&stun.server), stun.port); +} + +static void +add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info) +{ + if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN) + throw std::runtime_error("Too many TURN servers"); + + IpAddr ip {info.uri}; + + // Same comment as add_stun_server() + if (ip.getFamily() == AF_UNSPEC) { + RING_WARN("[ice] TURN server '%s' not used, unresolvable address", info.uri.c_str()); + return; + } + + auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++]; + pj_ice_strans_turn_cfg_default(&turn); + pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str()); + turn.af = ip.getFamily(); + turn.port = PJ_STUN_PORT; + turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE; + + // Authorization (only static plain password supported yet) + if (not info.password.empty()) { + turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; + turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + pj_cstr(&turn.auth_cred.data.static_cred.realm, info.realm.c_str()); + pj_cstr(&turn.auth_cred.data.static_cred.username, info.username.c_str()); + pj_cstr(&turn.auth_cred.data.static_cred.data, info.password.c_str()); + } + + RING_DBG("[ice] added turn server '%s', port %d", pj_strbuf(&turn.server), turn.port); +} + +//################################################################################################## + +IceTransport::Packet::Packet(void *pkt, pj_size_t size) + : data(new char[size]), datalen(size) +{ + std::copy_n(reinterpret_cast<char*>(pkt), size, data.get()); +} + +void +IceTransport::cb_on_rx_data(pj_ice_strans* ice_st, + unsigned comp_id, + void *pkt, pj_size_t size, + const pj_sockaddr_t* /*src_addr*/, + unsigned /*src_addr_len*/) +{ + if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st))) + tr->onReceiveData(comp_id, pkt, size); + else + RING_WARN("null IceTransport"); +} + +void +IceTransport::cb_on_ice_complete(pj_ice_strans* ice_st, + pj_ice_strans_op op, + pj_status_t status) +{ + if (auto tr = static_cast<IceTransport*>(pj_ice_strans_get_user_data(ice_st))) + tr->onComplete(ice_st, op, status); + else + RING_WARN("null IceTransport"); +} + + +IceTransport::IceTransport(const char* name, int component_count, bool master, + const IceTransportOptions& options) + : pool_(nullptr, pj_pool_release) + , on_initdone_cb_(options.onInitDone) + , on_negodone_cb_(options.onNegoDone) + , component_count_(component_count) + , compIO_(component_count) + , initiatorSession_(master) + , thread_() +{ + if (options.upnpEnable) + upnp_.reset(new upnp::Controller()); + + auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); + config_ = iceTransportFactory.getIceCfg(); // config copy + + pool_.reset(pj_pool_create(iceTransportFactory.getPoolFactory(), + "IceTransport.pool", 512, 512, NULL)); + if (not pool_) + throw std::runtime_error("pj_pool_create() failed"); + + pj_ice_strans_cb icecb; + pj_bzero(&icecb, sizeof(icecb)); + icecb.on_rx_data = cb_on_rx_data; + icecb.on_ice_complete = cb_on_ice_complete; + + // Add STUN servers + for (auto& server : options.stunServers) + add_stun_server(*pool_, config_, server); + + // Add TURN servers + for (auto& server : options.turnServers) + add_turn_server(*pool_, config_, server); + + static constexpr auto IOQUEUE_MAX_HANDLES = min(PJ_IOQUEUE_MAX_HANDLES, 64); + TRY( pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap) ); + TRY( pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue) ); + + pj_ice_strans* icest = nullptr; + pj_status_t status = pj_ice_strans_create(name, &config_, component_count, + this, &icecb, &icest); + + if (status != PJ_SUCCESS || icest == nullptr) { + throw std::runtime_error("pj_ice_strans_create() failed"); + } + + // Must be created after any potential failure + thread_ = std::thread([this]{ + register_thread(); + while (not threadTerminateFlags_) { + handleEvents(500); // limit polling to 500ms + } + }); +} + +IceTransport::~IceTransport() +{ + register_thread(); + + threadTerminateFlags_ = true; + if (thread_.joinable()) + thread_.join(); + + icest_.reset(); // must be done before ioqueue/timer destruction + + if (config_.stun_cfg.ioqueue) + pj_ioqueue_destroy(config_.stun_cfg.ioqueue); + + if (config_.stun_cfg.timer_heap) + pj_timer_heap_destroy(config_.stun_cfg.timer_heap); +} + +void +IceTransport::handleEvents(unsigned max_msec) +{ + // By tests, never seen more than two events per 500ms + static constexpr auto MAX_NET_EVENTS = 2; + + pj_time_val max_timeout = {0, 0}; + pj_time_val timeout = {0, 0}; + unsigned net_event_count = 0; + + max_timeout.msec = max_msec; + + timeout.sec = timeout.msec = 0; + pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout); + + // timeout limitation + if (timeout.msec >= 1000) + timeout.msec = 999; + if (PJ_TIME_VAL_GT(timeout, max_timeout)) + timeout = max_timeout; + + do { + auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout); + + // timeout + if (not n_events) + return; + + // error + if (n_events < 0) { + const auto err = pj_get_os_error(); + // Kept as debug as some errors are "normal" in regular context + last_errmsg_ = sip_utils::sip_strerror(err); + RING_DBG("IceIOQueue: error %d - %s", err, last_errmsg_.c_str()); + std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout))); + return; + } + + net_event_count += n_events; + timeout.sec = timeout.msec = 0; + } while (net_event_count < MAX_NET_EVENTS); +} + +void +IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, + pj_status_t status) +{ + const char *opname = + op == PJ_ICE_STRANS_OP_INIT ? "initialization" : + op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; + + const bool done = status == PJ_SUCCESS; + if (done) { + RING_DBG("ICE %s success", opname); + } + else { + last_errmsg_ = sip_utils::sip_strerror(status); + RING_ERR("ICE %s failed: %s", opname, last_errmsg_.c_str()); + } + + { + std::lock_guard<std::mutex> lk(iceMutex_); + if (!icest_.get()) + icest_.reset(ice_st); + } + + if (done and op == PJ_ICE_STRANS_OP_INIT) { + if (initiatorSession_) + setInitiatorSession(); + else + setSlaveSession(); + selectUPnPIceCandidates(); + } + + if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_) + on_initdone_cb_(*this, done); + else if (op == PJ_ICE_STRANS_OP_NEGOTIATION and on_negodone_cb_) + on_negodone_cb_(*this, done); + + // Unlock waitForXXX APIs + iceCV_.notify_all(); +} + +void +IceTransport::getUFragPwd() +{ + pj_str_t local_ufrag, local_pwd; + pj_ice_strans_get_ufrag_pwd(icest_.get(), &local_ufrag, &local_pwd, NULL, NULL); + local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen); + local_pwd_.assign(local_pwd.ptr, local_pwd.slen); +} + +std::string +IceTransport::getLastErrMsg() const +{ + return last_errmsg_; +} + +void +IceTransport::getDefaultCanditates() +{ + for (unsigned i=0; i < component_count_; ++i) + pj_ice_strans_get_def_cand(icest_.get(), i+1, &cand_[i]); +} + +bool +IceTransport::createIceSession(pj_ice_sess_role role) +{ + if (pj_ice_strans_init_ice(icest_.get(), role, nullptr, nullptr) != PJ_SUCCESS) { + RING_ERR("pj_ice_strans_init_ice() failed"); + return false; + } + + // Fetch some information on local configuration + getUFragPwd(); + getDefaultCanditates(); + RING_DBG("ICE [local] ufrag=%s, pwd=%s", local_ufrag_.c_str(), local_pwd_.c_str()); + return true; +} + +bool +IceTransport::setInitiatorSession() +{ + RING_DBG("ICE as master"); + initiatorSession_ = true; + if (isInitialized()) { + auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING); + if (status != PJ_SUCCESS) { + last_errmsg_ = sip_utils::sip_strerror(status); + RING_ERR("ICE role change failed: %s", last_errmsg_.c_str()); + return false; + } + return true; + } + return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING); +} + +bool +IceTransport::setSlaveSession() +{ + RING_DBG("ICE as slave"); + initiatorSession_ = false; + if (isInitialized()) { + auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED); + if (status != PJ_SUCCESS) { + last_errmsg_ = sip_utils::sip_strerror(status); + RING_ERR("ICE role change failed: %s", last_errmsg_.c_str()); + return false; + } + return true; + } + return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED); +} + +bool +IceTransport::isInitiator() const +{ + if (isInitialized()) + return pj_ice_strans_get_role(icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING; + return initiatorSession_; +} + +bool +IceTransport::start(const Attribute& rem_attrs, + const std::vector<IceCandidate>& rem_candidates) +{ + if (not isInitialized()) { + RING_ERR("ICE: not initialized transport"); + return false; + } + + // pj_ice_strans_start_ice crashes if remote candidates array is empty + if (rem_candidates.empty()) { + RING_ERR("ICE start failed: no remote candidates"); + return false; + } + + pj_str_t ufrag, pwd; + RING_DBG("ICE negotiation starting (%zu remote candidates)", rem_candidates.size()); + auto status = pj_ice_strans_start_ice(icest_.get(), + pj_cstr(&ufrag, rem_attrs.ufrag.c_str()), + pj_cstr(&pwd, rem_attrs.pwd.c_str()), + rem_candidates.size(), + rem_candidates.data()); + if (status != PJ_SUCCESS) { + last_errmsg_ = sip_utils::sip_strerror(status); + RING_ERR("ICE start failed: %s", last_errmsg_.c_str()); + return false; + } + return true; +} + +std::string +IceTransport::unpackLine(std::vector<uint8_t>::const_iterator& begin, + std::vector<uint8_t>::const_iterator& end) +{ + if (std::distance(begin, end) <= 0) + return {}; + + // Search for EOL + std::vector<uint8_t>::const_iterator line_end(begin); + while (line_end != end && *line_end != NEW_LINE && *line_end) + ++line_end; + + if (std::distance(begin, line_end) <= 0) + return {}; + + std::string str(begin, line_end); + + // Consume the new line character + if (std::distance(line_end, end) > 0) + ++line_end; + + begin = line_end; + return str; +} + +bool +IceTransport::start(const std::vector<uint8_t>& rem_data) +{ + auto begin = rem_data.cbegin(); + auto end = rem_data.cend(); + auto rem_ufrag = unpackLine(begin, end); + auto rem_pwd = unpackLine(begin, end); + if (rem_pwd.empty() or rem_pwd.empty()) { + RING_ERR("ICE remote attributes parsing error"); + return false; + } + std::vector<IceCandidate> rem_candidates; + try { + while (true) { + IceCandidate candidate; + const auto line = unpackLine(begin, end); + if (line.empty()) + break; + if (getCandidateFromSDP(line, candidate)) + rem_candidates.push_back(candidate); + } + } catch (std::exception& e) { + RING_ERR("ICE remote candidates parsing error"); + return false; + } + return start({rem_ufrag, rem_pwd}, rem_candidates); +} + +bool +IceTransport::stop() +{ + if (isStarted()) { + auto status = pj_ice_strans_stop_ice(icest_.get()); + if (status != PJ_SUCCESS) { + last_errmsg_ = sip_utils::sip_strerror(status); + RING_ERR("ICE stop failed: %s", last_errmsg_.c_str()); + return false; + } + } + return true; +} + +bool +IceTransport::_isInitialized() const +{ + if (auto icest = icest_.get()) { + auto state = pj_ice_strans_get_state(icest); + return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED; + } + return false; +} + +bool +IceTransport::_isStarted() const +{ + if (auto icest = icest_.get()) { + auto state = pj_ice_strans_get_state(icest); + return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED; + } + return false; +} + +bool +IceTransport::_isRunning() const +{ + if (auto icest = icest_.get()) { + auto state = pj_ice_strans_get_state(icest); + return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED; + } + return false; +} + +bool +IceTransport::_isFailed() const +{ + if (auto icest = icest_.get()) + return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED; + return false; +} + +IpAddr +IceTransport::getLocalAddress(unsigned comp_id) const +{ + // Return the local IP of negotiated connection pair + if (isRunning()) { + if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) { + return sess->lcand->addr; + } + RING_WARN("Non-negotiated transport: try to return default local IP"); + } + + // Return the default IP (could be not nominated and valid after negotiation) + if (isInitialized()) + return cand_[comp_id].addr; + + RING_ERR("bad call: non-initialized transport"); + return {}; +} + +IpAddr +IceTransport::getRemoteAddress(unsigned comp_id) const +{ + // Return the remote IP of negotiated connection pair + if (isRunning()) { + if (auto sess = pj_ice_strans_get_valid_pair(icest_.get(), comp_id+1)) + return sess->rcand->addr; + RING_ERR("runtime error: negotiated transport without valid pair"); + } + + RING_ERR("bad call: non-negotiated transport"); + return {}; +} + +const IceTransport::Attribute +IceTransport::getLocalAttributes() const +{ + return {local_ufrag_, local_pwd_}; +} + +std::vector<std::string> +IceTransport::getLocalCandidates(unsigned comp_id) const +{ + std::vector<std::string> res; + pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)]; + unsigned cand_cnt = PJ_ARRAY_SIZE(cand); + + if (pj_ice_strans_enum_cands(icest_.get(), comp_id+1, &cand_cnt, cand) != PJ_SUCCESS) { + RING_ERR("pj_ice_strans_enum_cands() failed"); + return res; + } + + for (unsigned i=0; i<cand_cnt; ++i) { + std::ostringstream val; + char ipaddr[PJ_INET6_ADDRSTRLEN]; + + val << std::string(cand[i].foundation.ptr, cand[i].foundation.slen); + val << " " << (unsigned)cand[i].comp_id << " UDP " << cand[i].prio; + val << " " << pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0); + val << " " << (unsigned)pj_sockaddr_get_port(&cand[i].addr); + val << " typ " << pj_ice_get_cand_type_name(cand[i].type); + + res.push_back(val.str()); + } + + return res; +} + +std::vector<IpAddr> +IceTransport::getLocalCandidatesAddr(unsigned comp_id) const +{ + std::vector<IpAddr> cand_addrs; + pj_ice_sess_cand cand[PJ_ARRAY_SIZE(cand_)]; + unsigned cand_cnt = PJ_ARRAY_SIZE(cand); + + if (pj_ice_strans_enum_cands(icest_.get(), comp_id, &cand_cnt, cand) != PJ_SUCCESS) { + RING_ERR("pj_ice_strans_enum_cands() failed"); + return cand_addrs; + } + + for (unsigned i=0; i<cand_cnt; ++i) + cand_addrs.push_back(cand[i].addr); + + return cand_addrs; +} + +bool +IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP) +{ + if (not isInitialized()) + return false; + + // Register only if no NAT traversal methods exists + if (upnp_ or config_.stun.port > 0 or config_.turn.port > 0) + return false; + + // Find the local candidate corresponding to local host, + // then register a rflx candidate using given public address + // and this local address as base. It's port is used for both address + // even if on the public side it have strong probabilities to not exist. + // But as this candidate is made after initialization, it's not used during + // negotiation, only to exchanged candidates between peers. + auto localIP = ip_utils::getLocalAddr(); + auto pubIP = publicIP; + for (const auto& addr : getLocalCandidatesAddr(compId)) { + auto port = addr.getPort(); + localIP.setPort(port); + if (addr != localIP) + continue; + pubIP.setPort(port); + addReflectiveCandidate(compId, addr, pubIP); + return true; + } + return false; +} + +void +IceTransport::addReflectiveCandidate(int comp_id, const IpAddr& base, const IpAddr& addr) +{ + pj_ice_sess_cand cand; + + cand.type = PJ_ICE_CAND_TYPE_SRFLX; + cand.status = PJ_EPENDING; /* not used */ + cand.comp_id = comp_id; + cand.transport_id = 1; /* 1 = STUN */ + cand.local_pref = 65535; /* host */ + /* cand.foundation = ? */ + /* cand.prio = calculated by ice session */ + /* make base and addr the same since we're not going through a server */ + pj_sockaddr_cp(&cand.base_addr, base.pjPtr()); + pj_sockaddr_cp(&cand.addr, addr.pjPtr()); + pj_sockaddr_cp(&cand.rel_addr, &cand.base_addr); + pj_ice_calc_foundation(pool_.get(), &cand.foundation, cand.type, &cand.base_addr); + + auto ret = pj_ice_sess_add_cand(pj_ice_strans_get_ice_sess(icest_.get()), + cand.comp_id, + cand.transport_id, + cand.type, + cand.local_pref, + &cand.foundation, + &cand.addr, + &cand.base_addr, + &cand.rel_addr, + pj_sockaddr_get_len(&cand.addr), + NULL); + + if (ret != PJ_SUCCESS) { + last_errmsg_ = sip_utils::sip_strerror(ret); + RING_ERR("pj_ice_sess_add_cand failed with error %d: %s", ret, + last_errmsg_.c_str()); + RING_ERR("failed to add candidate for comp_id=%d : %s : %s", comp_id, + base.toString().c_str(), addr.toString().c_str()); + } else { + RING_DBG("succeed to add candidate for comp_id=%d : %s : %s", comp_id, + base.toString().c_str(), addr.toString().c_str()); + } +} + +void +IceTransport::selectUPnPIceCandidates() +{ + /* use upnp to open ports and add the proper candidates */ + if (upnp_) { + /* for every component, get the candidate(s) + * create a port mapping either with that port, or with an available port + * add candidate with that port and public IP + */ + if (auto publicIP = upnp_->getExternalIP()) { + /* comp_id start at 1 */ + for (unsigned comp_id = 1; comp_id <= component_count_; ++comp_id) { + RING_DBG("UPnP: Opening port(s) for ICE comp %d and adding candidate with public IP", + comp_id); + auto candidates = getLocalCandidatesAddr(comp_id); + for (IpAddr addr : candidates) { + auto localIP = upnp_->getLocalIP(); + localIP.setPort(addr.getPort()); + if (addr != localIP) + continue; + uint16_t port = addr.getPort(); + uint16_t port_used; + if (upnp_->addAnyMapping(port, upnp::PortType::UDP, true, &port_used)) { + publicIP.setPort(port_used); + addReflectiveCandidate(comp_id, addr, publicIP); + } else + RING_WARN("UPnP: Could not create a port mapping for the ICE candide"); + } + } + } else { + RING_WARN("UPnP: Could not determine public IP for ICE candidates"); + } + } +} + +std::vector<uint8_t> +IceTransport::getLocalAttributesAndCandidates() const +{ + if (not isInitialized()) + return {}; + + std::stringstream ss; + ss << local_ufrag_ << NEW_LINE; + ss << local_pwd_ << NEW_LINE; + for (unsigned i=0; i<component_count_; i++) { + const auto& candidates = getLocalCandidates(i); + for (const auto& c : candidates) + ss << c << NEW_LINE; + } + auto str(ss.str()); + return std::vector<uint8_t>(str.begin(), str.end()); +} + +void +IceTransport::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size) +{ + if (!comp_id or comp_id > component_count_) { + RING_ERR("rx: invalid comp_id (%u)", comp_id); + return; + } + if (!size) + return; + auto& io = compIO_[comp_id-1]; + std::lock_guard<std::mutex> lk(io.mutex); + if (io.cb) { + io.cb((uint8_t*)pkt, size); + } else { + io.queue.emplace_back(pkt, size); + io.cv.notify_one(); + } +} + +bool +IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) +{ + char foundation[33], transport[13], ipaddr[81], type[33]; + pj_str_t tmpaddr; + int af, comp_id, prio, port; + int cnt = sscanf(line.c_str(), "%32s %d %12s %d %80s %d typ %32s", + foundation, + &comp_id, + transport, + &prio, + ipaddr, + &port, + type); + + if (cnt != 7) { + RING_WARN("ICE: invalid remote candidate line"); + return false; + } + + pj_bzero(&cand, sizeof(IceCandidate)); + + if (strcmp(type, "host")==0) + cand.type = PJ_ICE_CAND_TYPE_HOST; + else if (strcmp(type, "srflx")==0) + cand.type = PJ_ICE_CAND_TYPE_SRFLX; + else if (strcmp(type, "relay")==0) + cand.type = PJ_ICE_CAND_TYPE_RELAYED; + else { + RING_WARN("ICE: invalid remote candidate type '%s'", type); + return false; + } + + cand.comp_id = (pj_uint8_t)comp_id; + cand.prio = prio; + + if (strchr(ipaddr, ':')) + af = pj_AF_INET6(); + else + af = pj_AF_INET(); + + tmpaddr = pj_str(ipaddr); + pj_sockaddr_init(af, &cand.addr, NULL, 0); + auto status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr); + if (status != PJ_SUCCESS) { + RING_ERR("ICE: invalid remote IP address '%s'", ipaddr); + return false; + } + + pj_sockaddr_set_port(&cand.addr, (pj_uint16_t)port); + pj_strdup2(pool_.get(), &cand.foundation, foundation); + + return true; +} + +ssize_t +IceTransport::recv(int comp_id, unsigned char* buf, size_t len) +{ + register_thread(); + auto& io = compIO_[comp_id]; + std::lock_guard<std::mutex> lk(io.mutex); + + if (io.queue.empty()) + return 0; + + auto& packet = io.queue.front(); + const auto count = std::min(len, packet.datalen); + std::copy_n(packet.data.get(), count, buf); + io.queue.pop_front(); + + return count; +} + +void +IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) +{ + auto& io = compIO_[comp_id]; + std::lock_guard<std::mutex> lk(io.mutex); + io.cb = cb; + + if (cb) { + // Flush existing queue using the callback + for (const auto& packet : io.queue) + io.cb((uint8_t*)packet.data.get(), packet.datalen); + io.queue.clear(); + } +} + +ssize_t +IceTransport::send(int comp_id, const unsigned char* buf, size_t len) +{ + register_thread(); + auto remote = getRemoteAddress(comp_id); + if (!remote) { + RING_ERR("Can't find remote address for component %d", comp_id); + errno = EINVAL; + return -1; + } + auto status = pj_ice_strans_sendto(icest_.get(), comp_id+1, buf, len, remote.pjPtr(), remote.getLength()); + if (status != PJ_SUCCESS) { + if (status == PJ_EBUSY) { + errno = EAGAIN; + } else { + last_errmsg_ = sip_utils::sip_strerror(status); + RING_ERR("ice send failed: %s", last_errmsg_.c_str()); + errno = EIO; + } + return -1; + } + + return len; +} + +ssize_t +IceTransport::getNextPacketSize(int comp_id) +{ + auto& io = compIO_[comp_id]; + std::lock_guard<std::mutex> lk(io.mutex); + if (io.queue.empty()) { + return 0; + } + return io.queue.front().datalen; +} + +int +IceTransport::waitForInitialization(unsigned timeout) +{ + std::unique_lock<std::mutex> lk(iceMutex_); + if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), + [this]{ return _isInitialized() or _isFailed(); })) { + RING_WARN("waitForInitialization: timeout"); + return -1; + } + return not _isFailed(); +} + +int +IceTransport::waitForNegotiation(unsigned timeout) +{ + std::unique_lock<std::mutex> lk(iceMutex_); + if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), + [this]{ return _isRunning() or _isFailed(); })) { + RING_WARN("waitForIceNegotiation: timeout"); + return -1; + } + return not _isFailed(); +} + +ssize_t +IceTransport::waitForData(int comp_id, unsigned int timeout) +{ + auto& io = compIO_[comp_id]; + std::unique_lock<std::mutex> lk(io.mutex); + if (!io.cv.wait_for(lk, std::chrono::milliseconds(timeout), + [this, &io]{ return !io.queue.empty() or !isRunning(); })) { + return 0; + } + if (!isRunning()) + return -1; // acknowledged as an error + return io.queue.front().datalen; +} + +//################################################################################################## + +IceTransportFactory::IceTransportFactory() + : cp_() + , pool_(nullptr, pj_pool_release) + , ice_cfg_() +{ + pj_caching_pool_init(&cp_, NULL, 0); + pool_.reset(pj_pool_create(&cp_.factory, "IceTransportFactory.pool", + 512, 512, NULL)); + if (not pool_) + throw std::runtime_error("pj_pool_create() failed"); + + pj_ice_strans_cfg_default(&ice_cfg_); + ice_cfg_.stun_cfg.pf = &cp_.factory; + + // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires + // a minimum of 500ms on fixed-line links. Our usual case is wireless links. + // This solves too long ICE exchange by DHT. + // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout. + ice_cfg_.stun_cfg.rto_msec = 500; + + // Add local hosts (IPv4, IPv6) as stun candidates + add_stun_server(ice_cfg_, pj_AF_INET6()); + add_stun_server(ice_cfg_, pj_AF_INET()); + + ice_cfg_.opt.aggressive = PJ_FALSE; +} + +IceTransportFactory::~IceTransportFactory() +{ + pool_.reset(); + pj_caching_pool_destroy(&cp_); +} + +std::shared_ptr<IceTransport> +IceTransportFactory::createTransport(const char* name, int component_count, + bool master, + const IceTransportOptions& options) +{ + try { + return std::make_shared<IceTransport>(name, component_count, master, options); + } catch(const std::exception& e) { + RING_ERR("%s",e.what()); + return nullptr; + } +} + +void +IceSocket::close() +{ + ice_transport_.reset(); +} + +ssize_t +IceSocket::recv(unsigned char* buf, size_t len) +{ + if (!ice_transport_.get()) + return -1; + return ice_transport_->recv(compId_, buf, len); +} + +ssize_t +IceSocket::send(const unsigned char* buf, size_t len) +{ + if (!ice_transport_.get()) + return -1; + return ice_transport_->send(compId_, buf, len); +} + +ssize_t +IceSocket::getNextPacketSize() const +{ + if (!ice_transport_.get()) + return -1; + return ice_transport_->getNextPacketSize(compId_); +} + +ssize_t +IceSocket::waitForData(unsigned int timeout) +{ + if (!ice_transport_.get()) + return -1; + + return ice_transport_->waitForData(compId_, timeout); +} + +void +IceSocket::setOnRecv(IceRecvCb cb) +{ + if (!ice_transport_.get()) + return; + return ice_transport_->setOnRecv(compId_, cb); +} + +} // namespace ring diff --git a/src/ice_transport.h b/src/ice_transport.h index 843cb7b419..1b635c2c42 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -52,30 +52,30 @@ using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>; #endif using IceCandidate = pj_ice_sess_cand; -struct StunServerInfo { - StunServerInfo& setUri(const std::string& args) { uri = args; return *this; } - - std::string uri; // server URI, mandatory -}; - -struct TurnServerInfo { - TurnServerInfo& setUri(const std::string& args) { uri = args; return *this; } - TurnServerInfo& setUsername(const std::string& args) { username = args; return *this; } - TurnServerInfo& setPassword(const std::string& args) { password = args; return *this; } - TurnServerInfo& setRealm(const std::string& args) { realm = args; return *this; } - - std::string uri; // server URI, mandatory - std::string username; // credentials username (optional, empty if not used) - std::string password; // credentials password (optional, empty if not used) - std::string realm; // credentials realm (optional, empty if not used) -}; - -struct IceTransportOptions { - bool upnpEnable {false}; - IceTransportCompleteCb onInitDone {}; - IceTransportCompleteCb onNegoDone {}; - std::vector<StunServerInfo> stunServers; - std::vector<TurnServerInfo> turnServers; +struct StunServerInfo { + StunServerInfo& setUri(const std::string& args) { uri = args; return *this; } + + std::string uri; // server URI, mandatory +}; + +struct TurnServerInfo { + TurnServerInfo& setUri(const std::string& args) { uri = args; return *this; } + TurnServerInfo& setUsername(const std::string& args) { username = args; return *this; } + TurnServerInfo& setPassword(const std::string& args) { password = args; return *this; } + TurnServerInfo& setRealm(const std::string& args) { realm = args; return *this; } + + std::string uri; // server URI, mandatory + std::string username; // credentials username (optional, empty if not used) + std::string password; // credentials password (optional, empty if not used) + std::string realm; // credentials realm (optional, empty if not used) +}; + +struct IceTransportOptions { + bool upnpEnable {false}; + IceTransportCompleteCb onInitDone {}; + IceTransportCompleteCb onNegoDone {}; + std::vector<StunServerInfo> stunServers; + std::vector<TurnServerInfo> turnServers; }; class IceTransport { diff --git a/src/manager.cpp b/src/manager.cpp index f5dd8dc0e3..eca8d723f7 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -125,6 +125,34 @@ restore_backup(const std::string &path) copy_over(backup_path, path); } +/** + * Set OpenDHT's log level based on the DHTLOGLEVEL environment variable. + * DHTLOGLEVEL = 0 minimum logging (=disable) + * DHTLOGLEVEL = 1 (=ERROR only) + * DHTLOGLEVEL = 2 (+=WARN) + * DHTLOGLEVEL = 3 maximum logging (+=DEBUG) + */ + +/** Environment variable used to set OpenDHT's logging level */ +static constexpr const char* DHTLOGLEVEL = "DHTLOGLEVEL"; + +static void +setDhtLogLevel() +{ + char* envvar = getenv(DHTLOGLEVEL); + int level = 0; + + if (envvar != nullptr) { + if (not (std::istringstream(envvar) >> level)) + level = 0; + + // From 0 (min) to 3 (max) + level = std::max(0, std::min(level, 3)); + RING_DBG("DHTLOGLEVEL=%u", level); + } + Manager::instance().dhtLogLevel = level; +} + /** * Set pjsip's log level based on the SIPLOGLEVEL environment variable. * SIPLOGLEVEL = 0 minimum logging @@ -288,6 +316,8 @@ Manager::init(const std::string &config_file) setGnuTlsLogLevel(); RING_DBG("GNU TLS version %s initialized", gnutls_check_version(nullptr)); + setDhtLogLevel(); + ice_tf_.reset(new IceTransportFactory()); path_ = config_file.empty() ? retrieveConfigPath() : config_file; diff --git a/src/manager.h b/src/manager.h index 54c17a736c..2734334847 100644 --- a/src/manager.h +++ b/src/manager.h @@ -1015,6 +1015,8 @@ class Manager { VideoManager& getVideoManager() const { return *videoManager_; } #endif // RING_VIDEO + std::atomic<unsigned> dhtLogLevel {0}; // default = disable + private: NON_COPYABLE(Manager); diff --git a/src/media/video/v4l2/vaapi.cpp b/src/media/video/v4l2/vaapi.cpp new file mode 100644 index 0000000000..003d1d5cab --- /dev/null +++ b/src/media/video/v4l2/vaapi.cpp @@ -0,0 +1,249 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Philippe Gorley <philippe.gorley@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "libav_deps.h" // MUST BE INCLUDED FIRST + +#include "config.h" + +#if defined(RING_VIDEO) && defined(RING_ACCEL) + +#include "video/v4l2/vaapi.h" +#include "video/accel.h" + +#include "fileutils.h" + +#include <sstream> +#include <stdexcept> +#include <map> +#include <algorithm> +#include <vector> + +#include "logger.h" + +namespace ring { namespace video { + +static auto avBufferRefDeleter = [](AVBufferRef* buf){ av_buffer_unref(&buf); }; + +VaapiAccel::VaapiAccel(AccelInfo info) : HardwareAccel(info) + , deviceBufferRef_(nullptr, avBufferRefDeleter) + , framesBufferRef_(nullptr, avBufferRefDeleter) +{ +} + +VaapiAccel::~VaapiAccel() +{ +} + +int +VaapiAccel::allocateBuffer(AVCodecContext* codecCtx, AVFrame* frame, int flags) +{ + return av_hwframe_get_buffer(framesBufferRef_.get(), frame, 0); +} + +bool +VaapiAccel::extractData(AVCodecContext* codecCtx, VideoFrame& container) +{ + try { + auto input = container.pointer(); + + if (input->format != format_) { + std::stringstream buf; + buf << "Frame format mismatch: expected " << av_get_pix_fmt_name(format_); + buf << ", got " << av_get_pix_fmt_name((AVPixelFormat)input->format); + throw std::runtime_error(buf.str()); + } + + auto outContainer = new VideoFrame(); + auto output = outContainer->pointer(); + output->format = AV_PIX_FMT_YUV420P; + + if (av_hwframe_transfer_data(output, input, 0) < 0) { + throw std::runtime_error("Unable to extract data from VAAPI frame"); + } + + if (av_frame_copy_props(output, input) < 0 ) { + av_frame_unref(output); + } + + av_frame_unref(input); + av_frame_move_ref(input, output); + } catch (const std::runtime_error& e) { + fail(codecCtx, false); + RING_ERR("%s", e.what()); + return false; + } + + succeed(); + return true; +} + +bool +VaapiAccel::init(AVCodecContext* codecCtx) +{ +#ifdef HAVE_VAAPI_ACCEL_DRM + // try all possible devices, use first one that works + const std::string path = "/dev/dri/"; + for (auto& entry : ring::fileutils::readDirectory(path)) { + // a drm device is either a card or a render node, check both + const std::string prefixCard = "card"; + if (!entry.compare(0, prefixCard.size(), prefixCard.c_str())) + if (open(codecCtx, path + entry)) + return true; + + const std::string prefixNode = "renderD"; + if (!entry.compare(0, prefixNode.size(), prefixNode.c_str())) + if (open(codecCtx, path + entry)) + return true; + } + return false; +#elif HAVE_VAAPI_ACCEL_X11 + return open(codecCtx, ":0"); // this is the default x11 device +#endif +} + +bool +VaapiAccel::open(AVCodecContext* codecCtx, std::string deviceName) +{ + vaProfile_ = VAProfileNone; + vaEntryPoint_ = VAEntrypointVLD; + using ProfileMap = std::map<int, VAProfile>; + ProfileMap h264 = { + { FF_PROFILE_H264_CONSTRAINED_BASELINE, VAProfileH264ConstrainedBaseline }, + { FF_PROFILE_H264_BASELINE, VAProfileH264Baseline }, + { FF_PROFILE_H264_MAIN, VAProfileH264Main }, + { FF_PROFILE_H264_HIGH, VAProfileH264High } + }; + ProfileMap mpeg4 = { + { FF_PROFILE_MPEG4_SIMPLE, VAProfileMPEG4Simple }, + { FF_PROFILE_MPEG4_ADVANCED_SIMPLE, VAProfileMPEG4AdvancedSimple }, + { FF_PROFILE_MPEG4_MAIN, VAProfileMPEG4Main } + }; + ProfileMap h263 = { + { FF_PROFILE_UNKNOWN, VAProfileH263Baseline } + }; + + std::map<int, ProfileMap> profileMap = { + { AV_CODEC_ID_H264, h264 }, + { AV_CODEC_ID_MPEG4, mpeg4 }, + { AV_CODEC_ID_H263, h263 }, + { AV_CODEC_ID_H263P, h263 } + }; + + VAStatus status; + AVBufferRef* hardwareDeviceCtx; + if (av_hwdevice_ctx_create(&hardwareDeviceCtx, AV_HWDEVICE_TYPE_VAAPI, deviceName.c_str(), nullptr, 0) < 0) { + RING_ERR("Failed to create VAAPI device using %s", deviceName.c_str()); + av_buffer_unref(&hardwareDeviceCtx); + return false; + } + + deviceBufferRef_.reset(av_buffer_ref(hardwareDeviceCtx)); + + auto device = reinterpret_cast<AVHWDeviceContext*>(deviceBufferRef_->data); + vaConfig_ = VA_INVALID_ID; + vaContext_ = VA_INVALID_ID; + auto hardwareContext = static_cast<AVVAAPIDeviceContext*>(device->hwctx); + + int numProfiles = vaMaxNumProfiles(hardwareContext->display); + auto profiles = std::vector<VAProfile>(numProfiles); + status = vaQueryConfigProfiles(hardwareContext->display, profiles.data(), &numProfiles); + if (status != VA_STATUS_SUCCESS) { + RING_ERR("Failed to query profiles: %s", vaErrorStr(status)); + return false; + } + + VAProfile codecProfile; + auto itOuter = profileMap.find(codecCtx->codec_id); + if (itOuter != profileMap.end()) { + auto innerMap = itOuter->second; + auto itInner = innerMap.find(codecCtx->profile); + if (itInner != innerMap.end()) { + codecProfile = itInner->second; + } + } + + auto iter = std::find_if(std::begin(profiles), + std::end(profiles), + [codecProfile](const VAProfile& p){ return p == codecProfile; }); + + if (iter == std::end(profiles)) { + RING_ERR("VAAPI does not support selected codec"); + return false; + } + + vaProfile_ = *iter; + + status = vaCreateConfig(hardwareContext->display, vaProfile_, vaEntryPoint_, 0, 0, &vaConfig_); + if (status != VA_STATUS_SUCCESS) { + RING_ERR("Failed to create VAAPI configuration: %s", vaErrorStr(status)); + return false; + } + + auto hardwareConfig = static_cast<AVVAAPIHWConfig*>(av_hwdevice_hwconfig_alloc(deviceBufferRef_.get())); + hardwareConfig->config_id = vaConfig_; + + auto constraints = av_hwdevice_get_hwframe_constraints(deviceBufferRef_.get(), hardwareConfig); + if (width_ < constraints->min_width + || width_ > constraints->max_width + || height_ < constraints->min_height + || height_ > constraints->max_height) { + av_hwframe_constraints_free(&constraints); + av_freep(&hardwareConfig); + RING_ERR("Hardware does not support image size with VAAPI: %dx%d", width_, height_); + return false; + } + + int numSurfaces = 16; // based on codec instead? + if (codecCtx->active_thread_type & FF_THREAD_FRAME) + numSurfaces += codecCtx->thread_count; // need extra surface per thread + + framesBufferRef_.reset(av_hwframe_ctx_alloc(deviceBufferRef_.get())); + auto frames = reinterpret_cast<AVHWFramesContext*>(framesBufferRef_->data); + frames->format = AV_PIX_FMT_VAAPI; + frames->sw_format = AV_PIX_FMT_YUV420P; + frames->width = width_; + frames->height = height_; + frames->initial_pool_size = numSurfaces; + + if (av_hwframe_ctx_init(framesBufferRef_.get()) < 0) { + RING_ERR("Failed to initialize VAAPI frame context"); + return false; + } + + auto framesContext = static_cast<AVVAAPIFramesContext*>(frames->hwctx); + status = vaCreateContext(hardwareContext->display, vaConfig_, width_, height_, + VA_PROGRESSIVE, framesContext->surface_ids, framesContext->nb_surfaces, &vaContext_); + if (status != VA_STATUS_SUCCESS) { + RING_ERR("Failed to create VAAPI context: %s", vaErrorStr(status)); + return false; + } + + RING_DBG("VAAPI decoder initialized via device: %s", deviceName.c_str()); + + ffmpegAccelCtx_.display = hardwareContext->display; + ffmpegAccelCtx_.config_id = vaConfig_; + ffmpegAccelCtx_.context_id = vaContext_; + codecCtx->hwaccel_context = (void*)&ffmpegAccelCtx_; + return true; +} + +}} + +#endif // defined(RING_VIDEO) && defined(RING_ACCEL) diff --git a/src/media/video/v4l2/vaapi.h b/src/media/video/v4l2/vaapi.h new file mode 100644 index 0000000000..249422f1b4 --- /dev/null +++ b/src/media/video/v4l2/vaapi.h @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Philippe Gorley <philippe.gorley@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "libav_deps.h" // MUST BE INCLUDED FIRST + +#include "config.h" + +#if defined(RING_VIDEO) && defined(RING_ACCEL) + +extern "C" { +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +#include <va/va.h> +#ifdef HAVE_VAAPI_ACCEL_DRM +# include <va/va_drm.h> +#endif +#ifdef HAVE_VAAPI_ACCEL_X11 +# include <va/va_x11.h> +#endif + +#include <libavutil/avconfig.h> +#include <libavutil/buffer.h> +#include <libavutil/frame.h> +#include <libavutil/hwcontext.h> +#include <libavutil/hwcontext_vaapi.h> + +#include <libavcodec/vaapi.h> +} + +#include "video/accel.h" + +#include <memory> +#include <functional> + +namespace ring { namespace video { + +class VaapiAccel : public HardwareAccel { + public: + VaapiAccel(AccelInfo info); + ~VaapiAccel(); + + bool init(AVCodecContext* codecCtx) override; + int allocateBuffer(AVCodecContext* codecCtx, AVFrame* frame, int flags) override; + bool extractData(AVCodecContext* codecCtx, VideoFrame& container) override; + + private: + using AVBufferRefPtr = std::unique_ptr<AVBufferRef, std::function<void(AVBufferRef*)>>; + AVBufferRefPtr deviceBufferRef_; + AVBufferRefPtr framesBufferRef_; + + VAProfile vaProfile_; + VAEntrypoint vaEntryPoint_; + VAConfigID vaConfig_; + VAContextID vaContext_; + + struct vaapi_context ffmpegAccelCtx_; + + bool open(AVCodecContext* codecCtx, std::string deviceName); +}; + +}} // namespace ring::video + +#endif // defined(RING_VIDEO) && defined(RING_ACCEL) diff --git a/src/media/video/video_input.cpp b/src/media/video/video_input.cpp index c33bb016f9..1ff1b091a1 100644 --- a/src/media/video/video_input.cpp +++ b/src/media/video/video_input.cpp @@ -111,6 +111,15 @@ void VideoInput::processAndroid() std::unique_lock<std::mutex> lck(mutex_); frame_cv_.wait(lck, [this] { return waitForBufferFull(); }); + std::weak_ptr<VideoInput> wthis; + // shared_from_this throws in destructor + // assumes C++17 + try { + wthis = shared_from_this(); + } catch (...) { + return; + } + for (auto& buffer : buffers_) { if (buffer.status == BUFFER_FULL && buffer.index == publish_index_) { auto& frame = getNewFrame(); @@ -118,7 +127,12 @@ void VideoInput::processAndroid() buffer.status = BUFFER_PUBLISHED; frame.setFromMemory((uint8_t*)buffer.data, format, decOpts_.width, decOpts_.height, - std::bind(&VideoInput::releaseBufferCb, this, std::placeholders::_1)); + [wthis](uint8_t* ptr) { + if (auto sthis = wthis.lock()) + sthis->releaseBufferCb(ptr); + else + std::free(ptr); + }); publish_index_++; lck.unlock(); publishFrame(); diff --git a/src/ringdht/Makefile.am b/src/ringdht/Makefile.am index 4654df9a73..ecbc496385 100644 --- a/src/ringdht/Makefile.am +++ b/src/ringdht/Makefile.am @@ -11,9 +11,7 @@ libringacc_la_CXXFLAGS = @CXXFLAGS@ @JSONCPP_CFLAGS@ libringacc_la_LIBADD = $(DHT_LIBS) \ $(BOOST_SYSTEM_LIB) \ - $(BOOST_FILESYSTEM_LIB) \ $(BOOST_RANDOM_LIB) \ - $(BOOST_THREAD_LIB) \ ./eth/libdevcore/libdevcore.la \ ./eth/libdevcrypto/libdevcrypto.la @@ -24,3 +22,9 @@ libringacc_la_SOURCES = \ sip_transport_ice.h \ sips_transport_ice.cpp \ sips_transport_ice.h + +if RINGNS +libringacc_la_SOURCES += \ + namedirectory.cpp \ + namedirectory.h +endif diff --git a/src/ringdht/namedirectory.cpp b/src/ringdht/namedirectory.cpp new file mode 100644 index 0000000000..0f6994948d --- /dev/null +++ b/src/ringdht/namedirectory.cpp @@ -0,0 +1,261 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "namedirectory.h" + +#include "logger.h" +#include "string_utils.h" +#include "thread_pool.h" + +#include <json/json.h> +#include <restbed> + +/* for visual studio */ +#include <ciso646> +#include <sstream> +#include <regex> + +namespace ring { + +constexpr const char* const QUERY_NAME {"/name/"}; +constexpr const char* const QUERY_ADDR {"/addr/"}; +const std::regex NAME_VALIDATOR {"^[a-z0-9-_]{3,32}$"}; +const std::regex URI_VALIDATOR {"^(:?[a-zA-Z]+://)?([a-zA-Z0-9\\-._~%!$&'()*+,;=:\\[\\]]+)"}; + +constexpr size_t MAX_RESPONSE_SIZE {1024 * 1024}; + +std::string hostFromUri(const std::string& uri) +{ + std::smatch pieces_match; + if (std::regex_search(uri, pieces_match, URI_VALIDATOR)) + if (pieces_match.size() == 3) + return pieces_match[2].str(); + return uri; +} + +NameDirectory::NameDirectory(const std::string& s) : serverUri_(s), serverHost_(hostFromUri(s)) +{} + +NameDirectory& NameDirectory::instance(const std::string& server) +{ + const std::string& s = server.empty() ? DEFAULT_SERVER_URI : server; + static std::map<std::string, NameDirectory> instances {}; + auto it = instances.emplace(s, NameDirectory{s}); + return it.first->second; +} + +void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb) +{ + auto cacheRes = nameCache_.find(addr); + if (cacheRes != nameCache_.end()) { + cb(cacheRes->second, Response::found); + return; + } + + restbed::Uri uri(serverUri_ + QUERY_ADDR + addr); + auto req = std::make_shared<restbed::Request>(uri); + req->set_header("Accept", "*/*"); + req->set_header("Host", serverHost_); + + RING_DBG("Address lookup for %s: %s", addr.c_str(), uri.to_string().c_str()); + + auto ret = restbed::Http::async(req, [this,cb,addr](const std::shared_ptr<restbed::Request>, + const std::shared_ptr<restbed::Response> reply) { + if (reply->get_status_code() == 200) { + size_t length = 0; + length = reply->get_header("Content-Length", length); + if (length > MAX_RESPONSE_SIZE) { + cb("", Response::error); + return; + } + restbed::Http::fetch(length, reply); + std::string body; + reply->get_body(body); + + Json::Value json; + Json::Reader reader; + if (!reader.parse(body, json)) { + RING_ERR("Address lookup for %s: can't parse server response: %s", addr.c_str(), body.c_str()); + cb("", Response::error); + return; + } + auto name = json["name"].asString(); + if (not name.empty()) { + RING_DBG("Found name for %s: %s", addr.c_str(), name.c_str()); + addrCache_.emplace(name, addr); + nameCache_.emplace(addr, name); + cb(name, Response::found); + } else { + cb("", Response::notFound); + } + } else { + cb("", Response::error); + } + }).share(); + + // avoid blocking on future destruction + ThreadPool::instance().run([ret](){ ret.get(); }); +} + +static const std::string HEX_PREFIX {"0x"}; + +void NameDirectory::lookupName(const std::string& name, LookupCallback cb) +{ + if (not validateName(name)) { + cb(name, Response::invalidName); + return; + } + + auto cacheRes = addrCache_.find(name); + if (cacheRes != addrCache_.end()) { + cb(cacheRes->second, Response::found); + return; + } + + restbed::Uri uri(serverUri_ + QUERY_NAME + name); + auto request = std::make_shared<restbed::Request>(std::move(uri)); + request->set_header("Accept", "*/*"); + request->set_header("Host", serverHost_); + + RING_DBG("Name lookup for %s: %s", name.c_str(), uri.to_string().c_str()); + + auto ret = restbed::Http::async(request, [this,cb,name](const std::shared_ptr<restbed::Request>, + const std::shared_ptr<restbed::Response> reply) { + auto code = reply->get_status_code(); + if (code != 200) + RING_DBG("Name lookup for %s: got reply code %d", name.c_str(), code); + if (code >= 200 && code < 300) { + size_t length = 0; + length = reply->get_header("Content-Length", length); + if (length > MAX_RESPONSE_SIZE) { + cb("", Response::error); + return; + } + restbed::Http::fetch(length, reply); + std::string body; + reply->get_body(body); + + Json::Value json; + Json::Reader reader; + if (!reader.parse(body, json)) { + RING_ERR("Name lookup for %s: can't parse server response: %s", name.c_str(), body.c_str()); + cb("", Response::error); + return; + } + auto addr = json["addr"].asString(); + if (!addr.compare(0, HEX_PREFIX.size(), HEX_PREFIX)) + addr = addr.substr(HEX_PREFIX.size()); + if (not addr.empty()) { + RING_DBG("Found address for %s: %s", name.c_str(), addr.c_str()); + addrCache_.emplace(name, addr); + nameCache_.emplace(addr, name); + cb(addr, Response::found); + } else { + cb("", Response::notFound); + } + } else if (code >= 400 && code < 500) { + cb("", Response::notFound); + } else { + cb("", Response::error); + } + }).share(); + + // avoid blocking on future destruction + ThreadPool::instance().run([ret](){ ret.get(); }); +} + +bool NameDirectory::validateName(const std::string& name) const +{ + return std::regex_match(name, NAME_VALIDATOR); +} + +void NameDirectory::registerName(const std::string& addr, const std::string& name, const std::string& owner, RegistrationCallback cb) +{ + if (not validateName(name)) { + cb(RegistrationResponse::invalidName); + return; + } + + auto cacheRes = addrCache_.find(name); + if (cacheRes != addrCache_.end()) { + if (cacheRes->second == addr) + cb(RegistrationResponse::success); + else + cb(RegistrationResponse::alreadyTaken); + return; + } + + auto request = std::make_shared<restbed::Request>(restbed::Uri(serverUri_ + QUERY_NAME + name)); + request->set_header("Accept", "*/*"); + request->set_header("Host", serverHost_); + request->set_header("Content-Type", "application/json"); + request->set_method("POST"); + std::string body; + { + std::stringstream ss; + ss << "{\"addr\":\"" << addr << "\",\"owner\":\"" << owner << "\"}"; + body = ss.str(); + } + request->set_body(body); + request->set_header("Content-Length", ring::to_string(body.size())); + + auto params = std::make_shared<restbed::Settings>(); + params->set_connection_timeout(std::chrono::seconds(60)); + + RING_WARN("registerName: sending request %s %s", addr.c_str(), name.c_str()); + auto ret = restbed::Http::async(request, + [this,cb,addr,name](const std::shared_ptr<restbed::Request>, + const std::shared_ptr<restbed::Response> reply) + { + auto code = reply->get_status_code(); + RING_DBG("Got reply for registration of %s -> %s: code %d", name.c_str(), addr.c_str(), code); + if (code >= 200 && code < 300) { + size_t length = 0; + length = reply->get_header("Content-Length", length); + if (length > MAX_RESPONSE_SIZE) { + cb(RegistrationResponse::error); + return; + } + restbed::Http::fetch(length, reply); + std::string body; + reply->get_body(body); + + Json::Value json; + Json::Reader reader; + if (!reader.parse(body, json)) { + cb(RegistrationResponse::error); + return; + } + auto success = json["success"].asBool(); + RING_DBG("Got reply for registration of %s -> %s: %s", name.c_str(), addr.c_str(), success ? "success" : "failure"); + if (success) { + addrCache_.emplace(name, addr); + nameCache_.emplace(addr, name); + } + cb(success ? RegistrationResponse::success : RegistrationResponse::error); + } else if (code >= 400 && code < 500) { + cb(RegistrationResponse::alreadyTaken); + } else { + cb(RegistrationResponse::error); + } + }, params).share(); + + // avoid blocking on future destruction + ThreadPool::instance().run([ret](){ ret.get(); }); +} + +} diff --git a/src/ringdht/namedirectory.h b/src/ringdht/namedirectory.h new file mode 100644 index 0000000000..e8acc6f2a4 --- /dev/null +++ b/src/ringdht/namedirectory.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#pragma once + +#include <functional> +#include <map> +#include <string> + +namespace ring { + +class NameDirectory +{ +public: + NameDirectory() {} + NameDirectory(const std::string& s); + + static NameDirectory& instance(const std::string& server); + static NameDirectory& instance() { return instance(DEFAULT_SERVER_URI); } + + enum class Response : int { found = 0, invalidName, notFound, error }; + enum class RegistrationResponse : int { success = 0, invalidName, alreadyTaken, error }; + + using LookupCallback = std::function<void(const std::string& result, Response response)>; + void lookupAddress(const std::string& addr, LookupCallback cb); + void lookupName(const std::string& name, LookupCallback cb); + + using RegistrationCallback = std::function<void(RegistrationResponse response)>; + void registerName(const std::string& addr, const std::string& name, const std::string& owner, RegistrationCallback cb); + + const std::string& getServer() const { + return serverUri_; + } + +private: + constexpr static const char* const DEFAULT_SERVER_URI = "http://5.196.89.112:3000"; + + const std::string serverUri_ {DEFAULT_SERVER_URI}; + const std::string serverHost_ {}; + std::map<std::string, std::string> nameCache_; + std::map<std::string, std::string> addrCache_; + + bool validateName(const std::string& name) const; + +}; + +} diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index d9ff043e14..9cf2277a2b 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -93,9 +93,9 @@ constexpr const char* const RingAccount::ACCOUNT_TYPE; static std::uniform_int_distribution<dht::Value::Id> udist; static const std::string -parseRingUri(const std::string& toUrl) +stripPrefix(const std::string& toUrl) { - auto dhtf = toUrl.find("ring:"); + auto dhtf = toUrl.find(RING_URI_PREFIX); if (dhtf != std::string::npos) { dhtf = dhtf+5; } else { @@ -104,16 +104,31 @@ parseRingUri(const std::string& toUrl) } while (dhtf < toUrl.length() && toUrl[dhtf] == '/') dhtf++; + return toUrl.substr(dhtf); +} - if (toUrl.length() - dhtf < 40) +static const std::string +parseRingUri(const std::string& toUrl) +{ + auto sufix = stripPrefix(toUrl); + if (sufix.length() < 40) throw std::invalid_argument("id must be a ring infohash"); - const std::string toUri = toUrl.substr(dhtf, 40); + const std::string toUri = sufix.substr(0, 40); if (std::find_if_not(toUri.cbegin(), toUri.cend(), ::isxdigit) != toUri.cend()) throw std::invalid_argument("id must be a ring infohash"); return toUri; } +static bool +isRingHash(const std::string& uri) +{ + if (uri.length() < 40) + return false; + if (std::find_if_not(uri.cbegin(), uri.cbegin()+40, ::isxdigit) != uri.cend()) + return false; + return true; +} static constexpr const char* dhtStatusStr(dht::NodeStatus status) { @@ -153,6 +168,9 @@ RingAccount::createIceTransport(const Args&... args) RingAccount::RingAccount(const std::string& accountID, bool /* presenceEnabled */) : SIPAccountBase(accountID), via_addr_(), +#if HAVE_RINGNS + nameDir_(NameDirectory::instance()), +#endif cachePath_(fileutils::get_cache_dir()+DIR_SEPARATOR_STR+getAccountID()), dataPath_(cachePath_ + DIR_SEPARATOR_STR "values"), idPath_(fileutils::get_data_dir()+DIR_SEPARATOR_STR+getAccountID()) @@ -209,17 +227,47 @@ RingAccount::newIncomingCall(const std::string& from) std::shared_ptr<SIPCall> RingAccount::newOutgoingSIPCall(const std::string& toUrl) { - const std::string toUri = parseRingUri(toUrl); - RING_DBG("Calling DHT peer %s", toUri.c_str()); - + auto sufix = stripPrefix(toUrl); + RING_DBG("Calling DHT peer %s", sufix.c_str()); auto& manager = Manager::instance(); auto call = manager.callFactory.newCall<SIPCall, RingAccount>(*this, manager.getNewCallID(), Call::CallType::OUTGOING); call->setIPToIP(true); call->setSecure(isTlsEnabled()); - call->initRecFilename(toUri); + call->initRecFilename(toUrl); + try { + const std::string toUri = parseRingUri(sufix); + startOutgoingCall(call, toUri); + } catch (...) { +#if HAVE_RINGNS + std::weak_ptr<RingAccount> wthis_ = std::static_pointer_cast<RingAccount>(shared_from_this()); + nameDir_.get().lookupName(sufix, [wthis_,call](const std::string& result, NameDirectory::Response response) mutable { + runOnMainThread([=]() mutable { + if (auto sthis = wthis_.lock()) { + try { + const std::string toUri = parseRingUri(result); + sthis->startOutgoingCall(call, toUri); + } catch (...) { + call->onFailure(ENOENT); + } + } else { + call->onFailure(); + } + }); + }); +#else + call->onFailure(ENOENT); +#endif + } + + return call; +} + +void +RingAccount::startOutgoingCall(std::shared_ptr<SIPCall>& call, const std::string toUri) +{ auto sthis = std::static_pointer_cast<RingAccount>(shared_from_this()); // TODO: for now, we automatically trust all explicitly called peers @@ -335,8 +383,6 @@ RingAccount::newOutgoingSIPCall(const std::string& toUrl) } } }); - - return call; } void @@ -487,10 +533,13 @@ void RingAccount::serialize(YAML::Emitter &out) out << YAML::Key << Conf::DHT_ALLOW_PEERS_FROM_CONTACT << YAML::Value << allowPeersFromContact_; out << YAML::Key << Conf::DHT_ALLOW_PEERS_FROM_TRUSTED << YAML::Value << allowPeersFromTrusted_; +#if HAVE_RINGNS + out << YAML::Key << DRing::Account::ConfProperties::RingNS::URI << YAML::Value << nameDir_.get().getServer(); +#endif + out << YAML::Key << DRing::Account::ConfProperties::ARCHIVE_PATH << YAML::Value << archivePath_; out << YAML::Key << Conf::RING_ACCOUNT_RECEIPT << YAML::Value << receipt_; out << YAML::Key << Conf::RING_ACCOUNT_RECEIPT_SIG << YAML::Value << YAML::Binary(receiptSignature_.data(), receiptSignature_.size()); - out << YAML::Key << DRing::Account::ConfProperties::ETH::ACCOUNT << YAML::Value << ethAccount_; // tls submap out << YAML::Key << Conf::TLS_KEY << YAML::Value << YAML::BeginMap; @@ -509,12 +558,6 @@ void RingAccount::unserialize(const YAML::Node &node) parseValue(node, Conf::DHT_ALLOW_PEERS_FROM_CONTACT, allowPeersFromContact_); parseValue(node, Conf::DHT_ALLOW_PEERS_FROM_TRUSTED, allowPeersFromTrusted_); - try { - parseValue(node, DRing::Account::ConfProperties::ETH::ACCOUNT, ethAccount_); - } catch (const std::exception& e) { - RING_WARN("can't read eth account: %s", e.what()); - } - try { parseValue(node, DRing::Account::ConfProperties::ARCHIVE_PATH, archivePath_); } catch (const std::exception& e) { @@ -533,6 +576,12 @@ void RingAccount::unserialize(const YAML::Node &node) dhtPort_ = getRandomEvenPort(DHT_PORT_RANGE); dhtPortUsed_ = dhtPort_; +#if HAVE_RINGNS + std::string ringns_server; + parseValue(node, DRing::Account::ConfProperties::RingNS::URI, ringns_server); + nameDir_ = NameDirectory::instance(ringns_server); +#endif + parseValue(node, Conf::DHT_PUBLIC_IN_CALLS, dhtPublicInCalls_); loadAccount(); @@ -656,12 +705,7 @@ RingAccount::hasSignedReceipt() ringDeviceId_ = identity_.first->getPublicKey().getId().toString(); username_ = RING_URI_PREFIX + id; announce_ = std::make_shared<dht::Value>(std::move(announce_val)); - - auto eth_addr = root["eth"].asString(); - if (eth_addr != ethAccount_) { - RING_WARN("hasSignedReceipt() eth_addr not matching"); - ethAccount_ = eth_addr; - } + ethAccount_ = root["eth"].asString(); RING_WARN("hasSignedReceipt() -> true"); return true; @@ -1130,6 +1174,12 @@ RingAccount::setAccountDetails(const std::map<std::string, std::string> &details std::transform(archive_pin.begin(), archive_pin.end(), archive_pin.begin(), ::toupper); parseString(details, DRing::Account::ConfProperties::ARCHIVE_PATH, archivePath_); +#if HAVE_RINGNS + std::string ringns_server; + parseString(details, DRing::Account::ConfProperties::RingNS::URI, ringns_server); + nameDir_ = NameDirectory::instance(ringns_server); +#endif + loadAccount(archive_password, archive_pin); } @@ -1160,7 +1210,10 @@ RingAccount::getAccountDetails() const a.emplace(Conf::CONFIG_TLS_NEGOTIATION_TIMEOUT_SEC, "-1"); //a.emplace(DRing::Account::ConfProperties::ETH::KEY_FILE, ethPath_); - a.emplace(DRing::Account::ConfProperties::ETH::ACCOUNT, ethAccount_); + a.emplace(DRing::Account::ConfProperties::RingNS::ACCOUNT, ethAccount_); +#if HAVE_RINGNS + a.emplace(DRing::Account::ConfProperties::RingNS::URI, nameDir_.get().getServer()); +#endif return a; } @@ -1170,9 +1223,50 @@ RingAccount::getVolatileAccountDetails() const { auto a = SIPAccountBase::getVolatileAccountDetails(); a.emplace(DRing::Account::VolatileProperties::InstantMessaging::OFF_CALL, TRUE_STR); +#if HAVE_RINGNS + if (not registeredName_.empty()) + a.emplace(DRing::Account::VolatileProperties::REGISTERED_NAME, registeredName_); +#endif return a; } +#if HAVE_RINGNS +void +RingAccount::lookupName(const std::string& name) +{ + auto acc = getAccountID(); + nameDir_.get().lookupName(name, [acc,name](const std::string& result, NameDirectory::Response response) { + emitSignal<DRing::ConfigurationSignal::RegisteredNameFound>(acc, (int)response, result, name); + }); +} + +void +RingAccount::lookupAddress(const std::string& addr) +{ + auto acc = getAccountID(); + nameDir_.get().lookupAddress(addr, [acc,addr](const std::string& result, NameDirectory::Response response) { + emitSignal<DRing::ConfigurationSignal::RegisteredNameFound>(acc, (int)response, addr, result); + }); +} + +void +RingAccount::registerName(const std::string& password, const std::string& name) +{ + auto acc = getAccountID(); + std::weak_ptr<RingAccount> w = std::static_pointer_cast<RingAccount>(shared_from_this()); + nameDir_.get().registerName(ringAccountId_, name, ethAccount_, [acc,name,w](NameDirectory::RegistrationResponse response){ + int res = (response == NameDirectory::RegistrationResponse::success) ? 0 : ( + (response == NameDirectory::RegistrationResponse::invalidName) ? 2 : ( + (response == NameDirectory::RegistrationResponse::alreadyTaken) ? 3 : 4)); + if (response == NameDirectory::RegistrationResponse::success) { + if (auto this_ = w.lock()) + this_->registeredName_ = name; + } + emitSignal<DRing::ConfigurationSignal::NameRegistrationEnded>(acc, res, name); + }); +} +#endif + void RingAccount::handleEvents() { @@ -1466,6 +1560,22 @@ RingAccount::doRegister_() dht_.join(); } + auto shared = std::static_pointer_cast<RingAccount>(shared_from_this()); + std::weak_ptr<RingAccount> w {shared}; + +#if HAVE_RINGNS + // Look for registered name on the blockchain + nameDir_.get().lookupAddress(ringAccountId_, [w](const std::string& result, const NameDirectory::Response& response) { + if (response == NameDirectory::Response::found) + if (auto this_ = w.lock()) { + if (this_->registeredName_ != result) { + this_->registeredName_ = result; + emitSignal<DRing::ConfigurationSignal::VolatileDetailsChanged>(this_->accountID_, this_->getVolatileAccountDetails()); + } + } + }); +#endif + dht_.setOnStatusChanged([this](dht::NodeStatus s4, dht::NodeStatus s6) { RING_WARN("Dht status : IPv4 %s; IPv6 %s", dhtStatusStr(s4), dhtStatusStr(s6)); RegistrationState state; @@ -1498,25 +1608,27 @@ RingAccount::doRegister_() return ret; }); -#if 0 // enable if dht_ logging is needed - dht_.setLoggers( - [](char const* m, va_list args){ - //vlogger(LOG_ERR, m, args); - char tmp[2048]; - vsprintf(tmp, m, args); - auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); - ring::emitSignal<DRing::Debug::MessageSend>(std::to_string(now) + " " + std::string(tmp)); - }, - [](char const* m, va_list args){ - //vlogger(LOG_WARNING, m, args); - char tmp[2048]; - vsprintf(tmp, m, args); - auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); - ring::emitSignal<DRing::Debug::MessageSend>(std::to_string(now) + " " + std::string(tmp)); - }, - [](char const* m, va_list args){ /*vlogger(LOG_DEBUG, m, args);*/ } - ); + auto dht_log_level = Manager::instance().dhtLogLevel.load(); + if (dht_log_level > 0) { + static auto silent = [](char const* m, va_list args) {}; +#ifndef WIN32_NATIVE + static auto log_error = [](char const* m, va_list args) { vlogger(LOG_ERR, m, args); }; + static auto log_warn = [](char const* m, va_list args) { vlogger(LOG_WARNING, m, args); }; + static auto log_debug = [](char const* m, va_list args) { vlogger(LOG_DEBUG, m, args); }; + dht_.setLoggers( + log_error, + (dht_log_level > 1) ? log_warn : silent, + (dht_log_level > 2) ? log_debug : silent); +#else + static auto log_all = [](char const* m, va_list args) { + char tmp[2048]; + vsprintf(tmp, m, args); + auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + ring::emitSignal<DRing::Debug::MessageSend>(std::to_string(now) + " " + std::string(tmp)); + }; + dht_.setLoggers(log_all, log_all, silent); #endif + } dht_.importValues(loadValues()); @@ -1528,8 +1640,6 @@ RingAccount::doRegister_() if (not bootstrap.empty()) dht_.bootstrap(bootstrap); - auto shared = std::static_pointer_cast<RingAccount>(shared_from_this()); - // Put device annoucement if (announce_) { auto h = dht::InfoHash(ringAccountId_); @@ -1749,8 +1859,18 @@ RingAccount::replyToIncomingIceMsg(std::shared_ptr<SIPCall> call, dht::Value val { dht::IceCandidates(peer_ice_msg.id, ice->getLocalAttributesAndCandidates()) }; val.id = vid; - // Asynchronous DHT put of our local ICE data std::weak_ptr<SIPCall> wcall = call; +#if HAVE_RINGNS + auto from_acc_id = peer_cert ? (peer_cert->issuer ? peer_cert->issuer->getId().toString() : peer_cert->getId().toString()) : peer_ice_msg.from.toString(); + nameDir_.get().lookupAddress(from_acc_id, [wcall](const std::string& result, const NameDirectory::Response& response){ + if (response == NameDirectory::Response::found) + if (auto call = wcall.lock()) + call->setPeerRegistredName(result); + }); +#endif + + // Asynchronous DHT put of our local ICE data + auto shared_this = std::static_pointer_cast<RingAccount>(shared_from_this()); dht_.putEncrypted( callKey_, peer_ice_msg.from, @@ -1793,6 +1913,12 @@ RingAccount::replyToIncomingIceMsg(std::shared_ptr<SIPCall> call, void RingAccount::doUnregister(std::function<void(bool)> released_cb) { + if (registrationState_ == RegistrationState::INITIALIZING + || registrationState_ == RegistrationState::ERROR_NEED_MIGRATION) { + if (released_cb) released_cb(false); + return; + } + RING_WARN("doUnregister"); { std::lock_guard<std::mutex> lock(callsMutex_); diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 1c34cf7112..1839fdec87 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -42,6 +42,10 @@ #include <list> #include <future> +#if HAVE_RINGNS +#include "namedirectory.h" +#endif + /** * @file ringaccount.h * @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity. @@ -281,9 +285,15 @@ class RingAccount : public SIPAccountBase { void connectivityChanged() override; - public: // overloaded methods + // overloaded methods void flush() override; +#if HAVE_RINGNS + void lookupName(const std::string& name); + void lookupAddress(const std::string& address); + void registerName(const std::string& password, const std::string& name); +#endif + private: NON_COPYABLE(RingAccount); @@ -338,6 +348,11 @@ class RingAccount : public SIPAccountBase { MSGPACK_DEFINE_MAP(dev); }; +#if HAVE_RINGNS + std::reference_wrapper<NameDirectory> nameDir_; + std::string registeredName_; +#endif + /** * Compute archive encryption key and DHT storage location from password and PIN. */ @@ -358,6 +373,8 @@ class RingAccount : public SIPAccountBase { */ virtual void setAccountDetails(const std::map<std::string, std::string> &details) override; + void startOutgoingCall(std::shared_ptr<SIPCall>& call, const std::string toUri); + /** * Start a SIP Call * @param call The current call diff --git a/src/sip/sipaccountbase.cpp b/src/sip/sipaccountbase.cpp index 78dfe7f24e..0b39a736ae 100644 --- a/src/sip/sipaccountbase.cpp +++ b/src/sip/sipaccountbase.cpp @@ -343,21 +343,21 @@ SIPAccountBase::generateVideoPort() const } #endif -const IceTransportOptions -SIPAccountBase::getIceOptions() const noexcept -{ - auto opts = Account::getIceOptions(); // Local copy of global account ICE settings - - // Note: we don't check of servers pre-existance, let underlaying stack do the job - if (stunEnabled_) - opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_)); - if (turnEnabled_) - opts.turnServers.emplace_back(TurnServerInfo() - .setUri(turnServer_) - .setUsername(turnServerUserName_) - .setPassword(turnServerPwd_) - .setRealm(turnServerRealm_)); - return opts; +const IceTransportOptions +SIPAccountBase::getIceOptions() const noexcept +{ + auto opts = Account::getIceOptions(); // Local copy of global account ICE settings + + // Note: we don't check of servers pre-existance, let underlaying stack do the job + if (stunEnabled_) + opts.stunServers.emplace_back(StunServerInfo().setUri(stunServer_)); + if (turnEnabled_) + opts.turnServers.emplace_back(TurnServerInfo() + .setUri(turnServer_) + .setUsername(turnServerUserName_) + .setPassword(turnServerPwd_) + .setRealm(turnServerRealm_)); + return opts; } void diff --git a/src/sip/sipvoiplink.cpp b/src/sip/sipvoiplink.cpp index a90cf0a0c7..50a31c4bfe 100644 --- a/src/sip/sipvoiplink.cpp +++ b/src/sip/sipvoiplink.cpp @@ -1374,4 +1374,4 @@ SIPVoIPLink::findLocalAddressFromSTUN(pjsip_transport* transport, } #undef RETURN_IF_NULL #undef RETURN_FALSE_IF_NULL -} // namespace ring \ No newline at end of file +} // namespace ring diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 05c660b5b8..8b056a2ee6 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -34,7 +34,7 @@ struct ThreadPool::ThreadState }; ThreadPool::ThreadPool() - : maxThreads_(std::thread::hardware_concurrency()) + : maxThreads_(std::max<size_t>(std::thread::hardware_concurrency(), 4)) { threads_.reserve(maxThreads_); } -- GitLab