Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
connectionmanager.cpp 72.26 KiB
/*
 *  Copyright (C) 2004-2023 Savoir-faire Linux Inc.
 *
 *  This program is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program. If not, see <https://www.gnu.org/licenses/>.
 */
#include "connectionmanager.h"
#include "peer_connection.h"
#include "upnp/upnp_control.h"
#include "certstore.h"
#include "fileutils.h"
#include "sip_utils.h"
#include "string_utils.h"

#include <opendht/crypto.h>
#include <opendht/thread_pool.h>
#include <opendht/value.h>
#include <asio.hpp>

#include <algorithm>
#include <mutex>
#include <map>
#include <condition_variable>
#include <set>
#include <charconv>
#include <fstream>

namespace dhtnet {
static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
static constexpr uint64_t ID_MAX_VAL = 9007199254740992;

using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
std::string
callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
{
    return fmt::format("{} {}", did.to_view(), vid);
}

CallbackId parseCallbackId(std::string_view ci)
{
    auto sep = ci.find(' ');
    std::string_view deviceIdString = ci.substr(0, sep);
    std::string_view vidString = ci.substr(sep + 1);

    dhtnet::DeviceId deviceId(deviceIdString);
    dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);

    return CallbackId(deviceId, vid);
}

std::shared_ptr<ConnectionManager::Config>
createConfig(std::shared_ptr<ConnectionManager::Config> config_)
{
    if (!config_->certStore){
        config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger);
    }
    if (!config_->dht) {
        dht::DhtRunner::Config dhtConfig;
        dhtConfig.dht_config.id = config_->id;
        dhtConfig.threaded = true;
        dht::DhtRunner::Context dhtContext;
        dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) {
            std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
            if (auto cert = c->getCertificate(pk_id.toString()))
                ret.emplace_back(std::move(cert));
            return ret;
        };
        config_->dht = std::make_shared<dht::DhtRunner>();
        config_->dht->run(dhtConfig, std::move(dhtContext));
        config_->dht->bootstrap("bootstrap.jami.net");
    }
    if (!config_->factory){
        config_->factory = std::make_shared<IceTransportFactory>(config_->logger);
    }
    return config_;
}

struct ConnectionInfo
{
    ~ConnectionInfo()
    {
        if (socket_)
            socket_->join();
    }

    std::mutex mutex_ {};
    bool responseReceived_ {false};
    PeerConnectionRequest response_ {};
    std::unique_ptr<IceTransport> ice_ {nullptr};
    // Used to store currently non ready TLS Socket
    std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
    std::shared_ptr<MultiplexedSocket> socket_ {};
    std::set<CallbackId> cbIds_ {};

    std::function<void(bool)> onConnected_;
    std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
};

/**
 * returns whether or not UPnP is enabled and active_
 * ie: if it is able to make port mappings
 */
bool
ConnectionManager::Config::getUPnPActive() const
{
    if (upnpCtrl)
        return upnpCtrl->isReady();
    return false;
}

