diff --git a/CMakeLists.txt b/CMakeLists.txt index 9641f8958a04ece45825841e320a7885332493f4..3ddea4ece13be233c99128db3ca71edba4bb69a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,6 +39,8 @@ list (APPEND dhtnet_SOURCES src/security/tls_session.cpp src/security/certstore.cpp src/security/threadloop.cpp + src/turn/turn_cache.cpp + src/turn/turn_transport.cpp src/upnp/upnp_context.cpp src/upnp/upnp_control.cpp src/upnp/protocol/mapping.cpp diff --git a/include/turn_cache.h b/include/turn_cache.h new file mode 100644 index 0000000000000000000000000000000000000000..c8d368435bcc7fbf572b9ad688315e9523260b5a --- /dev/null +++ b/include/turn_cache.h @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2004-2023 Savoir-faire Linux Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#pragma once + +#include "ip_utils.h" +#include "turn_params.h" + +#include <asio.hpp> + +#include <atomic> +#include <chrono> +#include <functional> +#include <memory> +#include <mutex> +#include <optional> +#include <string> + +namespace dht { +namespace log { +class Logger; +} +} + +namespace dhtnet { + +using Logger = dht::log::Logger; + +class TurnTransport; + +class TurnCache : public std::enable_shared_from_this<TurnCache> +{ +public: + TurnCache(const std::string& accountId, + const std::string& cachePath, + const std::shared_ptr<asio::io_context>& io_context, + const std::shared_ptr<Logger>& logger, + const TurnTransportParams& params, + bool enabled); + ~TurnCache(); + + std::optional<IpAddr> getResolvedTurn(uint16_t family = AF_INET) const; + /** + * Pass a new configuration for the cache + * @param param The new configuration + */ + void reconfigure(const TurnTransportParams& params, bool enabled); + /** + * Refresh cache from current configuration + */ + void refresh(const asio::error_code& ec = {}); + +private: + std::string accountId_; + std::string cachePath_; + TurnTransportParams params_; + std::atomic_bool enabled_ {false}; + /** + * Avoid to refresh the cache multiple times + */ + std::atomic_bool isRefreshing_ {false}; + /** + * This will cache the turn server resolution each time we launch + * Jami, or for each connectivityChange() + */ + void testTurn(IpAddr server); + std::unique_ptr<TurnTransport> testTurnV4_; + std::unique_ptr<TurnTransport> testTurnV6_; + + // Used to detect if a turn server is down. + void refreshTurnDelay(bool scheduleNext); + std::chrono::seconds turnRefreshDelay_ {std::chrono::seconds(10)}; + + // Store resoved turn addresses + mutable std::mutex cachedTurnMutex_ {}; + std::unique_ptr<IpAddr> cacheTurnV4_ {}; + std::unique_ptr<IpAddr> cacheTurnV6_ {}; + + void onConnected(const asio::error_code& ec, bool ok, IpAddr server); + + // io + std::shared_ptr<asio::io_context> io_context; + std::unique_ptr<asio::steady_timer> refreshTimer_; + std::unique_ptr<asio::steady_timer> onConnectedTimer_; + + std::mutex shutdownMtx_; + + std::shared_ptr<Logger> logger_; + + // Asio :( + // https://stackoverflow.com/questions/35507956/is-it-safe-to-destroy-boostasio-timer-from-its-handler-or-handler-dtor + std::weak_ptr<TurnCache> weak() + { + return std::static_pointer_cast<TurnCache>(shared_from_this()); + } +}; + +} // namespace jami diff --git a/include/turn_params.h b/include/turn_params.h new file mode 100644 index 0000000000000000000000000000000000000000..1708ad9f6059df8bf9ed0977ae4573fa1fb7a940 --- /dev/null +++ b/include/turn_params.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2004-2023 Savoir-faire Linux Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#pragma once + +#include "ip_utils.h" +#include <string> + +namespace dhtnet { + +struct TurnTransportParams +{ + dhtnet::IpAddr server; + std::string domain; // Used by cache_turn + // Plain Credentials + std::string realm; + std::string username; + std::string password; +}; + +} diff --git a/src/turn/turn_cache.cpp b/src/turn/turn_cache.cpp new file mode 100644 index 0000000000000000000000000000000000000000..12dfe8ba4ac55172c41ba9e9849f07be6507b4c4 --- /dev/null +++ b/src/turn/turn_cache.cpp @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2004-2023 Savoir-faire Linux Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#include "turn_cache.h" +#include "fileutils.h" +#include "turn_transport.h" + +#include <opendht/thread_pool.h> +#include <opendht/logger.h> +#include <fstream> + +namespace dhtnet { + +TurnCache::TurnCache(const std::string& accountId, + const std::string& cachePath, + const std::shared_ptr<asio::io_context>& io_ctx, + const std::shared_ptr<Logger>& logger, + const TurnTransportParams& params, + bool enabled) + : accountId_(accountId) + , cachePath_(cachePath) + , io_context(io_ctx) + , logger_(logger) +{ + refreshTimer_ = std::make_unique<asio::steady_timer>(*io_context, + std::chrono::steady_clock::now()); + onConnectedTimer_ = std::make_unique<asio::steady_timer>(*io_context, + std::chrono::steady_clock::now()); +} + +TurnCache::~TurnCache() { + { + std::lock_guard<std::mutex> lock(shutdownMtx_); + if (refreshTimer_) { + refreshTimer_->cancel(); + refreshTimer_.reset(); + } + if (onConnectedTimer_) { + onConnectedTimer_->cancel(); + onConnectedTimer_.reset(); + } + } + { + std::lock_guard<std::mutex> lock(cachedTurnMutex_); + testTurnV4_.reset(); + testTurnV6_.reset(); + cacheTurnV4_.reset(); + cacheTurnV6_.reset(); + } +} + +std::optional<IpAddr> +TurnCache::getResolvedTurn(uint16_t family) const +{ + if (family == AF_INET && cacheTurnV4_) { + return *cacheTurnV4_; + } else if (family == AF_INET6 && cacheTurnV6_) { + return *cacheTurnV6_; + } + return std::nullopt; +} + +void +TurnCache::reconfigure(const TurnTransportParams& params, bool enabled) +{ + params_ = params; + enabled_ = enabled; + { + std::lock_guard<std::mutex> lk(cachedTurnMutex_); + // Force re-resolution + isRefreshing_ = false; + cacheTurnV4_.reset(); + cacheTurnV6_.reset(); + testTurnV4_.reset(); + testTurnV6_.reset(); + } + std::lock_guard<std::mutex> lock(shutdownMtx_); + if (refreshTimer_) { + refreshTimer_->expires_at(std::chrono::steady_clock::now()); + refreshTimer_->async_wait(std::bind(&TurnCache::refresh, shared_from_this(), std::placeholders::_1)); + } +} + +void +TurnCache::refresh(const asio::error_code& ec) +{ + if (ec == asio::error::operation_aborted) + return; + // The resolution of the TURN server can take quite some time (if timeout). + // So, run this in its own io thread to avoid to block the main thread. + // Avoid multiple refresh + if (isRefreshing_.exchange(true)) + return; + if (!enabled_) { + // In this case, we do not use any TURN server + std::lock_guard<std::mutex> lk(cachedTurnMutex_); + cacheTurnV4_.reset(); + cacheTurnV6_.reset(); + isRefreshing_ = false; + return; + } + + if(logger_) logger_->debug("[Account {}] Refresh cache for TURN server resolution", accountId_); + // Retrieve old cached value if available. + // This means that we directly get the correct value when launching the application on the + // same network + // No need to resolve, it's already a valid address + auto server = params_.domain; + if (IpAddr::isValid(server, AF_INET)) { + testTurn(IpAddr(server, AF_INET)); + return; + } else if (IpAddr::isValid(server, AF_INET6)) { + testTurn(IpAddr(server, AF_INET6)); + return; + } + // Else cache resolution result + fileutils::recursive_mkdir(cachePath_ + DIR_SEPARATOR_STR + "domains", 0700); + auto pathV4 = cachePath_ + DIR_SEPARATOR_STR + "domains" + DIR_SEPARATOR_STR + "v4." + server; + IpAddr testV4, testV6; + if (auto turnV4File = std::ifstream(pathV4)) { + std::string content((std::istreambuf_iterator<char>(turnV4File)), + std::istreambuf_iterator<char>()); + testV4 = IpAddr(content, AF_INET); + } + auto pathV6 = cachePath_ + DIR_SEPARATOR_STR + "domains" + DIR_SEPARATOR_STR + "v6." + server; + if (auto turnV6File = std::ifstream(pathV6)) { + std::string content((std::istreambuf_iterator<char>(turnV6File)), + std::istreambuf_iterator<char>()); + testV6 = IpAddr(content, AF_INET6); + } + // Resolve just in case. The user can have a different connectivity + auto turnV4 = IpAddr {server, AF_INET}; + { + if (turnV4) { + // Cache value to avoid a delay when starting up Jami + std::ofstream turnV4File(pathV4); + turnV4File << turnV4.toString(); + } else + fileutils::remove(pathV4, true); + // Update TURN + testV4 = IpAddr(std::move(turnV4)); + } + auto turnV6 = IpAddr {server, AF_INET6}; + { + if (turnV6) { + // Cache value to avoid a delay when starting up Jami + std::ofstream turnV6File(pathV6); + turnV6File << turnV6.toString(); + } else + fileutils::remove(pathV6, true); + // Update TURN + testV6 = IpAddr(std::move(turnV6)); + } + if (testV4) + testTurn(testV4); + if (testV6) + testTurn(testV6); + + refreshTurnDelay(!testV4 && !testV6); +} + +void +TurnCache::testTurn(IpAddr server) +{ + TurnTransportParams params = params_; + params.server = server; + std::lock_guard<std::mutex> lk(cachedTurnMutex_); + auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_; + turn.reset(); // Stop previous TURN + try { + turn = std::make_unique<TurnTransport>( + params, [this, server](bool ok) { + // Stop server in an async job, because this callback can be called + // immediately and cachedTurnMutex_ must not be locked. + std::lock_guard<std::mutex> lock(shutdownMtx_); + if (onConnectedTimer_) { + onConnectedTimer_->expires_at(std::chrono::steady_clock::now()); + onConnectedTimer_->async_wait(std::bind(&TurnCache::onConnected, shared_from_this(), std::placeholders::_1, ok, server)); + } + }); + } catch (const std::exception& e) { + if(logger_) logger_->error("TurnTransport creation error: {}", e.what()); + } +} + +void +TurnCache::onConnected(const asio::error_code& ec, bool ok, IpAddr server) +{ + if (ec == asio::error::operation_aborted) + return; + + std::lock_guard<std::mutex> lk(cachedTurnMutex_); + auto& cacheTurn = server.isIpv4() ? cacheTurnV4_ : cacheTurnV6_; + if (!ok) { + if(logger_) logger_->error("Connection to {:s} failed - reset", server.toString()); + cacheTurn.reset(); + } else { + if(logger_) logger_->debug("Connection to {:s} ready", server.toString()); + cacheTurn = std::make_unique<IpAddr>(server); + } + refreshTurnDelay(!cacheTurnV6_ && !cacheTurnV4_); + if (auto& turn = server.isIpv4() ? testTurnV4_ : testTurnV6_) + turn->shutdown(); +} + + +void +TurnCache::refreshTurnDelay(bool scheduleNext) +{ + isRefreshing_ = false; + if (scheduleNext) { + std::lock_guard<std::mutex> lock(shutdownMtx_); + if(logger_) logger_->warn("[Account {:s}] Cache for TURN resolution failed.", accountId_); + if (refreshTimer_) { + refreshTimer_->expires_at(std::chrono::steady_clock::now() + turnRefreshDelay_); + refreshTimer_->async_wait(std::bind(&TurnCache::refresh, shared_from_this(), std::placeholders::_1)); + } + if (turnRefreshDelay_ < std::chrono::minutes(30)) + turnRefreshDelay_ *= 2; + } else { + if(logger_) logger_->debug("[Account {:s}] Cache refreshed for TURN resolution", accountId_); + turnRefreshDelay_ = std::chrono::seconds(10); + } +} + +} // namespace jami diff --git a/src/turn/turn_transport.cpp b/src/turn/turn_transport.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bfca535f324d2488064d0221f6a3a62cde50677d --- /dev/null +++ b/src/turn/turn_transport.cpp @@ -0,0 +1,216 @@ +/* + * Copyright (C) 2004-2023 Savoir-faire Linux Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#include "turn_transport.h" +#include "../sip_utils.h" + +#include <atomic> +#include <thread> + +#include <pjnath.h> +#include <pjlib-util.h> +#include <pjlib.h> + +#define TRY(ret) \ + do { \ + if ((ret) != PJ_SUCCESS) \ + throw std::runtime_error(#ret " failed"); \ + } while (0) + +namespace dhtnet { + +class TurnLock +{ + pj_grp_lock_t* lk_; + +public: + TurnLock(pj_turn_sock* sock) + : lk_(pj_turn_sock_get_grp_lock(sock)) + { + lock(); + } + + ~TurnLock() { unlock(); } + + void lock() { pj_grp_lock_add_ref(lk_); } + + void unlock() { pj_grp_lock_dec_ref(lk_); } +}; + +class TurnTransport::Impl +{ +public: + Impl(std::function<void(bool)>&& cb, const std::shared_ptr<Logger>& logger) + : cb_(std::move(cb)), logger_(logger) {} + ~Impl(); + + /** + * Detect new TURN state + */ + void onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state); + + /** + * Pool events from pjsip + */ + void ioJob(); + + void start() + { + ioWorker = std::thread([this] { ioJob(); }); + } + + void shutdown() { + std::lock_guard<std::mutex> lock(shutdownMtx_); + if (relay) { + pj_turn_sock_destroy(relay); + relay = nullptr; + } + turnLock.reset(); + if (ioWorker.joinable()) + ioWorker.join(); + } + + TurnTransportParams settings; + + pj_caching_pool poolCache {}; + pj_pool_t* pool {nullptr}; + pj_stun_config stunConfig {}; + pj_turn_sock* relay {nullptr}; + std::unique_ptr<TurnLock> turnLock; + pj_str_t relayAddr {}; + IpAddr peerRelayAddr; // address where peers should connect to + IpAddr mappedAddr; + std::function<void(bool)> cb_; + + std::thread ioWorker; + std::atomic_bool stopped_ {false}; + std::atomic_bool cbCalled_ {false}; + std::mutex shutdownMtx_; + std::shared_ptr<Logger> logger_; +}; + +TurnTransport::Impl::~Impl() +{ + shutdown(); + pj_caching_pool_destroy(&poolCache); +} + +void +TurnTransport::Impl::onTurnState(pj_turn_state_t old_state, pj_turn_state_t new_state) +{ + if (new_state == PJ_TURN_STATE_DESTROYING) { + stopped_ = true; + return; + } + + if (new_state == PJ_TURN_STATE_READY) { + pj_turn_session_info info; + pj_turn_sock_get_info(relay, &info); + peerRelayAddr = IpAddr {info.relay_addr}; + mappedAddr = IpAddr {info.mapped_addr}; + if(logger_) logger_->debug("TURN server ready, peer relay address: {:s}", + peerRelayAddr.toString(true, true).c_str()); + cbCalled_ = true; + cb_(true); + } else if (old_state <= PJ_TURN_STATE_READY and new_state > PJ_TURN_STATE_READY and not cbCalled_) { + if(logger_) logger_->debug("TURN server disconnected ({:s})", pj_turn_state_name(new_state)); + cb_(false); + } +} + +void +TurnTransport::Impl::ioJob() +{ + const pj_time_val delay = {0, 10}; + while (!stopped_) { + pj_ioqueue_poll(stunConfig.ioqueue, &delay); + pj_timer_heap_poll(stunConfig.timer_heap, nullptr); + } +} + +TurnTransport::TurnTransport(const TurnTransportParams& params, std::function<void(bool)>&& cb, const std::shared_ptr<Logger>& logger) + : pimpl_ {new Impl(std::move(cb), logger)} +{ + auto server = params.server; + if (!server.getPort()) + server.setPort(PJ_STUN_PORT); + if (server.isUnspecified()) + throw std::invalid_argument("invalid turn server address"); + pimpl_->settings = params; + // PJSIP memory pool + pj_caching_pool_init(&pimpl_->poolCache, &pj_pool_factory_default_policy, 0); + pimpl_->pool = pj_pool_create(&pimpl_->poolCache.factory, "TurnTransport", 512, 512, nullptr); + if (not pimpl_->pool) + throw std::runtime_error("pj_pool_create() failed"); + // STUN config + pj_stun_config_init(&pimpl_->stunConfig, &pimpl_->poolCache.factory, 0, nullptr, nullptr); + // create global timer heap + TRY(pj_timer_heap_create(pimpl_->pool, 1000, &pimpl_->stunConfig.timer_heap)); + // create global ioqueue + TRY(pj_ioqueue_create(pimpl_->pool, 16, &pimpl_->stunConfig.ioqueue)); + // TURN callbacks + pj_turn_sock_cb relay_cb; + pj_bzero(&relay_cb, sizeof(relay_cb)); + relay_cb.on_state = + [](pj_turn_sock* relay, pj_turn_state_t old_state, pj_turn_state_t new_state) { + auto pimpl = static_cast<Impl*>(pj_turn_sock_get_user_data(relay)); + pimpl->onTurnState(old_state, new_state); + }; + // TURN socket config + pj_turn_sock_cfg turn_sock_cfg; + pj_turn_sock_cfg_default(&turn_sock_cfg); + turn_sock_cfg.max_pkt_size = 4096; + // TURN socket creation + TRY(pj_turn_sock_create(&pimpl_->stunConfig, + server.getFamily(), + PJ_TURN_TP_TCP, + &relay_cb, + &turn_sock_cfg, + &*this->pimpl_, + &pimpl_->relay)); + // TURN allocation setup + pj_turn_alloc_param turn_alloc_param; + pj_turn_alloc_param_default(&turn_alloc_param); + turn_alloc_param.peer_conn_type = PJ_TURN_TP_TCP; + pj_stun_auth_cred cred; + pj_bzero(&cred, sizeof(cred)); + cred.type = PJ_STUN_AUTH_CRED_STATIC; + pj_cstr(&cred.data.static_cred.realm, pimpl_->settings.realm.c_str()); + pj_cstr(&cred.data.static_cred.username, pimpl_->settings.username.c_str()); + cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + pj_cstr(&cred.data.static_cred.data, pimpl_->settings.password.c_str()); + pimpl_->relayAddr = pj_strdup3(pimpl_->pool, server.toString().c_str()); + // TURN connection/allocation + if (logger) logger->debug("Connecting to TURN {:s}", server.toString(true, true)); + TRY(pj_turn_sock_alloc(pimpl_->relay, + &pimpl_->relayAddr, + server.getPort(), + nullptr, + &cred, + &turn_alloc_param)); + pimpl_->turnLock = std::make_unique<TurnLock>(pimpl_->relay); + pimpl_->start(); +} + +TurnTransport::~TurnTransport() {} + +void +TurnTransport::shutdown() +{ + pimpl_->shutdown(); +} + +} // namespace jami diff --git a/src/turn/turn_transport.h b/src/turn/turn_transport.h new file mode 100644 index 0000000000000000000000000000000000000000..da9404e33578ae19c54912c2e40bf86663455548 --- /dev/null +++ b/src/turn/turn_transport.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2004-2023 Savoir-faire Linux Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#pragma once + +#include "ip_utils.h" +#include "turn_params.h" + +#include <opendht/logger.h> + +#include <functional> +#include <memory> +#include <string> + +namespace dht { +namespace log { +class Logger; +} +} + +namespace dhtnet { + +using Logger = dht::log::Logger; + +/** + * This class is used to test connection to TURN servers + * No other logic is implemented. + */ +class TurnTransport +{ +public: + TurnTransport(const TurnTransportParams& param, std::function<void(bool)>&& cb, const std::shared_ptr<Logger>& logger = {}); + ~TurnTransport(); + void shutdown(); + +private: + TurnTransport() = delete; + class Impl; + std::unique_ptr<Impl> pimpl_; +}; + +} // namespace jami