From 6b0904ba03bddf176c05fbfc894a7c28abe801e6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Wed, 3 Jun 2020 12:04:04 -0400
Subject: [PATCH] cleanup: remove eventLoop from PeerConnection

Change-Id: If6e2bbc65a4e0029d55675717cb853e0a3dcf3c4
---
 src/jamidht/p2p.cpp | 394 ++++++++++++++++++--------------------------
 src/jamidht/p2p.h   |   2 +-
 2 files changed, 165 insertions(+), 231 deletions(-)

diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp
index 124875182b..e65748f3fb 100644
--- a/src/jamidht/p2p.cpp
+++ b/src/jamidht/p2p.cpp
@@ -33,6 +33,7 @@
 
 #include <opendht/default_types.h>
 #include <opendht/rng.h>
+#include <opendht/thread_pool.h>
 
 #include <memory>
 #include <map>
@@ -119,105 +120,36 @@ public:
     }
 };
 
-//==============================================================================
-
-enum class CtrlMsgType
-{
-    STOP,
-    CANCEL,
-    TURN_PEER_CONNECT,
-    TURN_PEER_DISCONNECT,
-    DHT_REQUEST,
-    DHT_RESPONSE,
-    ADD_DEVICE,
-};
-
-struct CtrlMsgBase
-{
-    CtrlMsgBase() = delete;
-    explicit CtrlMsgBase(CtrlMsgType id) : id_ {id} {}
-    virtual ~CtrlMsgBase() = default;
-    CtrlMsgType type() const noexcept { return id_; }
-private:
-    const CtrlMsgType id_;
-};
-
-template <class... Args>
-using DataTypeSet = std::tuple<Args...>;
-
-template <CtrlMsgType id, typename DataTypeSet=void>
-struct CtrlMsg : CtrlMsgBase
-{
-    template <class... Args>
-    explicit CtrlMsg(const Args&... args) : CtrlMsgBase(id), data {args...} {}
-
-    DataTypeSet data;
-};
-
-template <CtrlMsgType id, class... Args>
-auto makeMsg(const Args&... args)
-{
-    return std::make_unique<CtrlMsg<id, DataTypeSet<Args...>>>(args...);
-}
-
-template <std::size_t N, CtrlMsgType id, typename R, typename T>
-auto msgData(const T& msg)
-{
-    using MsgType = typename std::tuple_element<std::size_t(id), R>::type;
-    auto& x = static_cast<const MsgType&>(msg);
-    return std::get<N>(x.data);
-}
-
-//==============================================================================
-
-using DhtInfoHashMsgData = DataTypeSet<dht::InfoHash, DRing::DataTransferId>;
-using TurnConnectMsgData = DataTypeSet<IpAddr>;
-using PeerCnxMsgData = DataTypeSet<PeerConnectionMsg>;
-using AddDeviceMsgData = DataTypeSet<dht::InfoHash,
-                                     DRing::DataTransferId,
-                                     std::shared_ptr<dht::crypto::Certificate>,
-                                     std::vector<std::string>,
-                                     std::function<void(PeerConnection*)>>;
-
-using AllCtrlMsg = DataTypeSet<CtrlMsg<CtrlMsgType::STOP>,
-                               CtrlMsg<CtrlMsgType::CANCEL, DhtInfoHashMsgData>,
-                               CtrlMsg<CtrlMsgType::TURN_PEER_CONNECT, TurnConnectMsgData>,
-                               CtrlMsg<CtrlMsgType::TURN_PEER_DISCONNECT, TurnConnectMsgData>,
-                               CtrlMsg<CtrlMsgType::DHT_REQUEST, PeerCnxMsgData>,
-                               CtrlMsg<CtrlMsgType::DHT_RESPONSE, PeerCnxMsgData>,
-                               CtrlMsg<CtrlMsgType::ADD_DEVICE, AddDeviceMsgData>>;
-
-template <CtrlMsgType id, std::size_t N=0, typename T>
-auto ctrlMsgData(const T& msg)
-{
-    return msgData<N, id, AllCtrlMsg>(msg);
-}
-
 } // namespace <anonymous>
 
 //==============================================================================
 