class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
{
public:
    explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
        : config_ {std::move(createConfig(config_))}
        , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
    {
        loadTreatedMessages();
        if(!config_->ioContext) {
            config_->ioContext = std::make_shared<asio::io_context>();
            ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() {
                try {
                    auto work = asio::make_work_guard(*context);
                    context->run();
                } catch (const std::exception& ex) {
                    if (l) l->error("Exception: {}", ex.what());
                }
            });
        }
    }
    ~Impl() {
        if (ioContextRunner_) {
            if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread");
            config_->ioContext->stop();
            ioContextRunner_->join();
            ioContextRunner_.reset();
        }
    }

    std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
    const dht::crypto::Identity& identity() const { return config_->id; }

    void removeUnusedConnections(const DeviceId& deviceId = {})
    {
        std::vector<std::shared_ptr<ConnectionInfo>> unused {};

        {
            std::lock_guard<std::mutex> lk(infosMtx_);
            for (auto it = infos_.begin(); it != infos_.end();) {
                auto& [key, info] = *it;
                if (info && (!deviceId || key.first == deviceId)) {
                    unused.emplace_back(std::move(info));
                    it = infos_.erase(it);
                } else {
                    ++it;
                }
            }
        }
        for (auto& info: unused) {
            if (info->tls_)
                info->tls_->shutdown();
            if (info->socket_)
                info->socket_->shutdown();
            if (info->waitForAnswer_)
                info->waitForAnswer_->cancel();
        }
        if (!unused.empty())
            dht::ThreadPool::io().run([infos = std::move(unused)]() mutable {
                infos.clear();
            });
    }

    void shutdown()
    {
        if (isDestroying_.exchange(true))
            return;
        decltype(pendingOperations_) po;
        {
            std::lock_guard<std::mutex> lk(connectCbsMtx_);
            po = std::move(pendingOperations_);
        }
        for (auto& [deviceId, pcbs] : po) {
            for (auto& [id, pending] : pcbs.connecting)
                pending.cb(nullptr, deviceId);
            for (auto& [id, pending] : pcbs.waiting)
                pending.cb(nullptr, deviceId);
        }

        removeUnusedConnections();
    }

    void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
                               const dht::Value::Id& vid,
                               const std::string& connType,
                               std::function<void(bool)> onConnected);
    void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
    bool connectDeviceOnNegoDone(const DeviceId& deviceId,
                                 const std::string& name,
                                 const dht::Value::Id& vid,
                                 const std::shared_ptr<dht::crypto::Certificate>& cert);
    void connectDevice(const DeviceId& deviceId,
                       const std::string& uri,
                       ConnectCallback cb,
                       bool noNewSocket = false,
                       bool forceNewSocket = false,
                       const std::string& connType = "");
    void connectDevice(const dht::InfoHash& deviceId,
                       const std::string& uri,
                       ConnectCallbackLegacy cb,
                       bool noNewSocket = false,
                       bool forceNewSocket = false,
                       const std::string& connType = "");

    void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
                       const std::string& name,
                       ConnectCallback cb,
                       bool noNewSocket = false,
                       bool forceNewSocket = false,
                       const std::string& connType = "");
    /**
     * Send a ChannelRequest on the TLS socket. Triggers cb when ready
     * @param sock      socket used to send the request
     * @param name      channel's name
     * @param vid       channel's id
     * @param deviceId  to identify the linked ConnectCallback
     */
    void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
                            const std::string& name,
                            const DeviceId& deviceId,
                            const dht::Value::Id& vid);
    /**
     * Triggered when a PeerConnectionRequest comes from the DHT
     */
    void answerTo(IceTransport& ice,
                  const dht::Value::Id& id,
                  const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
    bool onRequestStartIce(const PeerConnectionRequest& req);
    bool onRequestOnNegoDone(const PeerConnectionRequest& req);
    void onDhtPeerRequest(const PeerConnectionRequest& req,
                          const std::shared_ptr<dht::crypto::Certificate>& cert);

    void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
    void onPeerResponse(const PeerConnectionRequest& req);
    void onDhtConnected(const dht::crypto::PublicKey& devicePk);

    const std::shared_future<tls::DhParams> dhParams() const;
    tls::CertificateStore& certStore() const { return *config_->certStore; }

    mutable std::mutex messageMutex_ {};
    std::set<std::string, std::less<>> treatedMessages_ {};

    void loadTreatedMessages();
    void saveTreatedMessages() const;

    /// \return true if the given DHT message identifier has been treated
    /// \note if message has not been treated yet this method st/ore this id and returns true at
    /// further calls
    bool isMessageTreated(std::string_view id);

    const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }

    /**
     * Published IPv4/IPv6 addresses, used only if defined by the user in account
     * configuration
     *
     */
    IpAddr publishedIp_[2] {};

    /**
     * interface name on which this account is bound
     */
    std::string interface_ {"default"};

    /**
     * Get the local interface name on which this account is bound.
     */
    const std::string& getLocalInterface() const { return interface_; }

    /**
     * Get the published IP address, fallbacks to NAT if family is unspecified
     * Prefers the usage of IPv4 if possible.
     */
    IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;

    /**
     * Set published IP address according to given family
     */
    void setPublishedAddress(const IpAddr& ip_addr);

    /**
     * Store the local/public addresses used to register
     */
    void storeActiveIpAddress(std::function<void()>&& cb = {});

    /**
     * Create and return ICE options.
     */
    void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
    IceTransportOptions getIceOptions() const noexcept;

    /**
     * Inform that a potential peer device have been found.
     * Returns true only if the device certificate is a valid device certificate.
     * In that case (true is returned) the account_id parameter is set to the peer account ID.
     */
    static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
                                dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);

    bool findCertificate(const dht::PkId& id,
                         std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
    bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);

    /**
     * returns whether or not UPnP is enabled and active
     * ie: if it is able to make port mappings
     */
    bool getUPnPActive() const;

    /**
     * Triggered when a new TLS socket is ready to use
     * @param ok        If succeed
     * @param deviceId  Related device
     * @param vid       vid of the connection request
     * @param name      non empty if TLS was created by connectDevice()
     */
    void onTlsNegotiationDone(bool ok,
                              const DeviceId& deviceId,
                              const dht::Value::Id& vid,
                              const std::string& name = "");

    std::shared_ptr<ConnectionManager::Config> config_;
    std::unique_ptr<std::thread> ioContextRunner_;

    mutable std::mt19937_64 rand;

    iOSConnectedCallback iOSConnectedCb_ {};

    std::mutex infosMtx_ {};
    // Note: Someone can ask multiple sockets, so to avoid any race condition,
    // each device can have multiple multiplexed sockets.
    std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};

    std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
    {
        std::lock_guard<std::mutex> lk(infosMtx_);
        auto it = infos_.find({deviceId, id});
        if (it != infos_.end())
            return it->second;
        return {};
    }

    std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
    {
        std::lock_guard<std::mutex> lk(infosMtx_);
        auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
            auto& [key, value] = item;
            return key.first == deviceId && value && value->socket_;
        });
        if (it != infos_.end())
            return it->second;
        return {};
    }

    ChannelRequestCallback channelReqCb_ {};
    ConnectionReadyCallback connReadyCb_ {};
    onICERequestCallback iceReqCb_ {};

    /**
     * Stores callback from connectDevice
     * @note: each device needs a vector because several connectDevice can
     * be done in parallel and we only want one socket
     */
    std::mutex connectCbsMtx_ {};


    struct PendingCb
    {
        std::string name;
        ConnectCallback cb;
    };
    struct PendingOperations {
        std::map<dht::Value::Id, PendingCb> connecting;
        std::map<dht::Value::Id, PendingCb> waiting;
    };

    std::map<DeviceId, PendingOperations> pendingOperations_ {};

    void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
    {
        std::vector<PendingCb> ret;
        std::unique_lock<std::mutex> lk(connectCbsMtx_);
        auto it = pendingOperations_.find(deviceId);
        if (it == pendingOperations_.end())
            return;
        auto& pendingOperations = it->second;
        if (vid == 0) {
            // Extract all pending callbacks
            for (auto& [vid, cb] : pendingOperations.connecting)
                ret.emplace_back(std::move(cb));
            pendingOperations.connecting.clear();
            for (auto& [vid, cb] : pendingOperations.waiting)
                ret.emplace_back(std::move(cb));
            pendingOperations.waiting.clear();
        } else if (auto n = pendingOperations.waiting.extract(vid)) {
            // If it's a waiting operation, just move it
            ret.emplace_back(std::move(n.mapped()));
        } else if (auto n = pendingOperations.connecting.extract(vid)) {
            ret.emplace_back(std::move(n.mapped()));
            // If sock is nullptr, execute if it's the last connecting operation
            // If accepted is false, it means that underlying socket is ok, but channel is declined
            if (!sock && pendingOperations.connecting.empty() && accepted) {
                for (auto& [vid, cb] : pendingOperations.waiting)
                    ret.emplace_back(std::move(cb));
                pendingOperations.waiting.clear();
                for (auto& [vid, cb] : pendingOperations.connecting)
                    ret.emplace_back(std::move(cb));
                pendingOperations.connecting.clear();
            }
        }
        if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
            pendingOperations_.erase(it);
        lk.unlock();
        for (auto& cb : ret)
            cb.cb(sock, deviceId);
    }

    std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
    {
        std::map<dht::Value::Id, std::string> ret;
        std::lock_guard<std::mutex> lk(connectCbsMtx_);
        auto it = pendingOperations_.find(deviceId);
        if (it == pendingOperations_.end())
            return ret;
        auto& pendingOp = it->second;
        for (const auto& [id, pc]: pendingOp.connecting) {
            if (vid == 0 || id == vid)
                ret[id] = pc.name;
        }
        for (const auto& [id, pc]: pendingOp.waiting) {
            if (vid == 0 || id == vid)
                ret[id] = pc.name;
        }
        return ret;
    }

    std::shared_ptr<ConnectionManager::Impl> shared()
    {
        return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
    }
    std::shared_ptr<ConnectionManager::Impl const> shared() const
    {
        return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
    }
    std::weak_ptr<ConnectionManager::Impl> weak()
    {
        return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
    }
    std::weak_ptr<ConnectionManager::Impl const> weak() const
    {
        return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
    }

    std::atomic_bool isDestroying_ {false};
};

void
ConnectionManager::Impl::connectDeviceStartIce(
    const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
    const dht::Value::Id& vid,
    const std::string& connType,
    std::function<void(bool)> onConnected)
{
    auto deviceId = devicePk->getLongId();
    auto info = getInfo(deviceId, vid);
    if (!info) {
        onConnected(false);
        return;
    }
    std::unique_lock<std::mutex> lk(info->mutex_);
    auto& ice = info->ice_;

    if (!ice) {
        if (config_->logger)
            config_->logger->error("[device {}] No ICE detected", deviceId);
        onConnected(false);
        return;
    }

    auto iceAttributes = ice->getLocalAttributes();
    std::ostringstream icemsg;
    icemsg << iceAttributes.ufrag << "\n";
    icemsg << iceAttributes.pwd << "\n";
    for (const auto& addr : ice->getLocalCandidates(1)) {
        icemsg << addr << "\n";
        if (config_->logger)
            config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
    }

    // Prepare connection request as a DHT message
    PeerConnectionRequest val;

    val.id = vid; /* Random id for the message unicity */
    val.ice_msg = icemsg.str();
    val.connType = connType;

    auto value = std::make_shared<dht::Value>(std::move(val));
    value->user_type = "peer_request";

    // Send connection request through DHT
    if (config_->logger)
        config_->logger->debug("[device {}] Sending connection request", deviceId);
    dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
                                           + devicePk->getId().toString()),
                        devicePk,
                        value,
                        [l=config_->logger,deviceId](bool ok) {
                            if (l)
                                l->debug("[device {}] Sent connection request. Put encrypted {:s}",
                                       deviceId,
                                       (ok ? "ok" : "failed"));
                        });
    // Wait for call to onResponse() operated by DHT
    if (isDestroying_) {
        onConnected(true); // This avoid to wait new negotiation when destroying
        return;
    }

    info->onConnected_ = std::move(onConnected);
    info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
                                                                std::chrono::steady_clock::now()
                                                                    + DHT_MSG_TIMEOUT);
    info->waitForAnswer_->async_wait(
        std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
}

void
ConnectionManager::Impl::onResponse(const asio::error_code& ec,
                                    const DeviceId& deviceId,
                                    const dht::Value::Id& vid)
{
    if (ec == asio::error::operation_aborted)
        return;
    auto info = getInfo(deviceId, vid);
    if (!info)
        return;

    std::unique_lock<std::mutex> lk(info->mutex_);
    auto& ice = info->ice_;
    if (isDestroying_) {
        info->onConnected_(true); // The destructor can wake a pending wait here.
        return;
    }
    if (!info->responseReceived_) {
        if (config_->logger)
            config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
        info->onConnected_(false);
        return;
    }

    if (!info->ice_) {
        info->onConnected_(false);
        return;
    }

    auto sdp = ice->parseIceCandidates(info->response_.ice_msg);

    if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
        if (config_->logger)
            config_->logger->warn("[device {}] start ICE failed", deviceId);
        info->onConnected_(false);
        return;
    }
    info->onConnected_(true);
}

bool
ConnectionManager::Impl::connectDeviceOnNegoDone(
    const DeviceId& deviceId,
    const std::string& name,
    const dht::Value::Id& vid,
    const std::shared_ptr<dht::crypto::Certificate>& cert)
{
    auto info = getInfo(deviceId, vid);
    if (!info)
        return false;

    std::unique_lock<std::mutex> lk {info->mutex_};
    if (info->waitForAnswer_) {
        // Negotiation is done and connected, go to handshake
        // and avoid any cancellation at this point.
        info->waitForAnswer_->cancel();
    }
    auto& ice = info->ice_;
    if (!ice || !ice->isRunning()) {
        if (config_->logger)
            config_->logger->error("[device {}] No ICE detected or not running", deviceId);
        return false;
    }

    // Build socket
    auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
                                                            std::move(ice)),
                                                        true);

    // Negotiate a TLS session
    if (config_->logger)
        config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
    info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
                                                     certStore(),
                                                     config_->ioContext,
                                                     identity(),
                                                     dhParams(),
                                                     *cert);

    info->tls_->setOnReady(
        [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
            bool ok) {
            if (auto shared = w.lock())
                shared->onTlsNegotiationDone(ok, deviceId, vid, name);
        });
    return true;
}

void
ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
                                       const std::string& name,
                                       ConnectCallback cb,
                                       bool noNewSocket,
                                       bool forceNewSocket,
                                       const std::string& connType)
{
    if (!dht()) {
        cb(nullptr, deviceId);
        return;
    }
    if (deviceId.toString() == identity().second->getLongId().toString()) {
        cb(nullptr, deviceId);
        return;
    }
    findCertificate(deviceId,
                    [w = weak(),
                     deviceId,
                     name,
                     cb = std::move(cb),
                     noNewSocket,
                     forceNewSocket,
                     connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
                        if (!cert) {
                            if (auto shared = w.lock())
                                if (shared->config_->logger)
                                    shared->config_->logger->error(
                                        "No valid certificate found for device {}",
                                        deviceId);
                            cb(nullptr, deviceId);
                            return;
                        }
                        if (auto shared = w.lock()) {
                            shared->connectDevice(cert,
                                                  name,
                                                  std::move(cb),
                                                  noNewSocket,
                                                  forceNewSocket,
                                                  connType);
                        } else
                            cb(nullptr, deviceId);
                    });
}

void
ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
                                       const std::string& name,
                                       ConnectCallbackLegacy cb,
                                       bool noNewSocket,
                                       bool forceNewSocket,
                                       const std::string& connType)
{
    if (!dht()) {
        cb(nullptr, deviceId);
        return;
    }
    if (deviceId.toString() == identity().second->getLongId().toString()) {
        cb(nullptr, deviceId);
        return;
    }
    findCertificate(deviceId,
                    [w = weak(),
                     deviceId,
                     name,
                     cb = std::move(cb),
                     noNewSocket,
                     forceNewSocket,
                     connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
                        if (!cert) {
                            if (auto shared = w.lock())
                                if (shared->config_->logger)
                                    shared->config_->logger->error(
                                        "No valid certificate found for device {}",
                                        deviceId);
                            cb(nullptr, deviceId);
                            return;
                        }
                        if (auto shared = w.lock()) {
                            shared->connectDevice(cert,
                                                  name,
                                                  [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
                                                     cb(sock, deviceId);
                                                  },
                                                  noNewSocket,
                                                  forceNewSocket,
                                                  connType);
                        } else
                            cb(nullptr, deviceId);
                    });
}

