From 55be1e14f2b5d97f2270ce19728e84b5c1428a9c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Tue, 24 Apr 2018 15:25:58 -0400
Subject: [PATCH] datatransfer: multi devices support when sending files

With the current implementation, when a user send a file to a contact
who got several devices connected, a request will be sent to all of
these devices and the first which answer will get the transfer. So,
a user can't know on which device they will get the file.

With this patch, in the same configuration the peer will receives
the file on all of its devices and add the possibility to accept
or refuse the file. To avoid to see multiple file transfers because
a peer has several devices, I introduced a OptimisticMetaOutgoingInfo
which represents the best current state of the transfer (for example,
if one subtransfer is ongoing and one failed (cause refused) it will
show the outgoing one).

This is an ongoing work, in the near future we must:
+ Give the ability to clients to see the real status of each
subtransfer (with the ability to cancel/retry transfer by devices)
+ Add a timer to close awaiting transfers to avoid to let a unused
connection for too long.

Change-Id: I84eb243bff2bfdc087a83dd7eced45c361f27d16
Reviewed-by: Philippe Gorley <philippe.gorley@savoirfairelinux.com>
---
 src/data_transfer.cpp       | 248 ++++++++++++++++++++++++++++++++----
 src/peer_connection.cpp     |   6 +
 src/peer_connection.h       |   2 +
 src/ringdht/ringaccount.cpp |   1 -
 4 files changed, 232 insertions(+), 25 deletions(-)

diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp
index cd4bcca4f1..83899952e1 100644
--- a/src/data_transfer.cpp
+++ b/src/data_transfer.cpp
@@ -72,7 +72,7 @@ public:
         return started_.compare_exchange_strong(expected, true);
     }
 
-    bool hasBeenStarted() const {
+    virtual bool hasBeenStarted() const {
         return wasStarted_;
     }
 
@@ -86,12 +86,21 @@ public:
         progress = info_.bytesProgress;
     }
 
+    void setBytesProgress(int64_t progress) const {
+        std::lock_guard<std::mutex> lk {infoMutex_};
+        info_.bytesProgress = progress;
+    }
+
     void info(DRing::DataTransferInfo& info) const {
         std::lock_guard<std::mutex> lk {infoMutex_};
         info = info_;
     }
 
-    void emit(DRing::DataTransferEventCode code) const;
+    DRing::DataTransferInfo info() const {
+        return info_;
+    }
+
+    virtual void emit(DRing::DataTransferEventCode code) const;
 
     const DRing::DataTransferId id;
 
@@ -114,18 +123,141 @@ DataTransfer::emit(DRing::DataTransferEventCode code) const
 
 //==============================================================================
 