-class DhtPeerConnector::Impl {
+class DhtPeerConnector::Impl : public std::enable_shared_from_this<DhtPeerConnector::Impl> {
 public:
     class ClientConnector;
 
-    explicit Impl(JamiAccount& account)
+    explicit Impl(const std::weak_ptr<JamiAccount>& account)
         : account {account}
-        , loopFut_ {std::async(std::launch::async, [this]{ eventLoop(); })} {}
+        {}
 
     ~Impl() {
-      for (auto &thread : answer_threads_)
-        thread.join();
-      servers_.clear();
-      clients_.clear();
-      waitForReadyEndpoints_.clear();
-      turnAuthv4_.reset();
-      turnAuthv6_.reset();
-      ctrl << makeMsg<CtrlMsgType::STOP>();
+        for (auto &thread : answer_threads_)
+            thread.join();
+        {
+            std::lock_guard<std::mutex> lk(serversMutex_);
+            servers_.clear();
+        }
+        {
+            std::lock_guard<std::mutex> lk(clientsMutex_);
+            clients_.clear();
+        }
+        std::lock_guard<std::mutex> lk(waitForReadyMtx_);
+        waitForReadyEndpoints_.clear();
+        turnAuthv4_.reset();
+        turnAuthv6_.reset();
     }
 
-    JamiAccount& account;
-    Channel<std::unique_ptr<CtrlMsgBase>> ctrl;
+    std::weak_ptr<JamiAccount> account;
 
     bool hasPublicIp(const ICESDP& sdp) {
         for (const auto& cand: sdp.rem_candidates)
@@ -226,8 +158,8 @@ public:
         return false;
     }
 
-private:
     std::map<std::pair<dht::InfoHash, IpAddr>, std::unique_ptr<TlsSocketEndpoint>> waitForReadyEndpoints_;
+    std::mutex waitForReadyMtx_ {};
     std::unique_ptr<TurnTransport> turnAuthv4_;
     std::unique_ptr<TurnTransport> turnAuthv6_;
 
@@ -236,15 +168,16 @@ private:
     std::map<dht::InfoHash, std::pair<std::shared_ptr<dht::crypto::Certificate>, dht::InfoHash>> certMap_;
     std::map<IpAddr, dht::InfoHash> connectedPeers_;
 
-protected:
     std::map<std::pair<dht::InfoHash, IpAddr>, std::unique_ptr<PeerConnection>> servers_;
+    std::mutex serversMutex_;
     std::map<IpAddr, std::unique_ptr<TlsTurnEndpoint>> tls_turn_ep_;
 
     std::map<std::pair<dht::InfoHash, DRing::DataTransferId>, std::unique_ptr<ClientConnector>> clients_;
     std::mutex clientsMutex_;
     std::mutex turnMutex_;
 
-private:
+    void cancel(const std::string& peer_id, const DRing::DataTransferId& tid);
+
     void onTurnPeerConnection(const IpAddr&);
     void onTurnPeerDisconnection(const IpAddr&);
     void onRequestMsg(PeerConnectionMsg&&);
@@ -259,12 +192,24 @@ private:
                      const std::vector<std::string>&,
                      const std::function<void(PeerConnection*)>&);
     bool turnConnect();
-    void eventLoop();
     bool validatePeerCertificate(const dht::crypto::Certificate&, dht::InfoHash&);
 
     std::future<void> loopFut_; // keep it last member
 
     std::vector<std::thread> answer_threads_;
+
+    std::shared_ptr<DhtPeerConnector::Impl> shared() {
+        return std::static_pointer_cast<DhtPeerConnector::Impl>(shared_from_this());
+    }
+    std::shared_ptr<DhtPeerConnector::Impl const> shared() const {
+        return std::static_pointer_cast<DhtPeerConnector::Impl const>(shared_from_this());
+    }
+    std::weak_ptr<DhtPeerConnector::Impl> weak() {
+        return std::static_pointer_cast<DhtPeerConnector::Impl>(shared_from_this());
+    }
+    std::weak_ptr<DhtPeerConnector::Impl const> weak() const {
+        return std::static_pointer_cast<DhtPeerConnector::Impl const>(shared_from_this());
+    }
 };
 
 //==============================================================================
@@ -326,7 +271,7 @@ public:
     }
 
     void cancel() {
-        parent_.ctrl << makeMsg<CtrlMsgType::CANCEL>(peer_, tid_);
+        parent_.cancel(peer_.toString(), tid_);
     }
 
     void onDhtResponse(PeerConnectionMsg&& response) {
@@ -342,9 +287,11 @@ private:
         // TODO remove publicAddresses in the future and only use the iceMsg
         // For now it's here for compability with old version
         auto &iceTransportFactory = Manager::instance().getIceTransportFactory();
-        auto ice_config = parent_.account.getIceOptions();
+        auto acc = parent_.account.lock();
+        if (!acc) return;
+        auto ice_config = acc->getIceOptions();
         ice_config.tcpEnable = true;
-        auto ice = iceTransportFactory.createTransport(parent_.account.getAccountID().c_str(), 1, false, ice_config);
+        auto ice = iceTransportFactory.createTransport(acc->getAccountID().c_str(), 1, false, ice_config);
 
         if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) {
             JAMI_ERR("Cannot initialize ICE session.");
@@ -352,7 +299,7 @@ private:
             return;
         }
 
