diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp index c9809035d0dfed0a707ba1ca5b92de9ede313ab8..44b42dc60e6b0651496226429f8a1dd2eaec2e41 100644 --- a/src/data_transfer.cpp +++ b/src/data_transfer.cpp @@ -68,6 +68,10 @@ public: return started_.compare_exchange_strong(expected, true); } + void close() noexcept override { + started_ = false; + } + virtual std::streamsize bytesProgress() const { std::lock_guard<std::mutex> lk {infoMutex_}; return info_.bytesProgress; @@ -152,6 +156,7 @@ FileTransfer::start() void FileTransfer::close() noexcept { + DataTransfer::close(); input_.close(); if (info_.lastEvent < DRing::DataTransferEventCode::finished) emit(DRing::DataTransferEventCode::closed_by_host); @@ -202,8 +207,7 @@ FileTransfer::read(std::vector<uint8_t>& buf) const class IncomingFileTransfer final : public DataTransfer { public: - IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&, - std::atomic<std::size_t>&); + IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&); bool start() override; @@ -215,18 +219,18 @@ public: void accept(const std::string&, std::size_t offset) override; + bool write(const uint8_t* buffer, std::size_t length) override; + private: IncomingFileTransfer() = delete; - std::atomic<std::size_t>& progressStorage_; + std::ofstream fout_; std::promise<void> filenamePromise_; }; IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId tid, - const DRing::DataTransferInfo& info, - std::atomic<std::size_t>& progress_storage) + const DRing::DataTransferInfo& info) : DataTransfer(tid) - , progressStorage_ {progress_storage} { RING_WARN() << "[FTP] incoming transfert of " << info.totalSize << " byte(s): " << info.displayName; @@ -240,7 +244,6 @@ std::streamsize IncomingFileTransfer::bytesProgress() const { std::lock_guard<std::mutex> lk {infoMutex_}; - info_.bytesProgress = progressStorage_.load(); return info_.bytesProgress; } @@ -249,33 +252,48 @@ IncomingFileTransfer::requestFilename() { emit(DRing::DataTransferEventCode::wait_host_acceptance); -#if 0 +#if 1 // Now wait for DataTransferFacade::acceptFileTransfer() call filenamePromise_.get_future().wait(); - return info_.path; #else // For DEBUG only char filename[] = "/tmp/ring_XXXXXX"; if (::mkstemp(filename) < 0) throw std::system_error(errno, std::generic_category()); - return filename; + info_.path = filename; #endif + return info_.path; } bool IncomingFileTransfer::start() { - if (DataTransfer::start()) { - emit(DRing::DataTransferEventCode::ongoing); - return true; + if (!DataTransfer::start()) + return false; + + fout_.open(&info_.path[0], std::ios::binary); + if (!fout_) { + RING_ERR() << "[FTP] Can't open file " << info_.path; + return false; } - return false; + + emit(DRing::DataTransferEventCode::ongoing); + return true; } void IncomingFileTransfer::close() noexcept { - filenamePromise_.set_value(); + DataTransfer::close(); + + try { + filenamePromise_.set_value(); + fout_.close(); + RING_DBG() << "[FTP] file closed with size " << info_.bytesProgress; + } catch (...) {} + + + emit(DRing::DataTransferEventCode::finished); } void @@ -289,6 +307,19 @@ IncomingFileTransfer::accept(const std::string& filename, std::size_t offset) start(); } +bool +IncomingFileTransfer::write(const uint8_t* buffer, std::size_t length) +{ + if (!length) + return true; + fout_.write(reinterpret_cast<const char*>(buffer), length); + if (!fout_) + return false; + std::lock_guard<std::mutex> lk {infoMutex_}; + info_.bytesProgress += length; + return true; +} + //============================================================================== class DataTransferFacade::Impl @@ -297,15 +328,11 @@ public: mutable std::mutex mapMutex_; std::unordered_map<DRing::DataTransferId, std::shared_ptr<DataTransfer>> map_; - std::shared_ptr<DataTransfer> createFileTransfer(const DRing::DataTransferInfo& info); - std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo& info, - std::atomic<std::size_t>& progress_storage); - - std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId& id); - - void cancel(DataTransfer& transfer); - - void onConnectionRequestReply(const DRing::DataTransferId& id, PeerConnection* connection); + std::shared_ptr<DataTransfer> createFileTransfer(const DRing::DataTransferInfo&); + std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&); + std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&); + void cancel(DataTransfer&); + void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*); }; void DataTransferFacade::Impl::cancel(DataTransfer& transfer) @@ -335,11 +362,10 @@ DataTransferFacade::Impl::createFileTransfer(const DRing::DataTransferInfo& info } std::shared_ptr<IncomingFileTransfer> -DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info, - std::atomic<std::size_t>& progress_storage) +DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info) { auto tid = generateUID(); - auto transfer = std::make_shared<IncomingFileTransfer>(tid, info, progress_storage); + auto transfer = std::make_shared<IncomingFileTransfer>(tid, info); std::lock_guard<std::mutex> lk {mapMutex_}; map_.emplace(tid, transfer); return transfer; @@ -450,13 +476,12 @@ DataTransferFacade::info(const DRing::DataTransferId& id) const throw std::invalid_argument("not existing DataTransferId"); } -std::string +std::shared_ptr<Stream> DataTransferFacade::onIncomingFileRequest(const std::string& account_id, const std::string& peer_uri, const std::string& display_name, std::size_t total_size, - std::size_t offset, - std::atomic<std::size_t>& progress_storage) + std::size_t offset) { DRing::DataTransferInfo info; info.accountId = account_id; @@ -466,11 +491,12 @@ DataTransferFacade::onIncomingFileRequest(const std::string& account_id, info.bytesProgress = offset; // remaining fields are overwritten - auto transfer = pimpl_->createIncomingFileTransfer(info, progress_storage); + auto transfer = pimpl_->createIncomingFileTransfer(info); auto filename = transfer->requestFilename(); if (!filename.empty()) - transfer->start(); // TODO: bad place, call only if file can be open - return filename; + if (transfer->start()) + return std::static_pointer_cast<Stream>(transfer); + return {}; } } // namespace ring diff --git a/src/data_transfer.h b/src/data_transfer.h index 0ccd9e5325327279effde3d204231b53d319c039..2b42ccd351084d4137127122eb4dc00c2d510017 100644 --- a/src/data_transfer.h +++ b/src/data_transfer.h @@ -27,6 +27,8 @@ namespace ring { +class Stream; + /// Front-end to data transfer service class DataTransferFacade { @@ -70,11 +72,11 @@ public: /// Create an IncomingFileTransfer object. /// \return a filename to open where incoming data will be written or an empty string /// in case of refusal. - std::string onIncomingFileRequest(const std::string& account_id, - const std::string& peer_uri, - const std::string& display_name, std::size_t total_size, - std::size_t offset, - std::atomic<std::size_t>& progress_storage); + std::shared_ptr<Stream> onIncomingFileRequest(const std::string& account_id, + const std::string& peer_uri, + const std::string& display_name, + std::size_t total_size, + std::size_t offset); private: class Impl; diff --git a/src/ftp_server.cpp b/src/ftp_server.cpp index e44284f1dae24998c7b72e674666649c8a3010d8..6388e3164faa9d14b27b312f6e86e2483e72103f 100644 --- a/src/ftp_server.cpp +++ b/src/ftp_server.cpp @@ -51,7 +51,7 @@ FtpServer::getId() const void FtpServer::close() noexcept { - out_.close(); + closeCurrentFile(); RING_WARN() << "[FTP] server closed"; } @@ -59,27 +59,21 @@ bool FtpServer::startNewFile() { // Request filename from client (WARNING: synchrone call!) - auto filename = Manager::instance().dataTransfers->onIncomingFileRequest(accountId_, - peerUri_, - displayName_, - fileSize_, - 0 /* TODO: offset */, - rx_); - if (filename.empty()) - return false; - - out_.open(&filename[0], std::ios::binary); - if (!out_) - throw std::system_error(errno, std::generic_category()); - RING_WARN() << "[FTP] Receiving file " << filename; + out_ = Manager::instance().dataTransfers->onIncomingFileRequest(accountId_, + peerUri_, + displayName_, + fileSize_, + 0 /* TODO: offset */); return true; } void FtpServer::closeCurrentFile() { - out_.close(); - RING_WARN() << "[FTP] File received, " << rx_ << " byte(s)"; + if (out_) { + out_->close(); + out_.reset(); + } } bool @@ -96,11 +90,15 @@ FtpServer::write(const std::vector<uint8_t>& buffer) state_ = FtpState::READ_DATA; while (headerStream_) { headerStream_.read(&line_[0], line_.size()); - out_.write(&line_[0], headerStream_.gcount()); - rx_ += headerStream_.gcount(); + auto count = headerStream_.gcount(); + if (!count) + continue; + out_->write(reinterpret_cast<const uint8_t*>(&line_[0]), count); + rx_ += count; if (rx_ >= fileSize_) { closeCurrentFile(); state_ = FtpState::PARSE_HEADERS; + break; } } headerStream_.clear(); @@ -109,7 +107,7 @@ FtpServer::write(const std::vector<uint8_t>& buffer) break; case FtpState::READ_DATA: - out_.write(reinterpret_cast<const char*>(&buffer[0]), buffer.size()); + out_->write(&buffer[0], buffer.size()); rx_ += buffer.size(); if (rx_ >= fileSize_) { closeCurrentFile(); diff --git a/src/ftp_server.h b/src/ftp_server.h index 8ba8142d4fa0aa1641e6605c6e0aad60e4a88b4c..8fbcb21df3ae08a42b70ef5fd41e05ff8fa8654a 100644 --- a/src/ftp_server.h +++ b/src/ftp_server.h @@ -24,9 +24,8 @@ #include <vector> #include <array> -#include <fstream> #include <sstream> -#include <atomic> +#include <memory> namespace ring { @@ -53,9 +52,9 @@ private: const std::string accountId_; const std::string peerUri_; - std::ofstream out_; + std::shared_ptr<Stream> out_; std::size_t fileSize_ {0}; - std::atomic<std::size_t> rx_ {0}; + std::size_t rx_ {0}; std::stringstream headerStream_; std::string displayName_; std::array<char, 1000> line_; diff --git a/src/peer_connection.h b/src/peer_connection.h index 2049613652a91e67ef2b48e4b817d90e39505ab7..e7c437dbeeed6408cb6e38c71400395046340a62 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -61,6 +61,11 @@ public: (void)buffer; return false; }; + virtual bool write(const uint8_t* buffer, std::size_t length) { + (void)buffer; + (void)length; + return false; + }; }; //==============================================================================