Commit 9c6fc26e authored by Guillaume Roguez's avatar Guillaume Roguez

datatransfer: fix incoming transfer

FTP/IncomingFileTransfer classes were not designed correctly
to talk between them.
This patch moves the file handling into IncomingFileTransfer class
and FtpServer only handles FTP protocol.

Change-Id: Ia12777dcb064b69b04bd5b6b3defc3ba3e84b2c3
Reviewed-by: default avatarOlivier Soldano <olivier.soldano@savoirfairelinux.com>
parent efd20a7e
......@@ -68,6 +68,10 @@ public:
return started_.compare_exchange_strong(expected, true);
}
void close() noexcept override {
started_ = false;
}
virtual std::streamsize bytesProgress() const {
std::lock_guard<std::mutex> lk {infoMutex_};
return info_.bytesProgress;
......@@ -152,6 +156,7 @@ FileTransfer::start()
void
FileTransfer::close() noexcept
{
DataTransfer::close();
input_.close();
if (info_.lastEvent < DRing::DataTransferEventCode::finished)
emit(DRing::DataTransferEventCode::closed_by_host);
......@@ -202,8 +207,7 @@ FileTransfer::read(std::vector<uint8_t>& buf) const
class IncomingFileTransfer final : public DataTransfer
{
public:
IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&,
std::atomic<std::size_t>&);
IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&);
bool start() override;
......@@ -215,18 +219,18 @@ public:
void accept(const std::string&, std::size_t offset) override;
bool write(const uint8_t* buffer, std::size_t length) override;
private:
IncomingFileTransfer() = delete;
std::atomic<std::size_t>& progressStorage_;
std::ofstream fout_;
std::promise<void> filenamePromise_;
};
IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId tid,
const DRing::DataTransferInfo& info,
std::atomic<std::size_t>& progress_storage)
const DRing::DataTransferInfo& info)
: DataTransfer(tid)
, progressStorage_ {progress_storage}
{
RING_WARN() << "[FTP] incoming transfert of " << info.totalSize << " byte(s): " << info.displayName;
......@@ -240,7 +244,6 @@ std::streamsize
IncomingFileTransfer::bytesProgress() const
{
std::lock_guard<std::mutex> lk {infoMutex_};
info_.bytesProgress = progressStorage_.load();
return info_.bytesProgress;
}
......@@ -249,33 +252,48 @@ IncomingFileTransfer::requestFilename()
{
emit(DRing::DataTransferEventCode::wait_host_acceptance);
#if 0
#if 1
// Now wait for DataTransferFacade::acceptFileTransfer() call
filenamePromise_.get_future().wait();
return info_.path;
#else
// For DEBUG only
char filename[] = "/tmp/ring_XXXXXX";
if (::mkstemp(filename) < 0)
throw std::system_error(errno, std::generic_category());
return filename;
info_.path = filename;
#endif
return info_.path;
}
bool
IncomingFileTransfer::start()
{
if (DataTransfer::start()) {
emit(DRing::DataTransferEventCode::ongoing);
return true;
if (!DataTransfer::start())
return false;
fout_.open(&info_.path[0], std::ios::binary);
if (!fout_) {
RING_ERR() << "[FTP] Can't open file " << info_.path;
return false;
}
return false;
emit(DRing::DataTransferEventCode::ongoing);
return true;
}
void
IncomingFileTransfer::close() noexcept
{
filenamePromise_.set_value();
DataTransfer::close();
try {
filenamePromise_.set_value();
fout_.close();
RING_DBG() << "[FTP] file closed with size " << info_.bytesProgress;
} catch (...) {}
emit(DRing::DataTransferEventCode::finished);
}
void
......@@ -289,6 +307,19 @@ IncomingFileTransfer::accept(const std::string& filename, std::size_t offset)
start();
}
bool
IncomingFileTransfer::write(const uint8_t* buffer, std::size_t length)
{
if (!length)
return true;
fout_.write(reinterpret_cast<const char*>(buffer), length);
if (!fout_)
return false;
std::lock_guard<std::mutex> lk {infoMutex_};
info_.bytesProgress += length;
return true;
}
//==============================================================================
class DataTransferFacade::Impl
......@@ -297,15 +328,11 @@ public:
mutable std::mutex mapMutex_;
std::unordered_map<DRing::DataTransferId, std::shared_ptr<DataTransfer>> map_;
std::shared_ptr<DataTransfer> createFileTransfer(const DRing::DataTransferInfo& info);
std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo& info,
std::atomic<std::size_t>& progress_storage);
std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId& id);
void cancel(DataTransfer& transfer);
void onConnectionRequestReply(const DRing::DataTransferId& id, PeerConnection* connection);
std::shared_ptr<DataTransfer> createFileTransfer(const DRing::DataTransferInfo&);
std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&);
std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&);
void cancel(DataTransfer&);
void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*);
};
void DataTransferFacade::Impl::cancel(DataTransfer& transfer)
......@@ -335,11 +362,10 @@ DataTransferFacade::Impl::createFileTransfer(const DRing::DataTransferInfo& info
}
std::shared_ptr<IncomingFileTransfer>
DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info,
std::atomic<std::size_t>& progress_storage)
DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info)
{
auto tid = generateUID();
auto transfer = std::make_shared<IncomingFileTransfer>(tid, info, progress_storage);
auto transfer = std::make_shared<IncomingFileTransfer>(tid, info);
std::lock_guard<std::mutex> lk {mapMutex_};
map_.emplace(tid, transfer);
return transfer;
......@@ -450,13 +476,12 @@ DataTransferFacade::info(const DRing::DataTransferId& id) const
throw std::invalid_argument("not existing DataTransferId");
}
std::string
std::shared_ptr<Stream>
DataTransferFacade::onIncomingFileRequest(const std::string& account_id,
const std::string& peer_uri,
const std::string& display_name,
std::size_t total_size,
std::size_t offset,
std::atomic<std::size_t>& progress_storage)
std::size_t offset)
{
DRing::DataTransferInfo info;
info.accountId = account_id;
......@@ -466,11 +491,12 @@ DataTransferFacade::onIncomingFileRequest(const std::string& account_id,
info.bytesProgress = offset;
// remaining fields are overwritten
auto transfer = pimpl_->createIncomingFileTransfer(info, progress_storage);
auto transfer = pimpl_->createIncomingFileTransfer(info);
auto filename = transfer->requestFilename();
if (!filename.empty())
transfer->start(); // TODO: bad place, call only if file can be open
return filename;
if (transfer->start())
return std::static_pointer_cast<Stream>(transfer);
return {};
}
} // namespace ring
......@@ -27,6 +27,8 @@
namespace ring {
class Stream;
/// Front-end to data transfer service
class DataTransferFacade
{
......@@ -70,11 +72,11 @@ public:
/// Create an IncomingFileTransfer object.
/// \return a filename to open where incoming data will be written or an empty string
/// in case of refusal.
std::string onIncomingFileRequest(const std::string& account_id,
const std::string& peer_uri,
const std::string& display_name, std::size_t total_size,
std::size_t offset,
std::atomic<std::size_t>& progress_storage);
std::shared_ptr<Stream> onIncomingFileRequest(const std::string& account_id,
const std::string& peer_uri,
const std::string& display_name,
std::size_t total_size,
std::size_t offset);
private:
class Impl;
......
......@@ -51,7 +51,7 @@ FtpServer::getId() const
void
FtpServer::close() noexcept
{
out_.close();
closeCurrentFile();
RING_WARN() << "[FTP] server closed";
}
......@@ -59,27 +59,21 @@ bool
FtpServer::startNewFile()
{
// Request filename from client (WARNING: synchrone call!)
auto filename = Manager::instance().dataTransfers->onIncomingFileRequest(accountId_,
peerUri_,
displayName_,
fileSize_,
0 /* TODO: offset */,
rx_);
if (filename.empty())
return false;
out_.open(&filename[0], std::ios::binary);
if (!out_)
throw std::system_error(errno, std::generic_category());
RING_WARN() << "[FTP] Receiving file " << filename;
out_ = Manager::instance().dataTransfers->onIncomingFileRequest(accountId_,
peerUri_,
displayName_,
fileSize_,
0 /* TODO: offset */);
return true;
}
void
FtpServer::closeCurrentFile()
{
out_.close();
RING_WARN() << "[FTP] File received, " << rx_ << " byte(s)";
if (out_) {
out_->close();
out_.reset();
}
}
bool
......@@ -96,11 +90,15 @@ FtpServer::write(const std::vector<uint8_t>& buffer)
state_ = FtpState::READ_DATA;
while (headerStream_) {
headerStream_.read(&line_[0], line_.size());
out_.write(&line_[0], headerStream_.gcount());
rx_ += headerStream_.gcount();
auto count = headerStream_.gcount();
if (!count)
continue;
out_->write(reinterpret_cast<const uint8_t*>(&line_[0]), count);
rx_ += count;
if (rx_ >= fileSize_) {
closeCurrentFile();
state_ = FtpState::PARSE_HEADERS;
break;
}
}
headerStream_.clear();
......@@ -109,7 +107,7 @@ FtpServer::write(const std::vector<uint8_t>& buffer)
break;
case FtpState::READ_DATA:
out_.write(reinterpret_cast<const char*>(&buffer[0]), buffer.size());
out_->write(&buffer[0], buffer.size());
rx_ += buffer.size();
if (rx_ >= fileSize_) {
closeCurrentFile();
......
......@@ -24,9 +24,8 @@
#include <vector>
#include <array>
#include <fstream>
#include <sstream>
#include <atomic>
#include <memory>
namespace ring {
......@@ -53,9 +52,9 @@ private:
const std::string accountId_;
const std::string peerUri_;
std::ofstream out_;
std::shared_ptr<Stream> out_;
std::size_t fileSize_ {0};
std::atomic<std::size_t> rx_ {0};
std::size_t rx_ {0};
std::stringstream headerStream_;
std::string displayName_;
std::array<char, 1000> line_;
......
......@@ -61,6 +61,11 @@ public:
(void)buffer;
return false;
};
virtual bool write(const uint8_t* buffer, std::size_t length) {
(void)buffer;
(void)length;
return false;
};
};
//==============================================================================
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment