From 705f819871af9df35fb1515640afe4d57932add4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Mon, 19 Oct 2020 14:06:20 -0400
Subject: [PATCH] channeled_transfers: remove io pool

Also makes ftp server non blocking

Change-Id: I26a3804a04c61274c064662d678b65747ff8ddc9
---
 src/data_transfer.cpp               |  56 +++++++------
 src/data_transfer.h                 |   6 +-
 src/ftp_server.cpp                  | 118 +++++++++++++++-------------
 src/ftp_server.h                    |  22 +++++-
 src/jamidht/channeled_transfers.cpp |  12 +--
 src/jamidht/p2p.cpp                 |   2 +-
 6 files changed, 125 insertions(+), 91 deletions(-)

diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp
index bacb166550..936867f18b 100644
--- a/src/data_transfer.cpp
+++ b/src/data_transfer.cpp
@@ -587,7 +587,7 @@ public:
 
     void close() noexcept override;
 
-    std::string requestFilename();
+    void requestFilename(const std::function<void(const std::string&)>& cb);
 
     void accept(const std::string&, std::size_t offset) override;
 
@@ -606,7 +606,8 @@ private:
     DRing::DataTransferId internalId_;
 
     std::ofstream fout_;
-    std::promise<void> filenamePromise_;
+    std::mutex cbMtx_ {};
+    std::function<void(const std::string&)> onFilenameCb_ {};
 };
 
 IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId tid,
@@ -623,9 +624,13 @@ IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId tid,
     info_.flags |= (uint32_t) 1 << int(DRing::DataTransferFlags::direction); // incoming
 }
 
-std::string
-IncomingFileTransfer::requestFilename()
+void
+IncomingFileTransfer::requestFilename(const std::function<void(const std::string&)>& cb)
 {
+    {
+        std::lock_guard<std::mutex> lk(cbMtx_);
+        onFilenameCb_ = cb;
+    }
     emit(DRing::DataTransferEventCode::wait_host_acceptance);
 
 #if 1
@@ -635,9 +640,7 @@ IncomingFileTransfer::requestFilename()
         if (not fileutils::isFile(filename))
             throw std::system_error(errno, std::generic_category());
         info_.path = filename;
-    } else {
-        // Now wait for DataTransferFacade::acceptFileTransfer() call
-        filenamePromise_.get_future().wait();
+        cb(filename);
     }
 #else
     // For DEBUG only
@@ -646,7 +649,6 @@ IncomingFileTransfer::requestFilename()
         throw std::system_error(errno, std::generic_category());
     info_.path = filename;
 #endif
-    return info_.path;
 }
 
 bool
@@ -675,10 +677,13 @@ IncomingFileTransfer::close() noexcept
     }
     DataTransfer::close();
 