void
ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
                                       const std::string& name,
                                       ConnectCallback cb,
                                       bool noNewSocket,
                                       bool forceNewSocket,
                                       const std::string& connType)
{
    // Avoid dht operation in a DHT callback to avoid deadlocks
    dht::ThreadPool::computation().run([w = weak(),
                     name = std::move(name),
                     cert = std::move(cert),
                     cb = std::move(cb),
                     noNewSocket,
                     forceNewSocket,
                     connType] {
        auto devicePk = cert->getSharedPublicKey();
        auto deviceId = devicePk->getLongId();
        auto sthis = w.lock();
        if (!sthis || sthis->isDestroying_) {
            cb(nullptr, deviceId);
            return;
        }
        dht::Value::Id vid;
        auto isConnectingToDevice = false;
        {
            std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
            vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
            auto pendingsIt = sthis->pendingOperations_.find(deviceId);
            if (pendingsIt != sthis->pendingOperations_.end()) {
                const auto& pendings = pendingsIt->second;
                while (pendings.connecting.find(vid) != pendings.connecting.end()
                    || pendings.waiting.find(vid) != pendings.waiting.end()) {
                    vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
                }
            }
            // Check if already connecting
            isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
            // Save current request for sendChannelRequest.
            // Note: do not return here, cause we can be in a state where first
            // socket is negotiated and first channel is pending
            // so return only after we checked the info
            if (isConnectingToDevice && !forceNewSocket)
                pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
            else
                sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
        }

        // Check if already negotiated
        CallbackId cbId(deviceId, vid);
        if (auto info = sthis->getConnectedInfo(deviceId)) {
            std::lock_guard<std::mutex> lk(info->mutex_);
            if (info->socket_) {
                if (sthis->config_->logger)
                    sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
                info->cbIds_.emplace(cbId);
                sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
                return;
            }
        }

        if (isConnectingToDevice && !forceNewSocket) {
            if (sthis->config_->logger)
                sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
            return;
        }
        if (noNewSocket) {
            // If no new socket is specified, we don't try to generate a new socket
            sthis->executePendingOperations(deviceId, vid, nullptr);
            return;
        }

        // Note: used when the ice negotiation fails to erase
        // all stored structures.
        auto eraseInfo = [w, cbId] {
            if (auto shared = w.lock()) {
                // If no new socket is specified, we don't try to generate a new socket
                shared->executePendingOperations(cbId.first, cbId.second, nullptr);
                std::lock_guard<std::mutex> lk(shared->infosMtx_);
                shared->infos_.erase(cbId);
            }
        };

        // If no socket exists, we need to initiate an ICE connection.
        sthis->getIceOptions([w,
                              deviceId = std::move(deviceId),
                              devicePk = std::move(devicePk),
                              name = std::move(name),
                              cert = std::move(cert),
                              vid,
                              connType,
                              eraseInfo](auto&& ice_config) {
            auto sthis = w.lock();
            if (!sthis) {
                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                return;
            }
            ice_config.tcpEnable = true;
            ice_config.onInitDone = [w,
                                     devicePk = std::move(devicePk),
                                     name = std::move(name),
                                     cert = std::move(cert),
                                     vid,
                                     connType,
                                     eraseInfo](bool ok) {
                dht::ThreadPool::io().run([w = std::move(w),
                                           devicePk = std::move(devicePk),
                                           vid = std::move(vid),
                                           eraseInfo,
                                           connType, ok] {
                    auto sthis = w.lock();
                    if (!ok && sthis && sthis->config_->logger)
                        sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
                    if (!sthis || !ok) {
                        eraseInfo();
                        return;
                    }
                    sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
                        if (!ok) {
                            dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                        }
                    });
                });
            };
            ice_config.onNegoDone = [w,
                    deviceId,
                    name,
                     cert = std::move(cert),
                     vid,
                     eraseInfo](bool ok) {
                dht::ThreadPool::io().run([w = std::move(w),
                                           deviceId = std::move(deviceId),
                                           name = std::move(name),
                                           cert = std::move(cert),
                                           vid = std::move(vid),
                                           eraseInfo = std::move(eraseInfo),
                                           ok] {
                    auto sthis = w.lock();
                    if (!ok && sthis && sthis->config_->logger)
                        sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
                    if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
                        eraseInfo();
                });
            };

            auto info = std::make_shared<ConnectionInfo>();
            {
                std::lock_guard<std::mutex> lk(sthis->infosMtx_);
                sthis->infos_[{deviceId, vid}] = info;
            }
            std::unique_lock<std::mutex> lk {info->mutex_};
            ice_config.master = false;
            ice_config.streamsCount = 1;
            ice_config.compCountPerStream = 1;
            info->ice_ = sthis->config_->factory->createUTransport("");
            if (!info->ice_) {
                if (sthis->config_->logger)
                    sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
                eraseInfo();
                return;
            }
            // We need to detect any shutdown if the ice session is destroyed before going to the
            // TLS session;
            info->ice_->setOnShutdown([eraseInfo]() {
                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
            });
            try {
                info->ice_->initIceInstance(ice_config);
            } catch (const std::exception& e) {
                if (sthis->config_->logger)
                    sthis->config_->logger->error("{}", e.what());
                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
            }
        });
    });
}

void
ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
                                            const std::string& name,
                                            const DeviceId& deviceId,
                                            const dht::Value::Id& vid)
{
    auto channelSock = sock->addChannel(name);
    channelSock->onShutdown([name, deviceId, vid, w = weak()] {
        auto shared = w.lock();
        if (auto shared = w.lock())
            shared->executePendingOperations(deviceId, vid, nullptr);
    });
    channelSock->onReady(
        [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
            auto shared = w.lock();
            auto channelSock = wSock.lock();
            if (shared)
                shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
        });

    ChannelRequest val;
    val.name = channelSock->name();
    val.state = ChannelRequestState::REQUEST;
    val.channel = channelSock->channel();
    msgpack::sbuffer buffer(256);
    msgpack::pack(buffer, val);

    std::error_code ec;
    int res = sock->write(CONTROL_CHANNEL,
                          reinterpret_cast<const uint8_t*>(buffer.data()),
                          buffer.size(),
                          ec);
    if (res < 0) {
        // TODO check if we should handle errors here
        if (config_->logger)
            config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
    }
}

