From 30b2ce2cf7e1a0580d2ab574f05842b08e18055b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Mon, 9 Nov 2020 12:33:22 -0500
Subject: [PATCH] fileTransfer: correctly link transfer to devices

Fix multi-devices file transfer

Change-Id: Id3061bb72253a45d50ef5a5c0474776ac39b17e7
GitLab: #323
---
 src/data_transfer.cpp | 25 +++++++++++++++-----
 src/jamidht/p2p.cpp   | 54 ++++++++++++++++++++++++++++---------------
 2 files changed, 54 insertions(+), 25 deletions(-)

diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp
index 0d0a9d7c30..6422644456 100644
--- a/src/data_transfer.cpp
+++ b/src/data_transfer.cpp
@@ -283,6 +283,9 @@ public:
     bool read(std::vector<uint8_t>&) const override;
     bool write(std::string_view) override;
     void emit(DRing::DataTransferEventCode code) const override;
+    const std::string& peer() { return peerUri_; }
+
+    bool isFinished() const { return info_.lastEvent >= DRing::DataTransferEventCode::finished; }
 
     void cancel() override
     {
@@ -345,6 +348,7 @@ private:
             JAMI_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes";
             if (internalCompletionCb_)
                 internalCompletionCb_(info_.path);
+
             if (info_.bytesProgress != info_.totalSize)
                 emit(DRing::DataTransferEventCode::closed_by_peer);
             else
@@ -516,10 +520,16 @@ public:
 
     void close() noexcept override;
 
-    void cancel() override
+    bool cancelWithPeer(const std::string& peer)
     {
-        for (const auto& subtransfer : subtransfer_)
-            subtransfer->cancel();
+        auto allFinished = true;
+        for (const auto& subtransfer : subtransfer_) {
+            if (subtransfer->peer() == peer)
+                subtransfer->cancel();
+            else if (!subtransfer->isFinished())
+                allFinished = false;
+        }
+        return allFinished;
     }
 
 private:
@@ -823,12 +833,15 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info,
                         out->linkTransfer(std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)
                                               ->startNewOutgoing(out->peer()));
             },