-class OutgoingFileTransfer final : public DataTransfer
+/**
+ * This class is used as a sort of buffer between the OutgoingFileTransfer
+ * used by clients to represent a transfer between the user and a contact
+ * and SubOutgoingFileTransfer representing the transfer between the user and
+ * each peer devices. It gives the optimistic view of a transfer (show a failure)
+ * only if all related transfer has failed. If one transfer succeed, ignore failures.
+ */
+class OptimisticMetaOutgoingInfo
 {
 public:
-    OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info);
+    OptimisticMetaOutgoingInfo(const DataTransfer* parent, const DRing::DataTransferInfo& info);
+    /**
+     * Update the DataTransferInfo of the parent if necessary (if the event is more *interesting* for the user)
+     * @param info the last modified linked info (for example if a subtransfer is accepted, it will gives as a parameter its info)
+     */
+    void updateInfo(const DRing::DataTransferInfo& info) const;
+    /**
+     * Add a subtransfer as a linked transfer
+     * @param linked
+     */
+    void addLinkedTransfer(DataTransfer* linked) const;
+    /**
+     * Return the optimistic representation of the transfer
+     */
+    const DRing::DataTransferInfo& info() const;
+
+private:
+    const DataTransfer* parent_;
+    mutable std::mutex infoMutex_;
+    mutable DRing::DataTransferInfo info_;
+    mutable std::vector<DataTransfer*> linkedTransfers_;
+};
+
+OptimisticMetaOutgoingInfo::OptimisticMetaOutgoingInfo(const DataTransfer* parent, const DRing::DataTransferInfo& info)
+: parent_(parent), info_(info)
+{}
+
+void
+OptimisticMetaOutgoingInfo::updateInfo(const DRing::DataTransferInfo& info) const
+{
+    bool emitCodeChanged = false;
+    bool checkOngoing = false;
+    DRing::DataTransferEventCode lastEvent { DRing::DataTransferEventCode::invalid };
+    {
+        std::lock_guard<std::mutex> lk {infoMutex_};
+        if (info_.lastEvent > DRing::DataTransferEventCode::unjoinable_peer) {
+            info_.lastEvent = DRing::DataTransferEventCode::invalid;
+        }
+        if (info.lastEvent >= DRing::DataTransferEventCode::created
+            && info.lastEvent <= DRing::DataTransferEventCode::finished
+            && info.lastEvent > info_.lastEvent) {
+            // Show the more advanced info
+            info_.lastEvent = info.lastEvent;
+            emitCodeChanged = true;
+        }
+
+        if (info.lastEvent >= DRing::DataTransferEventCode::closed_by_host
+            && info.lastEvent <= DRing::DataTransferEventCode::unjoinable_peer
+            && info_.lastEvent < DRing::DataTransferEventCode::finished) {
+            // if not finished show error if all failed
+            // if the transfer was ongoing and canceled, we should go to the best status
+            bool isAllFailed = true;
+            checkOngoing = info_.lastEvent == DRing::DataTransferEventCode::ongoing;
+            DRing::DataTransferEventCode bestEvent { DRing::DataTransferEventCode::invalid };
+            for (const auto* transfer : linkedTransfers_) {
+                const auto& i = transfer->info();
+                if (i.lastEvent >= DRing::DataTransferEventCode::created
+                    && i.lastEvent <= DRing::DataTransferEventCode::finished) {
+                        isAllFailed = false;
+                        if (checkOngoing)
+                            bestEvent = bestEvent > i.lastEvent ? bestEvent : i.lastEvent;
+                        else
+                            break;
+                    }
+            }
+            if (isAllFailed) {
+                info_.lastEvent = info.lastEvent;
+                emitCodeChanged = true;
+            } else if (checkOngoing && bestEvent != DRing::DataTransferEventCode::invalid) {
+                info_.lastEvent = bestEvent;
+                emitCodeChanged = true;
+            }
+        }
+
+        int64_t bytesProgress {0};
+        for (const auto* transfer : linkedTransfers_) {
+            const auto& i = transfer->info();
+            if (i.bytesProgress > bytesProgress) {
+                bytesProgress = i.bytesProgress;
+            }
+        }
+        if (bytesProgress > info_.bytesProgress) {
+            info_.bytesProgress = bytesProgress;
+            parent_->setBytesProgress(info_.bytesProgress);
+        }
+        if (checkOngoing && info_.lastEvent != DRing::DataTransferEventCode::invalid) {
+            parent_->setBytesProgress(0);
+        }
+    }
+
+    if (emitCodeChanged) {
+        parent_->emit(info_.lastEvent);
+    }
+}
+
+void
+OptimisticMetaOutgoingInfo::addLinkedTransfer(DataTransfer* linked) const
+{
+    std::lock_guard<std::mutex> lk {infoMutex_};
+    linkedTransfers_.emplace_back(linked);
+}
+
+const DRing::DataTransferInfo&
+OptimisticMetaOutgoingInfo::info() const
+{
+    return info_;
+}
+
+/**
+ * Represent a outgoing file transfer between a user and a device
+ */
+class SubOutgoingFileTransfer final : public DataTransfer
+{
+public:
+    SubOutgoingFileTransfer(DRing::DataTransferId tid, const std::string& peerUri, std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo);
 
     void close() noexcept override;
     bool read(std::vector<uint8_t>&) const override;
     bool write(const std::vector<uint8_t>& buffer) override;
+    void emit(DRing::DataTransferEventCode code) const override;
 
 private:
-    OutgoingFileTransfer() = delete;
+    SubOutgoingFileTransfer() = delete;
 
+    mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_;
     mutable std::ifstream input_;
     std::size_t tx_ {0};
     mutable bool headerSent_ {false};
@@ -133,39 +265,35 @@ private:
     const std::string peerUri_;
 };
 
-OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid,
-                                           const DRing::DataTransferInfo& info)
-    : DataTransfer(tid)
+SubOutgoingFileTransfer::SubOutgoingFileTransfer(DRing::DataTransferId tid,
+                                           const std::string& peerUri,
+                                           std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo)
+    : DataTransfer(tid), peerUri_ {peerUri}, metaInfo_(metaInfo)
 {
-    input_.open(info.path, std::ios::binary);
+
+    info_ = metaInfo_->info();
+    input_.open(info_.path, std::ios::binary);
     if (!input_)
         throw std::runtime_error("input file open failed");
-
-    info_ = info;
-    info_.flags &= ~((uint32_t)1 << int(DRing::DataTransferFlags::direction)); // outgoing
-
-    // File size?
-    input_.seekg(0, std::ios_base::end);
-    info_.totalSize = input_.tellg();
-    input_.seekg(0, std::ios_base::beg);
+    metaInfo_->addLinkedTransfer(this);
 }
 
 void
-OutgoingFileTransfer::close() noexcept
+SubOutgoingFileTransfer::close() noexcept
 {
     DataTransfer::close();
     input_.close();
 
     // We don't need the connection anymore. Can close it.
     auto account = Manager::instance().getAccount<RingAccount>(info_.accountId);
-    account->closePeerConnection(info_.peer, id);
+    account->closePeerConnection(peerUri_, id);
 
     if (info_.lastEvent < DRing::DataTransferEventCode::finished)
         emit(DRing::DataTransferEventCode::closed_by_host);
 }
 
 bool
-OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
+SubOutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
 {
     // Need to send headers?
     if (!headerSent_) {
@@ -196,6 +324,7 @@ OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
     if (buf.size()) {
         std::lock_guard<std::mutex> lk {infoMutex_};
         info_.bytesProgress += buf.size();
+        metaInfo_->updateInfo(info_);
         return true;
     }
 
@@ -210,7 +339,7 @@ OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
 }
 
 bool
-OutgoingFileTransfer::write(const std::vector<uint8_t>& buffer)
+SubOutgoingFileTransfer::write(const std::vector<uint8_t>& buffer)
 {
     if (buffer.empty())
         return true;
@@ -229,6 +358,75 @@ OutgoingFileTransfer::write(const std::vector<uint8_t>& buffer)
     return true;
 }
 
+void
+SubOutgoingFileTransfer::emit(DRing::DataTransferEventCode code) const
+{
+    {
+        std::lock_guard<std::mutex> lk {infoMutex_};
+        info_.lastEvent = code;
+    }
+    metaInfo_->updateInfo(info_);
+}
+
+/**
+ * Represent a file transfer between a user and a peer (all of its device)
+ */
+class OutgoingFileTransfer final : public DataTransfer
+{
+public:
+    OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info);
+
+    std::shared_ptr<DataTransfer> startNewOutgoing(const std::string& peer_uri) {
+        auto newTransfer = std::make_shared<SubOutgoingFileTransfer>(id, peer_uri, this->metaInfo_);
+        subtransfer_.emplace_back(newTransfer);
+        newTransfer->start();
+        return newTransfer;
+    }
+
+    bool hasBeenStarted() const override
+    {
+        // Started if one subtransfer is started
+        for (const auto& subtransfer: subtransfer_)
+            if (subtransfer->hasBeenStarted())
+                return true;
+        return false;
+    }
+
+    void close() noexcept override;
+
+private:
+    OutgoingFileTransfer() = delete;
+
+    mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_;
+    mutable std::ifstream input_;
+    mutable std::vector<std::shared_ptr<SubOutgoingFileTransfer>> subtransfer_;
+};
+
+OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info)
+: DataTransfer(tid)
+{
+    input_.open(info.path, std::ios::binary);
+    if (!input_)
+        throw std::runtime_error("input file open failed");
+
+    info_ = info;
+    info_.flags &= ~((uint32_t)1 << int(DRing::DataTransferFlags::direction)); // outgoing
+
+    // File size?
+    input_.seekg(0, std::ios_base::end);
+    info_.totalSize = input_.tellg();
+    input_.close();
+
+    metaInfo_ = std::make_shared<OptimisticMetaOutgoingInfo>(this, this->info_);
+}
+
+void
+OutgoingFileTransfer::close() noexcept
+{
+    for (const auto& subtransfer : subtransfer_)
+        subtransfer->close();
+}
+
 //==============================================================================
 
 class IncomingFileTransfer final : public DataTransfer
@@ -413,9 +611,11 @@ DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId&
 {
     if (auto transfer = getTransfer(id)) {
         if (connection) {
-            if (transfer->start()) {
-                connection->attachInputStream(transfer);
-            }
+            connection->attachInputStream(
+                std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->startNewOutgoing(
+                    connection->getPeerUri()
+                )
+            );
         } else if (not transfer->hasBeenStarted()) {
             transfer->emit(DRing::DataTransferEventCode::unjoinable_peer);
             cancel(*transfer);
diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp
index 87396fc028..85f83d140e 100644
--- a/src/peer_connection.cpp
+++ b/src/peer_connection.cpp
@@ -688,4 +688,10 @@ PeerConnection::hasStreamWithId(const DRing::DataTransferId& id)
     return pimpl_->hasStreamWithId(id);
 }
 
+std::string
+PeerConnection::getPeerUri() const
+{
+    return pimpl_->peer_uri;
+}
+
 } // namespace ring
diff --git a/src/peer_connection.h b/src/peer_connection.h
index d1d8f1d92c..973e224ff8 100644
--- a/src/peer_connection.h
+++ b/src/peer_connection.h
@@ -191,6 +191,8 @@ public:
      */
     bool hasStreamWithId(const DRing::DataTransferId& id);
 
+    std::string getPeerUri() const;
+
 private:
     class PeerConnectionImpl;
     std::unique_ptr<PeerConnectionImpl> pimpl_;
diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp
index 35281d3c42..cc44b0343c 100644
--- a/src/ringdht/ringaccount.cpp
+++ b/src/ringdht/ringaccount.cpp
@@ -3430,7 +3430,6 @@ RingAccount::closePeerConnection(const std::string& peer, const DRing::DataTrans
     dhtPeerConnector_->closeConnection(peer, tid);
 }
 
-
 void
 RingAccount::enableProxyClient(bool enable)
 {
-- 
GitLab