void
ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
{
    auto device = req.owner->getLongId();
    if (auto info = getInfo(device, req.id)) {
        if (config_->logger)
            config_->logger->debug("[device {}] New response received", device);
        std::lock_guard<std::mutex> lk {info->mutex_};
        info->responseReceived_ = true;
        info->response_ = std::move(req);
        info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
        info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
                                                   this,
                                                   std::placeholders::_1,
                                                   device,
                                                   req.id));
    } else {
        if (config_->logger)
            config_->logger->warn("[device {}] Respond received, but cannot find request", device);
    }
}

void
ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
{
    if (!dht())
        return;
    dht()->listen<PeerConnectionRequest>(
        dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
        [w = weak()](PeerConnectionRequest&& req) {
            auto shared = w.lock();
            if (!shared)
                return false;
            if (shared->isMessageTreated(to_hex_string(req.id))) {
                // Message already treated. Just ignore
                return true;
            }
            if (req.isAnswer) {
                if (shared->config_->logger)
                    shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
            } else {
                if (shared->config_->logger)
                    shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
            }
            if (req.isAnswer) {
                shared->onPeerResponse(req);
            } else {
                // Async certificate checking
                shared->findCertificate(
                    req.from,
                    [w, req = std::move(req)](
                        const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
                        auto shared = w.lock();
                        if (!shared)
                            return;
                        dht::InfoHash peer_h;
                        if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
#if TARGET_OS_IOS
                            if (shared->iOSConnectedCb_(req.connType, peer_h))
                                return;
#endif
                            shared->onDhtPeerRequest(req, cert);
                        } else {
                            if (shared->config_->logger)
                                shared->config_->logger->warn(
                                    "[device {}] Received request from untrusted peer",
                                    req.owner->getLongId());
                        }
                    });
            }

            return true;
        },
        dht::Value::UserTypeFilter("peer_request"));
}

void
ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
                                              const DeviceId& deviceId,
                                              const dht::Value::Id& vid,
                                              const std::string& name)
{
    if (isDestroying_)
        return;
    // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
    // Note: if not initied by connectDevice() the channel name will be empty (because no channel
    // asked yet)
    auto isDhtRequest = name.empty();
    if (!ok) {
        if (isDhtRequest) {
            if (config_->logger)
                config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
                                       deviceId,
                                       name,
                                       vid);
            if (connReadyCb_)
                connReadyCb_(deviceId, "", nullptr);
        } else {
            if (config_->logger)
                config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
                                       deviceId,
                                       name,
                                       vid);
            executePendingOperations(deviceId, vid, nullptr);
        }
    } else {
        // The socket is ready, store it
        if (isDhtRequest) {
            if (config_->logger)
                config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
                                       deviceId,
                                       vid);
        } else {
            if (config_->logger)
                config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
                                       deviceId,
                                       name,
                                       vid);
        }

        auto info = getInfo(deviceId, vid);
        addNewMultiplexedSocket({deviceId, vid}, info);
        // Finally, open the channel and launch pending callbacks
        if (info->socket_) {
            // Note: do not remove pending there it's done in sendChannelRequest
            for (const auto& [id, name] : getPendingIds(deviceId)) {
                if (config_->logger)
                    config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
                         deviceId, name);
                sendChannelRequest(info->socket_, name, deviceId, id);
            }
        }
    }
}

void
ConnectionManager::Impl::answerTo(IceTransport& ice,
                                  const dht::Value::Id& id,
                                  const std::shared_ptr<dht::crypto::PublicKey>& from)
{
    // NOTE: This is a shortest version of a real SDP message to save some bits
    auto iceAttributes = ice.getLocalAttributes();
    std::ostringstream icemsg;
    icemsg << iceAttributes.ufrag << "\n";
    icemsg << iceAttributes.pwd << "\n";
    for (const auto& addr : ice.getLocalCandidates(1)) {
        icemsg << addr << "\n";
    }

    // Send PeerConnection response
    PeerConnectionRequest val;
    val.id = id;
    val.ice_msg = icemsg.str();
    val.isAnswer = true;
    auto value = std::make_shared<dht::Value>(std::move(val));
    value->user_type = "peer_request";

    if (config_->logger)
        config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
    dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
                                           + from->getId().toString()),
                        from,
                        value,
                        [from,l=config_->logger](bool ok) {
                            if (l)
                                l->debug("[device {}] Answer to connection request: put encrypted {:s}",
                                         from->getLongId(),
                                         (ok ? "ok" : "failed"));
                        });
}

bool
ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
{
    auto deviceId = req.owner->getLongId();
    auto info = getInfo(deviceId, req.id);
    if (!info)
        return false;

    std::unique_lock<std::mutex> lk {info->mutex_};
    auto& ice = info->ice_;
    if (!ice) {
        if (config_->logger)
            config_->logger->error("[device {}] No ICE detected", deviceId);
        if (connReadyCb_)
            connReadyCb_(deviceId, "", nullptr);
        return false;
    }

    auto sdp = ice->parseIceCandidates(req.ice_msg);
    answerTo(*ice, req.id, req.owner);
    if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
        if (config_->logger)
            config_->logger->error("[device {}] Start ICE failed", deviceId);
        ice = nullptr;
        if (connReadyCb_)
            connReadyCb_(deviceId, "", nullptr);
        return false;
    }
    return true;
}

bool
ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
{
    auto deviceId = req.owner->getLongId();
    auto info = getInfo(deviceId, req.id);
    if (!info)
        return false;

    std::unique_lock<std::mutex> lk {info->mutex_};
    auto& ice = info->ice_;
    if (!ice) {
        if (config_->logger)
            config_->logger->error("[device {}] No ICE detected", deviceId);
        return false;
    }

    // Build socket
    auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
                                                            std::move(ice)),
                                                        false);

    // init TLS session
    auto ph = req.from;
    if (config_->logger)
        config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
                               deviceId,
                               req.id);
    info->tls_ = std::make_unique<TlsSocketEndpoint>(
        std::move(endpoint),
        certStore(),
        config_->ioContext,
        identity(),
        dhParams(),
        [ph, deviceId, w=weak(), l=config_->logger](const dht::crypto::Certificate& cert) {
            auto shared = w.lock();
            if (!shared)
                return false;
            if (cert.getPublicKey().getId() != ph
             || deviceId != cert.getPublicKey().getLongId()) {
                if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
                                        deviceId,
                                        cert.getPublicKey().getLongId());
                return false;
            }
            auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
            if (!crt)
                return false;
            return crt->getPacked() == cert.getPacked();
        });

    info->tls_->setOnReady(
        [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
            if (auto shared = w.lock())
                shared->onTlsNegotiationDone(ok, deviceId, vid);
        });
    return true;
}