-        parent_.account.registerDhtAddress(*ice);
+        acc->registerDhtAddress(*ice);
 
         auto iceAttributes = ice->getLocalAttributes();
         std::stringstream icemsg;
@@ -364,14 +311,14 @@ private:
 
         // Prepare connection request as a DHT message
         PeerConnectionMsg request;
-        request.id = ValueIdDist()(parent_.account.rand); /* Random id for the message unicity */
+        request.id = ValueIdDist()(acc->rand); /* Random id for the message unicity */
         waitId_ = request.id;
         request.addresses = {icemsg.str()};
         request.addresses.insert(request.addresses.end(), publicAddresses_.begin(), publicAddresses_.end());
 
         // Send connection request through DHT
-        JAMI_DBG() << parent_.account << "[CNX] request connection to " << peer_;
-        parent_.account.dht()->putEncrypted(
+        JAMI_DBG() << acc << "[CNX] request connection to " << peer_;
+        acc->dht()->putEncrypted(
             dht::InfoHash::get(PeerConnectionMsg::key_prefix + peer_.toString()), peer_, request,
             [](bool ok) {
                 if (ok) JAMI_DBG("[CNX] successfully put CNX request on DHT");
@@ -408,7 +355,7 @@ private:
                 if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd},
                                             sdp.rem_candidates)) {
                   JAMI_WARN("[Account:%s] start ICE failed - fallback to TURN",
-                            parent_.account.getAccountID().c_str());
+                            acc->getAccountID().c_str());
                   break;
                 }
 
@@ -416,16 +363,16 @@ private:
                 if (ice->isRunning()) {
                     peer_ep = std::make_unique<IceSocketEndpoint>(ice, true);
                     JAMI_DBG("[Account:%s] ICE negotiation succeed. Starting file transfer",
-                             parent_.account.getAccountID().c_str());
+                             acc->getAccountID().c_str());
                     if (hasPubIp) ice->setInitiatorSession();
                     break;
                 } else {
-                    JAMI_ERR("[Account:%s] ICE negotation failed", parent_.account.getAccountID().c_str());
+                    JAMI_ERR("[Account:%s] ICE negotation failed", acc->getAccountID().c_str());
                 }
             } else {
                 try {
                     // Connect to TURN peer using a raw socket
-                    JAMI_DBG() << parent_.account << "[CNX] connecting to TURN relay "
+                    JAMI_DBG() << acc << "[CNX] connecting to TURN relay "
                             << relay_addr.toString(true, true);
                     peer_ep = std::make_unique<TcpSocketEndpoint>(relay_addr);
                     try {
@@ -442,7 +389,7 @@ private:
                     }
                     break;
                 } catch (std::system_error&) {
-                    JAMI_DBG() << parent_.account << "[CNX] Failed to connect to TURN relay "
+                    JAMI_DBG() << acc << "[CNX] Failed to connect to TURN relay "
                             << relay_addr.toString(true, true);
                 }
             }
@@ -454,9 +401,9 @@ private:
         }
 
         // Negotiate a TLS session