-            [this, tid](const std::string& device) {
-                if (auto transfer = pimpl_->getTransfer(tid))
-                    if (not transfer->hasBeenStarted()) {
+            [this, tid](const std::string& peer) {
+                if (auto transfer = std::dynamic_pointer_cast<OutgoingFileTransfer>(
+                        pimpl_->getTransfer(tid))) {
+                    auto allFinished = transfer->cancelWithPeer(peer);
+                    if (allFinished and not transfer->hasBeenStarted()) {
                         transfer->emit(DRing::DataTransferEventCode::unjoinable_peer);
                         pimpl_->cancel(*transfer);
                     }
+                }
             });
         return DRing::DataTransferError::success;
     } catch (const std::exception& ex) {
diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp
index 01e57c5198..a29f6b26f5 100644
--- a/src/jamidht/p2p.cpp
+++ b/src/jamidht/p2p.cpp
@@ -96,8 +96,10 @@ public:
     std::map<IpAddr, dht::InfoHash> connectedPeers_;
 
     bool validatePeerCertificate(const dht::crypto::Certificate&, dht::InfoHash&);
-    void closeConnection(const DRing::DataTransferId& tid);
-    void stateChanged(const DRing::DataTransferId& tid, const DRing::DataTransferEventCode& code);
+    void closeConnection(const DRing::DataTransferId& tid, const std::string& peer = "");
+    void stateChanged(const DRing::DataTransferId& tid,
+                      const DRing::DataTransferEventCode& code,
+                      const std::string& peer);
 
     std::shared_ptr<DhtPeerConnector::Impl> shared()
     {
@@ -120,6 +122,7 @@ public:
     std::mutex channeledIncomingMtx_;
     std::map<DRing::DataTransferId, std::unique_ptr<ChanneledIncomingTransfer>> channeledIncoming_;
     std::mutex channeledOutgoingMtx_;
+    // TODO change <<id, peer>, Channeled>
     std::map<DRing::DataTransferId, std::vector<std::shared_ptr<ChanneledOutgoingTransfer>>>
         channeledOutgoing_;
     std::mutex incomingTransfersMtx_;
@@ -144,29 +147,43 @@ DhtPeerConnector::Impl::validatePeerCertificate(const dht::crypto::Certificate&
 
 void
 DhtPeerConnector::Impl::stateChanged(const DRing::DataTransferId& tid,
-                                     const DRing::DataTransferEventCode& code)
+                                     const DRing::DataTransferEventCode& code,
+                                     const std::string& peer)
 {
     if (code == DRing::DataTransferEventCode::finished
         or code == DRing::DataTransferEventCode::closed_by_peer
         or code == DRing::DataTransferEventCode::timeout_expired)
-        closeConnection(tid);
+        closeConnection(tid, peer);
 }
 
 void
-DhtPeerConnector::Impl::closeConnection(const DRing::DataTransferId& tid)
+DhtPeerConnector::Impl::closeConnection(const DRing::DataTransferId& tid, const std::string& peer)
 {
-    dht::ThreadPool::io().run([w = weak(), tid] {
+    dht::ThreadPool::io().run([w = weak(), tid, peer] {
         auto shared = w.lock();
         if (!shared)
             return;
-        // Cancel outgoing files
         {
             std::lock_guard<std::mutex> lk(shared->channeledIncomingMtx_);
-            auto it = shared->channeledIncoming_.erase(tid);
+            shared->channeledIncoming_.erase(tid);
         }
+        // Cancel outgoing files
+        std::vector<std::shared_ptr<ChanneledOutgoingTransfer>> files;
         {
             std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_);
-            shared->channeledOutgoing_.erase(tid);
+            auto it = shared->channeledOutgoing_.find(tid);
+            if (it != shared->channeledOutgoing_.end()) {
+                for (auto chanIt = it->second.begin(); chanIt != it->second.end();) {
+                    if ((*chanIt)->peer() == peer) {
+                        files.emplace_back(std::move(*chanIt));
+                        chanIt = it->second.erase(chanIt);
+                    } else {
+                        ++chanIt;
+                    }
+                }
+                if (it->second.empty())
+                    shared->channeledOutgoing_.erase(it);
+            }
         }
     });
 }
@@ -196,13 +213,12 @@ DhtPeerConnector::requestConnection(
 
     auto channelReadyCb = [this,
                            tid,
-                           peer_id,
                            channeledConnectedCb,
                            onChanneledCancelled](const std::shared_ptr<ChannelSocket>& channel,
-                                                 const DeviceId&) {
+                                                 const DeviceId& deviceId) {
         auto shared = pimpl_->account.lock();
         if (!channel) {
-            onChanneledCancelled("");
+            onChanneledCancelled(deviceId.toString());
             return;
         }
         if (!shared)
@@ -211,8 +227,9 @@ DhtPeerConnector::requestConnection(
 
         auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>(
             channel,
-            [this](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) {
-                pimpl_->stateChanged(id, code);
+            [this, deviceId](const DRing::DataTransferId& id,
+                             const DRing::DataTransferEventCode& code) {
+                pimpl_->stateChanged(id, code, deviceId.toString());
             });
         if (!outgoingFile)
             return;
@@ -223,7 +240,7 @@ DhtPeerConnector::requestConnection(
 
         channel->onShutdown([this, tid, onChanneledCancelled, peer = outgoingFile->peer()]() {
             JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid);
-            onChanneledCancelled("");
+            onChanneledCancelled(peer);
             dht::ThreadPool::io().run([w = pimpl_->weak(), tid, peer] {
                 auto shared = w.lock();
                 if (!shared)
@@ -246,7 +263,6 @@ DhtPeerConnector::requestConnection(
                             shared->channeledOutgoing_.erase(outgoingTransfers);
                     }
                 }
-                Manager::instance().dataTransfers->close(tid);
             });
         });
         channeledConnectedCb(outgoingFile);
@@ -288,7 +304,7 @@ DhtPeerConnector::requestConnection(
         [peer_h, onChanneledCancelled, accId = acc->getAccountID()](bool found) {
             if (!found) {
                 JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h;
-                onChanneledCancelled("");
+                onChanneledCancelled(peer_h.toString());
             }
         });
 }
@@ -326,8 +342,8 @@ DhtPeerConnector::onIncomingConnection(const std::string& peer_id,
     auto incomingFile = std::make_unique<ChanneledIncomingTransfer>(
         channel,
         std::make_shared<FtpServer>(acc->getAccountID(), peer_id, tid, std::move(cb)),
-        [this](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) {
-            pimpl_->stateChanged(id, code);
+        [this, peer_id](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) {
+            pimpl_->stateChanged(id, code, peer_id);
         });
     {
         std::lock_guard<std::mutex> lk(pimpl_->channeledIncomingMtx_);
-- 
GitLab