void
ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
                                          const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
{
    auto deviceId = req.owner->getLongId();
    if (config_->logger)
        config_->logger->debug("[device {}] New connection request", deviceId);
    if (!iceReqCb_ || !iceReqCb_(deviceId)) {
        if (config_->logger)
            config_->logger->debug("[device {}] Refusing connection", deviceId);
        return;
    }

    // Because the connection is accepted, create an ICE socket.
    getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
        auto shared = w.lock();
        if (!shared)
            return;
        // Note: used when the ice negotiation fails to erase
        // all stored structures.
        auto eraseInfo = [w, id = req.id, deviceId] {
            if (auto shared = w.lock()) {
                // If no new socket is specified, we don't try to generate a new socket
                shared->executePendingOperations(deviceId, id, nullptr);
                if (shared->connReadyCb_)
                    shared->connReadyCb_(deviceId, "", nullptr);
                std::lock_guard<std::mutex> lk(shared->infosMtx_);
                shared->infos_.erase({deviceId, id});
            }
        };

        ice_config.tcpEnable = true;
        ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
            auto shared = w.lock();
            if (!shared)
                return;
            if (!ok) {
                if (shared->config_->logger)
                    shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                return;
            }

            dht::ThreadPool::io().run(
                [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
                    auto shared = w.lock();
                    if (!shared)
                        return;
                    if (!shared->onRequestStartIce(req))
                        eraseInfo();
                });
        };

        ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
            auto shared = w.lock();
            if (!shared)
                return;
            if (!ok) {
                if (shared->config_->logger)
                    shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                return;
            }

            dht::ThreadPool::io().run(
                [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
                    if (auto shared = w.lock())
                        if (!shared->onRequestOnNegoDone(req))
                            eraseInfo();
                });
        };

        // Negotiate a new ICE socket
        auto info = std::make_shared<ConnectionInfo>();
        {
            std::lock_guard<std::mutex> lk(shared->infosMtx_);
            shared->infos_[{deviceId, req.id}] = info;
        }
        if (shared->config_->logger)
            shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
        std::unique_lock<std::mutex> lk {info->mutex_};
        ice_config.streamsCount = 1;
        ice_config.compCountPerStream = 1; // TCP
        ice_config.master = true;
        info->ice_ = shared->config_->factory->createUTransport("");
        if (not info->ice_) {
            if (shared->config_->logger)
                shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
            eraseInfo();
            return;
        }
        // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
        info->ice_->setOnShutdown([eraseInfo]() {
            dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
        });
        try {
            info->ice_->initIceInstance(ice_config);
        } catch (const std::exception& e) {
            if (shared->config_->logger)
                shared->config_->logger->error("{}", e.what());
            dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
        }
    });
}

void
ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
{
    info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_), config_->logger);
    info->socket_->setOnReady(
        [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
            if (auto sthis = w.lock())
                if (sthis->connReadyCb_)
                    sthis->connReadyCb_(deviceId, socket->name(), socket);
        });
    info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
                                             const uint16_t&,
                                             const std::string& name) {
        if (auto sthis = w.lock())
            if (sthis->channelReqCb_)
                return sthis->channelReqCb_(peer, name);
        return false;
    });
    info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
        // Cancel current outgoing connections
        dht::ThreadPool::io().run([w, deviceId, vid] {
            auto sthis = w.lock();
            if (!sthis)
                return;

            std::set<CallbackId> ids;
            if (auto info = sthis->getInfo(deviceId, vid)) {
                std::lock_guard<std::mutex> lk(info->mutex_);
                if (info->socket_) {
                    ids = std::move(info->cbIds_);
                    info->socket_->shutdown();
                }
            }
            for (const auto& cbId : ids)
                sthis->executePendingOperations(cbId.first, cbId.second, nullptr);

            std::lock_guard<std::mutex> lk(sthis->infosMtx_);
            sthis->infos_.erase({deviceId, vid});
        });
    });
}

const std::shared_future<tls::DhParams>
ConnectionManager::Impl::dhParams() const
{
    return dht::ThreadPool::computation().get<tls::DhParams>(
        std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
}

template<typename ID = dht::Value::Id>
std::set<ID, std::less<>>
loadIdList(const std::filesystem::path& path)
{
    std::set<ID, std::less<>> ids;
    std::ifstream file(path);
    if (!file.is_open()) {
        //JAMI_DBG("Could not load %s", path.c_str());
        return ids;
    }
    std::string line;
    while (std::getline(file, line)) {
        if constexpr (std::is_same<ID, std::string>::value) {
            ids.emplace(std::move(line));
        } else if constexpr (std::is_integral<ID>::value) {
            ID vid;
            if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
                ec == std::errc()) {
                ids.emplace(vid);
            }
        }
    }
    return ids;
}

template<typename List = std::set<dht::Value::Id>>
void
saveIdList(const std::filesystem::path& path, const List& ids)
{
    std::ofstream file(path, std::ios::trunc | std::ios::binary);
    if (!file.is_open()) {
        //JAMI_ERR("Could not save to %s", path.c_str());
        return;
    }
    for (auto& c : ids)
        file << std::hex << c << "\n";
}

void
ConnectionManager::Impl::loadTreatedMessages()
{
    std::lock_guard<std::mutex> lock(messageMutex_);
    auto path = config_->cachePath / "treatedMessages";
    treatedMessages_ = loadIdList<std::string>(path.string());
    if (treatedMessages_.empty()) {
        auto messages = loadIdList(path.string());
        for (const auto& m : messages)
            treatedMessages_.emplace(to_hex_string(m));
    }
}

void
ConnectionManager::Impl::saveTreatedMessages() const
{
    dht::ThreadPool::io().run([w = weak()]() {
        if (auto sthis = w.lock()) {
            auto& this_ = *sthis;
            std::lock_guard<std::mutex> lock(this_.messageMutex_);
            fileutils::check_dir(this_.config_->cachePath.c_str());
            saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
                                                         this_.treatedMessages_);
        }
    });
}