-    try {
-        filenamePromise_.set_value();
-    } catch (...) {
+    decltype(onFilenameCb_) cb;
+    {
+        std::lock_guard<std::mutex> lk(cbMtx_);
+        cb = std::move(onFilenameCb_);
     }
+    if (cb)
+        cb("");
 
     fout_.close();
 
@@ -698,11 +703,13 @@ IncomingFileTransfer::accept(const std::string& filename, std::size_t offset)
     (void) offset;
 
     info_.path = filename;
-    try {
-        filenamePromise_.set_value();
-    } catch (const std::future_error& e) {
-        JAMI_WARN() << "transfer already accepted";
+    decltype(onFilenameCb_) cb;
+    {
+        std::lock_guard<std::mutex> lk(cbMtx_);
+        cb = std::move(onFilenameCb_);
     }
+    if (cb)
+        cb(filename);
 }
 
 bool
@@ -912,8 +919,8 @@ DataTransferFacade::bytesProgress(const DRing::DataTransferId& id,
 }
 
 DRing::DataTransferError
-DataTransferFacade::info(const DRing::DataTransferId& id, DRing::DataTransferInfo& info) const
-    noexcept
+DataTransferFacade::info(const DRing::DataTransferId& id,
+                         DRing::DataTransferInfo& info) const noexcept
 {
     try {
         if (auto transfer = pimpl_->getTransfer(id)) {
@@ -938,15 +945,18 @@ DataTransferFacade::createIncomingTransfer(const DRing::DataTransferInfo& info,
     return transfer->getId();
 }
 
-IncomingFileInfo
-DataTransferFacade::onIncomingFileRequest(const DRing::DataTransferId& id)
+void
+DataTransferFacade::onIncomingFileRequest(const DRing::DataTransferId& id,
+                                          const std::function<void(const IncomingFileInfo&)>& cb)
 {
     if (auto transfer = std::static_pointer_cast<IncomingFileTransfer>(pimpl_->getTransfer(id))) {
-        auto filename = transfer->requestFilename();
-        if (!filename.empty() && transfer->start())
-            return {id, std::static_pointer_cast<Stream>(transfer)};
+        transfer->requestFilename([transfer, id, cb = std::move(cb)](const std::string& filename) {
+            if (!filename.empty() && transfer->start())
+                cb({id, std::static_pointer_cast<Stream>(transfer)});
+            else
+                cb({id, nullptr});
+        });
     }
-    return {id, nullptr};
 }
 
 } // namespace jami
diff --git a/src/data_transfer.h b/src/data_transfer.h
index 0e4f760bab..7b3e572ce0 100644
--- a/src/data_transfer.h
+++ b/src/data_transfer.h
@@ -80,8 +80,10 @@ public:
                                                  const InternalCompletionCb& cb);
 
     /// Create an IncomingFileTransfer object.
-    /// \return a shared pointer on created Stream object, or nullptr in case of error
-    IncomingFileInfo onIncomingFileRequest(const DRing::DataTransferId& id);
+    /// @param id of the transfer
+    /// @param a shared pointer on created Stream object, or nullptr in case of error
+    void onIncomingFileRequest(const DRing::DataTransferId& id,
+                               const std::function<void(const IncomingFileInfo&)>& cb);
 
 private:
     class Impl;
diff --git a/src/ftp_server.cpp b/src/ftp_server.cpp
index 84f2f37af4..f34443372d 100644
--- a/src/ftp_server.cpp
+++ b/src/ftp_server.cpp
@@ -29,6 +29,7 @@
 #include <stdexcept>
 #include <iterator>
 #include <cstdlib> // strtoull
+#include <opendht/thread_pool.h>
 
 namespace jami {
 
@@ -62,7 +63,7 @@ FtpServer::close() noexcept
     JAMI_WARN() << "[FTP] server closed";
 }
 
-bool
+void
 FtpServer::startNewFile()
 {
     // Request filename from client (WARNING: synchrone call!)
@@ -78,35 +79,63 @@ FtpServer::startNewFile()
                                                              outId_,
                                                              cb_); // return immediately
     isTreatingRequest_ = true;
-    out_ = Manager::instance().dataTransfers->onIncomingFileRequest(
-        transferId_); // we block here until answer from client
-    isTreatingRequest_ = false;
-    if (!out_.stream) {
-        JAMI_DBG() << "[FTP] transfer aborted by client";
-        closed_ = true; // send NOK msg at next read()
-    } else {
-        if (tmpOnStateChangedCb_)
-            out_.stream->setOnStateChangedCb(std::move(tmpOnStateChangedCb_));
-        go_ = true;
-    }
+    Manager::instance().dataTransfers->onIncomingFileRequest(
+        transferId_, [w = weak()](const IncomingFileInfo& fileInfo) {
+            auto shared = w.lock();
+            if (!shared)
+                return;
+            shared->out_ = fileInfo;
+            shared->isTreatingRequest_ = false;
+            if (!shared->out_.stream) {
+                JAMI_DBG() << "[FTP] transfer aborted by client";
+                shared->closed_ = true; // send NOK msg at next read()
+            } else {
+                if (shared->tmpOnStateChangedCb_)
+                    shared->out_.stream->setOnStateChangedCb(
+                        std::move(shared->tmpOnStateChangedCb_));
+                shared->go_ = true;
+            }
 
-    if (onRecvCb_) {
-        std::vector<uint8_t> buffer;
-        if (go_) {
-            buffer.resize(3);
-            buffer[0] = 'G';
-            buffer[1] = 'O';
-            buffer[2] = '\n';
-        } else {
-            buffer.resize(4);
-            buffer[0] = 'N';
-            buffer[1] = 'G';
-            buffer[2] = 'O';
-            buffer[3] = '\n';
-        }
-        onRecvCb_(std::move(buffer));
-    }
-    return bool(out_.stream);
+            if (shared->onRecvCb_) {
+                std::vector<uint8_t> buffer;
+                if (shared->go_) {
+                    buffer.resize(3);
+                    buffer[0] = 'G';
+                    buffer[1] = 'O';
+                    buffer[2] = '\n';
+                } else {
+                    buffer.resize(4);
+                    buffer[0] = 'N';
+                    buffer[1] = 'G';
+                    buffer[2] = 'O';
+                    buffer[3] = '\n';
+                }
+                shared->onRecvCb_(std::move(buffer));
+            }
+
+            if (shared->out_.stream) {
+                shared->state_ = FtpState::READ_DATA;
+                while (shared->headerStream_) {
+                    shared->headerStream_.read(&shared->line_[0], shared->line_.size());
+                    std::size_t count = shared->headerStream_.gcount();
+                    if (!count)
+                        break;
+                    auto size_needed = shared->fileSize_ - shared->rx_;
+                    count = std::min(count, size_needed);
+                    shared->out_.stream->write(reinterpret_cast<const uint8_t*>(
+                                                &shared->line_[0]),
+                                            count);
+                    shared->rx_ += count;
+                    if (shared->rx_ == shared->fileSize_) {
+                        shared->closeCurrentFile();
+                        shared->state_ = FtpState::PARSE_HEADERS;
+                        return;
+                    }
+                }
+            }
+            shared->headerStream_.clear();
+            shared->headerStream_.str({}); // reset
+        });
 }
 
 void
@@ -152,32 +181,15 @@ bool
 FtpServer::write(const std::vector<uint8_t>& buffer)
 {
     switch (state_) {
+    case FtpState::WAIT_ACCEPTANCE:
+        // Receiving data while waiting, this is incorrect, because we didn't accept yet
+        closeCurrentFile();
+        state_ = FtpState::PARSE_HEADERS;
+        break;
     case FtpState::PARSE_HEADERS:
         if (parseStream(buffer)) {
-            if (!startNewFile()) {
-                headerStream_.clear();
-                headerStream_.str({}); // reset
-                return true;
-            }
-            state_ = FtpState::READ_DATA;
-            while (headerStream_) {
-                headerStream_.read(&line_[0], line_.size());
-                std::size_t count = headerStream_.gcount();
-                if (!count)
-                    break;
-                auto size_needed = fileSize_ - rx_;
-                count = std::min(count, size_needed);
-                if (out_.stream)
-                    out_.stream->write(reinterpret_cast<const uint8_t*>(&line_[0]), count);
-                rx_ += count;
-                if (rx_ == fileSize_) {
-                    closeCurrentFile();
-                    state_ = FtpState::PARSE_HEADERS;
-                    return true;
-                }
-            }
-            headerStream_.clear();
-            headerStream_.str({}); // reset
+            state_ = FtpState::WAIT_ACCEPTANCE;
+            startNewFile();
         }
         break;
 
diff --git a/src/ftp_server.h b/src/ftp_server.h
index 137238a4cd..b4c5546246 100644
--- a/src/ftp_server.h
+++ b/src/ftp_server.h
@@ -32,7 +32,7 @@ namespace jami {
 
 using RecvCb = std::function<void(std::vector<uint8_t>&& buf)>;
 
-class FtpServer final : public Stream
+class FtpServer final : public Stream, public std::enable_shared_from_this<FtpServer>
 {
 public:
     FtpServer(const std::string& account_id,
@@ -61,11 +61,12 @@ private:
     bool parseStream(const std::vector<uint8_t>&);
     bool parseLine(const std::string&);
     void handleHeader(const std::string&, const std::string&);
-    bool startNewFile();
+    void startNewFile();
     void closeCurrentFile();
 
     enum class FtpState {
         PARSE_HEADERS,
+        WAIT_ACCEPTANCE,
         READ_DATA,
     };
 
@@ -88,6 +89,23 @@ private:
     RecvCb onRecvCb_ {};
     InternalCompletionCb cb_ {};
     OnStateChangedCb tmpOnStateChangedCb_ {};
+
+    std::shared_ptr<FtpServer> shared()
+    {
+        return std::static_pointer_cast<FtpServer>(shared_from_this());
+    }
+    std::shared_ptr<FtpServer const> shared() const
+    {
+        return std::static_pointer_cast<FtpServer const>(shared_from_this());
+    }
+    std::weak_ptr<FtpServer> weak()
+    {
+        return std::static_pointer_cast<FtpServer>(shared_from_this());
+    }
+    std::weak_ptr<FtpServer const> weak() const
+    {
+        return std::static_pointer_cast<FtpServer const>(shared_from_this());
+    }
 };
 
 } // namespace jami
diff --git a/src/jamidht/channeled_transfers.cpp b/src/jamidht/channeled_transfers.cpp
index ba18b5c5a9..0cf081b539 100644
--- a/src/jamidht/channeled_transfers.cpp
+++ b/src/jamidht/channeled_transfers.cpp
@@ -56,11 +56,7 @@ ChanneledOutgoingTransfer::linkTransfer(const std::shared_ptr<Stream>& file)
         return;
     file_ = file;
     channel_->setOnRecv([this](const uint8_t* buf, size_t len) {
-        dht::ThreadPool::io().run(
-            [rx = std::vector<uint8_t>(buf, buf + len), file = std::weak_ptr<Stream>(file_)] {
-                if (auto f = file.lock())
-                    f->write(rx);
-            });
+        file_->write(std::vector<uint8_t>(buf, buf + len));
         return len;
     });
     file_->setOnRecv(
@@ -80,11 +76,7 @@ ChanneledIncomingTransfer::ChanneledIncomingTransfer(const std::shared_ptr<Chann
     , channel_(channel)
 {
     channel_->setOnRecv([this](const uint8_t* buf, size_t len) {
-        dht::ThreadPool::io().run(
-            [rx = std::vector<uint8_t>(buf, buf + len), ftp = std::weak_ptr<FtpServer>(ftp_)] {
-                if (auto f = ftp.lock())
-                    f->write(rx);
-            });
+        ftp_->write(std::vector<uint8_t>(buf, buf + len));
         return len;
     });
     ftp_->setOnRecv([channel = std::weak_ptr<ChannelSocket>(channel_)](std::vector<uint8_t>&& data) {
diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp
index e0d5312807..a85722f5dd 100644
--- a/src/jamidht/p2p.cpp
+++ b/src/jamidht/p2p.cpp
@@ -282,7 +282,7 @@ DhtPeerConnector::requestConnection(
                                                    channelReadyCb);
         },
 
-        [this, peer_h, onChanneledCancelled, accId = acc->getAccountID()](bool found) {
+        [peer_h, onChanneledCancelled, accId = acc->getAccountID()](bool found) {
             if (!found) {
                 JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h;
                 onChanneledCancelled();
-- 
GitLab