diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp index cd4bcca4f14f30fc2a41204be1c768dd7beada10..83899952e175b5e09659cc5a5554e19c43901691 100644 --- a/src/data_transfer.cpp +++ b/src/data_transfer.cpp @@ -72,7 +72,7 @@ public: return started_.compare_exchange_strong(expected, true); } - bool hasBeenStarted() const { + virtual bool hasBeenStarted() const { return wasStarted_; } @@ -86,12 +86,21 @@ public: progress = info_.bytesProgress; } + void setBytesProgress(int64_t progress) const { + std::lock_guard<std::mutex> lk {infoMutex_}; + info_.bytesProgress = progress; + } + void info(DRing::DataTransferInfo& info) const { std::lock_guard<std::mutex> lk {infoMutex_}; info = info_; } - void emit(DRing::DataTransferEventCode code) const; + DRing::DataTransferInfo info() const { + return info_; + } + + virtual void emit(DRing::DataTransferEventCode code) const; const DRing::DataTransferId id; @@ -114,18 +123,141 @@ DataTransfer::emit(DRing::DataTransferEventCode code) const //============================================================================== -class OutgoingFileTransfer final : public DataTransfer +/** + * This class is used as a sort of buffer between the OutgoingFileTransfer + * used by clients to represent a transfer between the user and a contact + * and SubOutgoingFileTransfer representing the transfer between the user and + * each peer devices. It gives the optimistic view of a transfer (show a failure) + * only if all related transfer has failed. If one transfer succeed, ignore failures. + */ +class OptimisticMetaOutgoingInfo { public: - OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info); + OptimisticMetaOutgoingInfo(const DataTransfer* parent, const DRing::DataTransferInfo& info); + /** + * Update the DataTransferInfo of the parent if necessary (if the event is more *interesting* for the user) + * @param info the last modified linked info (for example if a subtransfer is accepted, it will gives as a parameter its info) + */ + void updateInfo(const DRing::DataTransferInfo& info) const; + /** + * Add a subtransfer as a linked transfer + * @param linked + */ + void addLinkedTransfer(DataTransfer* linked) const; + /** + * Return the optimistic representation of the transfer + */ + const DRing::DataTransferInfo& info() const; + +private: + const DataTransfer* parent_; + mutable std::mutex infoMutex_; + mutable DRing::DataTransferInfo info_; + mutable std::vector<DataTransfer*> linkedTransfers_; +}; + +OptimisticMetaOutgoingInfo::OptimisticMetaOutgoingInfo(const DataTransfer* parent, const DRing::DataTransferInfo& info) +: parent_(parent), info_(info) +{} + +void +OptimisticMetaOutgoingInfo::updateInfo(const DRing::DataTransferInfo& info) const +{ + bool emitCodeChanged = false; + bool checkOngoing = false; + DRing::DataTransferEventCode lastEvent { DRing::DataTransferEventCode::invalid }; + { + std::lock_guard<std::mutex> lk {infoMutex_}; + if (info_.lastEvent > DRing::DataTransferEventCode::unjoinable_peer) { + info_.lastEvent = DRing::DataTransferEventCode::invalid; + } + if (info.lastEvent >= DRing::DataTransferEventCode::created + && info.lastEvent <= DRing::DataTransferEventCode::finished + && info.lastEvent > info_.lastEvent) { + // Show the more advanced info + info_.lastEvent = info.lastEvent; + emitCodeChanged = true; + } + + if (info.lastEvent >= DRing::DataTransferEventCode::closed_by_host + && info.lastEvent <= DRing::DataTransferEventCode::unjoinable_peer + && info_.lastEvent < DRing::DataTransferEventCode::finished) { + // if not finished show error if all failed + // if the transfer was ongoing and canceled, we should go to the best status + bool isAllFailed = true; + checkOngoing = info_.lastEvent == DRing::DataTransferEventCode::ongoing; + DRing::DataTransferEventCode bestEvent { DRing::DataTransferEventCode::invalid }; + for (const auto* transfer : linkedTransfers_) { + const auto& i = transfer->info(); + if (i.lastEvent >= DRing::DataTransferEventCode::created + && i.lastEvent <= DRing::DataTransferEventCode::finished) { + isAllFailed = false; + if (checkOngoing) + bestEvent = bestEvent > i.lastEvent ? bestEvent : i.lastEvent; + else + break; + } + } + if (isAllFailed) { + info_.lastEvent = info.lastEvent; + emitCodeChanged = true; + } else if (checkOngoing && bestEvent != DRing::DataTransferEventCode::invalid) { + info_.lastEvent = bestEvent; + emitCodeChanged = true; + } + } + + int64_t bytesProgress {0}; + for (const auto* transfer : linkedTransfers_) { + const auto& i = transfer->info(); + if (i.bytesProgress > bytesProgress) { + bytesProgress = i.bytesProgress; + } + } + if (bytesProgress > info_.bytesProgress) { + info_.bytesProgress = bytesProgress; + parent_->setBytesProgress(info_.bytesProgress); + } + if (checkOngoing && info_.lastEvent != DRing::DataTransferEventCode::invalid) { + parent_->setBytesProgress(0); + } + } + + if (emitCodeChanged) { + parent_->emit(info_.lastEvent); + } +} + +void +OptimisticMetaOutgoingInfo::addLinkedTransfer(DataTransfer* linked) const +{ + std::lock_guard<std::mutex> lk {infoMutex_}; + linkedTransfers_.emplace_back(linked); +} + +const DRing::DataTransferInfo& +OptimisticMetaOutgoingInfo::info() const +{ + return info_; +} + +/** + * Represent a outgoing file transfer between a user and a device + */ +class SubOutgoingFileTransfer final : public DataTransfer +{ +public: + SubOutgoingFileTransfer(DRing::DataTransferId tid, const std::string& peerUri, std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo); void close() noexcept override; bool read(std::vector<uint8_t>&) const override; bool write(const std::vector<uint8_t>& buffer) override; + void emit(DRing::DataTransferEventCode code) const override; private: - OutgoingFileTransfer() = delete; + SubOutgoingFileTransfer() = delete; + mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_; mutable std::ifstream input_; std::size_t tx_ {0}; mutable bool headerSent_ {false}; @@ -133,39 +265,35 @@ private: const std::string peerUri_; }; -OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid, - const DRing::DataTransferInfo& info) - : DataTransfer(tid) +SubOutgoingFileTransfer::SubOutgoingFileTransfer(DRing::DataTransferId tid, + const std::string& peerUri, + std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo) + : DataTransfer(tid), peerUri_ {peerUri}, metaInfo_(metaInfo) { - input_.open(info.path, std::ios::binary); + + info_ = metaInfo_->info(); + input_.open(info_.path, std::ios::binary); if (!input_) throw std::runtime_error("input file open failed"); - - info_ = info; - info_.flags &= ~((uint32_t)1 << int(DRing::DataTransferFlags::direction)); // outgoing - - // File size? - input_.seekg(0, std::ios_base::end); - info_.totalSize = input_.tellg(); - input_.seekg(0, std::ios_base::beg); + metaInfo_->addLinkedTransfer(this); } void -OutgoingFileTransfer::close() noexcept +SubOutgoingFileTransfer::close() noexcept { DataTransfer::close(); input_.close(); // We don't need the connection anymore. Can close it. auto account = Manager::instance().getAccount<RingAccount>(info_.accountId); - account->closePeerConnection(info_.peer, id); + account->closePeerConnection(peerUri_, id); if (info_.lastEvent < DRing::DataTransferEventCode::finished) emit(DRing::DataTransferEventCode::closed_by_host); } bool -OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const +SubOutgoingFileTransfer::read(std::vector<uint8_t>& buf) const { // Need to send headers? if (!headerSent_) { @@ -196,6 +324,7 @@ OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const if (buf.size()) { std::lock_guard<std::mutex> lk {infoMutex_}; info_.bytesProgress += buf.size(); + metaInfo_->updateInfo(info_); return true; } @@ -210,7 +339,7 @@ OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const } bool -OutgoingFileTransfer::write(const std::vector<uint8_t>& buffer) +SubOutgoingFileTransfer::write(const std::vector<uint8_t>& buffer) { if (buffer.empty()) return true; @@ -229,6 +358,75 @@ OutgoingFileTransfer::write(const std::vector<uint8_t>& buffer) return true; } +void +SubOutgoingFileTransfer::emit(DRing::DataTransferEventCode code) const +{ + { + std::lock_guard<std::mutex> lk {infoMutex_}; + info_.lastEvent = code; + } + metaInfo_->updateInfo(info_); +} + +/** + * Represent a file transfer between a user and a peer (all of its device) + */ +class OutgoingFileTransfer final : public DataTransfer +{ +public: + OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info); + + std::shared_ptr<DataTransfer> startNewOutgoing(const std::string& peer_uri) { + auto newTransfer = std::make_shared<SubOutgoingFileTransfer>(id, peer_uri, this->metaInfo_); + subtransfer_.emplace_back(newTransfer); + newTransfer->start(); + return newTransfer; + } + + bool hasBeenStarted() const override + { + // Started if one subtransfer is started + for (const auto& subtransfer: subtransfer_) + if (subtransfer->hasBeenStarted()) + return true; + return false; + } + + void close() noexcept override; + +private: + OutgoingFileTransfer() = delete; + + mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_; + mutable std::ifstream input_; + mutable std::vector<std::shared_ptr<SubOutgoingFileTransfer>> subtransfer_; +}; + +OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info) +: DataTransfer(tid) +{ + input_.open(info.path, std::ios::binary); + if (!input_) + throw std::runtime_error("input file open failed"); + + info_ = info; + info_.flags &= ~((uint32_t)1 << int(DRing::DataTransferFlags::direction)); // outgoing + + // File size? + input_.seekg(0, std::ios_base::end); + info_.totalSize = input_.tellg(); + input_.close(); + + metaInfo_ = std::make_shared<OptimisticMetaOutgoingInfo>(this, this->info_); +} + +void +OutgoingFileTransfer::close() noexcept +{ + for (const auto& subtransfer : subtransfer_) + subtransfer->close(); +} + //============================================================================== class IncomingFileTransfer final : public DataTransfer @@ -413,9 +611,11 @@ DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId& { if (auto transfer = getTransfer(id)) { if (connection) { - if (transfer->start()) { - connection->attachInputStream(transfer); - } + connection->attachInputStream( + std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->startNewOutgoing( + connection->getPeerUri() + ) + ); } else if (not transfer->hasBeenStarted()) { transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); cancel(*transfer); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 87396fc028c0624c1e26d0db18e482b434b0298e..85f83d140ef3e676ec73d336a6bfb1d4e3315b5c 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -688,4 +688,10 @@ PeerConnection::hasStreamWithId(const DRing::DataTransferId& id) return pimpl_->hasStreamWithId(id); } +std::string +PeerConnection::getPeerUri() const +{ + return pimpl_->peer_uri; +} + } // namespace ring diff --git a/src/peer_connection.h b/src/peer_connection.h index d1d8f1d92ce5e8e40e682f745d3a33d68e0307f0..973e224ff86f4adeda41b7dfcdf4f69057a55025 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -191,6 +191,8 @@ public: */ bool hasStreamWithId(const DRing::DataTransferId& id); + std::string getPeerUri() const; + private: class PeerConnectionImpl; std::unique_ptr<PeerConnectionImpl> pimpl_; diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 35281d3c424908d915851550878850206cc02113..cc44b0343c5d3df8cd8f60fecbece4a65bc2b2b8 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -3430,7 +3430,6 @@ RingAccount::closePeerConnection(const std::string& peer, const DRing::DataTrans dhtPeerConnector_->closeConnection(peer, tid); } - void RingAccount::enableProxyClient(bool enable) {