From 81221ada9fd3c55679f4228b7924fee476ec54c9 Mon Sep 17 00:00:00 2001 From: Amna <amna.snene@savoirfairelinux.com> Date: Thu, 14 Sep 2023 17:33:26 -0400 Subject: [PATCH] connectionManager: new API Change-Id: I96eb3ed8d0d4f82adb28c8d47a5751bc3f603560 --- include/connectionmanager.h | 13 ++++--- include/ice_options.h | 2 +- include/ice_transport.h | 1 + src/connectionmanager.cpp | 68 ++++++++++++++++++++++++++++++++++--- src/ice_transport.cpp | 6 ++-- tests/connectionManager.cpp | 6 ++-- 6 files changed, 79 insertions(+), 17 deletions(-) diff --git a/include/connectionmanager.h b/include/connectionmanager.h index d714f79..e11c27a 100644 --- a/include/connectionmanager.h +++ b/include/connectionmanager.h @@ -94,6 +94,8 @@ public: struct Config; ConnectionManager(std::shared_ptr<Config> config_); + ConnectionManager(dht::crypto::Identity id); + ~ConnectionManager(); /** @@ -292,21 +294,18 @@ struct ConnectionManager::Config std::shared_ptr<TurnCache> turnCache; std::filesystem::path cachePath {}; - std::shared_ptr<asio::io_context> ioContext; std::shared_ptr<dht::DhtRunner> dht; - dht::crypto::Identity id; + dht::crypto::Identity id {}; - tls::CertificateStore* certStore; - - dhtnet::IceTransportFactory* factory; + std::shared_ptr<tls::CertificateStore> certStore {nullptr}; + std::shared_ptr<dhtnet::IceTransportFactory> factory {nullptr}; /** * UPnP IGD controller and the mutex to access it */ - bool upnpEnabled; + bool upnpEnabled {true}; std::shared_ptr<dhtnet::upnp::Controller> upnpCtrl; - std::shared_ptr<dht::log::Logger> logger; /** diff --git a/include/ice_options.h b/include/ice_options.h index b587512..33ac76e 100644 --- a/include/ice_options.h +++ b/include/ice_options.h @@ -69,7 +69,7 @@ struct TurnServerInfo struct IceTransportOptions { - IceTransportFactory* factory {nullptr}; + std::shared_ptr<IceTransportFactory> factory {}; bool master {true}; unsigned streamsCount {1}; unsigned compCountPerStream {1}; diff --git a/include/ice_transport.h b/include/ice_transport.h index 8c69e93..c2d4756 100644 --- a/include/ice_transport.h +++ b/include/ice_transport.h @@ -50,6 +50,7 @@ class Controller; } class IceTransport; +class IceTransportFactory; using IceRecvCb = std::function<ssize_t(unsigned char* buf, size_t len)>; using IceCandidate = pj_ice_sess_cand; diff --git a/src/connectionmanager.cpp b/src/connectionmanager.cpp index 238f4b4..339c8d5 100644 --- a/src/connectionmanager.cpp +++ b/src/connectionmanager.cpp @@ -58,6 +58,34 @@ CallbackId parseCallbackId(std::string_view ci) return CallbackId(deviceId, vid); } + +std::shared_ptr<ConnectionManager::Config> +createConfig(std::shared_ptr<ConnectionManager::Config> config_) +{ + if (!config_->certStore){ + config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger); + } + if (!config_->dht) { + dht::DhtRunner::Config dhtConfig; + dhtConfig.dht_config.id = config_->id; + dhtConfig.threaded = true; + dht::DhtRunner::Context dhtContext; + dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) { + std::vector<std::shared_ptr<dht::crypto::Certificate>> ret; + if (auto cert = c->getCertificate(pk_id.toString())) + ret.emplace_back(std::move(cert)); + return ret; + }; + config_->dht = std::make_shared<dht::DhtRunner>(); + config_->dht->run(dhtConfig, std::move(dhtContext)); + config_->dht->bootstrap("bootstrap.jami.net"); + } + if (!config_->factory){ + config_->factory = std::make_shared<IceTransportFactory>(config_->logger); + } + return config_; +} + struct ConnectionInfo { ~ConnectionInfo() @@ -95,12 +123,29 @@ class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionMa { public: explicit Impl(std::shared_ptr<ConnectionManager::Config> config_) - : config_ {std::move(config_)} + : config_ {std::move(createConfig(config_))} , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()} { - loadTreatedMessages(); + if(!config_->ioContext) { + config_->ioContext = std::make_shared<asio::io_context>(); + ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() { + try { + auto work = asio::make_work_guard(*context); + context->run(); + } catch (const std::exception& ex) { + if (l) l->error("Exception: {}", ex.what()); + } + }); + } + } + ~Impl() { + if (ioContextRunner_) { + if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread"); + config_->ioContext->stop(); + ioContextRunner_->join(); + ioContextRunner_.reset(); + } } - ~Impl() {} std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; } const dht::crypto::Identity& identity() const { return config_->id; } @@ -130,7 +175,9 @@ public: info->waitForAnswer_->cancel(); } if (!unused.empty()) - dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); }); + dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { + infos.clear(); + }); } void shutdown() @@ -292,6 +339,7 @@ public: const std::string& name = ""); std::shared_ptr<ConnectionManager::Config> config_; + std::unique_ptr<std::thread> ioContextRunner_; mutable std::mt19937_64 rand; @@ -334,6 +382,7 @@ public: */ std::mutex connectCbsMtx_ {}; + struct PendingCb { std::string name; @@ -1591,10 +1640,21 @@ ConnectionManager::Impl::findCertificate(const dht::InfoHash& h, return true; } +std::shared_ptr<ConnectionManager::Config> +buildDefaultConfig(dht::crypto::Identity id){ + auto conf = std::make_shared<ConnectionManager::Config>(); + conf->id = std::move(id); + return conf; +} + ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_) : pimpl_ {std::make_shared<Impl>(config_)} {} +ConnectionManager::ConnectionManager(dht::crypto::Identity id) + : ConnectionManager {buildDefaultConfig(id)} +{} + ConnectionManager::~ConnectionManager() { if (pimpl_) diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index b0d7a2e..a735e75 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -151,6 +151,7 @@ public: int checkEventQueue(int maxEventToPoll); std::shared_ptr<dht::log::Logger> logger_ {}; + std::shared_ptr<dhtnet::IceTransportFactory> factory {}; std::condition_variable_any iceCV_ {}; @@ -381,6 +382,7 @@ IceTransport::Impl::~Impl() void IceTransport::Impl::initIceInstance(const IceTransportOptions& options) { + factory = options.factory; isTcp_ = options.tcpEnable; upnpEnabled_ = options.upnpEnable; on_initdone_cb_ = options.onInitDone; @@ -406,7 +408,7 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) if (upnpEnabled_) upnp_ = std::make_shared<upnp::Controller>(options.upnpContext); - config_ = options.factory->getIceCfg(); // config copy + config_ = factory->getIceCfg(); // config copy if (isTcp_) { config_.protocol = PJ_ICE_TP_TCP; config_.stun.conn_type = PJ_STUN_TP_TCP; @@ -418,7 +420,7 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) } pool_.reset( - pj_pool_create(options.factory->getPoolFactory(), "IceTransport.pool", 512, 512, NULL)); + pj_pool_create(factory->getPoolFactory(), "IceTransport.pool", 512, 512, NULL)); if (not pool_) throw std::runtime_error("pj_pool_create() failed"); diff --git a/tests/connectionManager.cpp b/tests/connectionManager.cpp index 98f93fa..4cd62da 100644 --- a/tests/connectionManager.cpp +++ b/tests/connectionManager.cpp @@ -66,7 +66,7 @@ public: std::shared_ptr<std::thread> ioContextRunner; // std::thread ioContextRunner; std::shared_ptr<Logger> logger; - std::unique_ptr<IceTransportFactory> factory; + std::shared_ptr<IceTransportFactory> factory; private: std::unique_ptr<ConnectionHandler> setupHandler(const std::string& name); @@ -155,9 +155,9 @@ ConnectionManagerTest::setupHandler(const std::string& name) config->dht = h->dht; config->id = h->id; config->ioContext = h->ioContext; - config->factory = factory.get(); + config->factory = factory; config->logger = logger; - config->certStore = h->certStore.get(); + config->certStore = h->certStore; std::filesystem::path currentPath = std::filesystem::current_path(); std::filesystem::path tempDirPath = currentPath / "temp"; -- GitLab