bool
ConnectionManager::Impl::isMessageTreated(std::string_view id)
{
    std::lock_guard<std::mutex> lock(messageMutex_);
    auto res = treatedMessages_.emplace(id);
    if (res.second) {
        saveTreatedMessages();
        return false;
    }
    return true;
}

/**
 * returns whether or not UPnP is enabled and active_
 * ie: if it is able to make port mappings
 */
bool
ConnectionManager::Impl::getUPnPActive() const
{
    return config_->getUPnPActive();
}

IpAddr
ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
{
    if (family == AF_INET)
        return publishedIp_[0];
    if (family == AF_INET6)
        return publishedIp_[1];

    assert(family == AF_UNSPEC);

    // If family is not set, prefere IPv4 if available. It's more
    // likely to succeed behind NAT.
    if (publishedIp_[0])
        return publishedIp_[0];
    if (publishedIp_[1])
        return publishedIp_[1];
    return {};
}

void
ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
{
    if (ip_addr.getFamily() == AF_INET) {
        publishedIp_[0] = ip_addr;
    } else {
        publishedIp_[1] = ip_addr;
    }
}

void
ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
{
    dht()->getPublicAddress([w=weak(), cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
        auto shared = w.lock();
        if (!shared)
            return;
        bool hasIpv4 {false}, hasIpv6 {false};
        for (auto& result : results) {
            auto family = result.getFamily();
            if (family == AF_INET) {
                if (not hasIpv4) {
                    hasIpv4 = true;
                    if (shared->config_->logger)
                        shared->config_->logger->debug("Store DHT public IPv4 address: {}", result);
                    //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
                    shared->setPublishedAddress(*result.get());
                    if (shared->config_->upnpCtrl) {
                        shared->config_->upnpCtrl->setPublicAddress(*result.get());
                    }
                }
            } else if (family == AF_INET6) {
                if (not hasIpv6) {
                    hasIpv6 = true;
                    if (shared->config_->logger)
                        shared->config_->logger->debug("Store DHT public IPv6 address: {}", result);
                    shared->setPublishedAddress(*result.get());
                }
            }
            if (hasIpv4 and hasIpv6)
                break;
        }
        if (cb)
            cb();
    });
}

void
ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
{
    storeActiveIpAddress([this, cb = std::move(cb)] {
        IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
        auto publishedAddr = getPublishedIpAddress();

        if (publishedAddr) {
            auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
                                                            publishedAddr.getFamily());
            if (interfaceAddr) {
                opts.accountLocalAddr = interfaceAddr;
                opts.accountPublicAddr = publishedAddr;
            }
        }
        if (cb)
            cb(std::move(opts));
    });
}

IceTransportOptions
ConnectionManager::Impl::getIceOptions() const noexcept
{
    IceTransportOptions opts;
    opts.factory = config_->factory;
    opts.upnpEnable = getUPnPActive();
    opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;

    if (config_->stunEnabled)
        opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
    if (config_->turnEnabled) {
        if (config_->turnCache) {
            auto turnAddr = config_->turnCache->getResolvedTurn();
            if (turnAddr != std::nullopt) {
                opts.turnServers.emplace_back(TurnServerInfo()
                                                .setUri(turnAddr->toString())
                                                .setUsername(config_->turnServerUserName)
                                                .setPassword(config_->turnServerPwd)
                                                .setRealm(config_->turnServerRealm));
            }
        } else {
            opts.turnServers.emplace_back(TurnServerInfo()
                                                .setUri(config_->turnServer)
                                                .setUsername(config_->turnServerUserName)
                                                .setPassword(config_->turnServerPwd)
                                                .setRealm(config_->turnServerRealm));
        }
        // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
        // co issues. So this needs some debug. for now just disable
        // if (cacheTurnV6 && *cacheTurnV6) {
        //    opts.turnServers.emplace_back(TurnServerInfo()
        //                                      .setUri(cacheTurnV6->toString(true))
        //                                      .setUsername(turnServerUserName_)
        //                                      .setPassword(turnServerPwd_)
        //                                      .setRealm(turnServerRealm_));
        //}
    }
    return opts;
}

bool
ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
                                         dht::InfoHash& account_id,
                                         const std::shared_ptr<Logger>& logger)
{
    if (not crt)
        return false;

    auto top_issuer = crt;
    while (top_issuer->issuer)
        top_issuer = top_issuer->issuer;

    // Device certificate can't be self-signed
    if (top_issuer == crt) {
        if (logger)
            logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
        return false;
    }

    // Check peer certificate chain
    // Trust store with top issuer as the only CA
    dht::crypto::TrustList peer_trust;
    peer_trust.add(*top_issuer);
    if (not peer_trust.verify(*crt)) {
        if (logger)
            logger->warn("Found invalid peer device: {}", crt->getLongId());
        return false;
    }

    // Check cached OCSP response
    if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
        if (logger)
            logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
        return false;
    }

    account_id = crt->issuer->getId();
    if (logger)
        logger->warn("Found peer device: {} account:{} CA:{}",
              crt->getLongId(),
              account_id,
              top_issuer->getId());
    return true;
}

bool
ConnectionManager::Impl::findCertificate(
    const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
{
    if (auto cert = certStore().getCertificate(id.toString())) {
        if (cb)
            cb(cert);
    } else if (cb)
        cb(nullptr);
    return true;
}

bool
ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
    std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
{
    if (auto cert = certStore().getCertificate(h.toString())) {
        if (cb)
            cb(cert);
    } else {
        dht()->findCertificate(h,
                              [cb = std::move(cb), this](
                                  const std::shared_ptr<dht::crypto::Certificate>& crt) {
                                  if (crt)
                                      certStore().pinCertificate(crt);
                                  if (cb)
                                      cb(crt);
                              });
    }
    return true;
}

std::shared_ptr<ConnectionManager::Config>
buildDefaultConfig(dht::crypto::Identity id){
    auto conf = std::make_shared<ConnectionManager::Config>();
    conf->id = std::move(id);
    return conf;
}

ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
    : pimpl_ {std::make_shared<Impl>(config_)}
{}

ConnectionManager::ConnectionManager(dht::crypto::Identity id)
    : ConnectionManager {buildDefaultConfig(id)}
{}

ConnectionManager::~ConnectionManager()
{
    if (pimpl_)
        pimpl_->shutdown();
}

void
ConnectionManager::connectDevice(const DeviceId& deviceId,
                                 const std::string& name,
                                 ConnectCallback cb,
                                 bool noNewSocket,
                                 bool forceNewSocket,
                                 const std::string& connType)
{
    pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
}

void
ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
                                 const std::string& name,
                                 ConnectCallbackLegacy cb,
                                 bool noNewSocket,
                                 bool forceNewSocket,
                                 const std::string& connType)
{
    pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
}


