Skip to content
Snippets Groups Projects
Commit fe5e13c3 authored by Guillaume Roguez's avatar Guillaume Roguez
Browse files

fix datatransfer

- implement "cancel" file transfer method
- fixing various bugs found during testing

Change-Id: Iea23cb3e2cdf8b4649afdf7436ec0701f9b67bdf
parent 1e5a3092
No related branches found
No related tags found
No related merge requests found
......@@ -198,9 +198,19 @@ OutgoingFileTransfer::read(std::vector<uint8_t>& buf) const
bool
OutgoingFileTransfer::write(const std::vector<uint8_t>& buffer)
{
if (not peerReady_ and not buffer.empty() and headerSent_) {
if (buffer.empty())
return true;
if (not peerReady_ and headerSent_) {
// detect GO or NGO msg
if (buffer.size() == 3 and buffer[0] == 'G' and buffer[1] == 'O' and buffer[2] == '\n') {
peerReady_ = true;
emit(DRing::DataTransferEventCode::ongoing);
} else {
// consider any other response as a cancel msg
RING_WARN() << "FTP#" << getId() << ": refused by peer";
emit(DRing::DataTransferEventCode::closed_by_peer);
return false;
}
}
return true;
}
......
......@@ -65,8 +65,13 @@ FtpServer::startNewFile()
info.displayName = displayName_;
info.totalSize = fileSize_;
info.bytesProgress = 0;
rx_ = 0;
out_ = Manager::instance().dataTransfers->onIncomingFileRequest(info); // we block here until answer from client
return true;
if (!out_) {
RING_DBG() << "[FTP] transfer aborted by client";
closed_ = true; // send NOK msg at next read()
}
return bool(out_);
}
void
......@@ -75,16 +80,27 @@ FtpServer::closeCurrentFile()
if (out_) {
out_->close();
out_.reset();
closed_ = true;
}
}
bool
FtpServer::read(std::vector<uint8_t>& buffer) const
{
if (!out_)
return false;
if (!out_) {
if (closed_) {
closed_ = false;
buffer.resize(4);
buffer[0] = 'N'; buffer[1] = 'G'; buffer[2] = 'O'; buffer[3] = '\n';
RING_DBG() << "[FTP] sending NGO (cancel) order";
} else {
buffer.resize(0);
}
return true;
}
buffer.resize(3);
buffer[0] = 'G'; buffer[1] = 'O'; buffer[2] = '\n';
RING_DBG() << "[FTP] sending GO order";
return true;
}
......@@ -102,16 +118,18 @@ FtpServer::write(const std::vector<uint8_t>& buffer)
state_ = FtpState::READ_DATA;
while (headerStream_) {
headerStream_.read(&line_[0], line_.size());
auto count = headerStream_.gcount();
std::size_t count = headerStream_.gcount();
if (!count)
continue;
break;
auto size_needed = fileSize_ - rx_;
count = std::min(count, size_needed);
if (out_)
out_->write(reinterpret_cast<const uint8_t*>(&line_[0]), count);
rx_ += count;
if (rx_ >= fileSize_) {
if (rx_ == fileSize_) {
closeCurrentFile();
state_ = FtpState::PARSE_HEADERS;
break;
return true;
}
}
headerStream_.clear();
......@@ -120,13 +138,20 @@ FtpServer::write(const std::vector<uint8_t>& buffer)
break;
case FtpState::READ_DATA:
{
if (out_)
out_->write(&buffer[0], buffer.size());
rx_ += buffer.size();
if (rx_ >= fileSize_) {
auto size_needed = fileSize_ - rx_;
auto read_size = std::min(buffer.size(), size_needed);
rx_ += read_size;
if (rx_ == fileSize_) {
closeCurrentFile();
// data may remains into the buffer: copy into the header stream for next header parsing
if (read_size < buffer.size())
headerStream_ << std::string(std::begin(buffer) + read_size, std::end(buffer));
state_ = FtpState::PARSE_HEADERS;
}
}
break;
default: break;
......
......@@ -59,6 +59,7 @@ private:
std::stringstream headerStream_;
std::string displayName_;
std::array<char, 1000> line_;
mutable bool closed_ {false};
FtpState state_ {FtpState::PARSE_HEADERS};
};
......
......@@ -274,7 +274,10 @@ TcpSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
{
// NOTE: recv buf args is a void* on POSIX compliant system, but it's a char* on mingw
auto res = ::recv(sock_, reinterpret_cast<char*>(buf), len, 0);
if (res < 0)
ec.assign(errno, std::generic_category());
else
ec.clear();
return (res >= 0) ? res : 0;
}
......@@ -283,7 +286,10 @@ TcpSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code&
{
// NOTE: recv buf args is a void* on POSIX compliant system, but it's a char* on mingw
auto res = ::send(sock_, reinterpret_cast<const char*>(buf), len, 0);
if (res < 0)
ec.assign(errno, std::generic_category());
else
ec.clear();
return (res >= 0) ? res : 0;
}
......@@ -572,13 +578,16 @@ PeerConnection::PeerConnectionImpl::eventLoop()
bool sleep = true;
// sending loop
handle_stream_list(inputs_, [&] (auto& stream) {
buf.resize(IO_BUFFER_SIZE);
if (stream->read(buf)) {
if (not buf.empty()) {
endpoint_->write(buf, ec);
if (ec)
throw std::system_error(ec);
sleep &= buf.size() == 0;
sleep = false;
}
} else {
// EOF on outgoing stream => finished
return false;
......@@ -596,13 +605,18 @@ PeerConnection::PeerConnectionImpl::eventLoop()
return true;
});
// receiving loop
handle_stream_list(outputs_, [&] (auto& stream) {
buf.resize(IO_BUFFER_SIZE);
if (stream->read(buf)) {
auto eof = stream->read(buf);
// if eof we let a chance to send a reply before leaving
if (not buf.empty()) {
endpoint_->write(buf, ec);
if (ec)
throw std::system_error(ec);
}
if (not eof)
return false;
if (endpoint_->waitForData(0, ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment