/* * Copyright (C) 2004-2023 Savoir-faire Linux Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. */ #include "connectionmanager.h" #include "peer_connection.h" #include "upnp/upnp_control.h" #include "certstore.h" #include "fileutils.h" #include "sip_utils.h" #include "string_utils.h" #include <opendht/crypto.h> #include <opendht/thread_pool.h> #include <opendht/value.h> #include <asio.hpp> #include <algorithm> #include <mutex> #include <map> #include <condition_variable> #include <set> #include <charconv> #include <fstream> namespace dhtnet { static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30}; static constexpr uint64_t ID_MAX_VAL = 9007199254740992; using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>; std::string callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid) { return fmt::format("{} {}", did.to_view(), vid); } std::pair<dhtnet::DeviceId, dht::Value::Id> parseCallbackId(std::string_view ci) { auto sep = ci.find(' '); std::string_view deviceIdString = ci.substr(0, sep); std::string_view vidString = ci.substr(sep + 1); dhtnet::DeviceId deviceId(deviceIdString); dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10); return {deviceId, vid}; } std::shared_ptr<ConnectionManager::Config> createConfig(std::shared_ptr<ConnectionManager::Config> config_) { if (!config_->certStore){ config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger); } if (!config_->dht) { dht::DhtRunner::Config dhtConfig; dhtConfig.dht_config.id = config_->id; dhtConfig.threaded = true; dht::DhtRunner::Context dhtContext; dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) { std::vector<std::shared_ptr<dht::crypto::Certificate>> ret; if (auto cert = c->getCertificate(pk_id.toString())) ret.emplace_back(std::move(cert)); return ret; }; config_->dht = std::make_shared<dht::DhtRunner>(); config_->dht->run(dhtConfig, std::move(dhtContext)); config_->dht->bootstrap("bootstrap.jami.net"); } if (!config_->factory){ config_->factory = std::make_shared<IceTransportFactory>(config_->logger); } return config_; } struct ConnectionInfo { ~ConnectionInfo() { if (socket_) socket_->join(); } std::mutex mutex_ {}; bool responseReceived_ {false}; PeerConnectionRequest response_ {}; std::unique_ptr<IceTransport> ice_ {nullptr}; // Used to store currently non ready TLS Socket std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr}; std::shared_ptr<MultiplexedSocket> socket_ {}; std::set<dht::Value::Id> cbIds_ {}; std::function<void(bool)> onConnected_; std::unique_ptr<asio::steady_timer> waitForAnswer_ {}; void shutdown() { std::lock_guard<std::mutex> lk(mutex_); if (tls_) tls_->shutdown(); if (socket_) socket_->shutdown(); if (waitForAnswer_) waitForAnswer_->cancel(); if (ice_) { dht::ThreadPool::io().run( [ice = std::shared_ptr<IceTransport>(std::move(ice_))] {}); } } std::map<std::string, std::string> getInfo(const DeviceId& deviceId, dht::Value::Id valueId, tls::CertificateStore& certStore) const { std::map<std::string, std::string> connectionInfo; connectionInfo["id"] = callbackIdToString(deviceId, valueId); connectionInfo["device"] = deviceId.toString(); auto cert = tls_ ? tls_->peerCertificate() : (socket_ ? socket_->peerCertificate() : nullptr); if (not cert) cert = certStore.getCertificate(deviceId.toString()); if (cert) { connectionInfo["peer"] = cert->issuer->getId().toString(); } if (socket_) { connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected)); connectionInfo["remoteAddress"] = socket_->getRemoteAddress(); } else if (tls_) { connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS)); connectionInfo["remoteAddress"] = tls_->getRemoteAddress(); } else if(ice_) { connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE)); connectionInfo["remoteAddress"] = ice_->getRemoteAddress(ICE_COMP_ID_SIP_TRANSPORT); } return connectionInfo; } }; struct PendingCb { std::string name; ConnectCallback cb; bool requested {false}; }; struct DeviceInfo { const DeviceId deviceId; mutable std::mutex mtx_ {}; std::map<dht::Value::Id, std::shared_ptr<ConnectionInfo>> info; std::map<dht::Value::Id, PendingCb> connecting; std::map<dht::Value::Id, PendingCb> waiting; DeviceInfo(DeviceId id) : deviceId {id} {} inline bool isConnecting() const { return !connecting.empty() || !waiting.empty(); } inline bool empty() const { return info.empty() && connecting.empty() && waiting.empty(); } dht::Value::Id newId(std::mt19937_64& rand) const { ValueIdDist dist(1, ID_MAX_VAL); dht::Value::Id id; do { id = dist(rand); } while (info.find(id) != info.end() || connecting.find(id) != connecting.end() || waiting.find(id) != waiting.end()); return id; } std::shared_ptr<ConnectionInfo> getConnectedInfo() const { for (auto& [id, ci] : info) { if (ci->socket_) return ci; } return {}; } std::vector<PendingCb> extractPendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) { std::vector<PendingCb> ret; if (vid == 0) { // Extract all pending callbacks ret.reserve(connecting.size() + waiting.size()); for (auto& [vid, cb] : connecting) ret.emplace_back(std::move(cb)); connecting.clear(); for (auto& [vid, cb] : waiting) ret.emplace_back(std::move(cb)); waiting.clear(); } else if (auto n = waiting.extract(vid)) { // If it's a waiting operation, just move it ret.emplace_back(std::move(n.mapped())); } else if (auto n = connecting.extract(vid)) { ret.emplace_back(std::move(n.mapped())); // If sock is nullptr, execute if it's the last connecting operation // If accepted is false, it means that underlying socket is ok, but channel is declined if (!sock && connecting.empty() && accepted) { for (auto& [vid, cb] : waiting) ret.emplace_back(std::move(cb)); waiting.clear(); for (auto& [vid, cb] : connecting) ret.emplace_back(std::move(cb)); connecting.clear(); } } return ret; } std::vector<std::shared_ptr<ConnectionInfo>> extractUnusedConnections() { std::vector<std::shared_ptr<ConnectionInfo>> unused {}; for (auto& [id, info] : info) unused.emplace_back(std::move(info)); info.clear(); return unused; } void executePendingOperations(std::unique_lock<std::mutex>& lock, dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) { auto ops = extractPendingOperations(vid, sock, accepted); lock.unlock(); for (auto& cb : ops) cb.cb(sock, deviceId); } void executePendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) { std::unique_lock<std::mutex> lock(mtx_); executePendingOperations(lock, vid, sock, accepted); } bool isConnecting(const std::string& name) const { for (const auto& [id, pc]: connecting) if (pc.name == name) return true; for (const auto& [id, pc]: waiting) if (pc.name == name) return true; return false; } std::map<dht::Value::Id, std::string> requestPendingOps() { std::map<dht::Value::Id, std::string> ret; for (auto& [id, pc]: connecting) { if (!pc.requested) { ret[id] = pc.name; pc.requested = true; } } for (auto& [id, pc]: waiting) { if (!pc.requested) { ret[id] = pc.name; pc.requested = true; } } return ret; } std::vector<std::map<std::string, std::string>> getConnectionList(tls::CertificateStore& certStore) const { std::lock_guard<std::mutex> lk(mtx_); std::vector<std::map<std::string, std::string>> ret; ret.reserve(info.size() + connecting.size() + waiting.size()); for (auto& [id, ci] : info) { std::lock_guard<std::mutex> lk(ci->mutex_); ret.emplace_back(ci->getInfo(deviceId, id, certStore)); } auto cert = certStore.getCertificate(deviceId.toString()); for (const auto& [vid, ci] : connecting) { ret.emplace_back(std::map<std::string, std::string> { {"id", callbackIdToString(deviceId, vid)}, {"status", std::to_string(static_cast<int>(ConnectionStatus::Connecting))}, {"device", deviceId.toString()}, {"peer", cert ? cert->issuer->getId().toString() : ""} }); } for (const auto& [vid, ci] : waiting) { ret.emplace_back(std::map<std::string, std::string> { {"id", callbackIdToString(deviceId, vid)}, {"status", std::to_string(static_cast<int>(ConnectionStatus::Waiting))}, {"device", deviceId.toString()}, {"peer", cert ? cert->issuer->getId().toString() : ""} }); } return ret; } }; class DeviceInfoSet { public: std::shared_ptr<DeviceInfo> getDeviceInfo(const DeviceId& deviceId) { std::lock_guard<std::mutex> lk(mtx_); auto it = infos_.find(deviceId); if (it != infos_.end()) return it->second; return {}; } std::vector<std::shared_ptr<DeviceInfo>> getDeviceInfos() { std::vector<std::shared_ptr<DeviceInfo>> deviceInfos; std::lock_guard<std::mutex> lk(mtx_); deviceInfos.reserve(infos_.size()); for (auto& [deviceId, info] : infos_) deviceInfos.emplace_back(info); return deviceInfos; } std::shared_ptr<DeviceInfo> createDeviceInfo(const DeviceId& deviceId) { std::lock_guard<std::mutex> lk(mtx_); auto& info = infos_[deviceId]; if (!info) info = std::make_shared<DeviceInfo>(deviceId); return info; } bool removeDeviceInfo(const DeviceId& deviceId) { std::lock_guard<std::mutex> lk(mtx_); return infos_.erase(deviceId) != 0; } std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id) { if (auto info = getDeviceInfo(deviceId)) { std::lock_guard<std::mutex> lk(info->mtx_); auto it = info->info.find(id); if (it != info->info.end()) return it->second; } return {}; } std::vector<std::shared_ptr<ConnectionInfo>> getConnectedInfos() { auto deviceInfos = getDeviceInfos(); std::vector<std::shared_ptr<ConnectionInfo>> ret; ret.reserve(deviceInfos.size()); for (auto& info : deviceInfos) { std::lock_guard<std::mutex> lk(info->mtx_); for (auto& [id, ci] : info->info) { if (ci->socket_) ret.emplace_back(ci); } } return ret; } std::vector<std::shared_ptr<DeviceInfo>> shutdown() { std::vector<std::shared_ptr<DeviceInfo>> ret; std::lock_guard<std::mutex> lk(mtx_); ret.reserve(infos_.size()); for (auto& [deviceId, info] : infos_) { ret.emplace_back(std::move(info)); } infos_.clear(); return ret; } private: std::mutex mtx_ {}; std::map<DeviceId, std::shared_ptr<DeviceInfo>> infos_ {}; }; /** * returns whether or not UPnP is enabled and active_ * ie: if it is able to make port mappings */ bool ConnectionManager::Config::getUPnPActive() const { if (upnpCtrl) return upnpCtrl->isReady(); return false; } class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl> { public: explicit Impl(std::shared_ptr<ConnectionManager::Config> config_) : config_ {std::move(createConfig(config_))} , rand_ {config_->rng ? *config_->rng : dht::crypto::getSeededRandomEngine<std::mt19937_64>()} { loadTreatedMessages(); if(!config_->ioContext) { config_->ioContext = std::make_shared<asio::io_context>(); ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() { try { auto work = asio::make_work_guard(*context); context->run(); } catch (const std::exception& ex) { if (l) l->error("Exception: {}", ex.what()); } }); } } ~Impl() { if (ioContextRunner_) { if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread"); config_->ioContext->stop(); ioContextRunner_->join(); ioContextRunner_.reset(); } } std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; } const dht::crypto::Identity& identity() const { return config_->id; } void shutdown() { if (isDestroying_.exchange(true)) return; std::vector<std::shared_ptr<ConnectionInfo>> unused; std::vector<std::pair<DeviceId, std::vector<PendingCb>>> pending; for (auto& dinfo: infos_.shutdown()) { std::lock_guard<std::mutex> lk(dinfo->mtx_); auto p = dinfo->extractPendingOperations(0, nullptr, false); if (!p.empty()) pending.emplace_back(dinfo->deviceId, std::move(p)); auto uc = dinfo->extractUnusedConnections(); unused.insert(unused.end(), std::make_move_iterator(uc.begin()), std::make_move_iterator(uc.end())); } for (auto& info: unused) info->shutdown(); for (auto& op: pending) for (auto& cb: op.second) cb.cb(nullptr, op.first); if (!unused.empty()) dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); }); } void connectDeviceStartIce(const std::shared_ptr<ConnectionInfo>& info, const std::shared_ptr<dht::crypto::PublicKey>& devicePk, const dht::Value::Id& vid, const std::string& connType, std::function<void(bool)> onConnected); void onResponse(const asio::error_code& ec, const std::weak_ptr<ConnectionInfo>& info, const DeviceId& deviceId, const dht::Value::Id& vid); bool connectDeviceOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const DeviceId& deviceId, const std::string& name, const dht::Value::Id& vid, const std::shared_ptr<dht::crypto::Certificate>& cert); void connectDevice(const DeviceId& deviceId, const std::string& uri, ConnectCallback cb, bool noNewSocket = false, bool forceNewSocket = false, const std::string& connType = ""); void connectDevice(const dht::InfoHash& deviceId, const std::string& uri, ConnectCallbackLegacy cb, bool noNewSocket = false, bool forceNewSocket = false, const std::string& connType = ""); void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name, ConnectCallback cb, bool noNewSocket = false, bool forceNewSocket = false, const std::string& connType = ""); /** * Send a ChannelRequest on the TLS socket. Triggers cb when ready * @param sock socket used to send the request * @param name channel's name * @param vid channel's id * @param deviceId to identify the linked ConnectCallback */ void sendChannelRequest(const std::weak_ptr<DeviceInfo>& dinfo, const std::weak_ptr<ConnectionInfo>& cinfo, const std::shared_ptr<MultiplexedSocket>& sock, const std::string& name, const dht::Value::Id& vid); /** * Triggered when a PeerConnectionRequest comes from the DHT */ void answerTo(IceTransport& ice, const dht::Value::Id& id, const std::shared_ptr<dht::crypto::PublicKey>& fromPk); bool onRequestStartIce(const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req); bool onRequestOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req); void onDhtPeerRequest(const PeerConnectionRequest& req, const std::shared_ptr<dht::crypto::Certificate>& cert); /** * Triggered when a new TLS socket is ready to use * @param ok If succeed * @param deviceId Related device * @param vid vid of the connection request * @param name non empty if TLS was created by connectDevice() */ void onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, bool ok, const DeviceId& deviceId, const dht::Value::Id& vid, const std::string& name = ""); void addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info); void onPeerResponse(PeerConnectionRequest&& req); void onDhtConnected(const dht::crypto::PublicKey& devicePk); const std::shared_future<tls::DhParams> dhParams() const; tls::CertificateStore& certStore() const { return *config_->certStore; } mutable std::mutex messageMutex_ {}; std::set<std::string, std::less<>> treatedMessages_ {}; void loadTreatedMessages(); void saveTreatedMessages() const; /// \return true if the given DHT message identifier has been treated /// \note if message has not been treated yet this method st/ore this id and returns true at /// further calls bool isMessageTreated(std::string_view id); const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; } /** * Published IPv4/IPv6 addresses, used only if defined by the user in account * configuration * */ IpAddr publishedIp_[2] {}; /** * interface name on which this account is bound */ std::string interface_ {"default"}; /** * Get the local interface name on which this account is bound. */ const std::string& getLocalInterface() const { return interface_; } /** * Get the published IP address, fallbacks to NAT if family is unspecified * Prefers the usage of IPv4 if possible. */ IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const; /** * Set published IP address according to given family */ void setPublishedAddress(const IpAddr& ip_addr); /** * Store the local/public addresses used to register */ void storeActiveIpAddress(std::function<void()>&& cb = {}); /** * Create and return ICE options. */ void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept; IceTransportOptions getIceOptions() const noexcept; /** * Inform that a potential peer device have been found. * Returns true only if the device certificate is a valid device certificate. * In that case (true is returned) the account_id parameter is set to the peer account ID. */ static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt, dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger); bool findCertificate(const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb); bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb); /** * returns whether or not UPnP is enabled and active * ie: if it is able to make port mappings */ bool getUPnPActive() const; std::shared_ptr<ConnectionManager::Config> config_; std::unique_ptr<std::thread> ioContextRunner_; mutable std::mutex randMtx_; mutable std::mt19937_64 rand_; iOSConnectedCallback iOSConnectedCb_ {}; DeviceInfoSet infos_ {}; ChannelRequestCallback channelReqCb_ {}; ConnectionReadyCallback connReadyCb_ {}; onICERequestCallback iceReqCb_ {}; std::atomic_bool isDestroying_ {false}; }; void ConnectionManager::Impl::connectDeviceStartIce( const std::shared_ptr<ConnectionInfo>& info, const std::shared_ptr<dht::crypto::PublicKey>& devicePk, const dht::Value::Id& vid, const std::string& connType, std::function<void(bool)> onConnected) { auto deviceId = devicePk->getLongId(); if (!info) { onConnected(false); return; } std::unique_lock<std::mutex> lk(info->mutex_); auto& ice = info->ice_; if (!ice) { if (config_->logger) config_->logger->error("[device {}] No ICE detected", deviceId); onConnected(false); return; } auto iceAttributes = ice->getLocalAttributes(); std::ostringstream icemsg; icemsg << iceAttributes.ufrag << "\n"; icemsg << iceAttributes.pwd << "\n"; for (const auto& addr : ice->getLocalCandidates(1)) { icemsg << addr << "\n"; if (config_->logger) config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr); } // Prepare connection request as a DHT message PeerConnectionRequest val; val.id = vid; /* Random id for the message unicity */ val.ice_msg = icemsg.str(); val.connType = connType; auto value = std::make_shared<dht::Value>(std::move(val)); value->user_type = "peer_request"; // Send connection request through DHT if (config_->logger) config_->logger->debug("[device {}] Sending connection request", deviceId); dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk->getId().toString()), devicePk, value, [l=config_->logger,deviceId](bool ok) { if (l) l->debug("[device {}] Sent connection request. Put encrypted {:s}", deviceId, (ok ? "ok" : "failed")); }); // Wait for call to onResponse() operated by DHT if (isDestroying_) { onConnected(true); // This avoid to wait new negotiation when destroying return; } info->onConnected_ = std::move(onConnected); info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext, std::chrono::steady_clock::now() + DHT_MSG_TIMEOUT); info->waitForAnswer_->async_wait( std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, info, deviceId, vid)); } void ConnectionManager::Impl::onResponse(const asio::error_code& ec, const std::weak_ptr<ConnectionInfo>& winfo, const DeviceId& deviceId, const dht::Value::Id& vid) { if (ec == asio::error::operation_aborted) return; auto info = winfo.lock(); if (!info) return; std::unique_lock<std::mutex> lk(info->mutex_); auto& ice = info->ice_; if (isDestroying_) { info->onConnected_(true); // The destructor can wake a pending wait here. return; } if (!info->responseReceived_) { if (config_->logger) config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId); info->onConnected_(false); return; } if (!info->ice_) { info->onConnected_(false); return; } auto sdp = ice->parseIceCandidates(info->response_.ice_msg); if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { if (config_->logger) config_->logger->warn("[device {}] start ICE failed", deviceId); info->onConnected_(false); return; } info->onConnected_(true); } bool ConnectionManager::Impl::connectDeviceOnNegoDone( const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const DeviceId& deviceId, const std::string& name, const dht::Value::Id& vid, const std::shared_ptr<dht::crypto::Certificate>& cert) { if (!info) return false; std::unique_lock<std::mutex> lk {info->mutex_}; if (info->waitForAnswer_) { // Negotiation is done and connected, go to handshake // and avoid any cancellation at this point. info->waitForAnswer_->cancel(); } auto& ice = info->ice_; if (!ice || !ice->isRunning()) { if (config_->logger) config_->logger->error("[device {}] No ICE detected or not running", deviceId); return false; } // Build socket auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( std::move(ice)), true); // Negotiate a TLS session if (config_->logger) config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid); info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint), certStore(), config_->ioContext, identity(), dhParams(), *cert); info->tls_->setOnReady( [w = weak_from_this(), dinfo, winfo=std::weak_ptr(info), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)]( bool ok) { if (auto shared = w.lock()) if (auto info = winfo.lock()) { shared->onTlsNegotiationDone(dinfo.lock(), info, ok, deviceId, vid, name); // Make another reference to info to avoid destruction (could lead to a deadlock/crash). dht::ThreadPool::io().run([info = std::move(info)] {}); } }); return true; } void ConnectionManager::Impl::connectDevice(const DeviceId& deviceId, const std::string& name, ConnectCallback cb, bool noNewSocket, bool forceNewSocket, const std::string& connType) { if (!dht()) { cb(nullptr, deviceId); return; } if (deviceId.toString() == identity().second->getLongId().toString()) { cb(nullptr, deviceId); return; } findCertificate(deviceId, [w = weak_from_this(), deviceId, name, cb = std::move(cb), noNewSocket, forceNewSocket, connType](const std::shared_ptr<dht::crypto::Certificate>& cert) { if (!cert) { if (auto shared = w.lock()) if (shared->config_->logger) shared->config_->logger->error( "No valid certificate found for device {}", deviceId); cb(nullptr, deviceId); return; } if (auto shared = w.lock()) { shared->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType); } else cb(nullptr, deviceId); }); } void ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId, const std::string& name, ConnectCallbackLegacy cb, bool noNewSocket, bool forceNewSocket, const std::string& connType) { if (!dht()) { cb(nullptr, deviceId); return; } if (deviceId.toString() == identity().second->getLongId().toString()) { cb(nullptr, deviceId); return; } findCertificate(deviceId, [w = weak_from_this(), deviceId, name, cb = std::move(cb), noNewSocket, forceNewSocket, connType](const std::shared_ptr<dht::crypto::Certificate>& cert) { if (!cert) { if (auto shared = w.lock()) if (shared->config_->logger) shared->config_->logger->error( "No valid certificate found for device {}", deviceId); cb(nullptr, deviceId); return; } if (auto shared = w.lock()) { shared->connectDevice(cert, name, [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){ cb(sock, deviceId); }, noNewSocket, forceNewSocket, connType); } else cb(nullptr, deviceId); }); } void ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name, ConnectCallback cb, bool noNewSocket, bool forceNewSocket, const std::string& connType) { // Avoid dht operation in a DHT callback to avoid deadlocks dht::ThreadPool::computation().run([w = weak_from_this(), name = std::move(name), cert = std::move(cert), cb = std::move(cb), noNewSocket, forceNewSocket, connType] { auto devicePk = cert->getSharedPublicKey(); auto deviceId = devicePk->getLongId(); auto sthis = w.lock(); if (!sthis || sthis->isDestroying_) { cb(nullptr, deviceId); return; } auto di = sthis->infos_.createDeviceInfo(deviceId); std::unique_lock<std::mutex> lk(di->mtx_); dht::Value::Id vid; { std::lock_guard<std::mutex> lkr(sthis->randMtx_); vid = di->newId(sthis->rand_); } // Check if already connecting auto isConnectingToDevice = di->isConnecting(); // Note: we can be in a state where first // socket is negotiated and first channel is pending // so return only after we checked the info auto& diw = (isConnectingToDevice && !forceNewSocket) ? di->waiting[vid] : di->connecting[vid]; diw = PendingCb {name, std::move(cb)}; // Check if already negotiated if (auto info = di->getConnectedInfo()) { std::unique_lock<std::mutex> lkc(info->mutex_); if (auto sock = info->socket_) { info->cbIds_.emplace(vid); diw.requested = true; lkc.unlock(); lk.unlock(); if (sthis->config_->logger) sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId); sthis->sendChannelRequest(di, info, sock, name, vid); return; } } if (isConnectingToDevice && !forceNewSocket) { if (sthis->config_->logger) sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId); return; } if (noNewSocket) { // If no new socket is specified, we don't try to generate a new socket di->executePendingOperations(lk, vid, nullptr); return; } // Note: used when the ice negotiation fails to erase // all stored structures. auto eraseInfo = [w, diw=std::weak_ptr(di), vid] { if (auto di = diw.lock()) { std::unique_lock<std::mutex> lk(di->mtx_); di->info.erase(vid); auto ops = di->extractPendingOperations(vid, nullptr); if (di->empty()) { if (auto shared = w.lock()) shared->infos_.removeDeviceInfo(di->deviceId); } lk.unlock(); for (const auto& op: ops) op.cb(nullptr, di->deviceId); } }; // If no socket exists, we need to initiate an ICE connection. sthis->getIceOptions([w, deviceId = std::move(deviceId), devicePk = std::move(devicePk), diw=std::weak_ptr(di), name = std::move(name), cert = std::move(cert), vid, connType, eraseInfo](auto&& ice_config) { auto sthis = w.lock(); if (!sthis) { dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); return; } auto info = std::make_shared<ConnectionInfo>(); auto winfo = std::weak_ptr(info); ice_config.tcpEnable = true; ice_config.onInitDone = [w, devicePk = std::move(devicePk), name = std::move(name), cert = std::move(cert), diw, winfo = std::weak_ptr(info), vid, connType, eraseInfo](bool ok) { dht::ThreadPool::io().run([w = std::move(w), devicePk = std::move(devicePk), vid, winfo, eraseInfo, connType, ok] { auto sthis = w.lock(); if (!ok && sthis && sthis->config_->logger) sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId()); if (!sthis || !ok) { eraseInfo(); return; } sthis->connectDeviceStartIce(winfo.lock(), devicePk, vid, connType, [=](bool ok) { if (!ok) { dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); } }); }); }; ice_config.onNegoDone = [w, deviceId, name, cert = std::move(cert), diw, winfo = std::weak_ptr(info), vid, eraseInfo](bool ok) { dht::ThreadPool::io().run([w = std::move(w), deviceId = std::move(deviceId), name = std::move(name), cert = std::move(cert), diw = std::move(diw), winfo = std::move(winfo), vid = std::move(vid), eraseInfo = std::move(eraseInfo), ok] { auto sthis = w.lock(); if (!ok && sthis && sthis->config_->logger) sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId); if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(diw, winfo.lock(), deviceId, name, vid, cert)) eraseInfo(); }); }; if (auto di = diw.lock()) { std::lock_guard<std::mutex> lk(di->mtx_); di->info[vid] = info; } std::unique_lock<std::mutex> lk {info->mutex_}; ice_config.master = false; ice_config.streamsCount = 1; ice_config.compCountPerStream = 1; info->ice_ = sthis->config_->factory->createUTransport(""); if (!info->ice_) { if (sthis->config_->logger) sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId); eraseInfo(); return; } // We need to detect any shutdown if the ice session is destroyed before going to the // TLS session; info->ice_->setOnShutdown([eraseInfo]() { dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); }); try { info->ice_->initIceInstance(ice_config); } catch (const std::exception& e) { if (sthis->config_->logger) sthis->config_->logger->error("{}", e.what()); dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); } }); }); } void ConnectionManager::Impl::sendChannelRequest(const std::weak_ptr<DeviceInfo>& dinfow, const std::weak_ptr<ConnectionInfo>& cinfow, const std::shared_ptr<MultiplexedSocket>& sock, const std::string& name, const dht::Value::Id& vid) { auto channelSock = sock->addChannel(name); if (!channelSock) { if (config_->logger) config_->logger->error("sendChannelRequest failed - cannot create channel"); if (auto info = dinfow.lock()) info->executePendingOperations(vid, nullptr); return; } channelSock->onShutdown([dinfow, name, vid] { if (auto info = dinfow.lock()) info->executePendingOperations(vid, nullptr); }); channelSock->onReady( [dinfow, cinfow, wSock = std::weak_ptr(channelSock), name, vid](bool accepted) { if (auto dinfo = dinfow.lock()) { dinfo->executePendingOperations(vid, accepted ? wSock.lock() : nullptr, accepted); if (auto cinfo = cinfow.lock()) { std::lock_guard<std::mutex> lk(cinfo->mutex_); cinfo->cbIds_.erase(vid); } } }); ChannelRequest val; val.name = channelSock->name(); val.state = ChannelRequestState::REQUEST; val.channel = channelSock->channel(); msgpack::sbuffer buffer(256); msgpack::pack(buffer, val); std::error_code ec; int res = sock->write(CONTROL_CHANNEL, reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec); if (res < 0) { // TODO check if we should handle errors here if (config_->logger) config_->logger->error("sendChannelRequest failed - error: {}", ec.message()); } } void ConnectionManager::Impl::onPeerResponse(PeerConnectionRequest&& req) { auto device = req.owner->getLongId(); if (auto info = infos_.getInfo(device, req.id)) { if (config_->logger) config_->logger->debug("[device {}] New response received", device); std::lock_guard<std::mutex> lk {info->mutex_}; info->responseReceived_ = true; info->response_ = std::move(req); info->waitForAnswer_->expires_at(std::chrono::steady_clock::now()); info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, std::weak_ptr(info), device, req.id)); } else { if (config_->logger) config_->logger->warn("[device {}] Respond received, but cannot find request", device); } } void ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk) { if (!dht()) return; dht()->listen<PeerConnectionRequest>( dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()), [w = weak_from_this()](PeerConnectionRequest&& req) { auto shared = w.lock(); if (!shared) return false; if (shared->isMessageTreated(to_hex_string(req.id))) { // Message already treated. Just ignore return true; } if (req.isAnswer) { if (shared->config_->logger) shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId()); } else { if (shared->config_->logger) shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId()); } if (req.isAnswer) { shared->onPeerResponse(std::move(req)); } else { // Async certificate checking shared->findCertificate( req.from, [w, req = std::move(req)]( const std::shared_ptr<dht::crypto::Certificate>& cert) mutable { auto shared = w.lock(); if (!shared) return; dht::InfoHash peer_h; if (foundPeerDevice(cert, peer_h, shared->config_->logger)) { #if TARGET_OS_IOS if (shared->iOSConnectedCb_(req.connType, peer_h)) return; #endif shared->onDhtPeerRequest(req, cert); } else { if (shared->config_->logger) shared->config_->logger->warn( "[device {}] Received request from untrusted peer", req.owner->getLongId()); } }); } return true; }, dht::Value::UserTypeFilter("peer_request")); } void ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, bool ok, const DeviceId& deviceId, const dht::Value::Id& vid, const std::string& name) { if (isDestroying_) return; // Note: only handle pendingCallbacks here for TLS initied by connectDevice() // Note: if not initied by connectDevice() the channel name will be empty (because no channel // asked yet) auto isDhtRequest = name.empty(); if (!ok) { if (isDhtRequest) { if (config_->logger) config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}", deviceId, name, vid); if (connReadyCb_) connReadyCb_(deviceId, "", nullptr); } else { if (config_->logger) config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}", deviceId, name, vid); dinfo->executePendingOperations(vid, nullptr); } std::unique_lock<std::mutex> lk(dinfo->mtx_); dinfo->info.erase(vid); if (dinfo->empty()) { infos_.removeDeviceInfo(dinfo->deviceId); } } else { // The socket is ready, store it if (isDhtRequest) { if (config_->logger) config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}", deviceId, vid); } else { if (config_->logger) config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}", deviceId, name, vid); } // Note: do not remove pending there it's done in sendChannelRequest std::unique_lock<std::mutex> lk2 {dinfo->mtx_}; auto pendingIds = dinfo->requestPendingOps(); lk2.unlock(); std::unique_lock<std::mutex> lk {info->mutex_}; addNewMultiplexedSocket(dinfo, deviceId, vid, info); // Finally, open the channel and launch pending callbacks lk.unlock(); for (const auto& [id, name]: pendingIds) { if (config_->logger) config_->logger->debug("[device {}] Send request on TLS socket for channel {}", deviceId, name); sendChannelRequest(dinfo, info, info->socket_, name, id); } } } void ConnectionManager::Impl::answerTo(IceTransport& ice, const dht::Value::Id& id, const std::shared_ptr<dht::crypto::PublicKey>& from) { // NOTE: This is a shortest version of a real SDP message to save some bits auto iceAttributes = ice.getLocalAttributes(); std::ostringstream icemsg; icemsg << iceAttributes.ufrag << "\n"; icemsg << iceAttributes.pwd << "\n"; for (const auto& addr : ice.getLocalCandidates(1)) { icemsg << addr << "\n"; } // Send PeerConnection response PeerConnectionRequest val; val.id = id; val.ice_msg = icemsg.str(); val.isAnswer = true; auto value = std::make_shared<dht::Value>(std::move(val)); value->user_type = "peer_request"; if (config_->logger) config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId()); dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix + from->getId().toString()), from, value, [from,l=config_->logger](bool ok) { if (l) l->debug("[device {}] Answer to connection request: put encrypted {:s}", from->getLongId(), (ok ? "ok" : "failed")); }); } bool ConnectionManager::Impl::onRequestStartIce(const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req) { if (!info) return false; auto deviceId = req.owner->getLongId(); std::unique_lock<std::mutex> lk {info->mutex_}; auto& ice = info->ice_; if (!ice) { if (config_->logger) config_->logger->error("[device {}] No ICE detected", deviceId); if (connReadyCb_) connReadyCb_(deviceId, "", nullptr); return false; } auto sdp = ice->parseIceCandidates(req.ice_msg); answerTo(*ice, req.id, req.owner); if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { if (config_->logger) config_->logger->error("[device {}] Start ICE failed", deviceId); ice = nullptr; if (connReadyCb_) connReadyCb_(deviceId, "", nullptr); return false; } return true; } bool ConnectionManager::Impl::onRequestOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req) { if (!info) return false; auto deviceId = req.owner->getLongId(); std::unique_lock<std::mutex> lk {info->mutex_}; auto& ice = info->ice_; if (!ice) { if (config_->logger) config_->logger->error("[device {}] No ICE detected", deviceId); return false; } // Build socket auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( std::move(ice)), false); // init TLS session auto ph = req.from; if (config_->logger) config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}", deviceId, req.id); info->tls_ = std::make_unique<TlsSocketEndpoint>( std::move(endpoint), certStore(), config_->ioContext, identity(), dhParams(), [ph, deviceId, w=weak_from_this(), l=config_->logger](const dht::crypto::Certificate& cert) { auto shared = w.lock(); if (!shared) return false; if (cert.getPublicKey().getId() != ph || deviceId != cert.getPublicKey().getLongId()) { if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.", deviceId, cert.getPublicKey().getLongId()); return false; } auto crt = shared->certStore().getCertificate(cert.getLongId().toString()); if (!crt) return false; return crt->getPacked() == cert.getPacked(); }); info->tls_->setOnReady( [w = weak_from_this(), dinfo, winfo=std::weak_ptr(info), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) { if (auto shared = w.lock()) if (auto info = winfo.lock()) { shared->onTlsNegotiationDone(dinfo.lock(), winfo.lock(), ok, deviceId, vid); // Make another reference to info to avoid destruction (could lead to a deadlock/crash). dht::ThreadPool::io().run([info = std::move(info)] {}); } }); return true; } void ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, const std::shared_ptr<dht::crypto::Certificate>& /*cert*/) { auto deviceId = req.owner->getLongId(); if (config_->logger) config_->logger->debug("[device {}] New connection request", deviceId); if (!iceReqCb_ || !iceReqCb_(deviceId)) { if (config_->logger) config_->logger->debug("[device {}] Refusing connection", deviceId); return; } // Because the connection is accepted, create an ICE socket. getIceOptions([w = weak_from_this(), req, deviceId](auto&& ice_config) { auto shared = w.lock(); if (!shared) return; auto di = shared->infos_.createDeviceInfo(deviceId); auto info = std::make_shared<ConnectionInfo>(); auto wdi = std::weak_ptr(di); auto winfo = std::weak_ptr(info); // Note: used when the ice negotiation fails to erase // all stored structures. auto eraseInfo = [w, wdi, id = req.id] { auto shared = w.lock(); if (auto di = wdi.lock()) { std::unique_lock<std::mutex> lk(di->mtx_); di->info.erase(id); auto ops = di->extractPendingOperations(id, nullptr); if (di->empty()) { if (shared) shared->infos_.removeDeviceInfo(di->deviceId); } lk.unlock(); for (const auto& op: ops) op.cb(nullptr, di->deviceId); if (shared && shared->connReadyCb_) shared->connReadyCb_(di->deviceId, "", nullptr); } }; ice_config.master = true; ice_config.streamsCount = 1; ice_config.compCountPerStream = 1; // TCP ice_config.tcpEnable = true; ice_config.onInitDone = [w, winfo, req, eraseInfo](bool ok) { auto shared = w.lock(); if (!shared) return; if (!ok) { if (shared->config_->logger) shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId()); dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); return; } dht::ThreadPool::io().run( [w = std::move(w), winfo = std::move(winfo), req = std::move(req), eraseInfo = std::move(eraseInfo)] { if (auto shared = w.lock()) { if (!shared->onRequestStartIce(winfo.lock(), req)) eraseInfo(); } }); }; ice_config.onNegoDone = [w, wdi, winfo, req, eraseInfo](bool ok) { auto shared = w.lock(); if (!shared) return; if (!ok) { if (shared->config_->logger) shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId()); dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); return; } dht::ThreadPool::io().run( [w = std::move(w), wdi = std::move(wdi), winfo = std::move(winfo), req = std::move(req), eraseInfo = std::move(eraseInfo)] { if (auto shared = w.lock()) if (!shared->onRequestOnNegoDone(wdi.lock(), winfo.lock(), req)) eraseInfo(); }); }; // Negotiate a new ICE socket { std::lock_guard<std::mutex> lk(di->mtx_); di->info[req.id] = info; } if (shared->config_->logger) shared->config_->logger->debug("[device {}] Accepting connection", deviceId); std::unique_lock<std::mutex> lk {info->mutex_}; info->ice_ = shared->config_->factory->createUTransport(""); if (not info->ice_) { if (shared->config_->logger) shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId); eraseInfo(); return; } // We need to detect any shutdown if the ice session is destroyed before going to the TLS session; info->ice_->setOnShutdown([eraseInfo]() { dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); }); try { info->ice_->initIceInstance(ice_config); } catch (const std::exception& e) { if (shared->config_->logger) shared->config_->logger->error("{}", e.what()); dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); } }); } void ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info) { info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, deviceId, std::move(info->tls_), config_->logger); info->socket_->setOnReady( [w = weak_from_this()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) { if (auto sthis = w.lock()) if (sthis->connReadyCb_) sthis->connReadyCb_(deviceId, socket->name(), socket); }); info->socket_->setOnRequest([w = weak_from_this()](const std::shared_ptr<dht::crypto::Certificate>& peer, const uint16_t&, const std::string& name) { if (auto sthis = w.lock()) if (sthis->channelReqCb_) return sthis->channelReqCb_(peer, name); return false; }); info->socket_->onShutdown([dinfo, wi=std::weak_ptr(info), vid]() { // Cancel current outgoing connections dht::ThreadPool::io().run([dinfo, wi, vid] { std::set<dht::Value::Id> ids; if (auto info = wi.lock()) { std::lock_guard<std::mutex> lk(info->mutex_); if (info->socket_) { ids = std::move(info->cbIds_); info->socket_->shutdown(); } } if (auto deviceInfo = dinfo.lock()) { std::shared_ptr<ConnectionInfo> info; std::vector<PendingCb> ops; std::unique_lock<std::mutex> lk(deviceInfo->mtx_); auto it = deviceInfo->info.find(vid); if (it != deviceInfo->info.end()) { info = std::move(it->second); deviceInfo->info.erase(it); } for (const auto& cbId : ids) { auto po = deviceInfo->extractPendingOperations(cbId, nullptr); ops.insert(ops.end(), po.begin(), po.end()); } lk.unlock(); for (auto& op : ops) op.cb(nullptr, deviceInfo->deviceId); } }); }); } const std::shared_future<tls::DhParams> ConnectionManager::Impl::dhParams() const { return dht::ThreadPool::computation().get<tls::DhParams>( std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams")); } template<typename ID = dht::Value::Id> std::set<ID, std::less<>> loadIdList(const std::filesystem::path& path) { std::set<ID, std::less<>> ids; std::ifstream file(path); if (!file.is_open()) { //JAMI_DBG("Could not load %s", path.c_str()); return ids; } std::string line; while (std::getline(file, line)) { if constexpr (std::is_same<ID, std::string>::value) { ids.emplace(std::move(line)); } else if constexpr (std::is_integral<ID>::value) { ID vid; if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16); ec == std::errc()) { ids.emplace(vid); } } } return ids; } template<typename List = std::set<dht::Value::Id>> void saveIdList(const std::filesystem::path& path, const List& ids) { std::ofstream file(path, std::ios::trunc | std::ios::binary); if (!file.is_open()) { //JAMI_ERR("Could not save to %s", path.c_str()); return; } for (auto& c : ids) file << std::hex << c << "\n"; } void ConnectionManager::Impl::loadTreatedMessages() { std::lock_guard<std::mutex> lock(messageMutex_); auto path = config_->cachePath / "treatedMessages"; treatedMessages_ = loadIdList<std::string>(path.string()); if (treatedMessages_.empty()) { auto messages = loadIdList(path.string()); for (const auto& m : messages) treatedMessages_.emplace(to_hex_string(m)); } } void ConnectionManager::Impl::saveTreatedMessages() const { dht::ThreadPool::io().run([w = weak_from_this()]() { if (auto sthis = w.lock()) { auto& this_ = *sthis; std::lock_guard<std::mutex> lock(this_.messageMutex_); fileutils::check_dir(this_.config_->cachePath.c_str()); saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages", this_.treatedMessages_); } }); } bool ConnectionManager::Impl::isMessageTreated(std::string_view id) { std::lock_guard<std::mutex> lock(messageMutex_); auto res = treatedMessages_.emplace(id); if (res.second) { saveTreatedMessages(); return false; } return true; } /** * returns whether or not UPnP is enabled and active_ * ie: if it is able to make port mappings */ bool ConnectionManager::Impl::getUPnPActive() const { return config_->getUPnPActive(); } IpAddr ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const { if (family == AF_INET) return publishedIp_[0]; if (family == AF_INET6) return publishedIp_[1]; assert(family == AF_UNSPEC); // If family is not set, prefere IPv4 if available. It's more // likely to succeed behind NAT. if (publishedIp_[0]) return publishedIp_[0]; if (publishedIp_[1]) return publishedIp_[1]; return {}; } void ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr) { if (ip_addr.getFamily() == AF_INET) { publishedIp_[0] = ip_addr; } else { publishedIp_[1] = ip_addr; } } void ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb) { dht()->getPublicAddress([w=weak_from_this(), cb = std::move(cb)](std::vector<dht::SockAddr>&& results) { auto shared = w.lock(); if (!shared) return; bool hasIpv4 {false}, hasIpv6 {false}; for (auto& result : results) { auto family = result.getFamily(); if (family == AF_INET) { if (not hasIpv4) { hasIpv4 = true; if (shared->config_->logger) shared->config_->logger->debug("Store DHT public IPv4 address: {}", result); //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str()); shared->setPublishedAddress(*result.get()); if (shared->config_->upnpCtrl) { shared->config_->upnpCtrl->setPublicAddress(*result.get()); } } } else if (family == AF_INET6) { if (not hasIpv6) { hasIpv6 = true; if (shared->config_->logger) shared->config_->logger->debug("Store DHT public IPv6 address: {}", result); shared->setPublishedAddress(*result.get()); } } if (hasIpv4 and hasIpv6) break; } if (cb) cb(); }); } void ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept { storeActiveIpAddress([this, cb = std::move(cb)] { IceTransportOptions opts = ConnectionManager::Impl::getIceOptions(); auto publishedAddr = getPublishedIpAddress(); if (publishedAddr) { auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(), publishedAddr.getFamily()); if (interfaceAddr) { opts.accountLocalAddr = interfaceAddr; opts.accountPublicAddr = publishedAddr; } } if (cb) cb(std::move(opts)); }); } IceTransportOptions ConnectionManager::Impl::getIceOptions() const noexcept { IceTransportOptions opts; opts.factory = config_->factory; opts.upnpEnable = getUPnPActive(); opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr; if (config_->stunEnabled) opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer)); if (config_->turnEnabled) { if (config_->turnCache) { auto turnAddr = config_->turnCache->getResolvedTurn(); if (turnAddr != std::nullopt) { opts.turnServers.emplace_back(TurnServerInfo() .setUri(turnAddr->toString()) .setUsername(config_->turnServerUserName) .setPassword(config_->turnServerPwd) .setRealm(config_->turnServerRealm)); } } else { opts.turnServers.emplace_back(TurnServerInfo() .setUri(config_->turnServer) .setUsername(config_->turnServerUserName) .setPassword(config_->turnServerPwd) .setRealm(config_->turnServerRealm)); } // NOTE: first test with ipv6 turn was not concluant and resulted in multiple // co issues. So this needs some debug. for now just disable // if (cacheTurnV6 && *cacheTurnV6) { // opts.turnServers.emplace_back(TurnServerInfo() // .setUri(cacheTurnV6->toString(true)) // .setUsername(turnServerUserName_) // .setPassword(turnServerPwd_) // .setRealm(turnServerRealm_)); //} } return opts; } bool ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt, dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger) { if (not crt) return false; auto top_issuer = crt; while (top_issuer->issuer) top_issuer = top_issuer->issuer; // Device certificate can't be self-signed if (top_issuer == crt) { if (logger) logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId()); return false; } // Check peer certificate chain // Trust store with top issuer as the only CA dht::crypto::TrustList peer_trust; peer_trust.add(*top_issuer); if (not peer_trust.verify(*crt)) { if (logger) logger->warn("Found invalid peer device: {}", crt->getLongId()); return false; } // Check cached OCSP response if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) { if (logger) logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId()); return false; } account_id = crt->issuer->getId(); if (logger) logger->warn("Found peer device: {} account:{} CA:{}", crt->getLongId(), account_id, top_issuer->getId()); return true; } bool ConnectionManager::Impl::findCertificate( const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb) { if (auto cert = certStore().getCertificate(id.toString())) { if (cb) cb(cert); } else if (cb) cb(nullptr); return true; } bool ConnectionManager::Impl::findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb) { if (auto cert = certStore().getCertificate(h.toString())) { if (cb) cb(cert); } else { dht()->findCertificate(h, [cb = std::move(cb), this]( const std::shared_ptr<dht::crypto::Certificate>& crt) { if (crt) certStore().pinCertificate(crt); if (cb) cb(crt); }); } return true; } std::shared_ptr<ConnectionManager::Config> buildDefaultConfig(dht::crypto::Identity id){ auto conf = std::make_shared<ConnectionManager::Config>(); conf->id = std::move(id); return conf; } ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_) : pimpl_ {std::make_shared<Impl>(config_)} {} ConnectionManager::ConnectionManager(dht::crypto::Identity id) : ConnectionManager {buildDefaultConfig(id)} {} ConnectionManager::~ConnectionManager() { if (pimpl_) pimpl_->shutdown(); } void ConnectionManager::connectDevice(const DeviceId& deviceId, const std::string& name, ConnectCallback cb, bool noNewSocket, bool forceNewSocket, const std::string& connType) { pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType); } void ConnectionManager::connectDevice(const dht::InfoHash& deviceId, const std::string& name, ConnectCallbackLegacy cb, bool noNewSocket, bool forceNewSocket, const std::string& connType) { pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType); } void ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name, ConnectCallback cb, bool noNewSocket, bool forceNewSocket, const std::string& connType) { pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType); } bool ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const { if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) { std::unique_lock<std::mutex> lk {dinfo->mtx_}; return dinfo->isConnecting(name); } return false; } bool ConnectionManager::isConnected(const DeviceId& deviceId) const { if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) { std::unique_lock<std::mutex> lk {dinfo->mtx_}; return dinfo->getConnectedInfo() != nullptr; } return false; } void ConnectionManager::closeConnectionsWith(const std::string& peerUri) { std::vector<std::shared_ptr<DeviceInfo>> dInfos; for (const auto& dinfo: pimpl_->infos_.getDeviceInfos()) { std::unique_lock<std::mutex> lk(dinfo->mtx_); bool isPeer = false; for (auto const& [id, cinfo]: dinfo->info) { std::lock_guard<std::mutex> lkv {cinfo->mutex_}; auto tls = cinfo->tls_ ? cinfo->tls_.get() : (cinfo->socket_ ? cinfo->socket_->endpoint() : nullptr); auto cert = tls ? tls->peerCertificate() : nullptr; if (not cert) cert = pimpl_->certStore().getCertificate(dinfo->deviceId.toString()); if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) { isPeer = true; break; } } lk.unlock(); if (isPeer) { dInfos.emplace_back(std::move(dinfo)); } } // Stop connections to all peers devices for (const auto& dinfo : dInfos) { std::unique_lock<std::mutex> lk {dinfo->mtx_}; auto unused = dinfo->extractUnusedConnections(); auto pending = dinfo->extractPendingOperations(0, nullptr); pimpl_->infos_.removeDeviceInfo(dinfo->deviceId); lk.unlock(); for (auto& op : unused) op->shutdown(); for (auto& op : pending) op.cb(nullptr, dinfo->deviceId); } } void ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk) { pimpl_->onDhtConnected(devicePk); } void ConnectionManager::onICERequest(onICERequestCallback&& cb) { pimpl_->iceReqCb_ = std::move(cb); } void ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb) { pimpl_->channelReqCb_ = std::move(cb); } void ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb) { pimpl_->connReadyCb_ = std::move(cb); } void ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb) { pimpl_->iOSConnectedCb_ = std::move(cb); } std::size_t ConnectionManager::activeSockets() const { return pimpl_->infos_.getConnectedInfos().size(); } void ConnectionManager::monitor() const { auto logger = pimpl_->config_->logger; if (!logger) return; logger->debug("ConnectionManager current status:"); for (const auto& ci : pimpl_->infos_.getConnectedInfos()) { std::lock_guard<std::mutex> lk(ci->mutex_); if (ci->socket_) ci->socket_->monitor(); } logger->debug("ConnectionManager end status."); } void ConnectionManager::connectivityChanged() { for (const auto& ci : pimpl_->infos_.getConnectedInfos()) { std::lock_guard<std::mutex> lk(ci->mutex_); if (ci->socket_) dht::ThreadPool::io().run([s = ci->socket_] { s->sendBeacon(); }); } } void ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept { return pimpl_->getIceOptions(std::move(cb)); } IceTransportOptions ConnectionManager::getIceOptions() const noexcept { return pimpl_->getIceOptions(); } IpAddr ConnectionManager::getPublishedIpAddress(uint16_t family) const { return pimpl_->getPublishedIpAddress(family); } void ConnectionManager::setPublishedAddress(const IpAddr& ip_addr) { return pimpl_->setPublishedAddress(ip_addr); } void ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb) { return pimpl_->storeActiveIpAddress(std::move(cb)); } std::shared_ptr<ConnectionManager::Config> ConnectionManager::getConfig() { return pimpl_->config_; } std::vector<std::map<std::string, std::string>> ConnectionManager::getConnectionList(const DeviceId& device) const { std::vector<std::map<std::string, std::string>> connectionsList; if (device) { if (auto deviceInfo = pimpl_->infos_.getDeviceInfo(device)) { connectionsList = deviceInfo->getConnectionList(pimpl_->certStore()); } } else { for (const auto& deviceInfo : pimpl_->infos_.getDeviceInfos()) { auto cl = deviceInfo->getConnectionList(pimpl_->certStore()); connectionsList.insert(connectionsList.end(), std::make_move_iterator(cl.begin()), std::make_move_iterator(cl.end())); } } return connectionsList; } std::vector<std::map<std::string, std::string>> ConnectionManager::getChannelList(const std::string& connectionId) const { auto [deviceId, valueId] = parseCallbackId(connectionId); if (auto info = pimpl_->infos_.getInfo(deviceId, valueId)) { std::lock_guard<std::mutex> lk(info->mutex_); if (info->socket_) return info->socket_->getChannelList(); } return {}; } } // namespace dhtnet