-        JAMI_DBG() << parent_.account << "[CNX] start TLS session";
+        JAMI_DBG() << acc << "[CNX] start TLS session";
         tls_ep_ = std::make_unique<TlsSocketEndpoint>(
-            std::move(peer_ep), parent_.account.identity(), parent_.account.dhParams(),
+            std::move(peer_ep), acc->identity(), acc->dhParams(),
             *peerCertificate_, ice->isRunning());
         tls_ep_->setOnStateChange([this, ice=std::move(ice)] (tls::TlsSessionState state) {
             if (state == tls::TlsSessionState::SHUTDOWN) {
@@ -513,13 +460,15 @@ DhtPeerConnector::Impl::turnConnect()
         && turnAuthv6_ && turnAuthv6_->isReady())
         return true;
 
-    auto details = account.getAccountDetails();
+    auto acc = account.lock();
+    if (!acc) return false;
+    auto details = acc->getAccountDetails();
     auto server = details[Conf::CONFIG_TURN_SERVER];
     auto realm = details[Conf::CONFIG_TURN_SERVER_REALM];
     auto username = details[Conf::CONFIG_TURN_SERVER_UNAME];
     auto password = details[Conf::CONFIG_TURN_SERVER_PWD];
 
-    auto turnCache = account.turnCache();
+    auto turnCache = acc->turnCache();
 
     auto turn_param_v4 = TurnTransportParams {};
     if (turnCache[0]) {
@@ -534,9 +483,9 @@ DhtPeerConnector::Impl::turnConnect()
     turn_param_v4.onPeerConnection = [this](uint32_t conn_id, const IpAddr& peer_addr, bool connected) {
         (void)conn_id;
         if (connected)
-            ctrl << makeMsg<CtrlMsgType::TURN_PEER_CONNECT>(peer_addr);
+            onTurnPeerConnection(peer_addr);
         else
-            ctrl << makeMsg<CtrlMsgType::TURN_PEER_DISCONNECT>(peer_addr);
+            onTurnPeerDisconnection(peer_addr);
     };
 
     // If a previous turn server exists, but is not ready, we should try to reconnect
@@ -600,7 +549,9 @@ DhtPeerConnector::Impl::validatePeerCertificate(const dht::crypto::Certificate&
 void
 DhtPeerConnector::Impl::onTurnPeerConnection(const IpAddr& peer_addr)
 {
-    JAMI_DBG() << account << "[CNX] TURN connection attempt from "
+    auto acc = account.lock();
+    if (!acc) return;
+    JAMI_DBG() << acc << "[CNX] TURN connection attempt from "
                << peer_addr.toString(true, true);
     auto turn_ep = std::unique_ptr<ConnectedTurnTransport>(nullptr);
 
@@ -616,17 +567,17 @@ DhtPeerConnector::Impl::onTurnPeerConnection(const IpAddr& peer_addr)
         }
     }
 
-    JAMI_DBG() << account << "[CNX] start TLS session over TURN socket";
+    JAMI_DBG() << acc << "[CNX] start TLS session over TURN socket";
     auto peer_h = std::make_shared<dht::InfoHash>();
     tls_turn_ep_[peer_addr] =
     std::make_unique<TlsTurnEndpoint>(std::move(turn_ep),
-                                      account.identity(),
-                                      account.dhParams(),
+                                      acc->identity(),
+                                      acc->dhParams(),
                                       [peer_h, this] (const dht::crypto::Certificate& cert) {
         return validatePeerCertificate(cert, *peer_h);
     });
 
-    tls_turn_ep_[peer_addr]->setOnStateChange([this, peer_addr, peer_h] (tls::TlsSessionState state)
+    tls_turn_ep_[peer_addr]->setOnStateChange([this, peer_addr, peer_h, accountId=acc->getAccountID()] (tls::TlsSessionState state)
     {
         auto it = tls_turn_ep_.find(peer_addr);
         if (it == tls_turn_ep_.end()) return false;
@@ -636,10 +587,11 @@ DhtPeerConnector::Impl::onTurnPeerConnection(const IpAddr& peer_addr)
             return false;
         } else if (state == tls::TlsSessionState::ESTABLISHED) {
             if (peer_h) {
-                JAMI_DBG() << account << "[CNX] Accepted TLS-TURN connection from " << *peer_h;
+                JAMI_DBG() << "[CNX] Accepted TLS-TURN connection from " << *peer_h;
                 connectedPeers_.emplace(peer_addr, it->second->peerCertificate().getId());
                 auto connection = std::make_unique<PeerConnection>([] {}, peer_addr.toString(), std::move(it->second));
-                connection->attachOutputStream(std::make_shared<FtpServer>(account.getAccountID(), peer_h->toString()));
+                connection->attachOutputStream(std::make_shared<FtpServer>(accountId, peer_h->toString()));
+                std::lock_guard<std::mutex> lk(serversMutex_);
                 servers_.emplace(std::make_pair(*peer_h, peer_addr), std::move(connection));
             }
             tls_turn_ep_.erase(it);
@@ -652,29 +604,37 @@ DhtPeerConnector::Impl::onTurnPeerConnection(const IpAddr& peer_addr)
 void
 DhtPeerConnector::Impl::onTurnPeerDisconnection(const IpAddr& peer_addr)
 {
+    auto acc = account.lock();
+    if (!acc) return;
+    std::unique_lock<std::mutex> lk(serversMutex_);
     auto it = std::find_if(servers_.begin(), servers_.end(),
                 [&peer_addr](const auto& element) {
                     return element.first.second == peer_addr;});
     if (it == servers_.end()) return;
-    JAMI_WARN() << account << "[CNX] disconnection from peer " << peer_addr.toString(true, true);
+    JAMI_WARN() << acc << "[CNX] disconnection from peer " << peer_addr.toString(true, true);
     servers_.erase(it);
+    lk.unlock();
     connectedPeers_.erase(peer_addr);
 }
 
 void
 DhtPeerConnector::Impl::onRequestMsg(PeerConnectionMsg&& request)
 {
-    JAMI_DBG() << account << "[CNX] rx DHT request from " << request.from;
+    auto acc = account.lock();
+    if (!acc) return;
+    JAMI_DBG() << acc << "[CNX] rx DHT request from " << request.from;
 
     // Asynch certificate checking -> trig onTrustedRequestMsg when trusted certificate is found
-    account.findCertificate(
+    acc->findCertificate(
         request.from,
         [this, request=std::move(request)] (const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
+            auto acc = account.lock();
+            if (!acc) return;
             dht::InfoHash peer_h;
             if (AccountManager::foundPeerDevice(cert, peer_h))
                 onTrustedRequestMsg(std::move(request), cert, peer_h);
             else
-                JAMI_WARN() << account << "[CNX] rejected untrusted connection request from "
+                JAMI_WARN() << acc << "[CNX] rejected untrusted connection request from "
                             << request.from;
     });
 }
@@ -692,6 +652,8 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
                                             const std::shared_ptr<dht::crypto::Certificate>& cert,
                                             const dht::InfoHash& peer_h)
 {
+    auto acc = account.lock();
+    if (!acc) return;
     // start a TURN client connection on first pass, next ones just add new peer cnx handlers
     bool sendTurn = turnConnect();
 
@@ -718,7 +680,7 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
                         sendRelayV4 = true;
                         turnAuthv4_->permitPeer(ip);
                     }
-                    JAMI_DBG() << account << "[CNX] authorized peer connection from " << ip;
+                    JAMI_DBG() << acc << "[CNX] authorized peer connection from " << ip;
                     continue;
                 } else if (addr.isIpv6()) {
                     if (!sendTurn) continue;
@@ -727,15 +689,15 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
                         sendRelayV6 = true;
                         turnAuthv6_->permitPeer(ip);
                     }
-                    JAMI_DBG() << account << "[CNX] authorized peer connection from " << ip;
+                    JAMI_DBG() << acc << "[CNX] authorized peer connection from " << ip;
                     continue;
                 }
             }
 
             // P2P File transfer. We received an ice SDP message:
-            JAMI_DBG() << account << "[CNX] receiving ICE session request";
+            JAMI_DBG() << acc << "[CNX] receiving ICE session request";
             auto &iceTransportFactory = Manager::instance().getIceTransportFactory();
-            auto ice_config = account.getIceOptions();
+            auto ice_config = acc->getIceOptions();
             ice_config.tcpEnable = true;
             ice_config.onRecvReady = [iceReady]() {
                 auto& ir = *iceReady;
@@ -743,14 +705,14 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
                 ir.ready = true;
                 ir.cv.notify_one();
             };
-            ice = iceTransportFactory.createTransport(account.getAccountID().c_str(), 1, true, ice_config);
+            ice = iceTransportFactory.createTransport(acc->getAccountID().c_str(), 1, true, ice_config);
 
             if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) {
                 JAMI_ERR("Cannot initialize ICE session.");
                 continue;
             }
 
-            account.registerDhtAddress(*ice);
+            acc->registerDhtAddress(*ice);
 
             auto sdp = IceTransport::parse_SDP(ip, *ice);
             // NOTE: hasPubIp is used for compability (because ICE is waiting for a certain state in old versions)
@@ -758,7 +720,7 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
             hasPubIp = hasPublicIp(sdp);
             if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) {
                 JAMI_WARN("[Account:%s] start ICE failed - fallback to TURN",
-                            account.getAccountID().c_str());
+                            acc->getAccountID().c_str());
                 continue;
             }
 