void
ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
                                 const std::string& name,
                                 ConnectCallback cb,
                                 bool noNewSocket,
                                 bool forceNewSocket,
                                 const std::string& connType)
{
    pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
}

bool
ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
{
    auto pending = pimpl_->getPendingIds(deviceId);
    return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
           != pending.end();
}

void
ConnectionManager::closeConnectionsWith(const std::string& peerUri)
{
    std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
    std::set<DeviceId> peersDevices;
    {
        std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
        for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
            auto const& [key, value] = *iter;
            std::unique_lock<std::mutex> lkv {value->mutex_};
            auto deviceId = key.first;
            auto tls = value->tls_ ? value->tls_.get() : (value->socket_ ? value->socket_->endpoint() : nullptr);
            auto cert = tls ? tls->peerCertificate() : nullptr;
            if (not cert)
                cert = pimpl_->certStore().getCertificate(deviceId.toString());
            if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
                connInfos.emplace_back(value);
                peersDevices.emplace(deviceId);
                lkv.unlock();
                iter = pimpl_->infos_.erase(iter);
            } else {
                iter++;
            }
        }
    }
    // Stop connections to all peers devices
    for (const auto& deviceId : peersDevices) {
        pimpl_->executePendingOperations(deviceId, 0, nullptr);
        // This will close the TLS Session
        pimpl_->removeUnusedConnections(deviceId);
    }
    for (auto& info : connInfos) {
        if (info->socket_)
            info->socket_->shutdown();
        if (info->waitForAnswer_)
            info->waitForAnswer_->cancel();
        if (info->ice_) {
            std::unique_lock<std::mutex> lk {info->mutex_};
            dht::ThreadPool::io().run(
                [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
        }
    }
}

void
ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
{
    pimpl_->onDhtConnected(devicePk);
}

void
ConnectionManager::onICERequest(onICERequestCallback&& cb)
{
    pimpl_->iceReqCb_ = std::move(cb);
}

void
ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
{
    pimpl_->channelReqCb_ = std::move(cb);
}

void
ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
{
    pimpl_->connReadyCb_ = std::move(cb);
}

void
ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
{
    pimpl_->iOSConnectedCb_ = std::move(cb);
}

std::size_t
ConnectionManager::activeSockets() const
{
    std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
    return pimpl_->infos_.size();
}

void
ConnectionManager::monitor() const
{
    std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
    auto logger = pimpl_->config_->logger;
    if (!logger)
        return;
    logger->debug("ConnectionManager current status:");
    for (const auto& [_, ci] : pimpl_->infos_) {
        if (ci->socket_)
            ci->socket_->monitor();
    }
    logger->debug("ConnectionManager end status.");
}

void
ConnectionManager::connectivityChanged()
{
    std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
    for (const auto& [_, ci] : pimpl_->infos_) {
        if (ci->socket_)
            ci->socket_->sendBeacon();
    }
}

void
ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
{
    return pimpl_->getIceOptions(std::move(cb));
}

IceTransportOptions
ConnectionManager::getIceOptions() const noexcept
{
    return pimpl_->getIceOptions();
}

IpAddr
ConnectionManager::getPublishedIpAddress(uint16_t family) const
{
    return pimpl_->getPublishedIpAddress(family);
}

void
ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
{
    return pimpl_->setPublishedAddress(ip_addr);
}

void
ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
{
    return pimpl_->storeActiveIpAddress(std::move(cb));
}

std::shared_ptr<ConnectionManager::Config>
ConnectionManager::getConfig()
{
    return pimpl_->config_;
}

std::vector<std::map<std::string, std::string>>
ConnectionManager::getConnectionList(const DeviceId& device) const
{
    std::vector<std::map<std::string, std::string>> connectionsList;
    std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);

    for (const auto& [key, ci] : pimpl_->infos_) {
        if (device && key.first != device)
            continue;
        std::map<std::string, std::string> connectionInfo;
        connectionInfo["id"] = callbackIdToString(key.first, key.second);
        connectionInfo["device"] = key.first.toString();
        if (ci->tls_) {
            if (auto cert = ci->tls_->peerCertificate()) {
                connectionInfo["peer"] = cert->issuer->getId().toString();
            }
        }
        if (ci->socket_) {
            connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
        } else if (ci->tls_) {
            connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
        } else if(ci->ice_)
        {
            connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
        }
        if (ci->tls_) {
            std::string remoteAddress = ci->tls_->getRemoteAddress();
            std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
            std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
            connectionInfo["remoteAdress"] = remoteAddressIp;
            connectionInfo["remotePort"] = remoteAddressPort;
        }
        connectionsList.emplace_back(std::move(connectionInfo));
    }

    if (device) {
        auto it = pimpl_->pendingOperations_.find(device);
        if (it != pimpl_->pendingOperations_.end()) {
            const auto& po = it->second;
            for (const auto& [vid, ci] : po.connecting) {
                std::map<std::string, std::string> connectionInfo;
                connectionInfo["id"] = callbackIdToString(device, vid);
                connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
                connectionsList.emplace_back(std::move(connectionInfo));
            }

            for (const auto& [vid, ci] : po.waiting) {
                std::map<std::string, std::string> connectionInfo;
                connectionInfo["id"] = callbackIdToString(device, vid);
                connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
                connectionsList.emplace_back(std::move(connectionInfo));
            }
        }
    }
    else {
        for (const auto& [key, po] : pimpl_->pendingOperations_) {
            for (const auto& [vid, ci] : po.connecting) {
                std::map<std::string, std::string> connectionInfo;
                connectionInfo["id"] = callbackIdToString(device, vid);
                connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
                connectionsList.emplace_back(std::move(connectionInfo));
            }

            for (const auto& [vid, ci] : po.waiting) {
               std::map<std::string, std::string> connectionInfo;
                connectionInfo["id"] = callbackIdToString(device, vid);
                connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
                connectionsList.emplace_back(std::move(connectionInfo));
            }
        }
    }
    return connectionsList;
}

std::vector<std::map<std::string, std::string>>
ConnectionManager::getChannelList(const std::string& connectionId) const
{
    std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
    CallbackId cbid = parseCallbackId(connectionId);
    if (pimpl_->infos_.count(cbid) > 0) {
        return pimpl_->infos_[cbid]->socket_->getChannelList();
    } else {
        return {};
    }
}

} // namespace dhtnet