Skip to content
Snippets Groups Projects
Select Git revision
  • 66cc843a3591f01826ed231bcb7d6632eec2a9ac
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
31 results

connectionmanager.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    connectionmanager.cpp 46.05 KiB
    /*
     *  Copyright (C) 2019-2023 Savoir-faire Linux Inc.
     *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <https://www.gnu.org/licenses/>.
     */
    #include "connectionmanager.h"
    #include "jamidht/jamiaccount.h"
    #include "account_const.h"
    #include "jamidht/account_manager.h"
    #include "manager.h"
    #include "peer_connection.h"
    #include "logger.h"
    
    #include <asio.hpp>
    #include <opendht/crypto.h>
    #include <opendht/thread_pool.h>
    #include <opendht/value.h>
    
    #include <algorithm>
    #include <mutex>
    #include <map>
    #include <condition_variable>
    #include <set>
    
    static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
    using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
    using CallbackId = std::pair<jami::DeviceId, dht::Value::Id>;
    
    namespace jami {
    
    struct ConnectionInfo
    {
        ~ConnectionInfo()
        {
            if (socket_)
                socket_->join();
        }
    
        std::mutex mutex_ {};
        bool responseReceived_ {false};
        PeerConnectionRequest response_ {};
        std::unique_ptr<IceTransport> ice_ {nullptr};
        // Used to store currently non ready TLS Socket
        std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
        std::shared_ptr<MultiplexedSocket> socket_ {};
        std::set<CallbackId> cbIds_ {};
    
        std::function<void(bool)> onConnected_;
        std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
    };
    
    class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
    {
    public:
        explicit Impl(JamiAccount& account)
            : account {account}
        {}
        ~Impl() {}
    
        void removeUnusedConnections(const DeviceId& deviceId = {})
        {
            std::vector<std::shared_ptr<ConnectionInfo>> unused {};
    
            {
                std::lock_guard<std::mutex> lk(infosMtx_);
                for (auto it = infos_.begin(); it != infos_.end();) {
                    auto& [key, info] = *it;
                    if (info && (!deviceId || key.first == deviceId)) {
                        unused.emplace_back(std::move(info));
                        it = infos_.erase(it);
                    } else {
                        ++it;
                    }
                }
            }
            for (auto& info: unused) {
                if (info->tls_)
                    info->tls_->shutdown();
                if (info->socket_)
                    info->socket_->shutdown();
                if (info->ice_)
                    info->ice_->cancelOperations();
                if (info->waitForAnswer_)
                    info->waitForAnswer_->cancel();
            }
            if (!unused.empty())
                dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
        }
    
        void shutdown()
        {
            if (isDestroying_.exchange(true))
                return;
            {
                std::lock_guard<std::mutex> lk(connectCbsMtx_);
                // Call all pending callbacks that channel is not ready
                for (auto& [deviceId, pcbs] : pendingCbs_)
                    for (auto& pending : pcbs)
                        pending.cb(nullptr, deviceId);
                pendingCbs_.clear();
            }
            removeUnusedConnections();
        }
    
        struct PendingCb
        {
            std::string name;
            ConnectCallback cb;
            dht::Value::Id vid;
        };
    
        void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
                                   const dht::Value::Id& vid,
                                   const std::string& connType,
                                   std::function<void(bool)> onConnected);
        void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
        bool connectDeviceOnNegoDone(const DeviceId& deviceId,
                                     const std::string& name,
                                     const dht::Value::Id& vid,
                                     const std::shared_ptr<dht::crypto::Certificate>& cert);
        void connectDevice(const DeviceId& deviceId,
                           const std::string& uri,
                           ConnectCallback cb,
                           bool noNewSocket = false,
                           bool forceNewSocket = false,
                           const std::string& connType = "");
        void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
                           const std::string& name,
                           ConnectCallback cb,
                           bool noNewSocket = false,
                           bool forceNewSocket = false,
                           const std::string& connType = "");
        /**
         * Send a ChannelRequest on the TLS socket. Triggers cb when ready
         * @param sock      socket used to send the request
         * @param name      channel's name
         * @param vid       channel's id
         * @param deviceId  to identify the linked ConnectCallback
         */
        void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
                                const std::string& name,
                                const DeviceId& deviceId,
                                const dht::Value::Id& vid);
        /**
         * Triggered when a PeerConnectionRequest comes from the DHT
         */
        void answerTo(IceTransport& ice,
                      const dht::Value::Id& id,
                      const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
        bool onRequestStartIce(const PeerConnectionRequest& req);
        bool onRequestOnNegoDone(const PeerConnectionRequest& req);
        void onDhtPeerRequest(const PeerConnectionRequest& req,
                              const std::shared_ptr<dht::crypto::Certificate>& cert);
    
        void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
        void onPeerResponse(const PeerConnectionRequest& req);
        void onDhtConnected(const dht::crypto::PublicKey& devicePk);
    
        /**
         * Triggered when a new TLS socket is ready to use
         * @param ok        If succeed
         * @param deviceId  Related device
         * @param vid       vid of the connection request
         * @param name      non empty if TLS was created by connectDevice()
         */
        void onTlsNegotiationDone(bool ok,
                                  const DeviceId& deviceId,
                                  const dht::Value::Id& vid,
                                  const std::string& name = "");
    
        JamiAccount& account;
    
        std::mutex infosMtx_ {};
        // Note: Someone can ask multiple sockets, so to avoid any race condition,
        // each device can have multiple multiplexed sockets.
        std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
    
        std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
        {
            std::lock_guard<std::mutex> lk(infosMtx_);
            auto it = infos_.find({deviceId, id});
            if (it != infos_.end())
                return it->second;
            return {};
        }
    
        std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
        {
            std::lock_guard<std::mutex> lk(infosMtx_);
            auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
                auto& [key, value] = item;
                return key.first == deviceId && value && value->socket_;
            });
            if (it != infos_.end())
                return it->second;
            return {};
        }
    
        ChannelRequestCallback channelReqCb_ {};
        ConnectionReadyCallback connReadyCb_ {};
        onICERequestCallback iceReqCb_ {};
    
        /**
         * Stores callback from connectDevice
         * @note: each device needs a vector because several connectDevice can
         * be done in parallel and we only want one socket
         */
        std::mutex connectCbsMtx_ {};
        std::map<DeviceId, std::vector<PendingCb>> pendingCbs_ {};
    
        std::vector<PendingCb> extractPendingCallbacks(const DeviceId& deviceId,
                                                       const dht::Value::Id vid = 0)
        {
            std::vector<PendingCb> ret;
            std::lock_guard<std::mutex> lk(connectCbsMtx_);
            auto pendingIt = pendingCbs_.find(deviceId);
            if (pendingIt == pendingCbs_.end())
                return ret;
            auto& pendings = pendingIt->second;
            if (vid == 0) {
                ret = std::move(pendings);
            } else {
                for (auto it = pendings.begin(); it != pendings.end(); ++it) {
                    if (it->vid == vid) {
                        ret.emplace_back(std::move(*it));
                        pendings.erase(it);
                        break;
                    }
                }
            }
            if (pendings.empty())
                pendingCbs_.erase(pendingIt);
            return ret;
        }
    
        std::vector<PendingCb> getPendingCallbacks(const DeviceId& deviceId,
                                                   const dht::Value::Id vid = 0)
        {
            std::vector<PendingCb> ret;
            std::lock_guard<std::mutex> lk(connectCbsMtx_);
            auto pendingIt = pendingCbs_.find(deviceId);
            if (pendingIt == pendingCbs_.end())
                return ret;
            auto& pendings = pendingIt->second;
            if (vid == 0) {
                ret = pendings;
            } else {
                std::copy_if(pendings.begin(),
                             pendings.end(),
                             std::back_inserter(ret),
                             [&](auto pending) { return pending.vid == vid; });
            }
            return ret;
        }
    
        std::shared_ptr<ConnectionManager::Impl> shared()
        {
            return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
        }
        std::shared_ptr<ConnectionManager::Impl const> shared() const
        {
            return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
        }
        std::weak_ptr<ConnectionManager::Impl> weak()
        {
            return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
        }
        std::weak_ptr<ConnectionManager::Impl const> weak() const
        {
            return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
        }
    
        std::atomic_bool isDestroying_ {false};
    };
    
    void
    ConnectionManager::Impl::connectDeviceStartIce(
        const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
        const dht::Value::Id& vid,
        const std::string& connType,
        std::function<void(bool)> onConnected)
    {
        auto deviceId = devicePk->getLongId();
        auto info = getInfo(deviceId, vid);
        if (!info) {
            onConnected(false);
            return;
        }
    
        std::unique_lock<std::mutex> lk(info->mutex_);
        auto& ice = info->ice_;
    
        if (!ice) {
            JAMI_ERR("No ICE detected");
            onConnected(false);
            return;
        }
    
        auto iceAttributes = ice->getLocalAttributes();
        std::ostringstream icemsg;
        icemsg << iceAttributes.ufrag << "\n";
        icemsg << iceAttributes.pwd << "\n";
        for (const auto& addr : ice->getLocalCandidates(1)) {
            icemsg << addr << "\n";
            JAMI_DBG() << "Added local ICE candidate " << addr;
        }
    
        // Prepare connection request as a DHT message
        PeerConnectionRequest val;
    
        val.id = vid; /* Random id for the message unicity */
        val.ice_msg = icemsg.str();
        val.connType = connType;
    
        auto value = std::make_shared<dht::Value>(std::move(val));
        value->user_type = "peer_request";
    
        // Send connection request through DHT
        JAMI_DBG() << account << "Request connection to " << deviceId;
        account.dht()->putEncrypted(
            dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk->getId().toString()),
            devicePk,
            value,
            [deviceId, accId = account.getAccountID()](bool ok) {
                JAMI_DEBUG("[Account {:s}] Send connection request to {:s}. Put encrypted {:s}",
                           accId,
                           deviceId.toString(),
                           (ok ? "ok" : "failed"));
            });
        // Wait for call to onResponse() operated by DHT
        if (isDestroying_) {
            onConnected(true); // This avoid to wait new negotiation when destroying
            return;
        }
    
        info->onConnected_ = std::move(onConnected);
        info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext(),
                                                                    std::chrono::steady_clock::now()
                                                                        + DHT_MSG_TIMEOUT);
        info->waitForAnswer_->async_wait(
            std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
    }
    
    void
    ConnectionManager::Impl::onResponse(const asio::error_code& ec,
                                        const DeviceId& deviceId,
                                        const dht::Value::Id& vid)
    {
        if (ec == asio::error::operation_aborted)
            return;
        auto info = getInfo(deviceId, vid);
        if (!info)
            return;
    
        std::unique_lock<std::mutex> lk(info->mutex_);
        auto& ice = info->ice_;
        if (isDestroying_) {
            info->onConnected_(true); // The destructor can wake a pending wait here.
            return;
        }
        if (!info->responseReceived_) {
            JAMI_ERR("no response from DHT to E2E request.");
            info->onConnected_(false);
            return;
        }
    
        if (!info->ice_) {
            info->onConnected_(false);
            return;
        }
    
        auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
    
        if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
            JAMI_WARN("[Account:%s] start ICE failed", account.getAccountID().c_str());
            info->onConnected_(false);
            return;
        }
        info->onConnected_(true);
    }
    
    bool
    ConnectionManager::Impl::connectDeviceOnNegoDone(
        const DeviceId& deviceId,
        const std::string& name,
        const dht::Value::Id& vid,
        const std::shared_ptr<dht::crypto::Certificate>& cert)
    {
        auto info = getInfo(deviceId, vid);
        if (!info)
            return false;
    
        std::unique_lock<std::mutex> lk {info->mutex_};
        if (info->waitForAnswer_) {
            // Negotiation is done and connected, go to handshake
            // and avoid any cancellation at this point.
            info->waitForAnswer_->cancel();
        }
        auto& ice = info->ice_;
        if (!ice || !ice->isRunning()) {
            JAMI_ERR("No ICE detected or not running");
            return false;
        }
    
        // Build socket
        auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
                                                                std::move(ice)),
                                                            true);
    
        // Negotiate a TLS session
        JAMI_DBG() << account
                   << "Start TLS session - Initied by connectDevice(). Launched by channel: " << name
                   << " - device:" << deviceId << " - vid: " << vid;
        info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
                                                         account.certStore(),
                                                         account.identity(),
                                                         account.dhParams(),
                                                         *cert);
    
        info->tls_->setOnReady(
            [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
                bool ok) {
                if (auto shared = w.lock())
                    shared->onTlsNegotiationDone(ok, deviceId, vid, name);
            });
        return true;
    }
    
    void
    ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
                                           const std::string& name,
                                           ConnectCallback cb,
                                           bool noNewSocket,
                                           bool forceNewSocket,
                                           const std::string& connType)
    {
        if (!account.dht()) {
            cb(nullptr, deviceId);
            return;
        }
        if (deviceId.toString() == account.currentDeviceId()) {
            cb(nullptr, deviceId);
            return;
        }
        account.findCertificate(deviceId,
                                [w = weak(),
                                 deviceId,
                                 name,
                                 cb = std::move(cb),
                                 noNewSocket,
                                 forceNewSocket,
                                 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
                                    if (!cert) {
                                        JAMI_ERR("No valid certificate found for device %s",
                                                 deviceId.to_c_str());
                                        cb(nullptr, deviceId);
                                        return;
                                    }
                                    if (auto shared = w.lock()) {
                                        shared->connectDevice(cert,
                                                              name,
                                                              std::move(cb),
                                                              noNewSocket,
                                                              forceNewSocket,
                                                              connType);
                                    }
                                });
    }
    
    void
    ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
                                           const std::string& name,
                                           ConnectCallback cb,
                                           bool noNewSocket,
                                           bool forceNewSocket,
                                           const std::string& connType)
    {
        // Avoid dht operation in a DHT callback to avoid deadlocks
        dht::ThreadPool::computation().run([w = weak(),
                         name = std::move(name),
                         cert = std::move(cert),
                         cb = std::move(cb),
                         noNewSocket,
                         forceNewSocket,
                         connType] {
            auto devicePk = cert->getSharedPublicKey();
            auto deviceId = devicePk->getLongId();
            auto sthis = w.lock();
            if (!sthis || sthis->isDestroying_) {
                cb(nullptr, deviceId);
                return;
            }
            dht::Value::Id vid = ValueIdDist(1, JAMI_ID_MAX_VAL)(sthis->account.rand);
            auto isConnectingToDevice = false;
            {
                std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
                auto pendingsIt = sthis->pendingCbs_.find(deviceId);
                if (pendingsIt != sthis->pendingCbs_.end()) {
                    const auto& pendings = pendingsIt->second;
                    while (std::find_if(pendings.begin(), pendings.end(), [&](const auto& it){ return it.vid == vid; }) != pendings.end()) {
                        vid = ValueIdDist(1, JAMI_ID_MAX_VAL)(sthis->account.rand);
                    }
                }
                // Check if already connecting
                isConnectingToDevice = pendingsIt != sthis->pendingCbs_.end();
                // Save current request for sendChannelRequest.
                // Note: do not return here, cause we can be in a state where first
                // socket is negotiated and first channel is pending
                // so return only after we checked the info
                if (isConnectingToDevice)
                    pendingsIt->second.emplace_back(PendingCb {name, std::move(cb), vid});
                else
                    sthis->pendingCbs_[deviceId] = {{name, std::move(cb), vid}};
            }
    
            // Check if already negotiated
            CallbackId cbId(deviceId, vid);
            if (auto info = sthis->getConnectedInfo(deviceId)) {
                std::lock_guard<std::mutex> lk(info->mutex_);
                if (info->socket_) {
                    JAMI_DBG("Peer already connected to %s. Add a new channel", deviceId.to_c_str());
                    info->cbIds_.emplace(cbId);
                    sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
                    return;
                }
            }
    
            if (isConnectingToDevice && !forceNewSocket) {
                JAMI_DBG("Already connecting to %s, wait for the ICE negotiation", deviceId.to_c_str());
                return;
            }
            if (noNewSocket) {
                // If no new socket is specified, we don't try to generate a new socket
                for (const auto& pending : sthis->extractPendingCallbacks(deviceId, vid))
                    pending.cb(nullptr, deviceId);
                return;
            }
    
            // Note: used when the ice negotiation fails to erase
            // all stored structures.
            auto eraseInfo = [w, cbId] {
                if (auto shared = w.lock()) {
                    // If no new socket is specified, we don't try to generate a new socket
                    for (const auto& pending : shared->extractPendingCallbacks(cbId.first, cbId.second))
                        pending.cb(nullptr, cbId.first);
                    std::lock_guard<std::mutex> lk(shared->infosMtx_);
                    shared->infos_.erase(cbId);
                }
            };
    
            // If no socket exists, we need to initiate an ICE connection.
            sthis->account.getIceOptions([w,
                                          deviceId = std::move(deviceId),
                                          devicePk = std::move(devicePk),
                                          name = std::move(name),
                                          cert = std::move(cert),
                                          vid,
                                          connType,
                                          eraseInfo](auto&& ice_config) {
                auto sthis = w.lock();
                if (!sthis) {
                    dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                    return;
                }
                ice_config.tcpEnable = true;
                ice_config.onInitDone = [w,
                                         deviceId = std::move(deviceId),
                                         devicePk = std::move(devicePk),
                                         name = std::move(name),
                                         cert = std::move(cert),
                                         vid,
                                         connType,
                                         eraseInfo](bool ok) {
                    dht::ThreadPool::io().run([w = std::move(w),
                                               devicePk = std::move(devicePk),
                                               vid = std::move(vid),
                                               eraseInfo,
                                               connType, ok] {
                        if (!ok) {
                            JAMI_ERR("Cannot initialize ICE session.");
                        }
                        auto sthis = w.lock();
                        if (!sthis || !ok) {
                            eraseInfo();
                            return;
                        }
                        sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
                            if (!ok) {
                                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                            }
                        });
                    });
                };
                ice_config.onNegoDone = [w,
                                         deviceId = std::move(deviceId),
                                         name = std::move(name),
                                         cert = std::move(cert),
                                         vid,
                                         eraseInfo](bool ok) {
                    dht::ThreadPool::io().run([w = std::move(w),
                                               deviceId = std::move(deviceId),
                                               name = std::move(name),
                                               cert = std::move(cert),
                                               vid = std::move(vid),
                                               eraseInfo = std::move(eraseInfo),
                                               ok] {
                        if (!ok)
                            JAMI_ERR("ICE negotiation failed.");
                        auto sthis = w.lock();
                        if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
                            eraseInfo();
                    });
                };
    
                auto info = std::make_shared<ConnectionInfo>();
                {
                    std::lock_guard<std::mutex> lk(sthis->infosMtx_);
                    sthis->infos_[{deviceId, vid}] = info;
                }
                std::unique_lock<std::mutex> lk {info->mutex_};
                ice_config.master = false;
                ice_config.streamsCount = JamiAccount::ICE_STREAMS_COUNT;
                ice_config.compCountPerStream = JamiAccount::ICE_COMP_COUNT_PER_STREAM;
                info->ice_ = Manager::instance().getIceTransportFactory().createUTransport(
                    sthis->account.getAccountID());
                if (!info->ice_) {
                    JAMI_ERR("Cannot initialize ICE session.");
                    eraseInfo();
                    return;
                }
                // We need to detect any shutdown if the ice session is destroyed before going to the
                // TLS session;
                info->ice_->setOnShutdown([eraseInfo]() {
                    dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                });
                info->ice_->initIceInstance(ice_config);
            });
        });
    }
    
    void
    ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
                                                const std::string& name,
                                                const DeviceId& deviceId,
                                                const dht::Value::Id& vid)
    {
        auto channelSock = sock->addChannel(name);
        channelSock->onShutdown([name, deviceId, vid, w = weak()] {
            auto shared = w.lock();
            if (shared)
                for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
                    pending.cb(nullptr, deviceId);
        });
        channelSock->onReady(
            [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()]() {
                auto shared = w.lock();
                auto channelSock = wSock.lock();
                if (shared)
                    for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
                        pending.cb(channelSock, deviceId);
            });
    
        ChannelRequest val;
        val.name = channelSock->name();
        val.state = ChannelRequestState::REQUEST;
        val.channel = channelSock->channel();
        msgpack::sbuffer buffer(256);
        msgpack::pack(buffer, val);
    
        std::error_code ec;
        int res = sock->write(CONTROL_CHANNEL,
                              reinterpret_cast<const uint8_t*>(buffer.data()),
                              buffer.size(),
                              ec);
        if (res < 0) {
            // TODO check if we should handle errors here
            JAMI_ERR("sendChannelRequest failed - error: %s", ec.message().c_str());
        }
    }
    
    void
    ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
    {
        auto device = req.owner->getLongId();
        JAMI_INFO() << account << " New response received from " << device.to_c_str();
        if (auto info = getInfo(device, req.id)) {
            std::lock_guard<std::mutex> lk {info->mutex_};
            info->responseReceived_ = true;
            info->response_ = std::move(req);
            info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
            info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
                                                       this,
                                                       std::placeholders::_1,
                                                       device,
                                                       req.id));
        } else {
            JAMI_WARN() << account << " respond received, but cannot find request";
        }
    }
    
    void
    ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
    {
        if (!account.dht()) {
            return;
        }
        account.dht()->listen<PeerConnectionRequest>(
            dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
            [w = weak()](PeerConnectionRequest&& req) {
                auto shared = w.lock();
                if (!shared)
                    return false;
                if (shared->account.isMessageTreated(to_hex_string(req.id))) {
                    // Message already treated. Just ignore
                    return true;
                }
                if (req.isAnswer) {
                    JAMI_DBG() << "Received request answer from " << req.owner->getLongId();
                } else {
                    JAMI_DBG() << "Received request from " << req.owner->getLongId();
                }
                if (req.isAnswer) {
                    shared->onPeerResponse(req);
                } else {
                    // Async certificate checking
                    shared->account.findCertificate(
                        req.from,
                        [w, req = std::move(req)](
                            const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
                            auto shared = w.lock();
                            if (!shared)
                                return;
                            dht::InfoHash peer_h;
                            if (AccountManager::foundPeerDevice(cert, peer_h)) {
    #if TARGET_OS_IOS
                                if ((req.connType == "videoCall" || req.connType == "audioCall")
                                    && jami::Manager::instance().isIOSExtension) {
                                    bool hasVideo = req.connType == "videoCall";
                                    emitSignal<libjami::ConversationSignal::CallConnectionRequest>(
                                        shared->account.getAccountID(), peer_h.toString(), hasVideo);
                                    return;
                                }
    #endif
                                shared->onDhtPeerRequest(req, cert);
                            } else {
                                JAMI_WARN()
                                    << shared->account << "Rejected untrusted connection request from "
                                    << req.owner->getLongId();
                            }
                        });
                }
    
                return true;
            },
            dht::Value::UserTypeFilter("peer_request"));
    }
    
    void
    ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
                                                  const DeviceId& deviceId,
                                                  const dht::Value::Id& vid,
                                                  const std::string& name)
    {
        if (isDestroying_)
            return;
        // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
        // Note: if not initied by connectDevice() the channel name will be empty (because no channel
        // asked yet)
        auto isDhtRequest = name.empty();
        if (!ok) {
            if (isDhtRequest) {
                JAMI_ERR() << "TLS connection failure for peer " << deviceId
                           << " - Initied by DHT request. Vid: " << vid;
                if (connReadyCb_)
                    connReadyCb_(deviceId, "", nullptr);
            } else {
                JAMI_ERR() << "TLS connection failure for peer " << deviceId
                           << " - Initied by connectDevice(). Initied by channel: " << name
                           << " - vid: " << vid;
                for (const auto& pending : extractPendingCallbacks(deviceId))
                    pending.cb(nullptr, deviceId);
            }
        } else {
            // The socket is ready, store it
            if (isDhtRequest) {
                JAMI_DBG() << "Connection to " << deviceId << " is ready"
                           << " - Initied by DHT request. Vid: " << vid;
            } else {
                JAMI_DBG() << "Connection to " << deviceId << " is ready"
                           << " - Initied by connectDevice(). Initied by channel: " << name
                           << " - vid: " << vid;
            }
    
            auto info = getInfo(deviceId, vid);
            addNewMultiplexedSocket({deviceId, vid}, info);
            // Finally, open the channel and launch pending callbacks
            if (info->socket_) {
                // Note: do not remove pending there it's done in sendChannelRequest
                for (const auto& pending : getPendingCallbacks(deviceId)) {
                    JAMI_DBG("Send request on TLS socket for channel %s to %s",
                             pending.name.c_str(),
                             deviceId.to_c_str());
                    sendChannelRequest(info->socket_, pending.name, deviceId, pending.vid);
                }
            }
        }
    }
    
    void
    ConnectionManager::Impl::answerTo(IceTransport& ice,
                                      const dht::Value::Id& id,
                                      const std::shared_ptr<dht::crypto::PublicKey>& from)
    {
        // NOTE: This is a shortest version of a real SDP message to save some bits
        auto iceAttributes = ice.getLocalAttributes();
        std::ostringstream icemsg;
        icemsg << iceAttributes.ufrag << "\n";
        icemsg << iceAttributes.pwd << "\n";
        for (const auto& addr : ice.getLocalCandidates(1)) {
            icemsg << addr << "\n";
        }
    
        // Send PeerConnection response
        PeerConnectionRequest val;
        val.id = id;
        val.ice_msg = icemsg.str();
        val.isAnswer = true;
        auto value = std::make_shared<dht::Value>(std::move(val));
        value->user_type = "peer_request";
    
        JAMI_DBG() << account << "[CNX] connection accepted, DHT reply to " << from->getLongId();
        account.dht()->putEncrypted(
            dht::InfoHash::get(PeerConnectionRequest::key_prefix + from->getId().toString()),
            from,
            value,
            [from, accId = account.getAccountID()](bool ok) {
                JAMI_DEBUG("[Account {:s}] Answer to connection request from {:s}. Put encrypted {:s}",
                           accId,
                           from->getLongId().toString(),
                           (ok ? "ok" : "failed"));
            });
    }
    
    bool
    ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
    {
        auto deviceId = req.owner->getLongId();
        auto info = getInfo(deviceId, req.id);
        if (!info)
            return false;
    
        std::unique_lock<std::mutex> lk {info->mutex_};
        auto& ice = info->ice_;
        if (!ice) {
            JAMI_ERR("No ICE detected");
            if (connReadyCb_)
                connReadyCb_(deviceId, "", nullptr);
            return false;
        }
    
        auto sdp = ice->parseIceCandidates(req.ice_msg);
        answerTo(*ice, req.id, req.owner);
        if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
            JAMI_ERR("[Account:%s] start ICE failed - fallback to TURN", account.getAccountID().c_str());
            ice = nullptr;
            if (connReadyCb_)
                connReadyCb_(deviceId, "", nullptr);
            return false;
        }
        return true;
    }
    
    bool
    ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
    {
        auto deviceId = req.owner->getLongId();
        auto info = getInfo(deviceId, req.id);
        if (!info)
            return false;
    
        std::unique_lock<std::mutex> lk {info->mutex_};
        auto& ice = info->ice_;
        if (!ice) {
            JAMI_ERR("No ICE detected");
            return false;
        }
    
        // Build socket
        auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
                                                                std::move(ice)),
                                                            false);
    
        // init TLS session
        auto ph = req.from;
        JAMI_DBG() << account << "Start TLS session - Initied by DHT request. Device:" << req.from
                   << " - vid: " << req.id;
        info->tls_ = std::make_unique<TlsSocketEndpoint>(
            std::move(endpoint),
            account.certStore(),
            account.identity(),
            account.dhParams(),
            [ph, w = weak()](const dht::crypto::Certificate& cert) {
                auto shared = w.lock();
                if (!shared)
                    return false;
                auto crt = shared->account.certStore().getCertificate(cert.getLongId().toString());
                if (!crt)
                    return false;
                return crt->getPacked() == cert.getPacked();
            });
    
        info->tls_->setOnReady(
            [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
                if (auto shared = w.lock())
                    shared->onTlsNegotiationDone(ok, deviceId, vid);
            });
        return true;
    }
    
    void
    ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
                                              const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
    {
        auto deviceId = req.owner->getLongId();
        JAMI_INFO() << account << "New connection requested by " << deviceId;
        if (!iceReqCb_ || !iceReqCb_(deviceId)) {
            JAMI_INFO("[Account:%s] refuse connection from %s",
                      account.getAccountID().c_str(),
                      deviceId.toString().c_str());
            return;
        }
    
        // Because the connection is accepted, create an ICE socket.
        account.getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
            auto shared = w.lock();
            if (!shared)
                return;
            // Note: used when the ice negotiation fails to erase
            // all stored structures.
            auto eraseInfo = [w, id = req.id, deviceId] {
                if (auto shared = w.lock()) {
                    // If no new socket is specified, we don't try to generate a new socket
                    for (const auto& pending : shared->extractPendingCallbacks(deviceId, id))
                        pending.cb(nullptr, deviceId);
                    if (shared->connReadyCb_)
                        shared->connReadyCb_(deviceId, "", nullptr);
                    std::lock_guard<std::mutex> lk(shared->infosMtx_);
                    shared->infos_.erase({deviceId, id});
                }
            };
    
            ice_config.tcpEnable = true;
            ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
                auto shared = w.lock();
                if (!shared)
                    return;
                if (!ok) {
                    JAMI_ERR("Cannot initialize ICE session.");
                    dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                    return;
                }
    
                dht::ThreadPool::io().run(
                    [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
                        auto shared = w.lock();
                        if (!shared)
                            return;
                        if (!shared->onRequestStartIce(req))
                            eraseInfo();
                    });
            };
    
            ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
                auto shared = w.lock();
                if (!shared)
                    return;
                if (!ok) {
                    JAMI_ERR("ICE negotiation failed");
                    dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
                    return;
                }
    
                dht::ThreadPool::io().run(
                    [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
                        if (auto shared = w.lock())
                            if (!shared->onRequestOnNegoDone(req))
                                eraseInfo();
                    });
            };
    
            // Negotiate a new ICE socket
            auto info = std::make_shared<ConnectionInfo>();
            {
                std::lock_guard<std::mutex> lk(shared->infosMtx_);
                shared->infos_[{deviceId, req.id}] = info;
            }
            JAMI_INFO("[Account:%s] accepting connection from %s",
                      shared->account.getAccountID().c_str(),
                      deviceId.toString().c_str());
            std::unique_lock<std::mutex> lk {info->mutex_};
            ice_config.streamsCount = JamiAccount::ICE_STREAMS_COUNT;
            ice_config.compCountPerStream = JamiAccount::ICE_COMP_COUNT_PER_STREAM;
            ice_config.master = true;
            info->ice_ = Manager::instance().getIceTransportFactory().createUTransport(
                shared->account.getAccountID());
            if (not info->ice_) {
                JAMI_ERR("Cannot initialize ICE session.");
                eraseInfo();
                return;
            }
            // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
            info->ice_->setOnShutdown([eraseInfo]() {
                dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
            });
            info->ice_->initIceInstance(ice_config);
        });
    }
    
    void
    ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
    {
        info->socket_ = std::make_shared<MultiplexedSocket>(id.first, std::move(info->tls_));
        info->socket_->setOnReady(
            [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
                if (auto sthis = w.lock())
                    if (sthis->connReadyCb_)
                        sthis->connReadyCb_(deviceId, socket->name(), socket);
            });
        info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
                                                 const uint16_t&,
                                                 const std::string& name) {
            if (auto sthis = w.lock())
                if (sthis->channelReqCb_)
                    return sthis->channelReqCb_(peer, name);
            return false;
        });
        info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
            // Cancel current outgoing connections
            dht::ThreadPool::io().run([w, deviceId, vid] {
                auto sthis = w.lock();
                if (!sthis)
                    return;
    
                std::set<CallbackId> ids;
                if (auto info = sthis->getInfo(deviceId, vid)) {
                    std::lock_guard<std::mutex> lk(info->mutex_);
                    if (info->socket_) {
                        ids = std::move(info->cbIds_);
                        info->socket_->shutdown();
                    }
                    if (info->ice_)
                        info->ice_->cancelOperations();
                }
                for (const auto& cbId : ids)
                    for (const auto& pending : sthis->extractPendingCallbacks(cbId.first, cbId.second))
                        pending.cb(nullptr, deviceId);
    
                std::lock_guard<std::mutex> lk(sthis->infosMtx_);
                sthis->infos_.erase({deviceId, vid});
            });
        });
    }
    
    ConnectionManager::ConnectionManager(JamiAccount& account)
        : pimpl_ {std::make_shared<Impl>(account)}
    {}
    
    ConnectionManager::~ConnectionManager()
    {
        if (pimpl_)
            pimpl_->shutdown();
    }
    
    void
    ConnectionManager::connectDevice(const DeviceId& deviceId,
                                     const std::string& name,
                                     ConnectCallback cb,
                                     bool noNewSocket,
                                     bool forceNewSocket,
                                     const std::string& connType)
    {
        pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
    }
    
    void
    ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
                                     const std::string& name,
                                     ConnectCallback cb,
                                     bool noNewSocket,
                                     bool forceNewSocket,
                                     const std::string& connType)
    {
        pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
    }
    
    bool
    ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
    {
        auto pending = pimpl_->getPendingCallbacks(deviceId);
        return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; })
               != pending.end();
    }
    
    void
    ConnectionManager::closeConnectionsWith(const std::string& peerUri)
    {
        std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
        std::set<DeviceId> peersDevices;
        {
            std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
            for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
                auto const& [key, value] = *iter;
                auto deviceId = key.first;
                auto cert = pimpl_->account.certStore().getCertificate(deviceId.toString());
                if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
                    connInfos.emplace_back(value);
                    peersDevices.emplace(deviceId);
                    iter = pimpl_->infos_.erase(iter);
                } else {
                    iter++;
                }
            }
        }
        // Stop connections to all peers devices
        for (const auto& deviceId : peersDevices) {
            for (const auto& pending : pimpl_->extractPendingCallbacks(deviceId))
                pending.cb(nullptr, deviceId);
            // This will close the TLS Session
            pimpl_->removeUnusedConnections(deviceId);
        }
        for (auto& info : connInfos) {
            if (info->ice_)
                info->ice_->cancelOperations();
            if (info->socket_)
                info->socket_->shutdown();
            if (info->waitForAnswer_)
                info->waitForAnswer_->cancel();
            if (info->ice_) {
                std::unique_lock<std::mutex> lk {info->mutex_};
                dht::ThreadPool::io().run(
                    [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
            }
        }
    }
    
    void
    ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
    {
        pimpl_->onDhtConnected(devicePk);
    }
    
    void
    ConnectionManager::onICERequest(onICERequestCallback&& cb)
    {
        pimpl_->iceReqCb_ = std::move(cb);
    }
    
    void
    ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
    {
        pimpl_->channelReqCb_ = std::move(cb);
    }
    
    void
    ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
    {
        pimpl_->connReadyCb_ = std::move(cb);
    }
    
    std::size_t
    ConnectionManager::activeSockets() const
    {
        std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
        return pimpl_->infos_.size();
    }
    
    void
    ConnectionManager::monitor() const
    {
        std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
        JAMI_DBG("ConnectionManager for account %s (%s), current status:",
                 pimpl_->account.getAccountID().c_str(),
                 pimpl_->account.getUserUri().c_str());
        for (const auto& [_, ci] : pimpl_->infos_) {
            if (ci->socket_)
                ci->socket_->monitor();
        }
        JAMI_DBG("ConnectionManager for account %s (%s), end status.",
                 pimpl_->account.getAccountID().c_str(),
                 pimpl_->account.getUserUri().c_str());
    }
    
    void
    ConnectionManager::connectivityChanged()
    {
        std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
        for (const auto& [_, ci] : pimpl_->infos_) {
            if (ci->socket_)
                ci->socket_->sendBeacon();
        }
    }
    
    } // namespace jami