@@ -766,15 +728,15 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
                 ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT);
                 if (ice->isRunning()) {
                     sendIce = true;
-                    JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", account.getAccountID().c_str());
+                    JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", acc->getAccountID().c_str());
                 } else {
-                    JAMI_WARN("[Account:%s] ICE negotation failed", account.getAccountID().c_str());
+                    JAMI_WARN("[Account:%s] ICE negotation failed", acc->getAccountID().c_str());
                     ice->cancelOperations();
                 }
             } else
                 sendIce = true; // Ice started with success, we can use it.
         } catch (const std::exception& e) {
-            JAMI_WARN() << account << "[CNX] ignored peer connection '" << ip << "', " << e.what();
+            JAMI_WARN() << acc << "[CNX] ignored peer connection '" << ip << "', " << e.what();
         }
     }
 
@@ -808,12 +770,12 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
     }
 
     if (addresses.empty()) {
-        JAMI_DBG() << account << "[CNX] connection aborted, no family address found";
+        JAMI_DBG() << acc << "[CNX] connection aborted, no family address found";
         return;
     }
 
-    JAMI_DBG() << account << "[CNX] connection accepted, DHT reply to " << request.from;
-    account.dht()->putEncrypted(
+    JAMI_DBG() << acc << "[CNX] connection accepted, DHT reply to " << request.from;
+    acc->dht()->putEncrypted(
         dht::InfoHash::get(PeerConnectionMsg::key_prefix + request.from.toString()),
         request.from, request.respond(addresses));
 
@@ -821,9 +783,9 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
         if (hasPubIp) {
             ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT);
             if (ice->isRunning()) {
-                JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", account.getAccountID().c_str());
+                JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", acc->getAccountID().c_str());
             } else {
-                JAMI_WARN("[Account:%s] ICE negotation failed - Fallbacking to TURN", account.getAccountID().c_str());
+                JAMI_WARN("[Account:%s] ICE negotation failed - Fallbacking to TURN", acc->getAccountID().c_str());
                 return; // wait for onTurnPeerConnection
             }
         }
