Commit 8809a496 authored by Guillaume Roguez's avatar Guillaume Roguez Committed by Adrien Béraud

datatransfer: implement full-duplex

Add support for "wait_peer_acceptance" state.
Now a sender blocks until it receives acceptance
from receiver.

Change-Id: I3252a9c20c1ef2f2e1da573c570b99d816cbb451
Reviewed-by: default avatarOlivier Soldano <olivier.soldano@savoirfairelinux.com>
parent 9b03e03e
......@@ -105,27 +105,27 @@ DataTransfer::emit(DRing::DataTransferEventCode code) const
//==============================================================================
class FileTransfer final : public DataTransfer
class OutgoingFileTransfer final : public DataTransfer
{
public:
FileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info);
bool start() override;
OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info);
void close() noexcept override;
bool read(std::vector<uint8_t>&) const override;
bool write(const std::vector<uint8_t>& buffer) override;
private:
FileTransfer() = delete;
OutgoingFileTransfer() = delete;
mutable std::ifstream input_;
mutable std::size_t tx_ {0};
std::size_t tx_ {0};
mutable bool headerSent_ {false};
bool peerReady_ {false};
const std::string peerUri_;
};
FileTransfer::FileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info)
OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid,
const DRing::DataTransferInfo& info)
: DataTransfer(tid)
{
input_.open(info.path, std::ios::binary);
......@@ -141,18 +141,8 @@ FileTransfer::FileTransfer(DRing::DataTransferId tid, const DRing::DataTransferI
input_.seekg(0, std::ios_base::beg);
}
bool
FileTransfer::start()
{
if (DataTransfer::start()) {
emit(DRing::DataTransferEventCode::ongoing);
return true;
}
return false;
}
void
FileTransfer::close() noexcept
OutgoingFileTransfer::close() noexcept
{
DataTransfer::close();
input_.close();
......@@ -161,8 +151,9 @@ FileTransfer::close() noexcept
}
bool
FileTransfer::read(std::vector<uint8_t>& buf) const
OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
{
// Need to send headers?
if (!headerSent_) {
std::stringstream ss;
ss << "Content-Length: " << info_.totalSize << '\n'
......@@ -175,28 +166,42 @@ FileTransfer::read(std::vector<uint8_t>& buf) const
std::copy(std::begin(header), std::end(header), std::begin(buf));
headerSent_ = true;
emit(DRing::DataTransferEventCode::wait_peer_acceptance);
return true;
}
input_.read(reinterpret_cast<char*>(&buf[0]), buf.size());
auto n = input_.gcount();
buf.resize(n);
{
std::lock_guard<std::mutex> lk {infoMutex_};
info_.bytesProgress += n;
// Wait for peer ready reply?
if (!peerReady_) {
buf.resize(0);
return true;
}
if (n)
// Sending file data...
input_.read(reinterpret_cast<char*>(&buf[0]), buf.size());
buf.resize(input_.gcount());
if (buf.size()) {
std::lock_guard<std::mutex> lk {infoMutex_};
info_.bytesProgress += buf.size();
return true;
}
// File end reached?
if (input_.eof()) {
RING_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes";
emit(DRing::DataTransferEventCode::finished);
return false;
} else {
throw std::runtime_error("FileTransfer IO read failed"); // TODO: better exception?
}
throw std::runtime_error("FileTransfer IO read failed"); // TODO: better exception?
}
bool
OutgoingFileTransfer::write(const std::vector<uint8_t>& buffer)
{
if (not peerReady_ and not buffer.empty() and headerSent_) {
peerReady_ = true;
emit(DRing::DataTransferEventCode::ongoing);
}
return true;
}
......@@ -318,7 +323,7 @@ public:
mutable std::mutex mapMutex_;
std::unordered_map<DRing::DataTransferId, std::shared_ptr<DataTransfer>> map_;
std::shared_ptr<DataTransfer> createFileTransfer(const DRing::DataTransferInfo& info,
std::shared_ptr<DataTransfer> createOutgoingFileTransfer(const DRing::DataTransferInfo& info,
DRing::DataTransferId& tid);
std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&);
std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&);
......@@ -326,7 +331,8 @@ public:
void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*);
};
void DataTransferFacade::Impl::cancel(DataTransfer& transfer)
void
DataTransferFacade::Impl::cancel(DataTransfer& transfer)
{
transfer.close();
map_.erase(transfer.getId());
......@@ -343,11 +349,11 @@ DataTransferFacade::Impl::getTransfer(const DRing::DataTransferId& id)
}
std::shared_ptr<DataTransfer>
DataTransferFacade::Impl::createFileTransfer(const DRing::DataTransferInfo& info,
DRing::DataTransferId& tid)
DataTransferFacade::Impl::createOutgoingFileTransfer(const DRing::DataTransferInfo& info,
DRing::DataTransferId& tid)
{
tid = generateUID();
auto transfer = std::make_shared<FileTransfer>(tid, info);
auto transfer = std::make_shared<OutgoingFileTransfer>(tid, info);
{
std::lock_guard<std::mutex> lk {mapMutex_};
map_.emplace(tid, transfer);
......@@ -420,7 +426,7 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info,
}
try {
pimpl_->createFileTransfer(info, tid);
pimpl_->createOutgoingFileTransfer(info, tid);
} catch (const std::exception& ex) {
RING_ERR() << "[XFER] exception during createFileTransfer(): " << ex.what();
return DRing::DataTransferError::io;
......
......@@ -65,7 +65,7 @@ FtpServer::startNewFile()
info.displayName = displayName_;
info.totalSize = fileSize_;
info.bytesProgress = 0;
out_ = Manager::instance().dataTransfers->onIncomingFileRequest(info);
out_ = Manager::instance().dataTransfers->onIncomingFileRequest(info); // we block here until answer from client
return true;
}
......@@ -78,6 +78,16 @@ FtpServer::closeCurrentFile()
}
}
bool
FtpServer::read(std::vector<uint8_t>& buffer) const
{
if (!out_)
return false;
buffer.resize(3);
buffer[0] = 'G'; buffer[1] = 'O'; buffer[2] = '\n';
return true;
}
bool
FtpServer::write(const std::vector<uint8_t>& buffer)
{
......
......@@ -34,6 +34,7 @@ class FtpServer final : public Stream
public:
FtpServer(const std::string& account_id, const std::string& peer_uri);
bool read(std::vector<uint8_t>& buffer) const override;
bool write(const std::vector<uint8_t>& buffer) override;
DRing::DataTransferId getId() const override;
void close() noexcept override;
......
......@@ -567,27 +567,58 @@ PeerConnection::PeerConnectionImpl::eventLoop()
}
// Then handles IO streams
std::vector<uint8_t> buf(IO_BUFFER_SIZE);
std::vector<uint8_t> buf;
std::error_code ec;
handle_stream_list(inputs_, [&](auto& stream) {
if (!stream->read(buf))
return false;
auto size = endpoint_->write(buf, ec);
if (!ec)
return true;
if (!size)
bool sleep = true;
handle_stream_list(inputs_, [&] (auto& stream) {
buf.resize(IO_BUFFER_SIZE);
if (stream->read(buf)) {
endpoint_->write(buf, ec);
if (ec)
throw std::system_error(ec);
sleep &= buf.size() == 0;
} else {
// EOF on outgoing stream => finished
return false;
throw std::system_error(ec);
}
if (endpoint_->waitForData(0, ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
endpoint_->read(buf, ec);
if (ec)
throw std::system_error(ec);
return stream->write(buf);
} else if (ec)
throw std::system_error(ec);
return true;
});
handle_stream_list(outputs_, [&](auto& stream) {
if (endpoint_->waitForData(10, ec) > 0) {
auto size = endpoint_->read(buf, ec);
if (!ec)
return size > 0 and stream->write(buf);
} else if (!ec)
return true; // continue on msg handling
throw std::system_error(ec);
handle_stream_list(outputs_, [&] (auto& stream) {
buf.resize(IO_BUFFER_SIZE);
if (stream->read(buf)) {
endpoint_->write(buf, ec);
if (ec)
throw std::system_error(ec);
}
if (endpoint_->waitForData(0, ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
endpoint_->read(buf, ec);
if (ec)
throw std::system_error(ec);
sleep = false;
return stream->write(buf);
} else if (ec)
throw std::system_error(ec);
return true;
});
if (sleep)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment