From 755347dd6be6f020377fe5942587c82439aafa9c Mon Sep 17 00:00:00 2001 From: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> Date: Mon, 20 Nov 2017 14:49:06 -0500 Subject: [PATCH] data transfer: first implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First implementation of Reliable Data Transfer feature in Ring. This implementation is a draft, comes with a Python script tools/dringctrl/sendfile.py to play with and doesn't implement all the API as described into "datatransfer: API proposal" commit. This version uses TLS over TCP-TURN sockets to encrypt data. Transfers require a TURN server with TCP peer connections as described by the RFC-6062. Testing: Currently only sendFile API is implemented and data are saved into a temporary file saved in "/tmp/ring_XXXXXX", where XXXXXX are replace by mkstemp() command. Change-Id: I5b8f48432edd58df5046e368a99f58ea44046dcd Reviewed-by: Olivier Soldano <olivier.soldano@savoirfairelinux.com> --- .../cx.ring.Ring.ConfigurationManager.xml | 40 +- bin/dbus/dbusclient.cpp | 8 + bin/dbus/dbusconfigurationmanager.cpp | 22 + bin/dbus/dbusconfigurationmanager.h | 5 + configure.ac | 2 +- doc/doxygen/core-doc.cfg.in | 2 +- src/Makefile.am | 18 +- src/account.cpp | 2 +- src/account.h | 2 +- src/client/Makefile.am | 1 + src/client/datatransfer.cpp | 80 +++ src/client/ring_signal.cpp | 3 + src/client/ring_signal.h | 1 + src/data_transfer.cpp | 441 +++++++++++++ src/data_transfer.h | 77 +++ src/dring/datatransfer_interface.h | 36 +- src/ftp_server.cpp | 164 +++++ src/ftp_server.h | 62 ++ src/im/message_engine.cpp | 2 +- src/manager.cpp | 3 + src/manager.h | 3 + src/peer_connection.cpp | 558 ++++++++++++++++ src/peer_connection.h | 191 ++++++ src/ringdht/Makefile.am | 4 +- src/ringdht/p2p.cpp | 604 ++++++++++++++++++ src/ringdht/p2p.h | 47 ++ src/ringdht/ringaccount.cpp | 40 +- src/ringdht/ringaccount.h | 33 + src/sip/sipaccount.cpp | 6 +- src/sip/sipaccountbase.cpp | 4 +- tools/dringctrl/controler.py | 12 +- tools/dringctrl/sendfile.py | 59 ++ 32 files changed, 2492 insertions(+), 40 deletions(-) create mode 100644 src/client/datatransfer.cpp create mode 100644 src/data_transfer.cpp create mode 100644 src/data_transfer.h create mode 100644 src/ftp_server.cpp create mode 100644 src/ftp_server.h create mode 100644 src/peer_connection.cpp create mode 100644 src/peer_connection.h create mode 100644 src/ringdht/p2p.cpp create mode 100644 src/ringdht/p2p.h create mode 100644 tools/dringctrl/sendfile.py diff --git a/bin/dbus/cx.ring.Ring.ConfigurationManager.xml b/bin/dbus/cx.ring.Ring.ConfigurationManager.xml index 7d8068a01a..3ac12f7b6d 100644 --- a/bin/dbus/cx.ring.Ring.ConfigurationManager.xml +++ b/bin/dbus/cx.ring.Ring.ConfigurationManager.xml @@ -1395,6 +1395,29 @@ </arg> </method> + <method name="sendFile" tp:name-for-bindings="sendFile"> + <tp:added version="4.2.0"/> + <arg type="t" name="DataTransferId" direction="out"> + </arg> + <arg type="s" name="accountID" direction="in"> + </arg> + <arg type="s" name="peer_uri" direction="in"> + <tp:docstring> + RingID of request's recipient. + </tp:docstring> + </arg> + <arg type="s" name="file_path" direction="in"></arg> + <arg type="s" name="display_name" direction="in"></arg> + </method> + + <method name="dataTransferInfo" tp:name-for-bindings="dataTransferInfo"> + <tp:added version="4.2.0"/> + <arg type="(buttss)" name="DataTransferInfo" direction="out"> + </arg> + <arg type="t" name="DataTransferId" direction="in"> + </arg> + </method> + <signal name="mediaParametersChanged" tp:name-for-bindings="mediaParametersChanged"> <tp:added version="2.3.0"/> <tp:docstring> @@ -1436,6 +1459,21 @@ </arg> </signal> - + <signal name="dataTransferEvent" tp:name-for-bindings="dataTransferEvent"> + <tp:added version="4.2.0"/> + <tp:docstring> + Notify clients when a data transfer state change. + </tp:docstring> + <arg type="t" name="id"> + <tp:docstring> + Data transfer unique id. + </tp:docstring> + </arg> + <arg type="i" name="code"> + <tp:docstring> + A DRing::DataTransferEventCode code + </tp:docstring> + </arg> + </signal> </interface> </node> diff --git a/bin/dbus/dbusclient.cpp b/bin/dbus/dbusclient.cpp index b9dbf27f72..15211e1e6b 100644 --- a/bin/dbus/dbusclient.cpp +++ b/bin/dbus/dbusclient.cpp @@ -42,6 +42,8 @@ #include "dbuspresencemanager.h" #include "presencemanager_interface.h" +#include "datatransfer_interface.h" + #ifdef RING_VIDEO #include "dbusvideomanager.h" #include "videomanager_interface.h" @@ -128,6 +130,7 @@ DBusClient::initLibrary(int flags) using DRing::ConfigurationSignal; using DRing::PresenceSignal; using DRing::AudioSignal; + using DRing::DataTransferSignal; using SharedCallback = std::shared_ptr<DRing::CallbackWrapperBase>; @@ -203,6 +206,10 @@ DBusClient::initLibrary(int flags) exportable_callback<AudioSignal::DeviceEvent>(bind(&DBusConfigurationManager::audioDeviceEvent, confM)), }; + const std::map<std::string, SharedCallback> dataXferEvHandlers = { + exportable_callback<DataTransferSignal::DataTransferEvent>(bind(&DBusConfigurationManager::dataTransferEvent, confM, _1, _2)), + }; + #ifdef RING_VIDEO // Video event handlers const std::map<std::string, SharedCallback> videoEvHandlers = { @@ -219,6 +226,7 @@ DBusClient::initLibrary(int flags) registerConfHandlers(configEvHandlers); registerPresHandlers(presEvHandlers); registerPresHandlers(audioEvHandlers); + registerDataXferHandlers(dataXferEvHandlers); #ifdef RING_VIDEO registerVideoHandlers(videoEvHandlers); #endif diff --git a/bin/dbus/dbusconfigurationmanager.cpp b/bin/dbus/dbusconfigurationmanager.cpp index 2b4c0f4039..2af4275bf5 100644 --- a/bin/dbus/dbusconfigurationmanager.cpp +++ b/bin/dbus/dbusconfigurationmanager.cpp @@ -24,6 +24,7 @@ #include "dbusconfigurationmanager.h" #include "configurationmanager_interface.h" +#include "datatransfer_interface.h" #include "media/audio/audiolayer.h" @@ -612,3 +613,24 @@ DBusConfigurationManager::connectivityChanged() { DRing::connectivityChanged(); } + +auto +DBusConfigurationManager::sendFile(const std::string& account_id, const std::string& peer_uri, + const std::string& file_path, const std::string& display_name) -> decltype(DRing::sendFile(account_id, peer_uri, file_path, display_name)) +{ + return DRing::sendFile(account_id, peer_uri, file_path, display_name); +} + +DBus::Struct<bool, uint32_t, uint64_t, uint64_t, std::string, std::string> +DBusConfigurationManager::dataTransferInfo(const DRing::DataTransferId& id) +{ + DBus::Struct<bool, uint32_t, uint64_t, uint64_t, std::string, std::string> out; + auto info = DRing::dataTransferInfo(id); + out._1 = info.isOutgoing; + out._2 = uint32_t(info.lastEvent); + out._3 = info.totalSize; + out._4 = info.bytesProgress; + out._5 = info.displayName; + out._6 = info.path; + return out; +} diff --git a/bin/dbus/dbusconfigurationmanager.h b/bin/dbus/dbusconfigurationmanager.h index 6c92831c20..b7786b8f29 100644 --- a/bin/dbus/dbusconfigurationmanager.h +++ b/bin/dbus/dbusconfigurationmanager.h @@ -31,6 +31,8 @@ #include "dbus_cpp.h" +#include "dring/datatransfer_interface.h" + #if __GNUC__ >= 5 || (__GNUC__ >=4 && __GNUC_MINOR__ >= 6) /* This warning option only exists for gcc 4.6.0 and greater. */ #pragma GCC diagnostic ignored "-Wunused-but-set-variable" @@ -151,6 +153,9 @@ class DBusConfigurationManager : int exportAccounts(const std::vector<std::string>& accountIDs, const std::string& filepath, const std::string& password); int importAccounts(const std::string& archivePath, const std::string& password); void connectivityChanged(); + DRing::DataTransferId sendFile(const std::string& account_id, const std::string& peer_uri, + const std::string& file_path, const std::string& display_name); + DBus::Struct<bool, uint32_t, uint64_t, uint64_t, std::string, std::string> dataTransferInfo(const DRing::DataTransferId& id); }; #endif // __RING_DBUSCONFIGURATIONMANAGER_H__ diff --git a/configure.ac b/configure.ac index ec8c8024f6..dd752fc6b0 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ dnl Ring - configure.ac for automake 1.9 and autoconf 2.59 dnl Process this file with autoconf to produce a configure script. AC_PREREQ([2.65]) -AC_INIT([Ring Daemon],[4.1.0],[ring@gnu.org],[ring]) +AC_INIT([Ring Daemon],[4.2.0],[ring@gnu.org],[ring]) AC_COPYRIGHT([[Copyright (c) Savoir-faire Linux 2004-2017]]) AC_REVISION([$Revision$]) diff --git a/doc/doxygen/core-doc.cfg.in b/doc/doxygen/core-doc.cfg.in index 31660777df..50a752648f 100644 --- a/doc/doxygen/core-doc.cfg.in +++ b/doc/doxygen/core-doc.cfg.in @@ -31,7 +31,7 @@ PROJECT_NAME = "Ring Daemon" # This could be handy for archiving the generated documentation or # if some version control system is used. -PROJECT_NUMBER = 4.1.0 +PROJECT_NUMBER = 4.2.0 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer diff --git a/src/Makefile.am b/src/Makefile.am index 2bdea570f8..f6a91771c9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -139,12 +139,18 @@ libring_la_SOURCES = \ rational.h \ smartools.cpp \ smartools.h \ - base64.h \ - base64.cpp \ - turn_transport.h \ - turn_transport.cpp \ - channel.h \ - generic_io.h + base64.h \ + base64.cpp \ + turn_transport.h \ + turn_transport.cpp \ + channel.h \ + peer_connection.cpp \ + peer_connection.h \ + data_transfer.cpp \ + data_transfer.h \ + ftp_server.cpp \ + ftp_server.h \ + generic_io.h if HAVE_WIN32 libring_la_SOURCES += \ diff --git a/src/account.cpp b/src/account.cpp index 8ecbc4af18..0cbda061f4 100644 --- a/src/account.cpp +++ b/src/account.cpp @@ -104,7 +104,7 @@ Account::Account(const std::string &accountID) { random_device rdev; std::seed_seq seed {rdev(), rdev()}; - rand_.seed(seed); + rand.seed(seed); // Initialize the codec order, used when creating a new account loadDefaultCodecs(); diff --git a/src/account.h b/src/account.h index 4b100db831..31422570f3 100644 --- a/src/account.h +++ b/src/account.h @@ -296,7 +296,7 @@ class Account : public Serializable, public std::enable_shared_from_this<Account * Random generator engine * Logical account state shall never rely on the state of the random generator. */ - mutable std::mt19937_64 rand_; + mutable std::mt19937_64 rand; /** * Inform the account that the network status has changed. diff --git a/src/client/Makefile.am b/src/client/Makefile.am index fd4e3a32f0..142b843f98 100644 --- a/src/client/Makefile.am +++ b/src/client/Makefile.am @@ -16,6 +16,7 @@ libclient_la_SOURCES = \ ring_signal.cpp \ callmanager.cpp \ configurationmanager.cpp \ + datatransfer.cpp \ $(PRESENCE_SRC) \ $(VIDEO_SRC) diff --git a/src/client/datatransfer.cpp b/src/client/datatransfer.cpp new file mode 100644 index 0000000000..063e8f1fbf --- /dev/null +++ b/src/client/datatransfer.cpp @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "datatransfer_interface.h" + +#include "manager.h" +#include "data_transfer.h" +#include "client/ring_signal.h" + +namespace DRing { + +void +registerDataXferHandlers(const std::map<std::string, std::shared_ptr<CallbackWrapperBase>>& handlers) +{ + auto& handlers_ = ring::getSignalHandlers(); + for (const auto& item : handlers) { + auto iter = handlers_.find(item.first); + if (iter == handlers_.end()) { + RING_ERR("Signal %s not supported", item.first.c_str()); + continue; + } + + iter->second = std::move(item.second); + } +} + +DataTransferId +sendFile(const std::string& account_id, + const std::string& peer_uri, + const std::string& file_path, + const std::string& display_name) +{ + return ring::Manager::instance().dataTransfers->sendFile( + account_id, peer_uri, file_path, display_name.empty() ? file_path : display_name); +} + +void +acceptFileTransfer(const DataTransferId& id, + const std::string& file_path, + std::size_t offset) +{ + ring::Manager::instance().dataTransfers->acceptAsFile(id, file_path, offset); +} + +void +cancelDataTransfer(const DataTransferId& id) +{ + ring::Manager::instance().dataTransfers->cancel(id); +} + +std::streamsize +dataTransferBytesSent(const DataTransferId& id) +{ + return ring::Manager::instance().dataTransfers->bytesSent(id); +} + +DataTransferInfo +dataTransferInfo(const DataTransferId& id) +{ + return ring::Manager::instance().dataTransfers->info(id); +} + +} // namespace DRing diff --git a/src/client/ring_signal.cpp b/src/client/ring_signal.cpp index f3234a9daa..d0de8ff2a8 100644 --- a/src/client/ring_signal.cpp +++ b/src/client/ring_signal.cpp @@ -93,6 +93,9 @@ getSignalHandlers() /* Audio */ exported_callback<DRing::AudioSignal::DeviceEvent>(), + /* DataTransfer */ + exported_callback<DRing::DataTransferSignal::DataTransferEvent>(), + #ifdef RING_VIDEO /* Video */ exported_callback<DRing::VideoSignal::DeviceEvent>(), diff --git a/src/client/ring_signal.h b/src/client/ring_signal.h index 13c7cc16be..d87679eb1c 100644 --- a/src/client/ring_signal.h +++ b/src/client/ring_signal.h @@ -27,6 +27,7 @@ #include "callmanager_interface.h" #include "configurationmanager_interface.h" #include "presencemanager_interface.h" +#include "datatransfer_interface.h" #ifdef RING_VIDEO #include "videomanager_interface.h" diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp new file mode 100644 index 0000000000..e7937242c4 --- /dev/null +++ b/src/data_transfer.cpp @@ -0,0 +1,441 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "data_transfer.h" + +#include "manager.h" +#include "ringdht/ringaccount.h" +#include "peer_connection.h" +#include "fileutils.h" +#include "string_utils.h" +#include "client/ring_signal.h" + +#include <stdexcept> +#include <fstream> +#include <sstream> +#include <ios> +#include <iostream> +#include <unordered_map> +#include <mutex> +#include <future> +#include <atomic> +#include <cstdlib> // mkstemp + +namespace ring { + +static DRing::DataTransferId +generateUID() +{ + static DRing::DataTransferId lastId = 0; + return lastId++; +} + +//============================================================================== + +class DataTransfer : public Stream +{ +public: + DataTransfer(DRing::DataTransferId id) : Stream(), id {id} {} + + virtual ~DataTransfer() = default; + + DRing::DataTransferId getId() const override { + return id; + } + + virtual void accept(const std::string&, std::size_t) {}; + + virtual bool start() { + bool expected = false; + return started_.compare_exchange_strong(expected, true); + } + + virtual std::streamsize bytesSent() const { + std::lock_guard<std::mutex> lk {infoMutex_}; + return info_.bytesProgress; + } + + DRing::DataTransferInfo info() const { + std::lock_guard<std::mutex> lk {infoMutex_}; + return info_; + } + + void emit(DRing::DataTransferEventCode code) const; + + const DRing::DataTransferId id; + +protected: + mutable std::mutex infoMutex_; + mutable DRing::DataTransferInfo info_; + std::atomic_bool started_ {false}; +}; + +void +DataTransfer::emit(DRing::DataTransferEventCode code) const +{ + { + std::lock_guard<std::mutex> lk {infoMutex_}; + info_.lastEvent = code; + } + emitSignal<DRing::DataTransferSignal::DataTransferEvent>(id, uint32_t(code)); +} + +//============================================================================== + +class FileTransfer final : public DataTransfer +{ +public: + FileTransfer(DRing::DataTransferId id, const std::string&, const std::string&); + + bool start() override; + + void close() noexcept override; + + bool read(std::vector<uint8_t>&) const override; + +private: + FileTransfer() = delete; + + mutable std::ifstream input_; + mutable std::size_t tx_ {0}; + mutable bool headerSent_ {false}; + const std::string peerUri_; +}; + +FileTransfer::FileTransfer(DRing::DataTransferId id, + const std::string& file_path, + const std::string& display_name) + : DataTransfer(id) +{ + input_.open(file_path, std::ios::binary); + if (!input_) + throw std::runtime_error("input file open failed"); + + info_.isOutgoing = true; + info_.displayName = display_name; + info_.path = file_path; + + // File size? + input_.seekg(0, std::ios_base::end); + info_.totalSize = input_.tellg(); + input_.seekg(0, std::ios_base::beg); + + emit(DRing::DataTransferEventCode::created); +} + +bool +FileTransfer::start() +{ + if (DataTransfer::start()) { + emit(DRing::DataTransferEventCode::ongoing); + return true; + } + return false; +} + +void +FileTransfer::close() noexcept +{ + input_.close(); + if (info_.lastEvent < DRing::DataTransferEventCode::finished) + emit(DRing::DataTransferEventCode::closed_by_host); +} + +bool +FileTransfer::read(std::vector<uint8_t>& buf) const +{ + if (!headerSent_) { + std::stringstream ss; + ss << "Content-Length: " << info_.totalSize << '\n' + << "Display-Name: " << info_.displayName << '\n' + << "Offset: 0\n" + << '\n'; + + auto header = ss.str(); + buf.resize(header.size()); + std::copy(std::begin(header), std::end(header), std::begin(buf)); + + headerSent_ = true; + return true; + } + + input_.read(reinterpret_cast<char*>(&buf[0]), buf.size()); + auto n = input_.gcount(); + buf.resize(n); + { + std::lock_guard<std::mutex> lk {infoMutex_}; + info_.bytesProgress += n; + } + + if (n) + return true; + + if (input_.eof()) { + RING_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes"; + emit(DRing::DataTransferEventCode::finished); + return false; + } else { + throw std::runtime_error("FileTransfer IO read failed"); // TODO: better exception? + } + + return true; +} + +//============================================================================== + +class IncomingFileTransfer final : public DataTransfer +{ +public: + IncomingFileTransfer(DRing::DataTransferId id, const std::string&, std::size_t); + + bool start() override; + + void close() noexcept override; + + std::string requestFilename(); + + void accept(const std::string&, std::size_t offset) override; + +private: + IncomingFileTransfer() = delete; + + std::promise<void> filenamePromise_; +}; + +IncomingFileTransfer::IncomingFileTransfer(DRing::DataTransferId id, + const std::string& display_name, + std::size_t offset) + : DataTransfer(id) +{ + RING_WARN() << "[FTP] incoming transfert: " << display_name; + (void)offset; + + info_.isOutgoing = false; + info_.displayName = display_name; + // TODO: use offset? + + emit(DRing::DataTransferEventCode::created); +} + +std::string +IncomingFileTransfer::requestFilename() +{ + emit(DRing::DataTransferEventCode::wait_host_acceptance); + // Now wait for DataTransferFacade::acceptFileTransfer() call + +#if 0 + filenamePromise_.get_future().wait(); + return info_.path; +#else + // DEBUG + char filename[] = "/tmp/ring_XXXXXX"; + if (::mkstemp(filename) < 0) + throw std::system_error(errno, std::generic_category()); + return filename; +#endif +} + +bool +IncomingFileTransfer::start() +{ + if (DataTransfer::start()) { + emit(DRing::DataTransferEventCode::ongoing); + return true; + } + return false; +} + +void +IncomingFileTransfer::close() noexcept +{ + filenamePromise_.set_value(); +} + +void +IncomingFileTransfer::accept(const std::string& filename, std::size_t offset) +{ + // TODO: offset? + (void)offset; + + info_.path = filename; + filenamePromise_.set_value(); + start(); +} + +//============================================================================== + +class DataTransferFacade::Impl +{ +public: + mutable std::mutex mapMutex_; + std::unordered_map<DRing::DataTransferId, std::shared_ptr<DataTransfer>> map_; + + std::shared_ptr<DataTransfer> createFileTransfer(const std::string& file_path, + const std::string& display_name); + std::shared_ptr<IncomingFileTransfer> createIncomingFileTransfer(const std::string& display_name, + std::size_t offset); + + std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId& id); + + void cancel(DataTransfer& transfer); + + void onConnectionRequestReply(const DRing::DataTransferId& id, PeerConnection* connection); +}; + +void DataTransferFacade::Impl::cancel(DataTransfer& transfer) +{ + transfer.close(); + map_.erase(transfer.getId()); +} + +std::shared_ptr<DataTransfer> +DataTransferFacade::Impl::getTransfer(const DRing::DataTransferId& id) +{ + std::lock_guard<std::mutex> lk {mapMutex_}; + const auto& iter = map_.find(id); + if (iter == std::end(map_)) + return {}; + return iter->second; +} + +std::shared_ptr<DataTransfer> +DataTransferFacade::Impl::createFileTransfer(const std::string& file_path, + const std::string& display_name) +{ + std::lock_guard<std::mutex> lk {mapMutex_}; + auto id = generateUID(); + auto transfer = std::make_shared<FileTransfer>(id, file_path, display_name); + map_.emplace(id, transfer); + return transfer; +} + +std::shared_ptr<IncomingFileTransfer> +DataTransferFacade::Impl::createIncomingFileTransfer(const std::string& display_name, + std::size_t offset) +{ + std::lock_guard<std::mutex> lk {mapMutex_}; + auto id = generateUID(); + auto transfer = std::make_shared<IncomingFileTransfer>(id, display_name, offset); + map_.emplace(id, transfer); + return transfer; +} + +void +DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId& id, + PeerConnection* connection) +{ + if (auto transfer = getTransfer(id)) { + if (connection) { + if (transfer->start()) { + connection->attachInputStream(transfer); + } + } else { + transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); + cancel(*transfer); + } + } +} + +//============================================================================== + +DataTransferFacade::DataTransferFacade() : pimpl_ {std::make_unique<Impl>()} +{ + RING_WARN("facade created, pimpl @%p", pimpl_.get()); +} + +DataTransferFacade::~DataTransferFacade() +{ + RING_WARN("facade destroy, pimpl @%p", pimpl_.get()); +}; + +DRing::DataTransferId +DataTransferFacade::sendFile(const std::string& account_id, const std::string& peer_uri, + const std::string& file_path, const std::string& display_name) +{ + auto account = Manager::instance().getAccount<RingAccount>(account_id); + if (!account) + throw std::invalid_argument("unknown account id"); + + if (!fileutils::isFile(file_path)) + throw std::invalid_argument("invalid input file"); + + auto transfer = pimpl_->createFileTransfer(file_path, display_name); + auto id = transfer->getId(); + // IMPLEMENTATION NOTE: requestPeerConnection() may call the given callback a multiple time. + // This happen when multiple agents handle communications of the given peer for the given account. + // Example: Ring account supports multi-devices, each can answer to the request. + account->requestPeerConnection( + peer_uri, + [this, id] (PeerConnection* connection) { + pimpl_->onConnectionRequestReply(id, connection); + }); + + return id; +} + +void +DataTransferFacade::acceptAsFile(const DRing::DataTransferId& id, + const std::string& file_path, + std::size_t offset) +{ + std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; + const auto& iter = pimpl_->map_.find(id); + if (iter == std::end(pimpl_->map_)) + throw std::invalid_argument("not existing DataTransferId"); + + iter->second->accept(file_path, offset); +} + +void +DataTransferFacade::cancel(const DRing::DataTransferId& id) +{ + if (auto transfer = pimpl_->getTransfer(id)) + pimpl_->cancel(*transfer); + else + throw std::invalid_argument("not existing DataTransferId"); +} + +std::streamsize +DataTransferFacade::bytesSent(const DRing::DataTransferId& id) const +{ + if (auto transfer = pimpl_->getTransfer(id)) + return transfer->bytesSent(); + throw std::invalid_argument("not existing DataTransferId"); +} + +DRing::DataTransferInfo +DataTransferFacade::info(const DRing::DataTransferId& id) const +{ + if (auto transfer = pimpl_->getTransfer(id)) + return transfer->info(); + throw std::invalid_argument("not existing DataTransferId"); +} + +std::string +DataTransferFacade::onIncomingFileRequest(const std::string& display_name, std::size_t offset) +{ + auto transfer = pimpl_->createIncomingFileTransfer(display_name, offset); + auto filename = transfer->requestFilename(); + if (!filename.empty()) + transfer->start(); // TODO: bad place, call only if file can be open + return filename; +} + +} // namespace ring diff --git a/src/data_transfer.h b/src/data_transfer.h new file mode 100644 index 0000000000..5ddc03c687 --- /dev/null +++ b/src/data_transfer.h @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "dring/datatransfer_interface.h" + +#include <memory> +#include <string> + +namespace ring { + +/// Front-end to data transfer service +class DataTransferFacade +{ +public: + DataTransferFacade(); + ~DataTransferFacade(); + + /// Send a file to a peer. + /// Open a file and send its contents over a reliable connection + /// to given peer using the protocol from given account. + /// This method fails immediately if the file cannot be open in binary read mode, + /// if the account doesn't exist or if it doesn't support data transfer. + /// Remaining actions are operated asynchronously, so events are given by signals. + /// \return a unique data transfer identifier. + /// \except std::invalid_argument account doesn't exist or don't support data transfer. + /// \except std::ios_base::failure in case of open file errors. + DRing::DataTransferId sendFile(const std::string& account_id, + const std::string& peer_uri, + const std::string& file_path, + const std::string& display_name); + + /// Accept an incoming transfer and send data into given file. + void acceptAsFile(const DRing::DataTransferId& id, + const std::string& file_path, + std::size_t offset); + + /// Abort a transfer. + /// The transfer id is abort and removed. The id is not longer valid after the call. + void cancel(const DRing::DataTransferId& id); + + /// \return a copy of all information about a data transfer + DRing::DataTransferInfo info(const DRing::DataTransferId& id) const; + + /// \return number of bytes sent by a data transfer + /// \note this method is fatest than info() + std::streamsize bytesSent(const DRing::DataTransferId& id) const; + + /// Create an IncomingFileTransfer object. + /// \return a filename to open where incoming data will be written or an empty string + /// in case of refusal. + std::string onIncomingFileRequest(const std::string& display_name, std::size_t offset); + +private: + class Impl; + std::unique_ptr<Impl> pimpl_; +}; + +} // namespace ring diff --git a/src/dring/datatransfer_interface.h b/src/dring/datatransfer_interface.h index 2fcdafb132..e9aac344f9 100644 --- a/src/dring/datatransfer_interface.h +++ b/src/dring/datatransfer_interface.h @@ -33,25 +33,27 @@ namespace DRing { using DataTransferId = uint64_t; -enum class DataTransferEventCode : uint32_t { - CREATED, - UNSUPPORTED, - WAIT_PEER_ACCEPTANCE, - WAIT_HOST_ACCEPTANCE, - ONGOING, - FINISHED, - CLOSED_BY_HOST, - CLOSED_BY_PEER, - INVALID_PATHNAME, - UNJOINABLE_PEER, +enum class DataTransferEventCode : uint32_t +{ + created, + unsupported, + wait_peer_acceptance, + wait_host_acceptance, + ongoing, + finished, + closed_by_host, + closed_by_peer, + invalid_pathname, + unjoinable_peer, }; -struct DataTransferInfo { +struct DataTransferInfo +{ bool isOutgoing; ///< Outgoing or Incoming? - DataTransferEventCode lastEvent {DataTransferEventCode::CREATED}; ///< Latest event code sent to the user - std::string displayName; ///< Human oriented transfer name + DataTransferEventCode lastEvent { DataTransferEventCode::created }; ///< Latest event code sent to the user std::size_t totalSize {0} ; ///< Total number of bytes to sent/receive, 0 if not known std::streamsize bytesProgress {0}; ///< Number of bytes sent/received + std::string displayName; ///< Human oriented transfer name std::string path; ///< associated local file path if supported (empty, if not) }; @@ -125,8 +127,10 @@ std::streamsize dataTransferBytesSent(const DataTransferId& id); void registerDataXferHandlers(const std::map<std::string, std::shared_ptr<CallbackWrapperBase>>&); // Signals -struct DataTransferSignal { - struct DataTransferEvent { +struct DataTransferSignal +{ + struct DataTransferEvent + { constexpr static const char* name = "DataTransferEvent"; using cb_type = void(const DataTransferId& transferId, int eventCode); }; diff --git a/src/ftp_server.cpp b/src/ftp_server.cpp new file mode 100644 index 0000000000..7f7fd3da07 --- /dev/null +++ b/src/ftp_server.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "ftp_server.h" + +#include "logger.h" +#include "string_utils.h" +#include "data_transfer.h" +#include "manager.h" + +#include <algorithm> +#include <array> +#include <stdexcept> +#include <iterator> +#include <cstdlib> // strtoull + +namespace ring { + +//============================================================================== + +FtpServer::FtpServer() + : Stream() +{} + +DRing::DataTransferId +FtpServer::getId() const +{ + return 0; +} + +void +FtpServer::close() noexcept +{ + out_.close(); + RING_WARN() << "[FTP] server closed"; +} + +bool +FtpServer::startNewFile() +{ + // Request filename from client (WARNING: synchrone call!) + auto filename = Manager::instance().dataTransfers->onIncomingFileRequest(displayName_, 0 /* TODO: offset */); + if (filename.empty()) + return false; + + out_.open(&filename[0], std::ios::binary); + if (!out_) + throw std::system_error(errno, std::generic_category()); + RING_WARN() << "[FTP] Receiving file " << filename; + return true; +} + +void +FtpServer::closeCurrentFile() +{ + out_.close(); + RING_WARN() << "[FTP] File received, " << rx_ << " byte(s)"; + rx_ = fileSize_ = 0; +} + +bool +FtpServer::write(const std::vector<uint8_t>& buffer) +{ + switch (state_) { + case FtpState::PARSE_HEADERS: + if (parseStream(buffer)) { + if (!startNewFile()) { + headerStream_.clear(); + headerStream_.str({}); // reset + return true; + } + state_ = FtpState::READ_DATA; + while (headerStream_) { + headerStream_.read(&line_[0], line_.size()); + out_.write(&line_[0], headerStream_.gcount()); + rx_ += headerStream_.gcount(); + if (rx_ >= fileSize_) { + closeCurrentFile(); + state_ = FtpState::PARSE_HEADERS; + } + } + headerStream_.clear(); + headerStream_.str({}); // reset + } + break; + + case FtpState::READ_DATA: + out_.write(reinterpret_cast<const char*>(&buffer[0]), buffer.size()); + rx_ += buffer.size(); + if (rx_ >= fileSize_) { + closeCurrentFile(); + state_ = FtpState::PARSE_HEADERS; + } + break; + + default: break; + } + + return true; // server always alive +} + +bool +FtpServer::parseStream(const std::vector<uint8_t>& buffer) +{ + headerStream_ << std::string(std::begin(buffer), std::end(buffer)); + + // Simple line stream parser + while (headerStream_.getline(&line_[0], line_.size())) { + if (parseLine(std::string(&line_[0], headerStream_.gcount()-1))) + return true; // headers EOF, data may remain in headerStream_ + } + + if (headerStream_.fail()) + throw std::runtime_error("[FTP] header parsing error"); + + headerStream_.clear(); + return false; // need more data +} + +bool +FtpServer::parseLine(const std::string& line) +{ + if (line.empty()) + return true; + + // Valid line found, parse it as "key: value" and store until end of headers detection + const auto& sep_pos = line.find(':'); + if (sep_pos == std::string::npos) + throw std::runtime_error("[FTP] stream protocol error: bad format"); + + handleHeader(trim(line.substr(0, sep_pos)), trim(line.substr(sep_pos+1))); + return false; +} + +void +FtpServer::handleHeader(const std::string& key, const std::string& value) +{ + RING_DBG() << "[FTP] header: '" << key << "' = '"<< value << "'"; + + if (key == "Content-Length") { + fileSize_ = std::strtoull(&value[0], nullptr, 10); + } else if (key == "Display-Name") { + displayName_ = value; + } +} + +} // namespace ring diff --git a/src/ftp_server.h b/src/ftp_server.h new file mode 100644 index 0000000000..d2d9c1dcad --- /dev/null +++ b/src/ftp_server.h @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "peer_connection.h" + +#include <vector> +#include <array> +#include <fstream> +#include <sstream> + +namespace ring { + +class FtpServer final : public Stream +{ +public: + FtpServer(); + + bool write(const std::vector<uint8_t>& buffer) override; + DRing::DataTransferId getId() const override; + void close() noexcept override; + +private: + bool parseStream(const std::vector<uint8_t>&); + bool parseLine(const std::string&); + void handleHeader(const std::string&, const std::string&); + bool startNewFile(); + void closeCurrentFile(); + + enum class FtpState { + PARSE_HEADERS, + READ_DATA, + }; + + std::ofstream out_; + std::size_t fileSize_ {0}; + std::size_t rx_ {0}; + std::stringstream headerStream_; + std::string displayName_; + std::array<char, 1000> line_; + FtpState state_ {FtpState::PARSE_HEADERS}; +}; + +} // namespace ring diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index b72efcac6f..6c33d62407 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -46,7 +46,7 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st { std::lock_guard<std::mutex> lock(messagesMutex_); do { - token = udist(account_.rand_); + token = udist(account_.rand); } while (messages_.find(token) != messages_.end()); auto m = messages_.emplace(token, Message{}); m.first->second.to = to; diff --git a/src/manager.cpp b/src/manager.cpp index 33a131f985..78062d1acf 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -79,6 +79,8 @@ using random_device = dht::crypto::random_device; #include "video/sinkclient.h" #include "audio/tonecontrol.h" +#include "data_transfer.h" + #include <cerrno> #include <ctime> #include <cstdlib> @@ -662,6 +664,7 @@ Manager::Manager() #endif , callFactory() , accountFactory() + , dataTransfers(std::make_unique<DataTransferFacade>()) , pimpl_ (new ManagerPimpl(*this)) {} diff --git a/src/manager.h b/src/manager.h index 2aa19f6245..818c58495f 100644 --- a/src/manager.h +++ b/src/manager.h @@ -52,6 +52,7 @@ class VideoManager; class Conference; class AudioLoop; class IceTransportFactory; +class DataTransferFacade; /** Manager (controller) of Ring daemon */ class Manager { @@ -890,6 +891,8 @@ class Manager { std::atomic<unsigned> dhtLogLevel {0}; // default = disable AccountFactory accountFactory; + std::unique_ptr<DataTransferFacade> dataTransfers; + private: Manager(); ~Manager(); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp new file mode 100644 index 0000000000..2996603573 --- /dev/null +++ b/src/peer_connection.cpp @@ -0,0 +1,558 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "peer_connection.h" + +#include "data_transfer.h" +#include "account.h" +#include "string_utils.h" +#include "channel.h" +#include "turn_transport.h" +#include "security/tls_session.h" + +#include <algorithm> +#include <future> +#include <map> +#include <atomic> +#include <stdexcept> +#include <istream> +#include <ostream> +#include <unistd.h> +#include <cstdio> + +#ifdef _WIN32 +#include <winsock2.h> +#include <ws2tcpip.h> +#else +#include <sys/select.h> +#endif + +#include <sys/time.h> + +namespace ring { + +using lock = std::lock_guard<std::mutex>; + +static constexpr std::size_t IO_BUFFER_SIZE {3000}; ///< Size of char buffer used by IO operations + +//============================================================================== + +class TlsTurnEndpoint::Impl +{ +public: + static constexpr auto TLS_TIMEOUT = std::chrono::seconds(20); + + Impl(ConnectedTurnTransport& tr, + dht::crypto::TrustList& trust_list) : turn {tr}, trustList {trust_list} {} + + ~Impl(); + + // TLS callbacks + int verifyCertificate(gnutls_session_t); + void onTlsStateChange(tls::TlsSessionState); + void onTlsRxData(std::vector<uint8_t>&&); + void onTlsCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int); + + std::unique_ptr<tls::TlsSession> tls; + ConnectedTurnTransport& turn; + dht::crypto::TrustList& trustList; + dht::crypto::Certificate peerCertificate; + std::promise<bool> connected; +}; + +TlsTurnEndpoint::Impl::~Impl() +{ + if (peerCertificate) + trustList.remove(peerCertificate); +} + +int +TlsTurnEndpoint::Impl::verifyCertificate(gnutls_session_t session) +{ + // Support only x509 format + if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // Store verification status + unsigned int status = 0; + auto ret = gnutls_certificate_verify_peers2(session, &status); + if (ret < 0 or (status & GNUTLS_CERT_SIGNATURE_FAILURE) != 0) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + unsigned int cert_list_size = 0; + auto cert_list = gnutls_certificate_get_peers(session, &cert_list_size); + if (cert_list == nullptr) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // Check if peer certificate is inside our list of permited certificate + std::vector<std::pair<uint8_t*, uint8_t*>> crt_data; + crt_data.reserve(cert_list_size); + for (unsigned i=0; i<cert_list_size; i++) + crt_data.emplace_back(cert_list[i].data, cert_list[i].data + cert_list[i].size); + auto crt = dht::crypto::Certificate {crt_data}; + auto verify_result = trustList.verify(crt); + if (!verify_result) { + RING_ERR() << "[TLS-TURN] Peer certificate verification failed: " << verify_result; + return verify_result.result; + } + + // Store valid peer certificate for trust list removal during dtor + peerCertificate = std::move(crt); + + // notify GnuTLS to continue handshake normally + return GNUTLS_E_SUCCESS; +} + +void +TlsTurnEndpoint::Impl::onTlsStateChange(tls::TlsSessionState state) +{ + if (state == tls::TlsSessionState::ESTABLISHED) { + connected.set_value(true); + } +} + +void +TlsTurnEndpoint::Impl::onTlsRxData(UNUSED std::vector<uint8_t>&& buf) +{ + RING_ERR() << "[TLS-TURN] rx " << buf.size() << " (but not implemented)"; +} + +void +TlsTurnEndpoint::Impl::onTlsCertificatesUpdate(UNUSED const gnutls_datum_t* local_raw, + UNUSED const gnutls_datum_t* remote_raw, + UNUSED unsigned int remote_count) +{} + +TlsTurnEndpoint::TlsTurnEndpoint(ConnectedTurnTransport& turn_ep, + const Identity& local_identity, + const std::shared_future<tls::DhParams>& dh_params, + dht::crypto::TrustList& trust_list) + : pimpl_ { std::make_unique<Impl>(turn_ep, trust_list) } +{ + // Add TLS over TURN + tls::TlsSession::TlsSessionCallbacks tls_cbs = { + /*.onStateChange = */[this](tls::TlsSessionState state){ pimpl_->onTlsStateChange(state); }, + /*.onRxData = */[this](std::vector<uint8_t>&& buf){ pimpl_->onTlsRxData(std::move(buf)); }, + /*.onCertificatesUpdate = */[this](const gnutls_datum_t* l, const gnutls_datum_t* r, + unsigned int n){ pimpl_->onTlsCertificatesUpdate(l, r, n); }, + /*.verifyCertificate = */[this](gnutls_session_t session){ return pimpl_->verifyCertificate(session); } + }; + tls::TlsParams tls_param = { + /*.ca_list = */ "", + /*.peer_ca = */ nullptr, + /*.cert = */ local_identity.second, + /*.cert_key = */ local_identity.first, + /*.dh_params = */ dh_params, + /*.timeout = */ Impl::TLS_TIMEOUT, + /*.cert_check = */ nullptr, + }; + pimpl_->tls = std::make_unique<tls::TlsSession>(turn_ep, tls_param, tls_cbs); +} + +TlsTurnEndpoint::~TlsTurnEndpoint() = default; + +bool +TlsTurnEndpoint::isInitiator() const +{ + return pimpl_->tls->isInitiator(); +} + +void +TlsTurnEndpoint::connect() +{ + pimpl_->tls->connect(); +} + +int +TlsTurnEndpoint::maxPayload() const +{ + return pimpl_->tls->maxPayload(); +} + +std::size_t +TlsTurnEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec) +{ + return pimpl_->tls->read(buf, len, ec); +} + +std::size_t +TlsTurnEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec) +{ + return pimpl_->tls->write(buf, len, ec); +} + +//============================================================================== + +TcpSocketEndpoint::TcpSocketEndpoint(const IpAddr& addr) + : addr_ {addr} + , sock_ {::socket(addr.getFamily(), SOCK_STREAM, 0)} +{ + if (sock_ < 0) + std::system_error(errno, std::generic_category()); + auto bound = ip_utils::getAnyHostAddr(addr.getFamily()); + if (::bind(sock_, bound, bound.getLength()) < 0) + std::system_error(errno, std::generic_category()); +} + +TcpSocketEndpoint::~TcpSocketEndpoint() +{ + ::close(sock_); +} + +void +TcpSocketEndpoint::connect() +{ + // Blocking method + if (::connect(sock_, addr_, addr_.getLength()) < 0) + throw std::system_error(errno, std::generic_category()); +} + +bool +TcpSocketEndpoint::waitForData(unsigned ms_timeout) const +{ + struct timeval tv; + tv.tv_sec = ms_timeout / 1000; + tv.tv_usec = (ms_timeout % 1000) * 1000; + + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(sock_, &read_fds); + + while (::select(sock_ + 1, &read_fds, nullptr, nullptr, &tv) >= 0) { + if (FD_ISSET(sock_, &read_fds)) + return true; + } + + return false; +} + +std::size_t +TcpSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec) +{ + // NOTE: recv buf args is a void* on POSIX compliant system, but it's a char* on mingw + auto res = ::recv(sock_, reinterpret_cast<char*>(buf), len, 0); + ec.assign(errno, std::generic_category()); + return (res >= 0) ? res : 0; +} + +std::size_t +TcpSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec) +{ + // NOTE: recv buf args is a void* on POSIX compliant system, but it's a char* on mingw + auto res = ::send(sock_, reinterpret_cast<const char*>(buf), len, 0); + ec.assign(errno, std::generic_category()); + return (res >= 0) ? res : 0; +} + +//============================================================================== + +class TlsSocketEndpoint::Impl +{ +public: + static constexpr auto TLS_TIMEOUT = std::chrono::seconds(20); + + Impl(TcpSocketEndpoint& ep, const dht::crypto::Certificate& peer_cert) + : tr {ep}, peerCertificate {peer_cert} {} + + // TLS callbacks + int verifyCertificate(gnutls_session_t); + void onTlsStateChange(tls::TlsSessionState); + void onTlsRxData(std::vector<uint8_t>&&); + void onTlsCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int); + + std::unique_ptr<tls::TlsSession> tls; + TcpSocketEndpoint& tr; + const dht::crypto::Certificate& peerCertificate; +}; + +int +TlsSocketEndpoint::Impl::verifyCertificate(gnutls_session_t session) +{ + // Support only x509 format + if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // Store verification status + unsigned int status = 0; + auto ret = gnutls_certificate_verify_peers2(session, &status); + if (ret < 0 or (status & GNUTLS_CERT_SIGNATURE_FAILURE) != 0) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + unsigned int cert_list_size = 0; + auto cert_list = gnutls_certificate_get_peers(session, &cert_list_size); + if (cert_list == nullptr) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // Check if peer certificate is equal to the expected one + std::vector<std::pair<uint8_t*, uint8_t*>> crt_data; + crt_data.reserve(cert_list_size); + for (unsigned i=0; i<cert_list_size; i++) + crt_data.emplace_back(cert_list[i].data, cert_list[i].data + cert_list[i].size); + auto crt = dht::crypto::Certificate {crt_data}; + if (crt.getId() != peerCertificate.getId()) { + RING_ERR() << "[TLS-SOCKET] Unexpected peer certificate"; + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // notify GnuTLS to continue handshake normally + return GNUTLS_E_SUCCESS; +} + +void +TlsSocketEndpoint::Impl::onTlsStateChange(UNUSED tls::TlsSessionState state) +{} + +void +TlsSocketEndpoint::Impl::onTlsRxData(UNUSED std::vector<uint8_t>&& buf) +{} + +void +TlsSocketEndpoint::Impl::onTlsCertificatesUpdate(UNUSED const gnutls_datum_t* local_raw, + UNUSED const gnutls_datum_t* remote_raw, + UNUSED unsigned int remote_count) +{} + +TlsSocketEndpoint::TlsSocketEndpoint(TcpSocketEndpoint& tr, + const Identity& local_identity, + const std::shared_future<tls::DhParams>& dh_params, + const dht::crypto::Certificate& peer_cert) + : pimpl_ { std::make_unique<Impl>(tr, peer_cert) } +{ + // Add TLS over TURN + tls::TlsSession::TlsSessionCallbacks tls_cbs = { + /*.onStateChange = */[this](tls::TlsSessionState state){ pimpl_->onTlsStateChange(state); }, + /*.onRxData = */[this](std::vector<uint8_t>&& buf){ pimpl_->onTlsRxData(std::move(buf)); }, + /*.onCertificatesUpdate = */[this](const gnutls_datum_t* l, const gnutls_datum_t* r, + unsigned int n){ pimpl_->onTlsCertificatesUpdate(l, r, n); }, + /*.verifyCertificate = */[this](gnutls_session_t session){ return pimpl_->verifyCertificate(session); } + }; + tls::TlsParams tls_param = { + /*.ca_list = */ "", + /*.peer_ca = */ nullptr, + /*.cert = */ local_identity.second, + /*.cert_key = */ local_identity.first, + /*.dh_params = */ dh_params, + /*.timeout = */ Impl::TLS_TIMEOUT, + /*.cert_check = */ nullptr, + }; + pimpl_->tls = std::make_unique<tls::TlsSession>(tr, tls_param, tls_cbs); +} + +TlsSocketEndpoint::~TlsSocketEndpoint() = default; + +std::size_t +TlsSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec) +{ + return pimpl_->tls->read(buf, len, ec); +} + +std::size_t +TlsSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code& ec) +{ + return pimpl_->tls->write(buf, len, ec); +} + +void +TlsSocketEndpoint::connect() +{ + pimpl_->tls->connect(); +} + +//============================================================================== + +// following namespace prevents an ODR violation with definitions in p2p.cpp +namespace +{ + +enum class CtrlMsgType +{ + STOP, + ATTACH_INPUT, + ATTACH_OUTPUT, +}; + +struct CtrlMsg +{ + virtual CtrlMsgType type() const = 0; + virtual ~CtrlMsg() = default; +}; + +struct StopCtrlMsg final : CtrlMsg +{ + explicit StopCtrlMsg() {} + CtrlMsgType type() const override { return CtrlMsgType::STOP; } +}; + +struct AttachInputCtrlMsg final : CtrlMsg +{ + explicit AttachInputCtrlMsg(const std::shared_ptr<Stream>& stream) + : stream {stream} {} + CtrlMsgType type() const override { return CtrlMsgType::ATTACH_INPUT; } + const std::shared_ptr<Stream> stream; +}; + +struct AttachOutputCtrlMsg final : CtrlMsg +{ + explicit AttachOutputCtrlMsg(const std::shared_ptr<Stream>& stream) + : stream {stream} {} + CtrlMsgType type() const override { return CtrlMsgType::ATTACH_OUTPUT; } + const std::shared_ptr<Stream> stream; +}; + +} // namespace <anonymous> + +//============================================================================== + +class PeerConnection::PeerConnectionImpl +{ +public: + PeerConnectionImpl(Account& account, const std::string& peer_uri, + std::unique_ptr<SocketType> endpoint) + : account {account} + , peer_uri {peer_uri} + , endpoint_ {std::move(endpoint)} + , eventLoopFut_ {std::async(std::launch::async, [this]{ eventLoop();})} {} + + ~PeerConnectionImpl() { + ctrlChannel << std::make_unique<StopCtrlMsg>(); + } + + const Account& account; + const std::string peer_uri; + Channel<std::unique_ptr<CtrlMsg>> ctrlChannel; + +private: + std::unique_ptr<SocketType> endpoint_; + std::map<DRing::DataTransferId, std::shared_ptr<Stream>> inputs_; + std::map<DRing::DataTransferId, std::shared_ptr<Stream>> outputs_; + std::future<void> eventLoopFut_; + + void eventLoop(); + + template <typename L, typename C> + void handle_stream_list(L& stream_list, const C& callable) { + if (stream_list.empty()) + return; + const auto& item = std::begin(stream_list); + auto& stream = item->second; + try { + if (callable(stream)) + return; + RING_DBG() << "EOF on stream #" << stream->getId(); + } catch (const std::system_error& e) { + RING_WARN() << "Stream #" << stream->getId() + << " IO failed with code = " << e.code(); + } catch (const std::exception& e) { + RING_ERR() << "Unexpected exception during IO with stream #" + << stream->getId() + << ": " << e.what(); + } + stream->close(); + stream_list.erase(item); + } +}; + +void +PeerConnection::PeerConnectionImpl::eventLoop() +{ + RING_DBG() << "[CNX] Peer connection to " << peer_uri << " ready"; + while (true) { + // Process ctrl orders first + while (true) { + std::unique_ptr<CtrlMsg> msg; + if (outputs_.empty() and inputs_.empty()) { + ctrlChannel >> msg; + } else if (!ctrlChannel.empty()) { + msg = ctrlChannel.receive(); + } else + break; + + switch (msg->type()) { + case CtrlMsgType::ATTACH_INPUT: + { + auto& input_msg = static_cast<AttachInputCtrlMsg&>(*msg); + auto id = input_msg.stream->getId(); + inputs_.emplace(id, std::move(input_msg.stream)); + } + break; + + case CtrlMsgType::ATTACH_OUTPUT: + { + auto& output_msg = static_cast<AttachOutputCtrlMsg&>(*msg); + auto id = output_msg.stream->getId(); + outputs_.emplace(id, std::move(output_msg.stream)); + } + break; + + case CtrlMsgType::STOP: + endpoint_.reset(); + inputs_.clear(); + outputs_.clear(); + return; + + default: RING_ERR("BUG: got unhandled control msg!"); break; + } + } + + // Then handles IO streams + std::vector<uint8_t> buf(IO_BUFFER_SIZE); + std::error_code ec; + handle_stream_list(inputs_, [&](auto& stream){ + if (!stream->read(buf)) + return false; + endpoint_->write(buf, ec); + return true; + }); + handle_stream_list(outputs_, [&](auto& stream){ + endpoint_->read(buf, ec); + return buf.size() != 0 and stream->write(buf); + }); + } +} + +//============================================================================== + +PeerConnection::PeerConnection(Account& account, const std::string& peer_uri, + std::unique_ptr<GenericSocket<uint8_t>> endpoint) + : pimpl_(std::make_unique<PeerConnectionImpl>(account, peer_uri, std::move(endpoint))) +{} + +PeerConnection::~PeerConnection() +{} + +void +PeerConnection::attachInputStream(const std::shared_ptr<Stream>& stream) +{ + pimpl_->ctrlChannel << std::make_unique<AttachInputCtrlMsg>(stream); +} + +void +PeerConnection::attachOutputStream(const std::shared_ptr<Stream>& stream) +{ + pimpl_->ctrlChannel << std::make_unique<AttachOutputCtrlMsg>(stream); +} + +} // namespace ring diff --git a/src/peer_connection.h b/src/peer_connection.h new file mode 100644 index 0000000000..289ff0bbc7 --- /dev/null +++ b/src/peer_connection.h @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "dring/datatransfer_interface.h" +#include "ip_utils.h" +#include "generic_io.h" +#include "security/diffie-hellman.h" +#include "opendht/crypto.h" + +#include <string> +#include <map> +#include <vector> +#include <memory> +#include <functional> +#include <future> +#include <utility> + +namespace dht { namespace crypto { +struct PrivateKey; +struct Certificate; +}} + +namespace ring { + +class Account; +class TurnTransport; +class ConnectedTurnTransport; + +//============================================================================== + +class Stream +{ +public: + virtual ~Stream() { close(); } + virtual void close() noexcept { } + virtual DRing::DataTransferId getId() const = 0; + virtual bool read(std::vector<uint8_t>& buffer) const { + (void)buffer; + return false; + } + virtual bool write(const std::vector<uint8_t>& buffer) { + (void)buffer; + return false; + }; +}; + +//============================================================================== + +/// Implement a server TLS session IO over a client TURN connection +class TlsTurnEndpoint : public GenericSocket<uint8_t> +{ +public: + using SocketType = GenericSocket<uint8_t>; + using Identity = std::pair<std::shared_ptr<dht::crypto::PrivateKey>, + std::shared_ptr<dht::crypto::Certificate>>; + + TlsTurnEndpoint(ConnectedTurnTransport& turn, + const Identity& local_identity, + const std::shared_future<tls::DhParams>& dh_params, + dht::crypto::TrustList& trust_list); + ~TlsTurnEndpoint(); + + bool isReliable() const override { return true; } + bool isInitiator() const override; + int maxPayload() const override; + std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override; + std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override; + + void setOnRecv(RecvCb&&) override { + throw std::logic_error("TlsTurnEndpoint::setOnRecv not implemented"); + } + bool waitForData(unsigned) const override { + throw std::logic_error("TlsTurnEndpoint::waitForData not implemented"); + } + + void connect(); + +private: + class Impl; + std::unique_ptr<Impl> pimpl_; +}; + +//============================================================================== + +/// Implement system socket IO +class TcpSocketEndpoint : public GenericSocket<uint8_t> +{ +public: + using SocketType = GenericSocket<uint8_t>; + explicit TcpSocketEndpoint(const IpAddr& addr); + ~TcpSocketEndpoint(); + + bool isReliable() const override { return true; } + bool isInitiator() const override { return true; } + int maxPayload() const override { return 1280; } + bool waitForData(unsigned ms_timeout) const override; + std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override; + std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override; + + void setOnRecv(RecvCb&&) override { + throw std::logic_error("TcpSocketEndpoint::setOnRecv not implemented"); + } + + void connect(); + +private: + const IpAddr addr_; + int sock_ {-1}; +}; + +//============================================================================== + +/// Implement a TLS session IO over a system socket +class TlsSocketEndpoint : public GenericSocket<uint8_t> +{ +public: + using SocketType = GenericSocket<uint8_t>; + using Identity = std::pair<std::shared_ptr<dht::crypto::PrivateKey>, + std::shared_ptr<dht::crypto::Certificate>>; + + TlsSocketEndpoint(TcpSocketEndpoint& parent, + const Identity& local_identity, + const std::shared_future<tls::DhParams>& dh_params, + const dht::crypto::Certificate& peer_cert); + ~TlsSocketEndpoint(); + + bool isReliable() const override { return true; } + bool isInitiator() const override { return true; } + int maxPayload() const override { return 1280; } + std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override; + std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override; + + void setOnRecv(RecvCb&&) override { + throw std::logic_error("TlsSocketEndpoint::setOnRecv not implemented"); + } + bool waitForData(unsigned) const override { + throw std::logic_error("TlsSocketEndpoint::waitForData not implemented"); + } + + void connect(); + +private: + class Impl; + std::unique_ptr<Impl> pimpl_; +}; + +//============================================================================== + +class PeerConnection +{ +public: + using SocketType = GenericSocket<uint8_t>; + + PeerConnection(Account& account, const std::string& peer_uri, + std::unique_ptr<SocketType> endpoint); + + ~PeerConnection(); + + void attachOutputStream(const std::shared_ptr<Stream>& stream); + + void attachInputStream(const std::shared_ptr<Stream>& stream); + + void refuseStream(const DRing::DataTransferId& id); + + void abortStream(const DRing::DataTransferId& id); + +private: + class PeerConnectionImpl; + std::unique_ptr<PeerConnectionImpl> pimpl_; +}; + +} // namespace ring diff --git a/src/ringdht/Makefile.am b/src/ringdht/Makefile.am index 6ac4aaacd7..f05233ab37 100644 --- a/src/ringdht/Makefile.am +++ b/src/ringdht/Makefile.am @@ -17,7 +17,9 @@ libringacc_la_SOURCES = \ sips_transport_ice.cpp \ sips_transport_ice.h \ accountarchive.cpp \ - accountarchive.h + accountarchive.h \ + p2p.cpp \ + p2p.h if RINGNS libringacc_la_SOURCES += \ diff --git a/src/ringdht/p2p.cpp b/src/ringdht/p2p.cpp new file mode 100644 index 0000000000..bd652699a7 --- /dev/null +++ b/src/ringdht/p2p.cpp @@ -0,0 +1,604 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "p2p.h" + +#include "ringaccount.h" +#include "peer_connection.h" +#include "turn_transport.h" +#include "ftp_server.h" +#include "channel.h" +#include "security/tls_session.h" + +#include <opendht/default_types.h> +#include <opendht/rng.h> + +#include <memory> +#include <map> +#include <vector> +#include <chrono> +#include <array> +#include <future> +#include <algorithm> +#include <type_traits> + +namespace ring { + +static constexpr auto DHT_MSG_TIMEOUT = std::chrono::seconds(20); +static constexpr auto NET_CONNECTION_TIMEOUT = std::chrono::seconds(10); + +using Clock = std::chrono::system_clock; +using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>; + +//============================================================================== + +// This namespace prevents a nasty ODR violation with definitions in peer_connection.cpp +inline namespace +{ + +template <typename CT> +class Timeout +{ +public: + using clock = CT; + using duration = typename CT::duration; + using time_point = typename CT::time_point; + + explicit Timeout(const duration& delay) : delay {delay} {} + + void start() { + start_ = clock::now(); + } + + explicit operator bool() const { + return (clock::now() - start_) >= delay; + } + + const duration delay {duration::zero()}; + +private: + time_point start_ {}; +}; + +//============================================================================== + +/** + * DHT message to convey a end2end connection request to a peer + */ +class PeerConnectionMsg : public dht::EncryptedValue<PeerConnectionMsg> +{ +public: + static constexpr const dht::ValueType& TYPE = dht::ValueType::USER_DATA; + static constexpr uint32_t protocol_version = 0x01000002; ///< Supported protocol + static constexpr const char* key_prefix = "peer:"; ///< base to compute the DHT listen key + + dht::Value::Id id = dht::Value::INVALID_ID; + uint32_t protocol {protocol_version}; ///< Protocol identification. First bit reserved to indicate a request (0) or a response (1) + std::vector<std::string> addresses; ///< Request: public addresses for TURN permission. Response: TURN relay addresses (only 1 in current implementation) + MSGPACK_DEFINE_MAP(id, protocol, addresses) + + PeerConnectionMsg() = default; + PeerConnectionMsg(dht::Value::Id id, uint32_t aprotocol, const std::string& arelay) + : id {id}, protocol {aprotocol}, addresses {{arelay}} {} + + bool isRequest() const noexcept { return (protocol & 1) == 0; } + + PeerConnectionMsg respond(const IpAddr& relay) const { + return {id, protocol|1, relay.toString(true, true)}; + } +}; + +//============================================================================== + +enum class CtrlMsgType +{ + STOP, + CANCEL, + TURN_PEER_CONNECT, + TURN_PEER_DISCONNECT, + DHT_REQUEST, + DHT_RESPONSE, + ADD_DEVICE, +}; + +struct CtrlMsgBase +{ + CtrlMsgBase() = delete; + explicit CtrlMsgBase(CtrlMsgType id) : id_ {id} {} + CtrlMsgType type() const noexcept { return id_; } +private: + const CtrlMsgType id_; +}; + +template <class... Args> +using DataTypeSet = std::tuple<Args...>; + +template <CtrlMsgType id, typename DataTypeSet=void> +struct CtrlMsg : CtrlMsgBase +{ + template <class... Args> + explicit CtrlMsg(const Args&... args) : CtrlMsgBase(id), data {args...} {} + + DataTypeSet data; +}; + +template <CtrlMsgType id, class... Args> +auto makeMsg(const Args&... args) +{ + return std::make_unique<CtrlMsg<id, DataTypeSet<Args...>>>(args...); +} + +template <std::size_t N, CtrlMsgType id, typename R, typename T> +auto msgData(const T& msg) +{ + using MsgType = typename std::tuple_element<std::size_t(id), R>::type; + auto& x = static_cast<const MsgType&>(msg); + return std::get<N>(x.data); +} + +//============================================================================== + +using DhtInfoHashMsgData = DataTypeSet<dht::InfoHash>; +using TurnConnectMsgData = DataTypeSet<IpAddr>; +using PeerCnxMsgData = DataTypeSet<PeerConnectionMsg>; +using AddDeviceMsgData = DataTypeSet<dht::InfoHash, + std::shared_ptr<dht::crypto::Certificate>, + std::vector<std::string>, + std::function<void(PeerConnection*)>>; + +using AllCtrlMsg = DataTypeSet<CtrlMsg<CtrlMsgType::STOP>, + CtrlMsg<CtrlMsgType::CANCEL, DhtInfoHashMsgData>, + CtrlMsg<CtrlMsgType::TURN_PEER_CONNECT, TurnConnectMsgData>, + CtrlMsg<CtrlMsgType::TURN_PEER_DISCONNECT, TurnConnectMsgData>, + CtrlMsg<CtrlMsgType::DHT_REQUEST, PeerCnxMsgData>, + CtrlMsg<CtrlMsgType::DHT_RESPONSE, PeerCnxMsgData>, + CtrlMsg<CtrlMsgType::ADD_DEVICE, AddDeviceMsgData>>; + +template <CtrlMsgType id, std::size_t N=0, typename T> +auto ctrlMsgData(const T& msg) +{ + return msgData<N, id, AllCtrlMsg>(msg); +} + +} // namespace <anonymous> + +//============================================================================== + +class DhtPeerConnector::Impl +{ +public: + class ClientConnector; + + explicit Impl(RingAccount& account) + : account {account} + , loopFut_ {std::async(std::launch::async, [this]{ eventLoop(); })} {} + + ~Impl() { ctrl << makeMsg<CtrlMsgType::STOP>(); } + + RingAccount& account; + Channel<std::unique_ptr<CtrlMsgBase>> ctrl; + +private: + std::unique_ptr<ConnectedTurnTransport> turn_ep_; + std::unique_ptr<TurnTransport> turn_; + dht::crypto::TrustList trustedPeers_; + +protected: + std::map<IpAddr, std::unique_ptr<PeerConnection>> servers_; + std::map<dht::InfoHash, std::unique_ptr<ClientConnector>> clients_; + +private: + void onTurnPeerConnection(const IpAddr&); + void onTurnPeerDisconnection(const IpAddr&); + void onRequestMsg(PeerConnectionMsg&&); + void onTrustedRequestMsg(PeerConnectionMsg&&, + const std::shared_ptr<dht::crypto::Certificate>&); + void onResponseMsg(PeerConnectionMsg&&); + void onAddDevice(const dht::InfoHash&, + const std::shared_ptr<dht::crypto::Certificate>&, + const std::vector<std::string>&, + const std::function<void(PeerConnection*)>&); + void turnConnect(); + void eventLoop(); + + std::future<void> loopFut_; // keep it last member +}; + +//============================================================================== + +/// This class is responsible of connection to a specific peer. +/// The connected peer acting as server (responsible of the TURN session). +/// When the TURN session is created and your IP is permited, we'll connect it +/// using a system socket. Later the TLS session is negotiated on this socket. +class DhtPeerConnector::Impl::ClientConnector +{ +public: + using ListenerFunction = std::function<void(PeerConnection*)>; + + ClientConnector(Impl& parent, + const dht::InfoHash& peer_h, + const std::shared_ptr<dht::crypto::Certificate>& peer_cert, + const std::vector<std::string>& public_addresses, + const ListenerFunction& connect_cb) + : parent_ {parent} + , peer_ {peer_h} + , publicAddresses_ {public_addresses} + , peerCertificate_ {peer_cert} { + addListener(connect_cb); + processTask_ = std::async( + std::launch::async, + [this] { + try { process(); } + catch (const std::exception& e) { + RING_ERR() << "[CNX] exception during client processing: " << e.what(); + cancel(); + } + }); + } + + ~ClientConnector() { + for (auto& cb: listeners_) + cb(nullptr); + connection_.reset(); + }; + + void addListener(const ListenerFunction& cb) { + if (!connected_) { + std::lock_guard<std::mutex> lk {listenersMutex_}; + listeners_.push_back(cb); + } else { + cb(connection_.get()); + } + } + + void cancel() { + parent_.ctrl << makeMsg<CtrlMsgType::CANCEL>(peer_); + } + + void onDhtResponse(PeerConnectionMsg&& response) { + response_ = std::move(response); + responseReceived_ = true; + } + +private: + void process() { + // Prepare connection request as a DHT message + PeerConnectionMsg request; + request.addresses = std::move(publicAddresses_); + request.id = ValueIdDist()(parent_.account.rand); /* Random id for the message unicity */ + + // Send connection request through DHT + RING_DBG() << parent_.account << "[CNX] request connection to " << peer_; + parent_.account.dht().putEncrypted( + dht::InfoHash::get(PeerConnectionMsg::key_prefix + peer_.toString()), peer_, request); + + // Wait for call to onResponse() operated by DHT + Timeout<Clock> dhtMsgTimeout {DHT_MSG_TIMEOUT}; + dhtMsgTimeout.start(); + while (!responseReceived_) { + if (dhtMsgTimeout) + throw std::runtime_error("no response from DHT to E2E request"); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // Check response validity + IpAddr relay_addr; + if (response_.from != peer_ or + response_.id != request.id or + response_.addresses.empty() or + !(relay_addr = response_.addresses[0])) { + throw std::runtime_error("invalid connection reply"); + } + + // Connect to TURN peer using a raw socket + RING_DBG() << parent_.account << "[CNX] connecting to TURN relay " + << relay_addr.toString(true, true); + auto peer_ep = std::make_unique<TcpSocketEndpoint>(relay_addr); + peer_ep->connect(); // IMPROVE_ME: socket timeout? + + // Negotiate a TLS session + RING_DBG() << parent_.account << "[CNX] start TLS session"; + auto tls_ep = std::make_unique<TlsSocketEndpoint>(*peer_ep, + parent_.account.identity(), + parent_.account.dhParams(), + *peerCertificate_); + tls_ep->connect(); + + // Connected! + connection_ = std::make_unique<PeerConnection>(parent_.account, peer_.toString(), + std::move(tls_ep)); + peer_ep_ = std::move(peer_ep); + + connected_ = true; + for (auto& cb: listeners_) { + cb(connection_.get()); + } + } + + Impl& parent_; + const dht::InfoHash peer_; + + std::vector<std::string> publicAddresses_; + std::atomic_bool responseReceived_ {false}; + PeerConnectionMsg response_; + std::shared_ptr<dht::crypto::Certificate> peerCertificate_; + std::unique_ptr<TcpSocketEndpoint> peer_ep_; + std::unique_ptr<PeerConnection> connection_; + + std::atomic_bool connected_ {false}; + std::mutex listenersMutex_; + std::vector<ListenerFunction> listeners_; + + std::future<void> processTask_; +}; + +//============================================================================== + +/// Synchronous TCP connect to a TURN server +/// \note TCP peer connection mode is enabled for reliable data transfer. +void +DhtPeerConnector::Impl::turnConnect() +{ + if (turn_) + return; + + auto turn_param = TurnTransportParams {}; + turn_param.server = IpAddr {"turn.ring.cx"}; + turn_param.realm = "ring"; + turn_param.username = "ring"; + turn_param.password = "ring"; + turn_param.isPeerConnection = true; // Request for TCP peer connections, not UDP + turn_param.onPeerConnection = [this](uint32_t conn_id, const IpAddr& peer_addr, bool connected) { + (void)conn_id; + if (connected) + ctrl << makeMsg<CtrlMsgType::TURN_PEER_CONNECT>(peer_addr); + else + ctrl << makeMsg<CtrlMsgType::TURN_PEER_DISCONNECT>(peer_addr); + }; + turn_ = std::make_unique<TurnTransport>(turn_param); + + // Wait until TURN server READY state (or timeout) + Timeout<Clock> timeout {NET_CONNECTION_TIMEOUT}; + timeout.start(); + while (!turn_->isReady()) { + if (timeout) + throw std::runtime_error("no response from TURN"); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + +/// Negotiate a TLS session over a TURN socket this method does [yoda]. +/// At this stage both endpoints has a dedicated TCP connection on each other. +void +DhtPeerConnector::Impl::onTurnPeerConnection(const IpAddr& peer_addr) +{ + RING_DBG() << account << "[CNX] TURN connection attempt from " + << peer_addr.toString(true, true); + + auto turn_ep = std::make_unique<ConnectedTurnTransport>(*turn_, peer_addr); + RING_DBG() << account << "[CNX] start TLS session over TURN socket"; + auto tls_ep = std::make_unique<TlsTurnEndpoint>(*turn_ep, account.identity(), + account.dhParams(), trustedPeers_); + tls_ep->connect(); // block until TLS negotiated (throw in case of error) + auto connection = std::make_unique<PeerConnection>(account, peer_addr.toString(), std::move(tls_ep)); + connection->attachOutputStream(std::make_shared<FtpServer>()); + servers_.emplace(peer_addr, std::move(connection)); + + // note: operating this way let endpoint to be deleted safely in case of exceptions + turn_ep_ = std::move(turn_ep); +} + +void +DhtPeerConnector::Impl::onTurnPeerDisconnection(const IpAddr& peer_addr) +{ + const auto& iter = servers_.find(peer_addr); + if (iter == std::end(servers_)) + return; + RING_WARN() << account << "[CNX] disconnection from peer " << peer_addr.toString(true, true); + servers_.erase(iter); +} + +void +DhtPeerConnector::Impl::onRequestMsg(PeerConnectionMsg&& request) +{ + RING_DBG() << account << "[CNX] rx DHT request from " << request.from; + + // Asynch certificate checking -> trig onTrustedRequestMsg when trusted certificate is found + account.findCertificate( + request.from, + [this, request=std::move(request)] (const std::shared_ptr<dht::crypto::Certificate>& cert) mutable { + /* TODO: certificate trusted? */ + onTrustedRequestMsg(std::move(request), cert); + }); +} + +void +DhtPeerConnector::Impl::onTrustedRequestMsg(PeerConnectionMsg&& request, + const std::shared_ptr<dht::crypto::Certificate>& cert) +{ + turnConnect(); // start a TURN client connection on first pass, next ones just add new peer cnx handlers + + // Save peer certificate for later TLS session (MUST BE DONE BEFORE TURN PEER AUTHORIZATION) + trustedPeers_.add(*cert); + + for (auto& ip: request.addresses) { + try { + turn_->permitPeer(ip); + RING_DBG() << account << "[CNX] authorized peer connection from " << ip; + } catch (const std::exception& e) { + RING_WARN() << account << "[CNX] ignored peer connection '" << ip << "', " << e.what(); + } + } + + RING_DBG() << account << "[CNX] connection accepted, DHT reply to " << request.from; + account.dht().putEncrypted( + dht::InfoHash::get(PeerConnectionMsg::key_prefix + request.from.toString()), + request.from, request.respond(turn_->peerRelayAddr())); + + // Now wait for a TURN connection from peer (see onTurnPeerConnection) +} + +void +DhtPeerConnector::Impl::onResponseMsg(PeerConnectionMsg&& response) +{ + RING_DBG() << account << "[CNX] rx DHT reply from " << response.from; + const auto& iter = clients_.find(response.from); + if (iter == std::end(clients_)) + return; // no corresponding request + iter->second->onDhtResponse(std::move(response)); +} + +void +DhtPeerConnector::Impl::onAddDevice(const dht::InfoHash& dev_h, + const std::shared_ptr<dht::crypto::Certificate>& peer_cert, + const std::vector<std::string>& public_addresses, + const std::function<void(PeerConnection*)>& connect_cb) +{ + const auto& iter = clients_.find(dev_h); + if (iter == std::end(clients_)) { + clients_.emplace( + dev_h, + std::make_unique<Impl::ClientConnector>(*this, dev_h, peer_cert, public_addresses, connect_cb)); + } else { + iter->second->addListener(connect_cb); + } +} + +void +DhtPeerConnector::Impl::eventLoop() +{ + // Loop until STOP msg + while (true) { + decltype(ctrl)::value_type msg; + ctrl >> msg; + switch (msg->type()) { + case CtrlMsgType::STOP: + turn_.reset(); + return; + + case CtrlMsgType::TURN_PEER_CONNECT: + onTurnPeerConnection(ctrlMsgData<CtrlMsgType::TURN_PEER_CONNECT>(*msg)); + break; + + case CtrlMsgType::TURN_PEER_DISCONNECT: + onTurnPeerDisconnection(ctrlMsgData<CtrlMsgType::TURN_PEER_DISCONNECT>(*msg)); + break; + + case CtrlMsgType::CANCEL: + clients_.erase(ctrlMsgData<CtrlMsgType::CANCEL>(*msg)); + break; + + case CtrlMsgType::DHT_REQUEST: + onRequestMsg(ctrlMsgData<CtrlMsgType::DHT_REQUEST>(*msg)); + break; + + case CtrlMsgType::DHT_RESPONSE: + onResponseMsg(ctrlMsgData<CtrlMsgType::DHT_RESPONSE>(*msg)); + break; + + case CtrlMsgType::ADD_DEVICE: + onAddDevice(ctrlMsgData<CtrlMsgType::ADD_DEVICE, 0>(*msg), + ctrlMsgData<CtrlMsgType::ADD_DEVICE, 1>(*msg), + ctrlMsgData<CtrlMsgType::ADD_DEVICE, 2>(*msg), + ctrlMsgData<CtrlMsgType::ADD_DEVICE, 3>(*msg)); + break; + + default: RING_ERR("BUG: got unhandled control msg!"); break; + } + } +} + +//============================================================================== + +DhtPeerConnector::DhtPeerConnector(RingAccount& account) + : pimpl_ {new Impl {account}} +{} + +DhtPeerConnector::~DhtPeerConnector() = default; + +/// Called by a RingAccount when it's DHT is connected +/// Install a DHT LISTEN operation on given device to receive data connection requests and replies +/// The DHT key is Hash(PeerConnectionMsg::key_prefix + device_id), where '+' is the string concatenation. +void +DhtPeerConnector::onDhtConnected(const std::string& device_id) +{ + pimpl_->account.dht().listen<PeerConnectionMsg>( + dht::InfoHash::get(PeerConnectionMsg::key_prefix + device_id), + [this](PeerConnectionMsg&& msg) { + if (msg.from == pimpl_->account.dht().getId()) + return true; + if (!pimpl_->account.isMessageTreated(msg.id)) { + if (msg.isRequest()) { + // TODO: filter-out request from non trusted peer + pimpl_->ctrl << makeMsg<CtrlMsgType::DHT_REQUEST>(std::move(msg)); + } else + pimpl_->ctrl << makeMsg<CtrlMsgType::DHT_RESPONSE>(std::move(msg)); + } + return true; + }); +} + +void +DhtPeerConnector::requestConnection(const std::string& peer_id, + const std::function<void(PeerConnection*)>& connect_cb) +{ + const auto peer_h = dht::InfoHash(peer_id); + + // Notes for reader: + // 1) dht.getPublicAddress() suffers of a non-usability into forEachDevice() callbacks. + // If you call it in forEachDevice callbacks, it'll never not return... + // Seems that getPublicAddress() and forEachDevice() need to process into the same thread + // (here the one where dht_ loop runs). + // 2) anyway its good to keep this processing here in case of multiple device + // as the result is the same for each device. + auto addresses = pimpl_->account.publicAddresses(); + + // Add local addresses + // XXX: is it really needed? use-case? a local TURN server? + //addresses.emplace_back(ip_utils::getLocalAddr(AF_INET)); + //addresses.emplace_back(ip_utils::getLocalAddr(AF_INET6)); + + // TODO: bypass DHT devices lookup if connection already exist + + pimpl_->account.forEachDevice( + peer_h, + [this, addresses, connect_cb](const std::shared_ptr<RingAccount>& account, + const dht::InfoHash& dev_h) { + if (dev_h == account->dht().getId()) { + RING_ERR() << account << "[CNX] no connection to yourself, bad boy!"; + return; + } + + account->findCertificate( + dev_h, + [this, dev_h, addresses, connect_cb] (const std::shared_ptr<dht::crypto::Certificate>& cert) { + pimpl_->ctrl << makeMsg<CtrlMsgType::ADD_DEVICE>(dev_h, cert, addresses, connect_cb); + }); + }, + + [this, peer_h, connect_cb](bool found) { + if (!found) { + RING_WARN() << pimpl_->account << "[CNX] aborted, no devices for " << peer_h; + connect_cb(nullptr); + } + }); +} + +} // namespace ring diff --git a/src/ringdht/p2p.h b/src/ringdht/p2p.h new file mode 100644 index 0000000000..493b6e6dbf --- /dev/null +++ b/src/ringdht/p2p.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2017 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include <string> +#include <memory> +#include <functional> + +namespace ring { + +class RingAccount; +class PeerConnection; + +class DhtPeerConnector { +public: + DhtPeerConnector(RingAccount& account); + ~DhtPeerConnector(); + + void onDhtConnected(const std::string& device_id); + void requestConnection(const std::string& peer_id, const std::function<void(PeerConnection*)>& connect_cb); + +private: + DhtPeerConnector() = delete; + + class Impl; + std::unique_ptr<Impl> pimpl_; +}; + +} diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 0ac7b8d17d..b58d434762 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -42,6 +42,8 @@ #include "sips_transport_ice.h" #include "ice_transport.h" +#include "p2p.h" + #include "client/ring_signal.h" #include "dring/call_const.h" #include "dring/account_const.h" @@ -176,7 +178,6 @@ struct RingAccount::KnownDevice : certificate(cert), name(n), last_sync(sync) {} }; - /** * Device announcement stored on DHT. */ @@ -277,6 +278,7 @@ RingAccount::RingAccount(const std::string& accountID, bool /* presenceEnabled * , idPath_(fileutils::get_data_dir()+DIR_SEPARATOR_STR+getAccountID()) , cachePath_(fileutils::get_cache_dir()+DIR_SEPARATOR_STR+getAccountID()) , dataPath_(cachePath_ + DIR_SEPARATOR_STR "values") + , dhtPeerConnector_ {new DhtPeerConnector {*this}} { // Force the SFL turn server if none provided yet turnServer_ = DEFAULT_TURN_SERVER; @@ -435,7 +437,7 @@ RingAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: sthis->registerDhtAddress(*ice); // Next step: sent the ICE data to peer through DHT - const dht::Value::Id callvid = ValueIdDist()(sthis->rand_); + const dht::Value::Id callvid = ValueIdDist()(sthis->rand); const auto callkey = dht::InfoHash::get("callto:" + dev.toString()); dht::Value val { dht::IceCandidates(callvid, ice->packIceMsg()) }; @@ -2203,6 +2205,8 @@ RingAccount::doRegister_() return true; } ); + + dhtPeerConnector_->onDhtConnected(ringDeviceId_); } catch (const std::exception& e) { RING_ERR("Error registering DHT account: %s", e.what()); @@ -2210,7 +2214,6 @@ RingAccount::doRegister_() } } - void RingAccount::onTrustRequest(const dht::InfoHash& peer_account, const dht::InfoHash& peer_device, time_t received, bool confirm, std::vector<uint8_t>&& payload) { @@ -2589,6 +2592,17 @@ RingAccount::saveTreatedMessages() const saveIdList(cachePath_+DIR_SEPARATOR_STR "treatedMessages", treatedMessages_); } +bool +RingAccount::isMessageTreated(unsigned int id) +{ + auto res = treatedMessages_.insert(id); + if (res.second) { + saveTreatedMessages(); + return false; + } + return true; +} + void RingAccount::loadKnownDevices() { @@ -3310,4 +3324,24 @@ RingAccount::registerDhtAddress(IceTransport& ice) } } +std::vector<std::string> +RingAccount::publicAddresses() +{ + std::vector<std::string> addresses; + for (auto& addr : dht_.getPublicAddress(AF_INET)) { + addresses.emplace_back(addr.toString()); + } + for (auto& addr : dht_.getPublicAddress(AF_INET6)) { + addresses.emplace_back(addr.toString()); + } + return addresses; +} + +void +RingAccount::requestPeerConnection(const std::string& peer_id, + std::function<void(PeerConnection*)> connect_cb) +{ + dhtPeerConnector_->requestConnection(peer_id, connect_cb); +} + } // namespace ring diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 0bf67d84a8..951a9be3f8 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -3,6 +3,7 @@ * * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> * Author: Simon Désaulniers <simon.desaulniers@gmail.com> + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -72,8 +73,13 @@ namespace ring { class IceTransport; struct Contact; struct AccountArchive; +class DhtPeerConnector; +class PeerConnection; class RingAccount : public SIPAccountBase { + private: + struct PeerConnectionMsg; + public: constexpr static const char* const ACCOUNT_TYPE = "RING"; constexpr static const in_port_t DHT_DEFAULT_PORT = 4222; @@ -310,8 +316,27 @@ class RingAccount : public SIPAccountBase { void registerName(const std::string& password, const std::string& name); #endif + /// + /// Send a E2E connection request to a given peer + /// + /// /// \param[in] peer_id RingID on request's recipiant + /// + void requestPeerConnection(const std::string& peer, + std::function<void(PeerConnection*)> connect_cb); + + std::vector<std::string> publicAddresses(); + + + /// \return true if the given DHT message identifier has been treated + /// \note if message has not been treated yet this method store this id and returns true at further calls + bool isMessageTreated(unsigned int id) ; + dht::DhtRunner& dht() { return dht_; } + const dht::crypto::Identity& identity() const { return identity_; } + + const std::shared_future<tls::DhParams> dhParams() const { return dhParams_; } + void forEachDevice(const dht::InfoHash& to, std::function<void(const std::shared_ptr<RingAccount>&, const dht::InfoHash&)> op, @@ -591,6 +616,14 @@ class RingAccount : public SIPAccountBase { std::shared_ptr<IceTransport> createIceTransport(const Args&... args); void registerDhtAddress(IceTransport&); + + std::unique_ptr<DhtPeerConnector> dhtPeerConnector_; }; +static inline std::ostream& operator<< (std::ostream& os, const RingAccount& acc) +{ + os << "[Account " << acc.getAccountID() << "] "; + return os; +} + } // namespace ring diff --git a/src/sip/sipaccount.cpp b/src/sip/sipaccount.cpp index fe8b3df783..0ff21b5c6c 100644 --- a/src/sip/sipaccount.cpp +++ b/src/sip/sipaccount.cpp @@ -893,7 +893,7 @@ void SIPAccount::startKeepAliveTimer() pj_time_val keepAliveDelay_; keepAliveTimer_.cb = &SIPAccount::keepAliveRegistrationCb; keepAliveTimer_.user_data = this; - keepAliveTimer_.id = timerIdDist_(rand_); + keepAliveTimer_.id = timerIdDist_(rand); // expiration may be undetermined during the first registration request if (registrationExpire_ == 0) { @@ -2003,10 +2003,10 @@ SIPAccount::scheduleReregistration() /* Randomize interval by +/- 10 secs */ if (delay.sec >= 10) { - delay.msec = delay10ZeroDist_(rand_); + delay.msec = delay10ZeroDist_(rand); } else { delay.sec = 0; - delay.msec = delay10PosDist_(rand_); + delay.msec = delay10PosDist_(rand); } pj_time_val_normalize(&delay); diff --git a/src/sip/sipaccountbase.cpp b/src/sip/sipaccountbase.cpp index 823558b14a..056df4802f 100644 --- a/src/sip/sipaccountbase.cpp +++ b/src/sip/sipaccountbase.cpp @@ -317,7 +317,7 @@ SIPAccountBase::getRandomEvenPort(const std::pair<uint16_t, uint16_t>& range) co std::uniform_int_distribution<uint16_t> dist(range.first/2, range.second/2); uint16_t result; do { - result = 2 * dist(rand_); + result = 2 * dist(rand); } while (getPortsReservation()[result / 2]); return result; } @@ -329,7 +329,7 @@ SIPAccountBase::acquireRandomEvenPort(const std::pair<uint16_t, uint16_t>& range uint16_t result; do { - result = 2 * dist(rand_); + result = 2 * dist(rand); } while (getPortsReservation()[result / 2]); getPortsReservation()[result / 2] = true; diff --git a/tools/dringctrl/controler.py b/tools/dringctrl/controler.py index c8b60c21f8..f3055d0c9a 100644 --- a/tools/dringctrl/controler.py +++ b/tools/dringctrl/controler.py @@ -78,9 +78,8 @@ class DRingCtrl(Thread): self.register() def __del__(self): - if self.registered: - self.unregister() - self.loop.quit() + self.unregister() + #self.loop.quit() # causes exception def stopThread(self): self.isStop = True @@ -136,6 +135,7 @@ class DRingCtrl(Thread): proxy_callmgr.connect_to_signal('callStateChanged', self.onCallStateChanged) proxy_callmgr.connect_to_signal('conferenceCreated', self.onConferenceCreated) proxy_confmgr.connect_to_signal('accountsChanged', self.onAccountsChanged) + proxy_confmgr.connect_to_signal('dataTransferEvent', self.onDataTransferEvent) except dbus.DBusException as e: raise DRingCtrlDBusError("Unable to connect to dring DBus signals") @@ -273,6 +273,9 @@ class DRingCtrl(Thread): self.currentConfId = confId self.onConferenceCreated_cb() + def onDataTransferEvent(self, transferId, code): + pass + # # Account management # @@ -633,6 +636,9 @@ class DRingCtrl(Thread): print(" %s: %s" % (k, details[k])) print() + def sendFile(self, *args, **kwds): + return self.configurationmanager.sendFile(*args, **kwds) + def run(self): """Processing method for this thread""" diff --git a/tools/dringctrl/sendfile.py b/tools/dringctrl/sendfile.py new file mode 100644 index 0000000000..653841cb18 --- /dev/null +++ b/tools/dringctrl/sendfile.py @@ -0,0 +1,59 @@ +#! /usr/bin/env python3 +# +# Copyright (C) 2017 Savoir-faire Linux Inc. +# Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from controler import DRingCtrl + +import argparse +import sys +import signal +import os.path + +class Controler(DRingCtrl): + def onDataTransferEvent(self, transferId, code): + if code == 6: + print("transfer %u has been cancelled by host" % transferId) + self.stopThread() + else: + print("TID %x, code %u" % (transferId, code)) + +parser = argparse.ArgumentParser() +parser.add_argument('--account', help='Account to use', metavar='<account>', type=str) +parser.add_argument('--peer', help='Peer identification related to used account', metavar='<peer>', type=str, required=True) +parser.add_argument('--filename', help='Pathname on file to send', metavar='<filename>', type=str, required=True) +parser.add_argument('--displayname', help='Name displayed to peer', metavar='<displayname>', type=str) + +args = parser.parse_args() +ctrl = Controler(sys.argv[0], False) + +if not args.account: + for account in ctrl.getAllEnabledAccounts(): + details = ctrl.getAccountDetails(account) + if details['Account.type'] == 'RING': + args.account = account + break + if not args.account: + raise ValueError("no valid account") +if not args.displayname: + args.displayname = os.path.basename(args.filename) + +tid = ctrl.sendFile(args.account, args.peer, os.path.abspath(args.filename), args.displayname) + +signal.signal(signal.SIGINT, ctrl.interruptHandler) + +ctrl.run() -- GitLab