@@ -839,13 +801,14 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
 
         std::unique_ptr<AbstractSocketEndpoint> peer_ep =
             std::make_unique<IceSocketEndpoint>(ice, false);
-        JAMI_DBG() << account << "[CNX] start TLS session";
+        JAMI_DBG() << acc << "[CNX] start TLS session";
         if (hasPubIp) ice->setSlaveSession();
 
         auto idx = std::make_pair(peer_h, ice->getRemoteAddress(0));
+        std::lock_guard<std::mutex> lk(waitForReadyMtx_);
         auto it = waitForReadyEndpoints_.emplace(
             idx,
-            std::make_unique<TlsSocketEndpoint>(std::move(peer_ep), account.identity(), account.dhParams(),
+            std::make_unique<TlsSocketEndpoint>(std::move(peer_ep), acc->identity(), acc->dhParams(),
                 [peer_h, this](const dht::crypto::Certificate &cert) {
                     dht::InfoHash peer_h_found;
                     return validatePeerCertificate(cert, peer_h_found)
@@ -854,10 +817,10 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
             )
         );
 
-        it.first->second->setOnStateChange([this, idx=std::move(idx)] (tls::TlsSessionState state) {
-            if (waitForReadyEndpoints_.find(idx) == waitForReadyEndpoints_.end()) {
+        it.first->second->setOnStateChange([this, accountId=acc->getAccountID(), idx=std::move(idx)] (tls::TlsSessionState state) {
+            std::lock_guard<std::mutex> lk(waitForReadyMtx_);
+            if (waitForReadyEndpoints_.find(idx) == waitForReadyEndpoints_.end())
                 return false;
-            }
             if (state == tls::TlsSessionState::SHUTDOWN) {
                 JAMI_WARN() << "TLS connection failure";
                 waitForReadyEndpoints_.erase(idx);
@@ -868,8 +831,11 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
                 auto connection = std::make_unique<PeerConnection>(
                     [] {}, peer_h,
                     std::move(waitForReadyEndpoints_[idx]));
-                connection->attachOutputStream(std::make_shared<FtpServer>(account.getAccountID(), peer_h));
-                servers_.emplace(idx, std::move(connection));
+                connection->attachOutputStream(std::make_shared<FtpServer>(accountId, peer_h));
+                {
+                    std::lock_guard<std::mutex> lk(serversMutex_);
+                    servers_.emplace(idx, std::move(connection));
+                }
                 waitForReadyEndpoints_.erase(idx);
                 return false;
             }
@@ -882,7 +848,9 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
 void
 DhtPeerConnector::Impl::onResponseMsg(PeerConnectionMsg&& response)
 {
-    JAMI_DBG() << account << "[CNX] rx DHT reply from " << response.from;
+    auto acc = account.lock();
+    if (!acc) return;
+    JAMI_DBG() << acc << "[CNX] rx DHT reply from " << response.from;
     std::lock_guard<std::mutex> lock(clientsMutex_);
     for (auto& client: clients_) {
         // NOTE We can receives multiple files from one peer. So fill unanswered clients with linked id.
@@ -915,80 +883,41 @@ DhtPeerConnector::Impl::onAddDevice(const dht::InfoHash& dev_h,
 }
 
 void
-DhtPeerConnector::Impl::eventLoop()
-{
-    // Loop until STOP msg
-    while (true) {
-        std::unique_ptr<CtrlMsgBase> msg;
-        ctrl >> msg;
-        switch (msg->type()) {
-            case CtrlMsgType::STOP:
-              return;
-
-            case CtrlMsgType::TURN_PEER_CONNECT:
-              onTurnPeerConnection(
-                  ctrlMsgData<CtrlMsgType::TURN_PEER_CONNECT>(*msg));
-              break;
-
-            case CtrlMsgType::TURN_PEER_DISCONNECT:
-              onTurnPeerDisconnection(
-                  ctrlMsgData<CtrlMsgType::TURN_PEER_DISCONNECT>(*msg));
-              break;
-
-            case CtrlMsgType::CANCEL:
-                {
-                    auto dev_h = ctrlMsgData<CtrlMsgType::CANCEL, 0>(*msg);
-                    auto id = ctrlMsgData<CtrlMsgType::CANCEL, 1>(*msg);
-                    // Cancel outgoing files
-                    {
-                        std::lock_guard<std::mutex> lock(clientsMutex_);
-                        clients_.erase(std::make_pair(dev_h, id));
-                    }
-                    // Cancel incoming files
-                    auto it = std::find_if(
-                        servers_.begin(), servers_.end(),
-                        [&dev_h, &id](const auto &element) {
-                          return (element.first.first == dev_h &&
-                                  element.second &&
-                                  element.second->hasStreamWithId(id));
-                        });
-                    if (it == servers_.end())  {
-                      Manager::instance().dataTransfers->close(id);
-                      break;
-                    }
-                    auto peer = it->first.second; // tmp copy to prevent use-after-free below
-                    servers_.erase(it);
-                    // Remove the file transfer if p2p
-                    connectedPeers_.erase(peer);
-                    Manager::instance().dataTransfers->close(id);
-                }
-                break;
-
-            case CtrlMsgType::DHT_REQUEST:
-              onRequestMsg(ctrlMsgData<CtrlMsgType::DHT_REQUEST>(*msg));
-              break;
-
-            case CtrlMsgType::DHT_RESPONSE:
-              onResponseMsg(ctrlMsgData<CtrlMsgType::DHT_RESPONSE>(*msg));
-              break;
-
-            case CtrlMsgType::ADD_DEVICE:
-              onAddDevice(ctrlMsgData<CtrlMsgType::ADD_DEVICE, 0>(*msg),
-                          ctrlMsgData<CtrlMsgType::ADD_DEVICE, 1>(*msg),
-                          ctrlMsgData<CtrlMsgType::ADD_DEVICE, 2>(*msg),
-                          ctrlMsgData<CtrlMsgType::ADD_DEVICE, 3>(*msg),
-                          ctrlMsgData<CtrlMsgType::ADD_DEVICE, 4>(*msg));
-              break;
-
-            default: JAMI_ERR("BUG: got unhandled control msg!"); break;
+DhtPeerConnector::Impl::cancel(const std::string& peer_id, const DRing::DataTransferId& tid) {
+    dht::ThreadPool::io().run([w=weak(), dev_h = dht::InfoHash(peer_id), tid] {
+        auto shared = w.lock();
+        if (!shared) return;
+        // Cancel outgoing files
+        {
+            std::lock_guard<std::mutex> lock(shared->clientsMutex_);
+            shared->clients_.erase(std::make_pair(dev_h, tid));
         }
-    }
+        // Cancel incoming files
+        std::unique_lock<std::mutex> lk(shared->serversMutex_);
+        auto it = std::find_if(
+            shared->servers_.begin(), shared->servers_.end(),
+            [&dev_h, &tid](const auto &element) {
+                return (element.first.first == dev_h &&
+                        element.second &&
+                        element.second->hasStreamWithId(tid));
+            });
+        if (it == shared->servers_.end())  {
+            Manager::instance().dataTransfers->close(tid);
+            return;
+        }
+        auto peer = it->first.second; // tmp copy to prevent use-after-free below
+        shared->servers_.erase(it);
+        lk.unlock();
+        // Remove the file transfer if p2p
+        shared->connectedPeers_.erase(peer);
+        Manager::instance().dataTransfers->close(tid);
+    });
 }
 
 //==============================================================================
 
 DhtPeerConnector::DhtPeerConnector(JamiAccount& account)
-    : pimpl_ {new Impl {account}}
+    : pimpl_ {std::make_shared<Impl>(account.weak())}
 {}
 
 DhtPeerConnector::~DhtPeerConnector() = default;
@@ -999,17 +928,20 @@ DhtPeerConnector::~DhtPeerConnector() = default;
 void
 DhtPeerConnector::onDhtConnected(const std::string& device_id)
 {
-    pimpl_->account.dht()->listen<PeerConnectionMsg>(
+    auto acc = pimpl_->account.lock();
+    if (!acc) return;
+    acc->dht()->listen<PeerConnectionMsg>(
         dht::InfoHash::get(PeerConnectionMsg::key_prefix + device_id),
         [this](PeerConnectionMsg&& msg) {
-            if (msg.from == pimpl_->account.dht()->getId())
+            auto acc = pimpl_->account.lock();
+            if (!acc) return false;
+            if (msg.from == acc->dht()->getId())
                 return true;
-            if (!pimpl_->account.isMessageTreated(to_hex_string(msg.id))) {
-                if (msg.isRequest()) {
-                    // TODO: filter-out request from non trusted peer
-                    pimpl_->ctrl << makeMsg<CtrlMsgType::DHT_REQUEST>(std::move(msg));
-                } else
-                    pimpl_->ctrl << makeMsg<CtrlMsgType::DHT_RESPONSE>(std::move(msg));
+            if (!acc->isMessageTreated(to_hex_string(msg.id))) {
+                if (msg.isRequest())
+                    pimpl_->onRequestMsg(std::move(msg));
+                else
+                    pimpl_->onResponseMsg(std::move(msg));
             }
             return true;
         }, [](const dht::Value& v) {
@@ -1032,7 +964,9 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
     //    (here the one where dht_ loop runs).
     // 2) anyway its good to keep this processing here in case of multiple device
     //    as the result is the same for each device.
-    auto addresses = pimpl_->account.publicAddresses();
+    auto acc = pimpl_->account.lock();
+    if (!acc) return;
+    auto addresses = acc->publicAddresses();
 
     // Add local addresses
     // XXX: is it really needed? use-case? a local TURN server?
@@ -1041,24 +975,26 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
 
     // TODO: bypass DHT devices lookup if connection already exist
 
-    pimpl_->account.forEachDevice(
+    acc->forEachDevice(
         peer_h,
         [this, addresses, connect_cb, tid](const dht::InfoHash& dev_h) {
-            if (dev_h == pimpl_->account.dht()->getId()) {
-                JAMI_ERR() << pimpl_->account.getAccountID() << "[CNX] no connection to yourself, bad person!";
+            auto acc = pimpl_->account.lock();
+            if (!acc) return;
+            if (dev_h == acc->dht()->getId()) {
+                JAMI_ERR() << acc->getAccountID() << "[CNX] no connection to yourself, bad person!";
                 return;
             }
 
-            pimpl_->account.findCertificate(
+            acc->findCertificate(
                 dev_h,
                 [this, dev_h, addresses, connect_cb, tid] (const std::shared_ptr<dht::crypto::Certificate>& cert) {
-                    pimpl_->ctrl << makeMsg<CtrlMsgType::ADD_DEVICE>(dev_h, tid, cert, addresses, connect_cb);
+                    pimpl_->onAddDevice(dev_h, tid, cert, addresses, connect_cb);
                 });
         },
 
-        [this, peer_h, connect_cb](bool found) {
+        [this, peer_h, connect_cb, accId = acc->getAccountID()](bool found) {
             if (!found) {
-                JAMI_WARN() << pimpl_->account.getAccountID() << "[CNX] aborted, no devices for " << peer_h;
+                JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h;
                 connect_cb(nullptr);
             }
         });
@@ -1066,9 +1002,7 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
 
 void
 DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) {
-    const auto peer_h = dht::InfoHash(peer_id);
-    // The connection will be close and removed in the main loop
-    pimpl_->ctrl << makeMsg<CtrlMsgType::CANCEL>(peer_h, tid);
+    pimpl_->cancel(peer_id, tid);
 }
 
 } // namespace jami
diff --git a/src/jamidht/p2p.h b/src/jamidht/p2p.h
index 04867fb409..c6773713ed 100644
--- a/src/jamidht/p2p.h
+++ b/src/jamidht/p2p.h
@@ -45,7 +45,7 @@ private:
     DhtPeerConnector() = delete;
 
     class Impl;
-    std::unique_ptr<Impl> pimpl_;
+    std::shared_ptr<Impl> pimpl_;
 };
 
 }
-- 
GitLab