diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp index f6701fab3dd6643260ac9995c128b896469704e5..6e31ae26a184c50761c2b1ad8a01c52f6959a0ea 100644 --- a/src/data_transfer.cpp +++ b/src/data_transfer.cpp @@ -27,6 +27,7 @@ #include "string_utils.h" #include "map_utils.h" #include "client/ring_signal.h" +#include "jamidht/p2p.h" #include <thread> #include <stdexcept> @@ -41,6 +42,7 @@ #include <cstdlib> // mkstemp #include <opendht/rng.h> +#include <opendht/thread_pool.h> namespace jami { @@ -52,6 +54,8 @@ generateUID() return dist(rd); } +constexpr const uint32_t MAX_BUFFER_SIZE {65534}; /* Channeled max packet size */ + //============================================================================== class DataTransfer : public Stream @@ -105,6 +109,8 @@ public: const DRing::DataTransferId id; + virtual void cancel() {} + protected: mutable std::mutex infoMutex_; mutable DRing::DataTransferInfo info_; @@ -256,9 +262,64 @@ public: bool write(const std::vector<uint8_t>& buffer) override; void emit(DRing::DataTransferEventCode code) const override; + void cancel() override { + if (auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId)) + account->closePeerConnection(peerUri_, id); + } + + void setOnRecv(std::function<void(std::vector<uint8_t>&&)>&& cb) override { + bool send = false; + { + std::lock_guard<std::mutex> lock(onRecvCbMtx_); + if (cb) send = true; + onRecvCb_ = std::move(cb); + } + if (send) { + std::vector<uint8_t> buf; + sendHeader(buf); // Pass headers to the new callback + } + } + private: SubOutgoingFileTransfer() = delete; + void sendHeader(std::vector<uint8_t>& buf) const { + std::stringstream ss; + ss << "Content-Length: " << info_.totalSize << '\n' + << "Display-Name: " << info_.displayName << '\n' + << "Offset: 0\n" + << '\n'; + auto header = ss.str(); + buf.resize(header.size()); + std::copy(std::begin(header), std::end(header), std::begin(buf)); + + headerSent_ = true; + emit(DRing::DataTransferEventCode::wait_peer_acceptance); + if (onRecvCb_) + onRecvCb_(std::move(buf)); + } + + void sendFile() const { + dht::ThreadPool::computation().run([this]() { + while (!input_.eof() && onRecvCb_) { + std::vector<uint8_t> buf; + buf.resize(MAX_BUFFER_SIZE); + + input_.read(reinterpret_cast<char*>(&buf[0]), buf.size()); + buf.resize(input_.gcount()); + if (buf.size()) { + std::lock_guard<std::mutex> lk {infoMutex_}; + info_.bytesProgress += buf.size(); + metaInfo_->updateInfo(info_); + } + if (onRecvCb_) + onRecvCb_(std::move(buf)); + } + JAMI_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes"; + emit(DRing::DataTransferEventCode::finished); + }); + } + mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_; mutable std::ifstream input_; std::size_t tx_ {0}; @@ -267,6 +328,8 @@ private: const std::string peerUri_; mutable std::unique_ptr<std::thread> timeoutThread_; mutable std::atomic_bool stopTimeout_ {false}; + std::mutex onRecvCbMtx_; + std::function<void(std::vector<uint8_t>&&)> onRecvCb_ {}; }; SubOutgoingFileTransfer::SubOutgoingFileTransfer(DRing::DataTransferId tid, @@ -301,10 +364,6 @@ SubOutgoingFileTransfer::closeAndEmit(DRing::DataTransferEventCode code) const n started_ = false; // NOTE: replace DataTransfer::close(); which is non const input_.close(); - // We don't need the connection anymore. Can close it. - auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId); - account->closePeerConnection(peerUri_, id); - if (info_.lastEvent < DRing::DataTransferEventCode::finished) emit(code); } @@ -314,18 +373,7 @@ SubOutgoingFileTransfer::read(std::vector<uint8_t>& buf) const { // Need to send headers? if (!headerSent_) { - std::stringstream ss; - ss << "Content-Length: " << info_.totalSize << '\n' - << "Display-Name: " << info_.displayName << '\n' - << "Offset: 0\n" - << '\n'; - - auto header = ss.str(); - buf.resize(header.size()); - std::copy(std::begin(header), std::end(header), std::begin(buf)); - - headerSent_ = true; - emit(DRing::DataTransferEventCode::wait_peer_acceptance); + sendHeader(buf); return true; } @@ -365,6 +413,8 @@ SubOutgoingFileTransfer::write(const std::vector<uint8_t>& buffer) if (buffer.size() == 3 and buffer[0] == 'G' and buffer[1] == 'O' and buffer[2] == '\n') { peerReady_ = true; emit(DRing::DataTransferEventCode::ongoing); + if (onRecvCb_) + sendFile(); } else { // consider any other response as a cancel msg JAMI_WARN() << "FTP#" << getId() << ": refused by peer"; @@ -408,6 +458,7 @@ class OutgoingFileTransfer final : public DataTransfer { public: OutgoingFileTransfer(DRing::DataTransferId tid, const DRing::DataTransferInfo& info); + ~OutgoingFileTransfer() {} std::shared_ptr<DataTransfer> startNewOutgoing(const std::string& peer_uri) { auto newTransfer = std::make_shared<SubOutgoingFileTransfer>(id, peer_uri, this->metaInfo_); @@ -416,6 +467,14 @@ public: return newTransfer; } + bool cancel(bool channeled) { + if (channeled) + cancelChanneled_ = true; + else + cancelIce_ = true; + return cancelChanneled_ && cancelIce_; + } + bool hasBeenStarted() const override { // Started if one subtransfer is started @@ -427,9 +486,17 @@ public: void close() noexcept override; + void cancel() { + for (const auto& subtransfer: subtransfer_) + subtransfer->cancel(); + } + private: OutgoingFileTransfer() = delete; + std::atomic_bool cancelChanneled_ {false}; + std::atomic_bool cancelIce_ {false}; + mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_; mutable std::ifstream input_; mutable std::vector<std::shared_ptr<SubOutgoingFileTransfer>> subtransfer_; @@ -456,8 +523,9 @@ OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid, const DRin void OutgoingFileTransfer::close() noexcept { - for (const auto& subtransfer : subtransfer_) - subtransfer->close(); + if (cancelChanneled_ && cancelIce_) + for (const auto& subtransfer : subtransfer_) + subtransfer->close(); } //============================================================================== @@ -465,7 +533,7 @@ OutgoingFileTransfer::close() noexcept class IncomingFileTransfer final : public DataTransfer { public: - IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&); + IncomingFileTransfer(DRing::DataTransferId, const DRing::DataTransferInfo&, DRing::DataTransferId); bool start() override; @@ -477,16 +545,24 @@ public: bool write(const uint8_t* buffer, std::size_t length) override; + void cancel() override { + auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId); + if (account) account->closePeerConnection(info_.peer, internalId_); + } + private: IncomingFileTransfer() = delete; + DRing::DataTransferId internalId_; + std::ofstream fout_; std::promise<void> filenamePromise_; }; IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId tid, - const DRing::DataTransferInfo& info) - : DataTransfer(tid) + const DRing::DataTransferInfo& info, + DRing::DataTransferId internalId) + : DataTransfer(tid), internalId_(internalId) { JAMI_WARN() << "[FTP] incoming transfert of " << info.totalSize << " byte(s): " << info.displayName; @@ -531,6 +607,11 @@ IncomingFileTransfer::start() void IncomingFileTransfer::close() noexcept { + { + std::lock_guard<std::mutex> lk {infoMutex_}; + if (info_.lastEvent >= DRing::DataTransferEventCode::finished) + return; + } DataTransfer::close(); try { @@ -539,10 +620,6 @@ IncomingFileTransfer::close() noexcept fout_.close(); - // We don't need the connection anymore. Can close it. - auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId); - account->closePeerConnection(info_.peer, id); - JAMI_DBG() << "[FTP] file closed, rx " << info_.bytesProgress << " on " << info_.totalSize; if (info_.bytesProgress >= info_.totalSize) @@ -588,7 +665,7 @@ public: std::shared_ptr<DataTransfer> createOutgoingFileTransfer(const DRing::DataTransferInfo& info, DRing::DataTransferId& tid); - std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&); + std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const DRing::DataTransferInfo&, const DRing::DataTransferId&); std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&); void cancel(DataTransfer&); void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*); @@ -625,10 +702,10 @@ DataTransferFacade::Impl::createOutgoingFileTransfer(const DRing::DataTransferIn } std::shared_ptr<IncomingFileTransfer> -DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info) +DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferInfo& info, const DRing::DataTransferId& internal_id) { auto tid = generateUID(); - auto transfer = std::make_shared<IncomingFileTransfer>(tid, info); + auto transfer = std::make_shared<IncomingFileTransfer>(tid, info, internal_id); { std::lock_guard<std::mutex> lk {mapMutex_}; map_.emplace(tid, transfer); @@ -648,7 +725,8 @@ DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId& connection->getPeerUri() ) ); - } else if (not transfer->hasBeenStarted()) { + } else if (std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->cancel(false) + and not transfer->hasBeenStarted()) { transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); cancel(*transfer); } @@ -705,6 +783,21 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info, info.peer, tid, [this, tid] (PeerConnection* connection) { pimpl_->onConnectionRequestReply(tid, connection); + }, + [this, tid](const std::shared_ptr<ChanneledOutgoingTransfer>& out) { + if (auto transfer = pimpl_->getTransfer(tid)) + if (out) + out->linkTransfer( + std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->startNewOutgoing(out->peer()) + ); + }, + [this, tid]() { + if (auto transfer = pimpl_->getTransfer(tid)) + if (std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->cancel(true) + and not transfer->hasBeenStarted()) { + transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); + pimpl_->cancel(*transfer); + } }); return DRing::DataTransferError::success; } catch (const std::exception& ex) { @@ -734,6 +827,7 @@ DRing::DataTransferError DataTransferFacade::cancel(const DRing::DataTransferId& id) noexcept { if (auto transfer = pimpl_->getTransfer(id)) { + transfer->cancel(); pimpl_->cancel(*transfer); return DRing::DataTransferError::success; } @@ -744,7 +838,12 @@ void DataTransferFacade::close(const DRing::DataTransferId &id) noexcept { std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; - pimpl_->map_.erase(id); + const auto& iter = pimpl_->map_.find(static_cast<uint64_t>(id)); + if (iter != std::end(pimpl_->map_)) { + // NOTE: don't erase from map. The client can retrieve + // related info() to know if the file is finished. + iter->second->close(); + } } DRing::DataTransferError @@ -779,16 +878,24 @@ DataTransferFacade::info(const DRing::DataTransferId& id, return DRing::DataTransferError::unknown; } +DRing::DataTransferId +DataTransferFacade::createIncomingTransfer(const DRing::DataTransferInfo &info, const DRing::DataTransferId& internal_id) +{ + auto transfer = pimpl_->createIncomingFileTransfer(info, internal_id); + if (!transfer) + return {}; + return transfer->getId(); +} + IncomingFileInfo -DataTransferFacade::onIncomingFileRequest(const DRing::DataTransferInfo &info) { - auto transfer = pimpl_->createIncomingFileTransfer(info); - if (!transfer) - return {}; - auto filename = transfer->requestFilename(); - if (!filename.empty()) - if (transfer->start()) - return {transfer->getId(), std::static_pointer_cast<Stream>(transfer)}; - return {transfer->getId(), nullptr}; +DataTransferFacade::onIncomingFileRequest(const DRing::DataTransferId& id) +{ + if (auto transfer = std::static_pointer_cast<IncomingFileTransfer>(pimpl_->getTransfer(id))) { + auto filename = transfer->requestFilename(); + if (!filename.empty() && transfer->start()) + return {id, std::static_pointer_cast<Stream>(transfer)}; + } + return {id, nullptr}; } } // namespace jami diff --git a/src/data_transfer.h b/src/data_transfer.h index d4da30b84dec5608fdaa6a692a718e47e0eb4ca1..f6510c41ba9930274c3bb38f0f1871bce5d1fdcf 100644 --- a/src/data_transfer.h +++ b/src/data_transfer.h @@ -67,9 +67,12 @@ public: DRing::DataTransferError bytesProgress(const DRing::DataTransferId& id, int64_t& total, int64_t& progress) const noexcept; + /// Used by p2p.cpp + DRing::DataTransferId createIncomingTransfer(const DRing::DataTransferInfo &info, const DRing::DataTransferId& internal_id); + /// Create an IncomingFileTransfer object. /// \return a shared pointer on created Stream object, or nullptr in case of error - IncomingFileInfo onIncomingFileRequest(const DRing::DataTransferInfo &info); + IncomingFileInfo onIncomingFileRequest(const DRing::DataTransferId& id); private: class Impl; diff --git a/src/ftp_server.cpp b/src/ftp_server.cpp index 4c09f43c45499334517fc3d758d631c778c81196..7779d4365cda52223825956a2a10c511d4744332 100644 --- a/src/ftp_server.cpp +++ b/src/ftp_server.cpp @@ -35,10 +35,12 @@ namespace jami { //============================================================================== FtpServer::FtpServer(const std::string& account_id, - const std::string& peer_uri) + const std::string& peer_uri, + const DRing::DataTransferId& outId) : Stream() , accountId_ {account_id} , peerUri_ {peer_uri} + , outId_ {outId} {} DRing::DataTransferId @@ -46,6 +48,8 @@ FtpServer::getId() const { // Because FtpServer is just the protocol on the top of a stream so the id // of the stream is the id of out_. + if (isTreatingRequest_) + return transferId_; return out_.id; } @@ -67,13 +71,28 @@ FtpServer::startNewFile() info.totalSize = fileSize_; info.bytesProgress = 0; rx_ = 0; - out_ = Manager::instance().dataTransfers->onIncomingFileRequest(info); // we block here until answer from client + transferId_ = Manager::instance().dataTransfers->createIncomingTransfer(info, outId_); // return immediately + isTreatingRequest_ = true; + out_ = Manager::instance().dataTransfers->onIncomingFileRequest(transferId_); // we block here until answer from client + isTreatingRequest_ = false; if (!out_.stream) { JAMI_DBG() << "[FTP] transfer aborted by client"; closed_ = true; // send NOK msg at next read() } else { go_ = true; } + + if (onRecvCb_) { + std::vector<uint8_t> buffer; + if (go_) { + buffer.resize(3); + buffer[0] = 'G'; buffer[1] = 'O'; buffer[2] = '\n'; + } else { + buffer.resize(4); + buffer[0] = 'N'; buffer[1] = 'G'; buffer[2] = 'O'; buffer[3] = '\n'; + } + onRecvCb_(std::move(buffer)); + } return bool(out_.stream); } diff --git a/src/ftp_server.h b/src/ftp_server.h index 6f620e68e9acafd4d6068c5e8cee7311eb19404b..0285fa62fb1195e316bf203f025c83224c8839b0 100644 --- a/src/ftp_server.h +++ b/src/ftp_server.h @@ -30,16 +30,22 @@ namespace jami { +using RecvCb = std::function<void(std::vector<uint8_t>&& buf)>; + class FtpServer final : public Stream { public: - FtpServer(const std::string& account_id, const std::string& peer_uri); + FtpServer(const std::string& account_id, const std::string& peer_uri, const DRing::DataTransferId& outId = 0); bool read(std::vector<uint8_t>& buffer) const override; bool write(const std::vector<uint8_t>& buffer) override; DRing::DataTransferId getId() const override; void close() noexcept override; + void setOnRecv(RecvCb&& cb) { + onRecvCb_ = cb; + } + private: bool parseStream(const std::vector<uint8_t>&); bool parseLine(const std::string&); @@ -54,7 +60,10 @@ private: const std::string accountId_; const std::string peerUri_; + std::atomic_bool isTreatingRequest_ {false}; + DRing::DataTransferId transferId_ {0}; IncomingFileInfo out_ {0, nullptr}; + DRing::DataTransferId outId_ {0}; std::size_t fileSize_ {0}; std::size_t rx_ {0}; std::stringstream headerStream_; @@ -63,6 +72,8 @@ private: mutable bool closed_ {false}; mutable bool go_ {false}; FtpState state_ {FtpState::PARSE_HEADERS}; + + RecvCb onRecvCb_ {}; }; } // namespace jami diff --git a/src/jamidht/Makefile.am b/src/jamidht/Makefile.am index c991556d13059cf01a4bc2f3b8c1a1280b830a57..080c15e69e67298e020b4c65dbca7732c904e426 100644 --- a/src/jamidht/Makefile.am +++ b/src/jamidht/Makefile.am @@ -19,6 +19,8 @@ libringacc_la_SOURCES = \ connectionmanager.cpp \ channeled_transport.h \ channeled_transport.cpp \ + channeled_transfers.h \ + channeled_transfers.cpp \ multiplexed_socket.h \ multiplexed_socket.cpp \ sips_transport_ice.cpp \ diff --git a/src/jamidht/channeled_transfers.cpp b/src/jamidht/channeled_transfers.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c6f64439d7e2fad24854422376e4232d9cabb55a --- /dev/null +++ b/src/jamidht/channeled_transfers.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2020 Savoir-faire Linux Inc. + * + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "channeled_transfers.h" + +#include "ftp_server.h" +#include "multiplexed_socket.h" + +#include <opendht/thread_pool.h> + +#include "jamiaccount.h" + +namespace jami { + +ChanneledOutgoingTransfer::ChanneledOutgoingTransfer(const std::shared_ptr<ChannelSocket>& channel) +: channel_(channel) +{} + +ChanneledOutgoingTransfer::~ChanneledOutgoingTransfer() +{ + channel_->setOnRecv({}); + file_->setOnRecv({}); + channel_->shutdown(); +} + +std::string +ChanneledOutgoingTransfer::peer() const +{ + return channel_ ? "" : channel_->deviceId(); +} + +void +ChanneledOutgoingTransfer::linkTransfer(const std::shared_ptr<Stream>& file) +{ + if (!file) return; + file_ = file; + channel_->setOnRecv([this](const uint8_t* buf, size_t len) { + dht::ThreadPool::io().run([ + rx=std::vector<uint8_t>(buf, buf+len), + file=std::weak_ptr<Stream>(file_)] { + if (auto f = file.lock()) + f->write(rx); + }); + return len; + }); + file_->setOnRecv([channel = std::weak_ptr<ChannelSocket>(channel_)](std::vector<uint8_t>&& data) { + if (auto c = channel.lock()) { + std::error_code ec; + c->write(data.data(), data.size(), ec); + } + }); +} + +ChanneledIncomingTransfer::ChanneledIncomingTransfer(const std::shared_ptr<ChannelSocket>& channel, const std::shared_ptr<FtpServer>& ftp) +: ftp_ (ftp) +, channel_(channel) +{ + channel_->setOnRecv([this](const uint8_t* buf, size_t len) { + dht::ThreadPool::io().run([ + rx=std::vector<uint8_t>(buf, buf+len), + ftp=std::weak_ptr<FtpServer>(ftp_)] { + if (auto f = ftp.lock()) + f->write(rx); + }); + return len; + }); + ftp_->setOnRecv([channel = std::weak_ptr<ChannelSocket>(channel_)](std::vector<uint8_t>&& data) { + if (auto c = channel.lock()) { + std::error_code ec; + c->write(data.data(), data.size(), ec); + } + }); +} + +ChanneledIncomingTransfer::~ChanneledIncomingTransfer() +{ + channel_->setOnRecv({}); + channel_->shutdown(); +} + +DRing::DataTransferId +ChanneledIncomingTransfer::id() const +{ + if (ftp_) + return ftp_->getId(); + return 0; +} + +} \ No newline at end of file diff --git a/src/jamidht/channeled_transfers.h b/src/jamidht/channeled_transfers.h new file mode 100644 index 0000000000000000000000000000000000000000..9e8224515b9e2b7e08c4d4e5bb3bb1cafcb5db1c --- /dev/null +++ b/src/jamidht/channeled_transfers.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2020 Savoir-faire Linux Inc. + * + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include <string> +#include <memory> + +#include "dring/datatransfer_interface.h" + +namespace jami { + +class ChannelSocket; +class Stream; +class FtpServer; + +class ChanneledOutgoingTransfer { +public: + ChanneledOutgoingTransfer(const std::shared_ptr<ChannelSocket>& channel); + ~ChanneledOutgoingTransfer(); + void linkTransfer(const std::shared_ptr<Stream>& file); + std::string peer() const; +private: + std::shared_ptr<ChannelSocket> channel_ {}; + std::shared_ptr<Stream> file_; +}; + +class ChanneledIncomingTransfer { +public: + ChanneledIncomingTransfer(const std::shared_ptr<ChannelSocket>& channel, const std::shared_ptr<FtpServer>& ftp); + ~ChanneledIncomingTransfer(); + DRing::DataTransferId id() const; +private: + std::shared_ptr<FtpServer> ftp_; + std::shared_ptr<ChannelSocket> channel_; +}; + +} diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index a0babbdd420af204d1aa244629b919ee8f0de004..29220294cbb499bf5e2c33989ac2cf765c2934f5 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -2001,18 +2001,39 @@ JamiAccount::doRegister_() auto result = fut.get(); return result; }); - connectionManager_->onChannelRequest([](const std::string& /* deviceId */, const std::string& name) { + connectionManager_->onChannelRequest([this](const std::string& /* deviceId */, const std::string& name) { if (name == "sip") { return true; + } else if (name.substr(0, 7) == "file://") { + auto tid_str = name.substr(7); + uint64_t tid; + std::istringstream iss(tid_str); + iss >> tid; + if (dhtPeerConnector_->onIncomingChannelRequest(tid)) { + incomingFileTransfers_.emplace(tid_str); + return true; + } } return false; }); connectionManager_->onConnectionReady([this](const std::string& deviceId, const std::string& name, std::shared_ptr<ChannelSocket> channel) { - if (channel && name == "sip") { + if (channel) { auto cert = tls::CertificateStore::instance().getCertificate(deviceId); if (!cert || !cert->issuer) return; auto peerId = cert->issuer->getId().toString(); - if (channel) cacheSIPConnection(std::move(channel), peerId, deviceId); + if (name == "sip") { + cacheSIPConnection(std::move(channel), peerId, deviceId); + } else if (name.substr(0, 7) == "file://") { + auto tid_str = name.substr(7); + auto it = incomingFileTransfers_.find(tid_str); + // Note, outgoing file transfers are ignored. + if (it == incomingFileTransfers_.end()) return; + incomingFileTransfers_.erase(it); + uint64_t tid; + std::istringstream iss(tid_str); + iss >> tid; + dhtPeerConnector_->onIncomingConnection(peerId, tid, std::move(channel)); + } } }); @@ -3038,9 +3059,11 @@ JamiAccount::publicAddresses() void JamiAccount::requestPeerConnection(const std::string& peer_id, const DRing::DataTransferId& tid, - const std::function<void(PeerConnection*)>& connect_cb) + const std::function<void(PeerConnection*)>& connect_cb, + const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, + const std::function<void()>& onChanneledCancelled) { - dhtPeerConnector_->requestConnection(peer_id, tid, connect_cb); + dhtPeerConnector_->requestConnection(peer_id, tid, connect_cb, channeledConnectedCb, onChanneledCancelled); } void diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 8a314b9cc5e84c7bc9e56cdfd783435167d0fab4..e188c256329e97783c5935400ab2f11a64918a1a 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -78,6 +78,7 @@ class AccountManager; struct AccountInfo; class ChannelSocket; class SipTransport; +class ChanneledOutgoingTransfer; /** * @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity. @@ -353,7 +354,9 @@ public: /// /// \param[in] tid linked outgoing data transfer /// void requestPeerConnection(const std::string& peer, const DRing::DataTransferId& tid, - const std::function<void(PeerConnection*)>& connect_cb); + const std::function<void(PeerConnection*)>& connect_cb, + const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, + const std::function<void()>& onChanneledCancelled); /// /// Close a E2E connection between a given peer and a given transfer id. @@ -716,6 +719,9 @@ private: * @param deviceId Device linked to that transport */ void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const std::string& peerId, const std::string& deviceId); + + // File transfers + std::set<std::string> incomingFileTransfers_ {}; }; static inline std::ostream& operator<< (std::ostream& os, const JamiAccount& acc) diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp index 46811dc7280625f962393a4cc8d432f7014a05dd..53c7d9819d5f61eaed3a2a3adc6409a72d2d31ef 100644 --- a/src/jamidht/p2p.cpp +++ b/src/jamidht/p2p.cpp @@ -30,6 +30,8 @@ #include "peer_connection.h" #include "turn_transport.h" #include "account_manager.h" +#include "multiplexed_socket.h" +#include "connectionmanager.h" #include <opendht/default_types.h> #include <opendht/rng.h> @@ -101,22 +103,20 @@ public: dht::Value::Id id = dht::Value::INVALID_ID; uint32_t protocol {protocol_version}; ///< Protocol identification. First bit reserved to indicate a request (0) or a response (1) std::vector<std::string> addresses; ///< Request: public addresses for TURN permission. Response: TURN relay addresses (only 1 in current implementation) - MSGPACK_DEFINE_MAP(id, protocol, addresses) + uint64_t tid {0}; + MSGPACK_DEFINE_MAP(id, protocol, addresses, tid) PeerConnectionMsg() = default; - PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::string& arelay) - : id {id}, protocol {aprotocol}, addresses {{arelay}} {} - PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::vector<std::string>& asrelay) - : id {id}, protocol {aprotocol}, addresses {asrelay} {} - + PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::string& arelay, uint64_t transfer_id) + : id {id}, protocol {aprotocol}, addresses {{arelay}}, tid {transfer_id} {} + PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::vector<std::string>& asrelay, uint64_t transfer_id) + : id {id}, protocol {aprotocol}, addresses {asrelay}, tid {transfer_id} {} bool isRequest() const noexcept { return (protocol & 1) == 0; } - PeerConnectionMsg respond(const IpAddr& relay) const { - return {id, protocol|1, relay.toString(true, true)}; + return {id, protocol|1, relay.toString(true, true), tid}; } - PeerConnectionMsg respond(const std::vector<std::string>& addresses) const { - return {id, protocol|1, addresses}; + return {id, protocol|1, addresses, tid}; } }; @@ -172,6 +172,7 @@ public: std::mutex clientsMutex_; void cancel(const std::string& peer_id, const DRing::DataTransferId& tid); + void cancelChanneled(const DRing::DataTransferId& tid); void onRequestMsg(PeerConnectionMsg&&); void onTrustedRequestMsg(PeerConnectionMsg&&, const std::shared_ptr<dht::crypto::Certificate>&, @@ -203,6 +204,15 @@ public: std::weak_ptr<DhtPeerConnector::Impl const> weak() const { return std::static_pointer_cast<DhtPeerConnector::Impl const>(shared_from_this()); } + + + // For Channeled transports + std::mutex channeledIncomingMtx_; + std::map<DRing::DataTransferId, std::unique_ptr<ChanneledIncomingTransfer>> channeledIncoming_; + std::mutex channeledOutgoingMtx_; + std::map<DRing::DataTransferId, std::shared_ptr<ChanneledOutgoingTransfer>> channeledOutgoing_; + std::mutex incomingTransfersMtx_; + std::set<DRing::DataTransferId> incomingTransfers_; }; //============================================================================== @@ -222,11 +232,14 @@ public: const std::shared_ptr<dht::crypto::Certificate>& peer_cert, const std::vector<std::string>& public_addresses, const ListenerFunction& connect_cb) - : parent_ {parent} - , tid_ {tid} + : tid_ {tid} + , parent_ {parent} , peer_ {peer_h} , publicAddresses_ {public_addresses} , peerCertificate_ {peer_cert} { + auto shared = parent_.account.lock(); + if (!shared) return; + waitId_ = ValueIdDist()(shared->rand); addListener(connect_cb); processTask_ = std::async( std::launch::async, @@ -243,7 +256,6 @@ public: for (auto& cb: listeners_) cb(nullptr); connection_.reset(); - } bool hasAlreadyAResponse() { @@ -274,6 +286,7 @@ public: responseCV_.notify_all(); } + const DRing::DataTransferId tid_; private: void process() { // Add ice msg into the addresses @@ -304,10 +317,10 @@ private: // Prepare connection request as a DHT message PeerConnectionMsg request; - request.id = ValueIdDist()(acc->rand); /* Random id for the message unicity */ - waitId_ = request.id; + request.id = waitId_; /* Random id for the message unicity */ request.addresses = {icemsg.str()}; request.addresses.insert(request.addresses.end(), publicAddresses_.begin(), publicAddresses_.end()); + request.tid = tid_; // Send connection request through DHT JAMI_DBG() << acc << "[CNX] request connection to " << peer_; @@ -419,7 +432,6 @@ private: } Impl& parent_; - const DRing::DataTransferId tid_; const dht::InfoHash peer_; std::vector<std::string> publicAddresses_; @@ -493,10 +505,19 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request, auto acc = account.lock(); if (!acc) return; + if (request.tid != 0) { + std::lock_guard<std::mutex> lk(incomingTransfersMtx_); + if (incomingTransfers_.find(request.tid) != incomingTransfers_.end()) { + JAMI_INFO("Incoming request for id(%lu) is already treated via channeled socket", request.tid); + return; + } + incomingTransfers_.emplace(request.tid); + } + // Save peer certificate for later TLS session (MUST BE DONE BEFORE TURN PEER AUTHORIZATION) certMap_.emplace(cert->getId(), std::make_pair(cert, peer_h)); - auto sendRelayV4 = false, sendRelayV6 = false, sendIce = false, hasPubIp = false; + auto sendIce = false, hasPubIp = false; struct IceReady { std::mutex mtx {}; @@ -722,6 +743,29 @@ DhtPeerConnector::Impl::cancel(const std::string& peer_id, const DRing::DataTran }); } +void +DhtPeerConnector::Impl::cancelChanneled(const DRing::DataTransferId& tid) { + dht::ThreadPool::io().run([w=weak(), tid] { + auto shared = w.lock(); + if (!shared) return; + // Cancel outgoing files + DRing::DataTransferId finalId = tid; + { + std::lock_guard<std::mutex> lk(shared->channeledIncomingMtx_); + auto it = shared->channeledIncoming_.find(tid); + if (it != shared->channeledIncoming_.end()) { + finalId = it->second->id(); + } + shared->channeledIncoming_.erase(tid); + } + { + std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_); + shared->channeledOutgoing_.erase(tid); + } + Manager::instance().dataTransfers->close(finalId); + }); +} + //============================================================================== DhtPeerConnector::DhtPeerConnector(JamiAccount& account) @@ -761,7 +805,9 @@ DhtPeerConnector::onDhtConnected(const std::string& device_id) void DhtPeerConnector::requestConnection(const std::string& peer_id, const DRing::DataTransferId& tid, - const std::function<void(PeerConnection*)>& connect_cb) + const std::function<void(PeerConnection*)>& connect_cb, + const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, + const std::function<void()>& onChanneledCancelled) { const auto peer_h = dht::InfoHash(peer_id); @@ -776,16 +822,9 @@ DhtPeerConnector::requestConnection(const std::string& peer_id, if (!acc) return; auto addresses = acc->publicAddresses(); - // Add local addresses - // XXX: is it really needed? use-case? a local TURN server? - //addresses.emplace_back(ip_utils::getLocalAddr(AF_INET)); - //addresses.emplace_back(ip_utils::getLocalAddr(AF_INET6)); - - // TODO: bypass DHT devices lookup if connection already exist - acc->forEachDevice( peer_h, - [this, addresses, connect_cb, tid](const dht::InfoHash& dev_h) { + [this, addresses, connect_cb, tid, channeledConnectedCb, onChanneledCancelled](const dht::InfoHash& dev_h) { auto acc = pimpl_->account.lock(); if (!acc) return; if (dev_h == acc->dht()->getId()) { @@ -793,6 +832,41 @@ DhtPeerConnector::requestConnection(const std::string& peer_id, return; } + acc->connectionManager().connectDevice(dev_h.toString(), "file://" + std::to_string(tid), + [this, tid, channeledConnectedCb, onChanneledCancelled, connect_cb](const std::shared_ptr<ChannelSocket>& channel) { + auto shared = pimpl_->account.lock(); + if (!channel) { + onChanneledCancelled(); + return; + } + if (!shared) return; + JAMI_INFO("New file channel for outgoing transfer with id(%lu)", tid); + + auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>(channel); + { + std::lock_guard<std::mutex> lk(pimpl_->channeledOutgoingMtx_); + pimpl_->channeledOutgoing_.emplace(tid, outgoingFile); + } + + channel->onShutdown([this, tid, onChanneledCancelled]() { + JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid); + onChanneledCancelled(); + dht::ThreadPool::io().run([w=pimpl_->weak(), tid] { + auto shared = w.lock(); + if (!shared) return; + // Cancel outgoing files + { + std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_); + shared->channeledOutgoing_.erase(tid); + } + Manager::instance().dataTransfers->close(tid); + }); + }); + // Cancel via DHT because we will use the channeled path + connect_cb(nullptr); + channeledConnectedCb(outgoingFile); + }); + acc->findCertificate( dev_h, [this, dev_h, addresses, connect_cb, tid] (const std::shared_ptr<dht::crypto::Certificate>& cert) { @@ -800,17 +874,66 @@ DhtPeerConnector::requestConnection(const std::string& peer_id, }); }, - [this, peer_h, connect_cb, accId = acc->getAccountID()](bool found) { + [this, peer_h, connect_cb, onChanneledCancelled, accId = acc->getAccountID()](bool found) { if (!found) { JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h; connect_cb(nullptr); + onChanneledCancelled(); } }); } void -DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) { +DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) +{ pimpl_->cancel(peer_id, tid); + pimpl_->cancelChanneled(tid); +} + +bool +DhtPeerConnector::onIncomingChannelRequest(const DRing::DataTransferId& tid) +{ + std::lock_guard<std::mutex> lk(pimpl_->incomingTransfersMtx_); + if (pimpl_->incomingTransfers_.find(tid) != pimpl_->incomingTransfers_.end()) { + JAMI_INFO("Incoming transfer request with id(%lu) is already treated via DHT", tid); + return false; + } + pimpl_->incomingTransfers_.emplace(tid); + JAMI_INFO("Incoming transfer request with id(%lu)", tid); + return true; +} + + +void +DhtPeerConnector::onIncomingConnection(const std::string& peer_id, const DRing::DataTransferId& tid, const std::shared_ptr<ChannelSocket>& channel) +{ + if (!channel) return; + auto acc = pimpl_->account.lock(); + if (!acc) return; + auto incomingFile = std::make_unique<ChanneledIncomingTransfer>(channel, std::make_shared<FtpServer>(acc->getAccountID(), peer_id, tid)); + { + std::lock_guard<std::mutex> lk(pimpl_->channeledIncomingMtx_); + pimpl_->channeledIncoming_.emplace(tid, std::move(incomingFile)); + } + channel->onShutdown([this, tid]() { + JAMI_INFO("Channel down for incoming transfer with id(%lu)", tid); + dht::ThreadPool::io().run([w=pimpl_->weak(), tid] { + auto shared = w.lock(); + if (!shared) return; + // Cancel incoming files + DRing::DataTransferId internalId = 0; + { + std::lock_guard<std::mutex> lk(shared->channeledIncomingMtx_); + auto it = shared->channeledIncoming_.find(tid); + if (it != shared->channeledIncoming_.end()) + internalId = it->second->id(); + shared->channeledIncoming_.erase(tid); + } + if (internalId != 0) { + Manager::instance().dataTransfers->close(internalId); + } + }); + }); } } // namespace jami diff --git a/src/jamidht/p2p.h b/src/jamidht/p2p.h index c6773713edc62061509ad119cdfe7d2e2eafb8f8..e8445234c79a51fb9fd45118f666c9a59df6c979 100644 --- a/src/jamidht/p2p.h +++ b/src/jamidht/p2p.h @@ -22,6 +22,7 @@ #pragma once #include "dring/datatransfer_interface.h" +#include "channeled_transfers.h" #include <string> #include <memory> @@ -38,9 +39,13 @@ public: ~DhtPeerConnector(); void onDhtConnected(const std::string& device_id); - void requestConnection(const std::string& peer_id, const DRing::DataTransferId& tid, const std::function<void(PeerConnection*)>& connect_cb); + void requestConnection(const std::string& peer_id, const DRing::DataTransferId& tid, + const std::function<void(PeerConnection*)>& connect_cb, + const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, + const std::function<void()>& onChanneledCancelled); void closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid); - + bool onIncomingChannelRequest(const DRing::DataTransferId& tid); + void onIncomingConnection(const std::string& peer_id, const DRing::DataTransferId& tid, const std::shared_ptr<ChannelSocket>& channel); private: DhtPeerConnector() = delete; diff --git a/src/meson.build b/src/meson.build index 1160c01019ba23e0ea602551814fabaf81f2ab55..8bc9d743ed91dc3773dcc0714147dc2f06c5db81 100644 --- a/src/meson.build +++ b/src/meson.build @@ -16,6 +16,7 @@ libjami_sources = files( 'hooks/urlhook.cpp', 'im/instant_messaging.cpp', 'im/message_engine.cpp', + 'jamidht/channeled_transfers.cpp', 'jamidht/eth/libdevcore/Common.cpp', 'jamidht/eth/libdevcore/CommonData.cpp', 'jamidht/eth/libdevcore/FixedHash.cpp', diff --git a/src/peer_connection.h b/src/peer_connection.h index 29cc1833956edb3546e087d3ef13d9cb2ad60386..40b47b5bce1d25177beaf5d7ed4249a566f9b40a 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -73,6 +73,9 @@ public: (void)length; return false; }; + virtual void setOnRecv(std::function<void(std::vector<uint8_t>&&)>&&) { + // Not implemented + } }; //==============================================================================