diff --git a/.gitignore b/.gitignore index c505fa036287939ae2c27e433c0c82108b57a574..0dd0e5823c7fab80811113bfd2492c6a5b9f684e 100644 --- a/.gitignore +++ b/.gitignore @@ -75,6 +75,10 @@ tags TAGS.LST ID +# Gcov coverage files +*.gcno +*.gcov + # IDE stuffs nbproject *.kdev* diff --git a/bin/dbus/Makefile.am b/bin/dbus/Makefile.am index fda17a225baafb22a418c816119e322d0b71c669..915b5ee771c470618dcb5d87372e8c5c7bac511a 100644 --- a/bin/dbus/Makefile.am +++ b/bin/dbus/Makefile.am @@ -5,7 +5,8 @@ noinst_LTLIBRARIES = libclient_dbus.la BUILT_SOURCES= \ dbuscallmanager.adaptor.h \ dbusconfigurationmanager.adaptor.h \ - dbusinstance.adaptor.h + dbusinstance.adaptor.h \ + dbusdatatransfer.adaptor.h BUILT_SOURCES+=dbuspresencemanager.adaptor.h dbuspresencemanager.adaptor.h: cx.ring.Ring.PresenceManager.xml Makefile.am @@ -17,18 +18,19 @@ dbusvideomanager.adaptor.h: cx.ring.Ring.VideoManager.xml Makefile.am dbusxx-xml2cpp $< --adaptor=$@ endif -# Rule to generate the binding headers +# Rules to generate binding headers dbuscallmanager.adaptor.h: cx.ring.Ring.CallManager.xml Makefile.am dbusxx-xml2cpp $< --adaptor=$@ -# Rule to generate the binding headers dbusconfigurationmanager.adaptor.h: cx.ring.Ring.ConfigurationManager.xml Makefile.am dbusxx-xml2cpp $< --adaptor=$@ -# Rule to generate the binding headers dbusinstance.adaptor.h: cx.ring.Ring.Instance.xml Makefile.am dbusxx-xml2cpp $< --adaptor=$@ +dbusdatatransfer.adaptor.h: cx.ring.Ring.DataTransfer.xml Makefile.am + dbusxx-xml2cpp $< --adaptor=$@ + libclient_dbus_la_SOURCES = \ dbuscallmanager.cpp \ dbuscallmanager.h \ @@ -41,6 +43,8 @@ libclient_dbus_la_SOURCES = \ dbus_cpp.h \ dbuspresencemanager.cpp \ dbuspresencemanager.h \ + dbusdatatransfer.cpp \ + dbusdatatransfer.h \ $(BUILT_SOURCES) if RING_VIDEO @@ -64,7 +68,8 @@ interfacedir = $(datadir)/dbus-1/interfaces interface_DATA = cx.ring.Ring.CallManager.xml \ cx.ring.Ring.ConfigurationManager.xml \ cx.ring.Ring.Instance.xml \ - cx.ring.Ring.PresenceManager.xml + cx.ring.Ring.PresenceManager.xml \ + cx.ring.Ring.DataTransfer.xml if RING_VIDEO interface_DATA += cx.ring.Ring.VideoManager.xml diff --git a/bin/dbus/cx.ring.Ring.DataTransfer.xml b/bin/dbus/cx.ring.Ring.DataTransfer.xml new file mode 100644 index 0000000000000000000000000000000000000000..2369db11b784a3263a100c9b1406537d48888d29 --- /dev/null +++ b/bin/dbus/cx.ring.Ring.DataTransfer.xml @@ -0,0 +1,176 @@ +<?xml version="1.0" ?> +<node name="/datatransfer-introspec" xmlns:tp="http://telepathy.freedesktop.org/wiki/DbusSpec#extensions-v0"> + <interface name="cx.ring.Ring.DataTransfer"> + + <tp:docstring xmlns="http://www.w3.org/1999/xhtml"> + Used to handle the data transfer API. + </tp:docstring> + + <method name="sendFile" tp:name-for-bindings="sendFile"> + <tp:added version="2.3.0"/> + <arg type="s" name="accountId" direction="in"> + <tp:docstring> + An account ID. + </tp:docstring> + </arg> + <arg type="s" name="peerUri" direction="in"> + <tp:docstring> + Destination URI. + </tp:docstring> + </arg> + <arg type="s" name="pathname" direction="in"> + <tp:docstring> + Fully qualified pathname on file to send. + </tp:docstring> + </arg> + <arg type="s" name="name" direction="in"> + <tp:docstring> + A name given to the peer. + </tp:docstring> + </arg> + <arg type="t" name="dataTransferId" direction="out"> + <tp:docstring> + Returns a DataTransferId. + </tp:docstring> + </arg> + </method> + + <signal name="dataConnectionStatus" tp:name-for-bindings="dataConnectionStatus"> + <tp:added version="2.3.0"/> + <tp:docstring> + Notify clients that a data connection has changed + </tp:docstring> + <arg type="t" name="connectionID"> + <tp:docstring> + A data connection id. + </tp:docstring> + </arg> + <arg type="i" name="code"> + <tp:docstring> + A DRing::DataTransferCode value. + </tp:docstring> + </arg> + </signal> + + <signal name="dataTransferStatus" tp:name-for-bindings="dataTransferStatus"> + <tp:added version="2.3.0"/> + <tp:docstring> + Notify clients that a data connection has changed + </tp:docstring> + <arg type="t" name="transferID"> + <tp:docstring> + A data transfer id. + </tp:docstring> + </arg> + <arg type="i" name="code"> + <tp:docstring> + A DRing::DataTransferCode value. + </tp:docstring> + </arg> + </signal> + + <method name="connectToPeer" tp:name-for-bindings="connectToPeer"> + <tp:added version="2.3.0"/> + <arg type="s" name="accountId" direction="in"> + <tp:docstring> + An account ID. + </tp:docstring> + </arg> + <arg type="s" name="peerUri" direction="in"> + <tp:docstring> + Destination URI. + </tp:docstring> + </arg> + <arg type="t" name="dataConnectionId" direction="out"> + <tp:docstring> + Returns a DataConnectionId. + </tp:docstring> + </arg> + </method> + + <method name="getDataConnectionInfo" tp:name-for-bindings="getDataConnectionInfo"> + <tp:added version="2.3.0"/> + <arg type="t" name="dataConnectionId" direction="in"> + <tp:docstring> + A DataConnectionId. + </tp:docstring> + </arg> + <annotation name="org.qtproject.QtDBus.QtTypeName.Out0" value="MapStringString"/> + <arg type="a{ss}" name="dataConnectionInfo" direction="out" tp:type="String_String_Map"> + <tp:docstring> + The available keys / parameters are: + <ul> + <li>Account: account used for transfer</li> + <li>Peer: remote identification</li> + <li>Code: latest status code (set by DataConnectionStatus signal change)</li> + </ul> + </tp:docstring> + </arg> + </method> + + <method name="closeDataConnection" tp:name-for-bindings="closeDataConnection"> + <tp:added version="2.3.0"/> + <arg type="t" name="dataConnectionId" direction="in"> + <tp:docstring> + A DataConnectionId. + </tp:docstring> + </arg> + <arg type="b" name="result" direction="out"/> + </method> + + <method name="cancelDataTransfer" tp:name-for-bindings="cancelDataTransfer"> + <tp:added version="2.3.0"/> + <arg type="t" name="dataTransferId" direction="in"> + <tp:docstring> + A DataTransferId. + </tp:docstring> + </arg> + <arg type="b" name="result" direction="out"/> + </method> + + <method name="getDataTransferSentBytes" tp:name-for-bindings="getDataTransferSentBytes"> + <tp:added version="2.3.0"/> + <arg type="t" name="dataTransferId" direction="in"> + <tp:docstring> + A DataTransferId. + </tp:docstring> + </arg> + <arg type="x" name="size" direction="out"/> + </method> + + <method name="getDataTransferInfo" tp:name-for-bindings="getDataTransferInfo"> + <tp:added version="2.3.0"/> + <arg type="t" name="dataTransferId" direction="in"> + <tp:docstring> + A DataTransferId. + </tp:docstring> + </arg> + <annotation name="org.qtproject.QtDBus.QtTypeName.Out0" value="MapStringString"/> + <arg type="a{ss}" name="dataTransferInfo" direction="out" tp:type="String_String_Map"> + <tp:docstring> + The available keys / parameters are: + <ul> + <li>ConnectionID: used connection id</li> + <li>Name: public name sent to peer, (e.g.: a filename for file transfer)</li> + <li>Size: total size to transfer, -1 if unknown</li> + <li>Code: latest status code (set by DataTransferStatus signal change)</li> + </ul> + </tp:docstring> + </arg> + </method> + + <method name="acceptFileTransfer" tp:name-for-bindings="acceptFileTransfer"> + <tp:added version="2.3.0"/> + <arg type="t" name="dataTransferId" direction="in"> + <tp:docstring> + A DataTransferId. + </tp:docstring> + </arg> + <arg type="s" name="pathname" direction="in"> + <tp:docstring> + File pathname (where to save the file). + </tp:docstring> + </arg> + </method> + </interface> +</node> diff --git a/bin/dbus/dbusclient.cpp b/bin/dbus/dbusclient.cpp index f049f455dbbf7a5010a5bc4d934b7250e189e68f..e1bfa261d076cb77e2723c6c6c7dbda8f2ae3d65 100644 --- a/bin/dbus/dbusclient.cpp +++ b/bin/dbus/dbusclient.cpp @@ -42,6 +42,9 @@ #include "dbuspresencemanager.h" #include "presencemanager_interface.h" +#include "dbusdatatransfer.h" +#include "datatransfer_interface.h" + #ifdef RING_VIDEO #include "dbusvideomanager.h" #include "videomanager_interface.h" @@ -88,6 +91,8 @@ DBusClient::DBusClient(int flags, bool persistent) onNoMoreClientFunc = [this] {this->exit();}; instanceManager_.reset(new DBusInstance {sessionConnection, onNoMoreClientFunc}); + dataXfer_.reset(new DBusDataTransfer {sessionConnection}); + #ifdef RING_VIDEO videoManager_.reset(new DBusVideoManager {sessionConnection}); @@ -114,6 +119,7 @@ DBusClient::~DBusClient() presenceManager_.reset(); configurationManager_.reset(); callManager_.reset(); + dataXfer_.reset(); timeout_.reset(); } @@ -128,12 +134,14 @@ DBusClient::initLibrary(int flags) using DRing::ConfigurationSignal; using DRing::PresenceSignal; using DRing::AudioSignal; + using DRing::DataTransferSignal; using SharedCallback = std::shared_ptr<DRing::CallbackWrapperBase>; auto callM = callManager_.get(); auto confM = configurationManager_.get(); auto presM = presenceManager_.get(); + auto dxferM = dataXfer_.get(); #ifdef RING_VIDEO using DRing::VideoSignal; @@ -198,6 +206,12 @@ DBusClient::initLibrary(int flags) exportable_callback<AudioSignal::DeviceEvent>(bind(&DBusConfigurationManager::audioDeviceEvent, confM)), }; + // Data transfer handlers + const std::map<std::string, SharedCallback> dataXferEvHandlers = { + exportable_callback<DataTransferSignal::DataConnectionStatus>(bind(&DBusDataTransfer::dataConnectionStatus, dxferM, _1, _2)), + exportable_callback<DataTransferSignal::DataTransferStatus>(bind(&DBusDataTransfer::dataTransferStatus, dxferM, _1, _2)), + }; + #ifdef RING_VIDEO // Video event handlers const std::map<std::string, SharedCallback> videoEvHandlers = { @@ -214,6 +228,7 @@ DBusClient::initLibrary(int flags) registerConfHandlers(configEvHandlers); registerPresHandlers(presEvHandlers); registerPresHandlers(audioEvHandlers); + registerDataXferHandlers(dataXferEvHandlers); #ifdef RING_VIDEO registerVideoHandlers(videoEvHandlers); #endif diff --git a/bin/dbus/dbusclient.h b/bin/dbus/dbusclient.h index da2bd602a3abdd54e6f28d5ee362fb2a3adefe78..f8608b87217df046054da0aa5746f220ae41b183 100644 --- a/bin/dbus/dbusclient.h +++ b/bin/dbus/dbusclient.h @@ -33,6 +33,7 @@ class DBusCallManager; class DBusNetworkManager; class DBusInstance; class DBusPresenceManager; +class DBusDataTransfer; #ifdef RING_VIDEO class DBusVideoManager; @@ -62,6 +63,7 @@ class DBusClient { std::unique_ptr<DBusConfigurationManager> configurationManager_; std::unique_ptr<DBusPresenceManager> presenceManager_; std::unique_ptr<DBusInstance> instanceManager_; + std::unique_ptr<DBusDataTransfer> dataXfer_; #ifdef RING_VIDEO std::unique_ptr<DBusVideoManager> videoManager_; diff --git a/bin/dbus/dbusdatatransfer.cpp b/bin/dbus/dbusdatatransfer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6666f6c3235dd49369558d5877a9ef108baa155b --- /dev/null +++ b/bin/dbus/dbusdatatransfer.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "dbusdatatransfer.h" +#include "datatransfer_interface.h" +#include "string_utils.h" + +DBusDataTransfer::DBusDataTransfer(DBus::Connection& connection) + : DBus::ObjectAdaptor(connection, "/cx/ring/Ring/DataTransfer") +{} + +auto +DBusDataTransfer::connectToPeer(const std::string& accountId, const std::string& peerUri) -> decltype(DRing::connectToPeer(accountId, peerUri)) +{ + return DRing::connectToPeer(accountId, peerUri); +} + +std::map<std::string, std::string> +DBusDataTransfer::getDataConnectionInfo(const uint64_t& dataConnectionId) +{ + DRing::DataConnectionInfo info; + if (DRing::dataConnectionInfo(dataConnectionId, info)) { + // serialize the struct + return { + {DRing::DataTransfer::ACCOUNT, info.account}, + {DRing::DataTransfer::PEER, info.peer}, + {DRing::DataTransfer::CODE, ring::to_string(info.code)}, + }; + } else { + return {}; + } +} + +auto +DBusDataTransfer::closeDataConnection(const uint64_t& dataConnectionId) -> decltype(DRing::closeDataConnection(dataConnectionId)) +{ + return DRing::closeDataConnection(dataConnectionId); +} + +auto +DBusDataTransfer::cancelDataTransfer(const uint64_t& dataTransferId) -> decltype(DRing::cancelDataTransfer(dataTransferId)) +{ + return DRing::cancelDataTransfer(dataTransferId); +} + +int64_t +DBusDataTransfer::getDataTransferSentBytes(const uint64_t& dataTransferId) +{ + return DRing::dataTransferSentBytes(dataTransferId); +} + +std::map<std::string, std::string> +DBusDataTransfer::getDataTransferInfo(const uint64_t& dataTransferId) +{ + DRing::DataTransferInfo info; + if (DRing::dataTransferInfo(dataTransferId, info)) { + // serialize the struct + return { + {DRing::DataTransfer::CONNECTION_ID, ring::to_string(info.connectionId)}, + {DRing::DataTransfer::NAME, info.name}, + {DRing::DataTransfer::SIZE, ring::to_string(info.size)}, + {DRing::DataTransfer::CODE, ring::to_string(info.code)}, + }; + } else { + return {}; + } +} + +auto +DBusDataTransfer::sendFile(const std::string& accountID, + const std::string& peerUri, + const std::string& pathname, + const std::string& name) -> decltype(DRing::sendFile(accountID, peerUri, pathname, name)) +{ + return DRing::sendFile(accountID, peerUri, pathname, name); +} + +void +DBusDataTransfer::acceptFileTransfer(const uint64_t& dataTransferId, const std::string& pathname) +{ + DRing::acceptFileTransfer(dataTransferId, pathname); +} diff --git a/bin/dbus/dbusdatatransfer.h b/bin/dbus/dbusdatatransfer.h new file mode 100644 index 0000000000000000000000000000000000000000..6dbbd8f4ab93e365d8d1e75737e55b6dc2b10600 --- /dev/null +++ b/bin/dbus/dbusdatatransfer.h @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef __RING_DBUSDATATRANSFER_H__ +#define __RING_DBUSDATATRANSFER_H__ + +#include <string> +#include <map> + +#include "dbus_cpp.h" + +#if __GNUC__ >= 5 || (__GNUC__ >=4 && __GNUC_MINOR__ >= 6) +/* This warning option only exists for gcc 4.6.0 and greater. */ +#pragma GCC diagnostic ignored "-Wunused-but-set-variable" +#endif + +#pragma GCC diagnostic ignored "-Wignored-qualifiers" +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include "dbusdatatransfer.adaptor.h" +#pragma GCC diagnostic warning "-Wignored-qualifiers" +#pragma GCC diagnostic warning "-Wunused-parameter" + +#if __GNUC__ >= 5 || (__GNUC__ >=4 && __GNUC_MINOR__ >= 6) +/* This warning option only exists for gcc 4.6.0 and greater. */ +#pragma GCC diagnostic warning "-Wunused-but-set-variable" +#endif + +class DBusDataTransfer : + public cx::ring::Ring::DataTransfer_adaptor, + public DBus::IntrospectableAdaptor, + public DBus::ObjectAdaptor +{ +public: + DBusDataTransfer(DBus::Connection& connection); + + // Methods + // These wrap the datatransfer_interface.h API in order to (de)serialize the data structures + uint64_t connectToPeer(const std::string& accountId, const std::string& peerUri); + std::map<std::string, std::string> getDataConnectionInfo(const uint64_t& dataConnectionId); + bool closeDataConnection(const uint64_t& dataConnectionId); + + bool cancelDataTransfer(const uint64_t& dataTransferId); + int64_t getDataTransferSentBytes(const uint64_t& dataTransferId); + std::map<std::string, std::string> getDataTransferInfo(const uint64_t& dataTransferId); + + uint64_t sendFile(const std::string& accountID, const std::string& peerUri, + const std::string& pathname, const std::string& name); + void acceptFileTransfer(const uint64_t& dataTransferId, const std::string& pathname); + +}; + +#endif // __RING_DBUSDATATRANSFER_H__ diff --git a/src/Makefile.am b/src/Makefile.am index c26886ee551377344d812ffa29768a204c958493..4bd27107e31c5136638403fc3d15ddbf52f20aa4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -100,6 +100,8 @@ libring_la_SOURCES = \ ice_transport.h \ plugin_manager.cpp \ plugin_loader_dl.cpp \ + data_transfer.cpp \ + data_transfer.h \ ring_plugin.h \ plugin_loader.h \ plugin_manager.h \ @@ -139,6 +141,7 @@ nobase_include_HEADERS= dring/dring.h \ dring/callmanager_interface.h \ dring/configurationmanager_interface.h \ dring/presencemanager_interface.h \ + dring/datatransfer_interface.h \ dring/account_const.h \ dring/call_const.h \ dring/media_const.h diff --git a/src/account.h b/src/account.h index 9453511be157438455e9b6794f5fcd622bef2ce5..da2730f7da796c0c758522a06f053fb6b708707c 100644 --- a/src/account.h +++ b/src/account.h @@ -34,6 +34,7 @@ #include "ip_utils.h" #include "media_codec.h" #include "logger.h" +#include "datatransfer_interface.h" #include "compiler_intrinsics.h" // UNUSED #include <functional> @@ -87,6 +88,8 @@ class Account : public Serializable, public std::enable_shared_from_this<Account */ virtual ~Account(); + std::weak_ptr<Account> weak_from_this() { return shared_from_this(); } + /** * Free all ressources related to this account. * ***Current calls using this account are HANG-UP*** @@ -301,6 +304,12 @@ class Account : public Serializable, public std::enable_shared_from_this<Account */ virtual void connectivityChanged() {}; + virtual DRing::DataTransferId sendFile(const std::string& /*peer_uri*/, + const std::string& /*pathname*/, + const std::string& /*name*/) { + return {}; // feature not supported by default + } + private: NON_COPYABLE(Account); diff --git a/src/account_factory.cpp b/src/account_factory.cpp index 51233721f9120183d421930a7fcab590a6de321e..738c006fbadc0e06197a5f88b2edfa50db9714ef 100644 --- a/src/account_factory.cpp +++ b/src/account_factory.cpp @@ -115,6 +115,19 @@ template <> void AccountFactory::clear() { std::lock_guard<std::recursive_mutex> lk(mutex_); + for (auto& map_item : accountMaps_) { + for (auto& item : map_item.second) { + auto id {item.second->getAccountID()}; + auto name {item.second->getAccountType()}; + RING_DBG("Account[%s]: %ld", id.c_str(), item.second.use_count()); + if (!item.second.unique()) { + std::ostringstream msg; + msg << "couldn't clear account " << name << "-" << id + << ", ownership remains"; + throw std::runtime_error(msg.str()); + } + } + } accountMaps_.clear(); } diff --git a/src/client/Makefile.am b/src/client/Makefile.am index cb26ade38f45011af62f5f278d1c7aa052e2bbf4..48906baa367044c241449b13e8f5cc21b037a145 100644 --- a/src/client/Makefile.am +++ b/src/client/Makefile.am @@ -16,6 +16,7 @@ libclient_la_SOURCES = \ ring_signal.cpp \ callmanager.cpp \ configurationmanager.cpp \ + datatransfer.cpp \ $(PRESENCE_SRC) \ $(VIDEO_SRC) diff --git a/src/client/datatransfer.cpp b/src/client/datatransfer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..acf41d8641fe8ee20fe61afff91b74d6a6b17443 --- /dev/null +++ b/src/client/datatransfer.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "datatransfer_interface.h" +#include "client/ring_signal.h" +#include "manager.h" +#include "data_transfer.h" + +#include "logger.h" + +namespace DRing { + +void +registerDataXferHandlers(const std::map<std::string, + std::shared_ptr<CallbackWrapperBase>>& handlers) +{ + auto& handlers_ = ring::getSignalHandlers(); + for (auto& item : handlers) { + auto iter = handlers_.find(item.first); + if (iter == handlers_.end()) { + RING_ERR("Signal %s not supported", item.first.c_str()); + continue; + } + + iter->second = std::move(item.second); + } +} + +DataConnectionId +connectToPeer(const std::string&, const std::string&) +{ + return {}; +} + +bool +dataConnectionInfo(const DataConnectionId&, const DataConnectionInfo&) +{ + return false; +} + +bool +closeDataConnection(const DataConnectionId&) +{ + return false; +} + +DataTransferId +sendFile(const std::string& accountID, + const std::string& peerUri, + const std::string& pathname, + const std::string& name) +{ + return ring::Manager::instance().sendFile(accountID, peerUri, pathname, name); +} + +bool +dataTransferInfo(const DataTransferId&, DataTransferInfo&) +{ + //if (auto dt = ring::DataTransfer::getDataTransfer(tid)) { + // dt->getInfo(info); + // return true; + //} + return false; +} + +std::streamsize +dataTransferSentBytes(const DataTransferId&) +{ + //if (auto dt = ring::DataTransfer::getDataTransfer(tid)) + // return dt->getCount(); + return 0; +} + +bool +cancelDataTransfer(const DataTransferId&) +{ + return false; +} + +void +acceptFileTransfer(const DataTransferId&, const std::string&) +{ + //ring::acceptFileTransfer(id, pathname); +} + +} // namespace DRing diff --git a/src/client/ring_signal.cpp b/src/client/ring_signal.cpp index d8f91e9dce638abdc473b42e63301b88a98347e4..32b84e15e784d01c3e5ebbbcddcf10465383433e 100644 --- a/src/client/ring_signal.cpp +++ b/src/client/ring_signal.cpp @@ -80,6 +80,10 @@ getSignalHandlers() exported_callback<DRing::PresenceSignal::NewBuddyNotification>(), exported_callback<DRing::PresenceSignal::SubscriptionStateChanged>(), + /* Data transfer */ + exported_callback<DRing::DataTransferSignal::DataConnectionStatus>(), + exported_callback<DRing::DataTransferSignal::DataTransferStatus>(), + /* Audio */ exported_callback<DRing::AudioSignal::DeviceEvent>(), diff --git a/src/client/ring_signal.h b/src/client/ring_signal.h index 152f823b3ec0ed7fb1dc547d1bffbf7fcd98d9ab..7ed823c780d9e5554d003c5dd9d180c15c1317fb 100644 --- a/src/client/ring_signal.h +++ b/src/client/ring_signal.h @@ -27,6 +27,7 @@ #include "callmanager_interface.h" #include "configurationmanager_interface.h" #include "presencemanager_interface.h" +#include "datatransfer_interface.h" #ifdef RING_VIDEO #include "videomanager_interface.h" diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8992a38cdeb8f50b77c6c0287580fe4627a9ca3c --- /dev/null +++ b/src/data_transfer.cpp @@ -0,0 +1,2175 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "data_transfer.h" +#include "manager.h" +#include "client/ring_signal.h" +#include "string_utils.h" +#include "security/tls_session.h" +#include "dring/datatransfer_interface.h" + +#include <random> +#include <algorithm> +#include <exception> +#include <queue> +#include <deque> +#include <bitset> +#include <cmath> +#include <cerrno> +#include <iomanip> + +namespace ring { namespace ReliableSocket { + +//================================================================================================== +// Constants + +static constexpr unsigned MAX_FRAME_HEADER_BYTES = 1+1+4+8+2; // Maximal size in bytes of a frame header +static constexpr unsigned MINIMAL_INIT_PAYLOAD_SIZE = 3 * 4; // Minimal number of data inside an INIT flagged packet +static constexpr unsigned TX_BUFFER_SIZE = 10; // Maximal number of packet waiting for transmission +static constexpr unsigned MAX_ACK_CT = 15; // Channel packet format uses 4 bits to encode AckCt +static constexpr unsigned MIN_UNACKED_PACKET = 2; // Minimal number of delayed ACK +static constexpr unsigned MAX_UNACKED_PACKET = 4; // Maximal number of ACK packet before sending an ACK +static constexpr auto ACK_DELAY = std::chrono::milliseconds(10); // ACK after this delay if unacked ACK exist +static constexpr auto HEARTBEAT_DELAY = std::chrono::seconds(60); // Heatbeat packets pacing +static constexpr auto MAX_RTT = std::chrono::seconds(10); // Limit RTT computation to this value +static constexpr auto MAX_TX_DELAY = std::chrono::milliseconds(100); // Maximum transmission pacing +static constexpr auto STATISTIC_DELAY = std::chrono::seconds(2); // Statistics logging pacing +static constexpr auto BASE_RTT = std::chrono::milliseconds(500); // RTT computation start + +//================================================================================================== +// Various helpers + +template<typename... Args> +inline void +emitDataXferStatus(Args... args) +{ + emitSignal<DRing::DataTransferSignal::DataTransferStatus>(args...); +} + +template <class... Args> +static auto cast_to_s(const Args&... args) + -> decltype(std::chrono::duration_cast<std::chrono::seconds>(args...)) +{ + return std::chrono::duration_cast<std::chrono::seconds>(args...); +} + +template <class... Args> +static auto cast_to_ms(const Args&... args) + -> decltype(std::chrono::duration_cast<std::chrono::milliseconds>(args...)) +{ + return std::chrono::duration_cast<std::chrono::milliseconds>(args...); +} + +template <class... Args> +static auto cast_to_us(const Args&... args) + -> decltype(std::chrono::duration_cast<std::chrono::microseconds>(args...)) +{ + return std::chrono::duration_cast<std::chrono::microseconds>(args...); +} + +template <class T> +inline void bswap32(T* mem, std::size_t len) +{ + auto mem32 = reinterpret_cast<uint32_t*>(mem); + // Notes: benchmarks show using a lambda gives better performances than a direct call to htonl + std::transform(mem32, mem32 + len/4, mem32, [](uint32_t x){ return htonl(x); }); +} + +template <typename Queue, typename Clock> +struct TimedJob +{ + using TimePoint = typename Clock::time_point; + using Callable = std::function<void(TimedJob<Queue, Clock>& self)>; + TimedJob(Queue* owner, const TimedJob::TimePoint& when, const TimedJob::Callable& func) + : owner(owner), callTime(when), callback(func) {} + + TimedJob(Queue* owner, const TimedJob::TimePoint& when, TimedJob::Callable&& func) + : owner(owner), callTime(when), callback(std::move(func)) {} + + template <typename T> + void reschedule(const T& delay) { owner->addTimedJob(delay, std::move(*this)); } + void reschedule(const TimePoint& when) { owner->addTimedJob(when, std::move(*this)); } + Queue* owner {nullptr}; + TimePoint callTime {}; + Callable callback {}; +}; + +template <typename Queue, typename Clock> +struct SortTimedJob +{ + constexpr bool operator()(const TimedJob<Queue, Clock>& lhs, + const TimedJob<Queue, Clock>& rhs) const { + return lhs.callTime > rhs.callTime; + } +}; + +template <class T> +constexpr const T& +clamp(const T& low, const T& x, const T& high) +{ + return std::max(low, std::min(x, high)); +} + +template <class T> +unsigned long +usdelay(const T& start) +{ + return cast_to_us(T::clock::now() - start).count(); +} + +//================================================================================================== + +namespace Packet +{ +// Wire format: all words are given in network order (bigendian). +// +// The protocol has been developped using some concepts from following existing works: +// UDT: relations between concepts +// SST: monotonic TSR, ACK per packet +// QUIC: variable length fields, protocol version, multi-frame per packet +// +// This protocol doesn't add any cryptocraphic layer, it is designed to be overlayed +// on top of a secure transport like DTLS, dedicated to this purpose. +// The goal is to keep fields simple, easy to parse (related fields are 32-bits word based). +// +// Channel packet format: +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Flags | TSN = Transmittion Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | AckCt | ASN = Acknowledgment Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | | +// ~ Channel payload ~ +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// +// Flags: +// bit 0: if set, this is an initialization packet (see initialization payload) +// bit 1: channel payload contains frames (after any others special payloads, like init packet) +// bit 2-3: reserved and must be set to 0 +// +// TSN: +// Transmittion Sequence Number +// Monotonicaly increased for each packet transmitted. +// This is the first 28-bits of the 64-bits peer TSN. +// +// AckCt: +// Number of contiguous acknowledged sequence number immediately before the following ASN. +// +// ASN: +// Acknowledgment Sequence Number +// Gives the highest sequence number received so far. +// +// +// +// Initialization packet payload format (client, flags 0 is set): +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Channel Id | +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Protocol Version Tag | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// +// Channel Id: +// The unique identifiant of the communication channel +// +// Protocol Version Tag: +// An opaque value representing the client's protocol requested +// +// +// +// Initialization packet payload format (server, flags 0 is set): +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Channel Id | +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Protocol Version Tag #1 | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Protocol Version Tag #2 | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | | +// ~ ... ~ +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Protocol Version Tag #n | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// +// Channel Id: +// The unique identifiant of the communication channel +// +// Protocol Version Tag #n: +// An opaque value representing the server's protocol supported +// +// +// Heartbeat packets: +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |0 0 0 0| TSN = Transmittion Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | AckCt | ASN = Acknowledgment Sequence Number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// +// These packets use the common channel header without any payloads and all flags bits set to 0. +// This implies they can be sent ONLY if protocol version is negotiated. +// TSN, AckCt and ASN are processed normally. +// +// +// Generic frame format (channel flags 1 is set): +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Flags | type-dependent data follow ... | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// +// Flags: +// bit 0: set if its a STREAM frame type, see STREAM frame for remaining bits, not set for +// control frame +// +// +// Stream frame format: +// +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |1|x d o o o s s| ExtraFlags | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | SID (8, 16, 24 or 32 bits) variable length ~ | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Offset (0, 16, 24, 32, 40, 48, 56 or 64 bits) variable length ~ | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | DataLength (0 or 16 bits) variable length ~ | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Data, variable length (not word aligned) ~ | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// +// Flags: +// bit x: set if ExtraFlags presents (8 bits) +// bits ooo: offset field size +// bit d: set if data lenght field present (16 bits) +// bits ss: SID (StreamId) field size +// +// ExtraFlags: +// An optional 8-bits extra flags, only present if flag x is set. +// These bits are defined as follow: +// +// 0 1 2 3 4 5 6 7 +// +-+-+-+-+-+-+-+-+ +// |F P M r r r r r| +// +-+-+-+-+-+-+-+-+ +// +// bit F: end of stream, set on last bytes for this stream +// bit P: set if data must be pushed to application as soon as possible +// bit M: this block ends a message +// bits r: reserved, must be set to 0 +// +// SID: +// Stream Identifier, 0 is reserved and not used in packets +// +// Offset: +// Byte offset for data inside this frame +// +// DataLength: +// if d bit is set, represent the number of data bytes +// +// + +enum ChannelFlags { + // We count bit for LSB to MSB (!= of upper schematics) + INIT = 3, + FRAME = 2, + RSV1 = 1, + RSV2 = 0, +}; + +enum FrameFlags { + FRAMEF_STREAM = 0x80, + FRAMEF_EXTRA = 0x40, + FRAMEF_DATA = 0x20, +}; + +enum FrameExtraFlags { + FRAMEXF_FIN = 0x80, + FRAMEXF_PUSH = 0x40, + FRAMEXF_EOM = 0x20, +}; + +template <typename U=uint8_t, typename T> +inline U* +getChannelPayload(std::vector<T>& vec) +{ + return reinterpret_cast<U*>(reinterpret_cast<uint8_t*>(vec.data()) + PACKET_HEADER_BYTES); +} + +template <typename U=uint8_t, typename T> +inline const U* +getChannelPayload(const std::vector<T>& vec) +{ + return reinterpret_cast<const U*>(reinterpret_cast<const uint8_t*>(vec.data()) + PACKET_HEADER_BYTES); +} + +template <typename U=uint8_t, typename T> +inline unsigned +getChannelPayloadSize(const std::vector<T>& vec) +{ + return (vec.size() * sizeof(T) - PACKET_HEADER_BYTES) / sizeof(U); +} + +template <typename T> +inline std::bitset<4> +getChannelFlags(const std::vector<T>& vec) +{ + return reinterpret_cast<const uint8_t*>(vec.data())[0] >> 4; +} + +template <typename T> +inline void +setChannelFlags(std::vector<T>& vec, std::bitset<4> flags) +{ + static const uint32_t mask = htonl(0xF0000000); + auto p = &reinterpret_cast<uint32_t*>(vec.data())[0]; + *p = (*p & ~mask) | htonl(flags.to_ulong() << 28); +} + +template <typename T> +inline void +setChannelFlag(std::vector<T>& vec, Packet::ChannelFlags bit) +{ + const uint32_t mask = htonl((1u << (28 + bit)) & 0xF0000000); + auto p = &reinterpret_cast<uint32_t*>(vec.data())[0]; + *p = (*p & ~mask) | mask; +} + +template <typename T> +inline uint32_t +getTsn(const std::vector<T>& vec) +{ + return htonl(reinterpret_cast<const uint32_t*>(vec.data())[0]) & 0x0FFFFFFF; +} + +template <typename T> +inline void +setTsn(std::vector<T>& vec, uint32_t tsn) +{ + static const uint32_t mask = htonl(0x0FFFFFFF); + auto p = &reinterpret_cast<uint32_t*>(vec.data())[0]; + *p = (*p & ~mask) | (htonl(tsn) & mask); +} + +template <typename T> +inline void +getAsnAckCt(const std::vector<T>& vec, uint32_t& asn, uint8_t& ackct) +{ + auto v = htonl(reinterpret_cast<const uint32_t*>(vec.data())[1]); + asn = v & 0x0FFFFFFF; + ackct = v >> 28; +} + +template <typename T> +inline void +setAsnAckCnt(std::vector<T>& vec, uint32_t asn, uint32_t ackcnt) +{ + reinterpret_cast<uint32_t*>(vec.data())[1] = htonl((asn & 0x0fffffff) + ((ackcnt & 0xf) << 28)); +} + +template <typename T> +inline uint8_t* +initChannel(std::vector<T>& vec, std::size_t payload_size=0, const void* payload=nullptr) +{ + vec.resize(PACKET_HEADER_BYTES + payload_size); + auto pl_begin = reinterpret_cast<uint8_t*>(vec.data()) + PACKET_HEADER_BYTES; + if (payload) + std::copy_n(reinterpret_cast<const uint8_t*>(payload), payload_size, pl_begin); + return pl_begin; +} + +template <typename T> +inline void +initProto(std::vector<T>& vec, uint64_t id, uint32_t proto) +{ + std::array<uint32_t, 3> data; + data[0] = htonl(id >> 32); + data[1] = htonl(id & 0xFFFFFFFF); + data[2] = htonl(proto); + Packet::initChannel(vec, data.size() * 4, data.data()); + Packet::setChannelFlag(vec, ChannelFlags::INIT); +} + +template <typename T> +inline void +initHeartbeat(std::vector<T>& vec) +{ + Packet::initChannel(vec); +} + +template <typename T> +inline uint8_t* +firstFramePtr(std::vector<T>& vec) +{ + return reinterpret_cast<uint8_t*>(vec.data()) + PACKET_HEADER_BYTES; +} + +// Unpack a frame segment by reading StreamId, Offset and DataLength fields. +// Return a pointer on first frame's payload byte, or first byte of next frame if no payload. +inline uint8_t* +unpackStreamFrame(uint8_t* frame, uint8_t& extra_flags, uint32_t& sid, uint64_t& offset, + uint16_t& data_length) +{ + const bool x = frame[0] & FRAMEF_EXTRA; + const bool d = frame[0] & FRAMEF_DATA; + const auto o = (frame[0] >> 2) & 0x7; + const auto s = frame[0] & 0x3; + + if (x) + extra_flags = *(++frame); + else + extra_flags = 0; + + // read Stream Id + sid = *(++frame); + if (s > 0) + sid = (sid << 8) + *(++frame); + if (s > 1) + sid = (sid << 8) + *(++frame); + if (s > 2) + sid = (sid << 8) + *(++frame); + + // read offset (if exists) + if (o > 0) { + offset = *(++frame); + offset = (offset << 8) + *(++frame); + if (o > 1) + offset = (offset << 8) + *(++frame); + if (o > 2) + offset = (offset << 8) + *(++frame); + if (o > 3) + offset = (offset << 8) + *(++frame); + if (o > 4) + offset = (offset << 8) + *(++frame); + if (o > 5) + offset = (offset << 8) + *(++frame); + if (o > 6) + offset = (offset << 8) + *(++frame); + } else + offset = 0; + + // bit 'd' set => DataLength field + if (d) { + data_length = *(++frame); + data_length = (data_length << 8) + *(++frame); + return ++frame; // first byte of payload + } + + data_length = 0; + return ++frame; // first byte of next frame +} + +unsigned +getStreamFrameHeaderSize(uint8_t& sid_size, uint8_t& off_size, uint8_t extra_flags, uint32_t sid, + uint64_t offset=0, uint16_t data_length=0) +{ + sid_size = (std::log2(sid) / 8) + 1; + unsigned frame_size = 1 + sid_size; + if (extra_flags) + ++frame_size; + if (data_length) + frame_size += 2; + if (offset) { + off_size = clamp(2, static_cast<int>(std::ceil(std::log2(offset) / 8.)), 8); + frame_size += off_size; + --off_size; + } else + off_size = 0; + return frame_size; +} + +inline uint8_t* +packStreamFrame(std::vector<uint8_t>& vec, uint8_t extra_flags, uint32_t sid, + uint64_t offset=0, uint16_t data_length=0, const uint8_t* data=nullptr) +{ + // Compute total frame size + uint8_t sid_size, off_size; + unsigned frame_size = getStreamFrameHeaderSize(sid_size, off_size, extra_flags, sid, + offset, data_length); + frame_size += data_length; + + // Init a stream frame packet + uint8_t* frame_ptr = Packet::initChannel(vec, frame_size); + Packet::setChannelFlag(vec, ChannelFlags::FRAME); + + // Fill frame bytes + frame_ptr[0] = FRAMEF_STREAM | (off_size << 2) | (sid_size - 1); + + if (data_length) + frame_ptr[0] |= FRAMEF_DATA; + + if (extra_flags) { + *(frame_ptr++) |= FRAMEF_EXTRA; + *(frame_ptr++) = extra_flags; + } else + ++frame_ptr; + + for (int i = sid_size-1; i >= 0; --i) + *(frame_ptr++) = (sid >> (i*8)) & 0xff; + + if (offset) { + for (int i = off_size; i >= 0; --i) + *(frame_ptr++) = (offset >> (i*8)) & 0xff; + } + + if (data_length) { + *(frame_ptr++) = data_length >> 8; + *(frame_ptr++) = data_length & 0xff; + } + + if (data) + return std::copy_n(data, data_length, frame_ptr); + + return frame_ptr; +} + +template <typename T> +inline void +dump(const std::vector<T>& vec) +{ + auto p = reinterpret_cast<const uint32_t*>(vec.data()); + for (std::size_t i=0; i < PACKET_HEADER_WORDS; ++i, ++p) + RING_DBG("[%zu] %08x", i, ntohl(*p)); + auto pk_size = vec.size() * sizeof(T); + if (pk_size > PACKET_HEADER_BYTES) + RING_DBG("Payload of %zu byte(s)", pk_size - PACKET_HEADER_BYTES); +} + +} // namespace ring::ReliableSocket::Packet + +//================================================================================================== + +struct TxPacket { + using Clock = std::chrono::high_resolution_clock; + std::vector<uint8_t> bytes {}; + DataConnection::SeqNum full_seq {0}; + DataStream* stream {nullptr}; + bool ack {false}; // true if receiver should acknowledge this packet + Clock::time_point time_tx; + Clock::time_point time_ack; +}; + +//================================================================================================== + +class TxFrameBuffer +{ +public: + TxFrameBuffer(unsigned size) : maxPacket_(size) {} + + bool push(std::shared_ptr<TxPacket> pkt); + std::shared_ptr<TxPacket> pop(); + void flush(); + std::size_t available(); + bool empty() const; + +private: + unsigned maxPacket_; + std::queue<std::shared_ptr<TxPacket>> outQ_ {}; // packets to transmit +}; + +bool +TxFrameBuffer::push(std::shared_ptr<TxPacket> pkt) +{ + if (outQ_.size() == maxPacket_) + return false; // full + outQ_.emplace(pkt); + return true; +} + +std::shared_ptr<TxPacket> +TxFrameBuffer::pop() +{ + if (outQ_.empty()) { + //RING_WARN("be"); + return {}; + } + auto pkt_ptr = outQ_.front(); + outQ_.pop(); + return pkt_ptr; +} + +void +TxFrameBuffer::flush() +{ + while (!outQ_.empty()) + outQ_.pop(); +} + +std::size_t +TxFrameBuffer::available() +{ + return maxPacket_ - outQ_.size(); +} + +bool +TxFrameBuffer::empty() const +{ + return outQ_.empty(); +} + +//================================================================================================== + +template <class Clock> +class TxScheduler +{ +public: + using Duration = typename Clock::duration; + using TimePoint = typename Clock::time_point; + + TxScheduler(DataConnection& parent); + ~TxScheduler(); + + void update(const TimePoint& ts, bool reshedule = false); + void update(bool reshedule = false); // update now (force popPacket_ call as soon as possible) + +private: + void threadJob(); + + DataConnection& dc_; + std::mutex jobMutex_ {}; + std::condition_variable jobCv_ {}; + bool scheduled_ {false}; // true when we need to call popPacket_ at popPacketTime_ time + TimePoint popPacketTime_ {}; + std::atomic_bool running_ {true}; + std::thread thread_ {}; +}; + +template <class Clock> +TxScheduler<Clock>::TxScheduler(DataConnection& parent) + : dc_(parent) + , popPacketTime_(Clock::now()) + , thread_(&TxScheduler::threadJob, this) +{} + +template <class Clock> +TxScheduler<Clock>::~TxScheduler() +{ + running_ = false; + jobCv_.notify_one(); + if (thread_.joinable()) + thread_.join(); +} + +template <class Clock> +void +TxScheduler<Clock>::update(const typename Clock::time_point& ts, bool reshedule) +{ + std::lock_guard<std::mutex> lk {jobMutex_}; + + if (scheduled_ and !reshedule) { + if (ts < popPacketTime_) + return; + } + + popPacketTime_ = ts; + scheduled_ = true; + jobCv_.notify_one(); +} + +template <class Clock> +void +TxScheduler<Clock>::update(bool reshedule) +{ + update(Clock::now(), reshedule); +} + +template <class Clock> +void +TxScheduler<Clock>::threadJob() +{ + std::unique_lock<std::mutex> lk {jobMutex_}; + + while (running_) { + // wait for scheduling + if (!scheduled_) + jobCv_.wait(lk, [this]{ return !running_ or scheduled_; }); + + // wait until schedule time is over or re-scheduling + do { + auto when = popPacketTime_; + if (!jobCv_.wait_until(lk, when, + [this, when] { return !running_ or popPacketTime_ < when; })) + break; + } while (running_); + + if (!running_) + return; + + TimePoint nextTime; + auto pkt = dc_.nextTxPacket(nextTime); + + if (pkt) { + lk.unlock(); + dc_.transmit(pkt); + lk.lock(); + popPacketTime_ = nextTime; + scheduled_ = true; + } else { + scheduled_ = false; + } + } +} + +//================================================================================================== + +#if 0 +class RxBuffer +{ +public: + RxBuffer(unsigned size) : size_(size) {} + + bool push(std::vector<uint8_t>&& pkt); + bool pop(std::vector<uint8_t>& pkt); + +private: + std::mutex pktMutex_ {}; + std::list<std::vector<uint8_t>> packets_ {}; + unsigned size_; + SeqNum::seqnum_t lastPopSeq_ {SeqNum::INVALID}; +}; + +bool +RxBuffer::push(std::vector<uint8_t>&& pkt) +{ + std::lock_guard<std::mutex> lk {pktMutex_}; + + // limit number of stored packets + if (!packets_.empty()) { + auto seq = Packet::getDataSeq(pkt); + if (SeqNum::length(Packet::getDataSeq(packets_.back()), seq) > size_) { + RING_ERR("rxbuf: full, wait for 0x%08x", SeqNum::increase(lastPopSeq_)); + return false; + } + } + + packets_.emplace_front(std::move(pkt)); + return true; +} + +bool +RxBuffer::pop(std::vector<uint8_t>& pkt) +{ + std::lock_guard<std::mutex> lk {pktMutex_}; + if (packets_.empty()) + return false; + + // first packet? + if (lastPopSeq_ == SeqNum::INVALID) { + pkt = std::move(packets_.back()); + lastPopSeq_ = Packet::getDataSeq(pkt); + packets_.pop_back(); + return true; + } + + // give packet in order + auto seq = Packet::getDataSeq(packets_.back()); + if (seq != SeqNum::increase(lastPopSeq_)) + return false; + + pkt = std::move(packets_.back()); + packets_.pop_back(); + lastPopSeq_ = seq; + return true; +} +#endif + +//================================================================================================== + +class RxQueue +{ +public: + using Clock = std::chrono::high_resolution_clock; + using TimePoint = Clock::time_point; + using Timer = TimedJob<RxQueue, Clock>; + + RxQueue(DataConnection& dc); + ~RxQueue(); + + bool push(std::vector<uint8_t>&& pkt); + void attachStream(std::shared_ptr<DataStream> stream); + void detachStream(std::shared_ptr<DataStream> stream); + +#if 0 + template <typename T> + void addTimedJob(const T& delay, const Timer::Callable& callback); + + template <typename T> + void addTimedJob(const T& delay, Timer::Callable&& callback); + + template <typename T> + void addTimedJob(const T& delay, Timer&& timer); + + void addTimedJob(const TimePoint& when, Timer&& timer); +#endif + +private: + static constexpr unsigned RX_QUEUE_SIZE {8192}; // unit = packet + void threadJob(); + + DataConnection& dc_; + + std::unordered_map<StreamId, std::shared_ptr<DataStream>> streamMap_ {}; + + std::mutex jobMutex_ {}; + std::condition_variable jobCv_ {}; + std::priority_queue<Timer, + std::vector<Timer>, + SortTimedJob<RxQueue, Clock>> timerQueue_; + std::deque<std::vector<uint8_t>> pktQueue_ {}; + + std::atomic_bool running_ {true}; + std::thread thread_ {}; +}; + +RxQueue::RxQueue(DataConnection& dc) + : dc_(dc) +{ + thread_ = std::thread(&RxQueue::threadJob, this); +} + +RxQueue::~RxQueue() +{ + running_ = false; + jobCv_.notify_one(); + if (thread_.joinable()) + thread_.join(); +} + +void +RxQueue::threadJob() +{ + while (running_) { + std::unique_lock<std::mutex> lk {jobMutex_}; + + jobCv_.wait(lk, [this]{ return !running_ or !pktQueue_.empty(); }); + +#if 0 + Clock::time_point when; + if (timerQueue_.empty()) + when = Clock::now() + std::chrono::seconds(60); + else + when = timerQueue_.top().callTime; + + if (!jobCv_.wait_until(lk, when, [this, when] { + return !running_ + or !rxQueue_.empty() + or (!timerQueue_.empty() and timerQueue_.top().callTime != when); + }) and !timerQueue_.empty()) { + // handle all exhausted timers, rescheduled timer are re-instered after + // the end of the loop to be handled at next thread loop + decltype(timerQueue_) tmp_timer_queue; + + std::swap(timerQueue_, tmp_timer_queue); + lk.unlock(); + auto now = Clock::now(); + while (!tmp_timer_queue.empty()) { + if (!running_) + return; + auto timer = std::move(tmp_timer_queue.top()); + when = timer.callTime; + if (when > now) + break; + timer.callback(timer); + tmp_timer_queue.pop(); + } + lk.lock(); + std::swap(timerQueue_, tmp_timer_queue); + + // Finaly insert new timers scheduled during the execution loop + while (!tmp_timer_queue.empty()) { + timerQueue_.emplace(std::move(tmp_timer_queue.top())); + tmp_timer_queue.pop(); + } + } +#endif + + // loop on received packets + decltype(pktQueue_) queue; + std::swap(pktQueue_, queue); + + // permit reception of new packets + lk.unlock(); + + for (auto& pkt : queue) { + if (!running_) + return; + + // loop on frame segments + // unpack each frame data and pass payload or event to the corresponding DataStream + auto frame = Packet::firstFramePtr(pkt); + const auto end_frame = pkt.data() + pkt.size(); + while (frame < end_frame) { + uint8_t extra_flags; + uint16_t data_length; + uint32_t sid; + uint64_t offset; + frame = Packet::unpackStreamFrame(frame, extra_flags, sid, offset, data_length); + const auto it = streamMap_.find(sid); + if (it != streamMap_.cend()) { + if (data_length) + it->second->onRxPayload(frame, data_length, offset); + if (extra_flags & 0x80) + it->second->onCloseStream(); + } + if (data_length) + frame += data_length; + break; + } + } + } +} + +void +RxQueue::attachStream(std::shared_ptr<DataStream> stream) +{ + std::lock_guard<std::mutex> lk {jobMutex_}; + streamMap_.emplace(stream->getId(), stream); +} + +void +RxQueue::detachStream(std::shared_ptr<DataStream> stream) +{ + std::lock_guard<std::mutex> lk {jobMutex_}; + streamMap_.erase(stream->getId()); +} + +bool +RxQueue::push(std::vector<uint8_t>&& pkt) +{ + std::lock_guard<std::mutex> lk {jobMutex_}; + if (pktQueue_.size() >= RX_QUEUE_SIZE) + return false; + + pktQueue_.emplace_back(std::move(pkt)); + jobCv_.notify_one(); + return true; +} + +#if 0 +template <typename T> +void +RxQueue::addTimedJob(const T& delay, const Timer::Callable& func) +{ + auto when = Clock::now() + delay; // do it out of locked area to not be delayed + std::lock_guard<std::mutex> lk {jobMutex_}; + timerQueue_.emplace(this, when, func); + jobCv_.notify_one(); +} + +template <typename T> +void +RxQueue::addTimedJob(const T& delay, Timer::Callable&& func) +{ + auto when = Clock::now() + delay; // do it out of locked area to not be delayed + std::lock_guard<std::mutex> lk {jobMutex_}; + timerQueue_.emplace(this, when, std::move(func)); + jobCv_.notify_one(); +} + +template <typename T> +void +RxQueue::addTimedJob(const T& delay, Timer&& timer) +{ + auto when = Clock::now() + delay; // do it out of locked area to not be delayed + std::lock_guard<std::mutex> lk {jobMutex_}; + timer.callTime = when; + timerQueue_.emplace(std::move(timer)); + jobCv_.notify_one(); +} + +void +RxQueue::addTimedJob(const TimePoint& when, Timer&& timer) +{ + std::lock_guard<std::mutex> lk {jobMutex_}; + timer.callTime = when; + timerQueue_.emplace(std::move(timer)); + jobCv_.notify_one(); +} +#endif + +//================================================================================================== + +void +CongestionControl::reset() +{ + cumRtt_ = BASE_RTT; + cumRttVar_ = std::chrono::microseconds(0); + cumPps_ = 0; + txBytes_ = rxBytes_ = rxLossPkt_ = rxBadPkt_ = rxOldPkt_ = rxDupPkt_ = 0; +} + +//================================================================================================== + +class DataConnectionFactory { +public: + std::shared_ptr<DataConnection> make(const std::string& account_id, + const std::string& peer_id, + bool is_client); + + std::shared_ptr<DataConnection> find(const DRing::DataConnectionId& cid) const; + +private: + mutable std::mutex mtx_ {}; + mutable std::map<DRing::DataConnectionId, std::weak_ptr<DataConnection>> map_; +}; + +std::shared_ptr<DataConnection> +DataConnectionFactory::make(const std::string& account_id, + const std::string& peer_id, + bool is_client) +{ + std::lock_guard<std::mutex> lk {mtx_}; + auto cnx = std::make_shared<DataConnection>(account_id, peer_id, is_client); + map_.emplace(cnx->getId(), cnx); + return cnx; +} + +std::shared_ptr<DataConnection> +DataConnectionFactory::find(const DRing::DataConnectionId& cid) const +{ + std::lock_guard<std::mutex> lk {mtx_}; + auto pair = map_.find(cid); + if (pair == map_.cend()) + return {}; + if (auto dc = pair->second.lock()) + return dc; + map_.erase(pair); + return {}; +} + +// Meyer Singleton +static DataConnectionFactory& +getDataConnectionFactory() +{ + static DataConnectionFactory ft; + return ft; +} + +//================================================================================================== + +std::shared_ptr<DataConnection> +makeDataConnection(const std::string& account_id, + const std::string& peer_id, + bool is_client) +{ + auto& ft = getDataConnectionFactory(); + return ft.make(account_id, peer_id, is_client); +} + +DataConnection::DataConnection(const std::string& account_id, const std::string& peer_id, + bool is_client) + : id_(genUUID()) + , isClient_(is_client) +{ + info_.account = account_id; + info_.peer = peer_id; + info_.isClient = is_client; + info_.code = DRing::DataConnectionCode::CODE_UNKNOWN; +} + +DataConnection::~DataConnection() +{ + disconnect(); +} + +DRing::DataConnectionId +DataConnection::genUUID() +{ + static DRing::DataConnectionId cid {0}; + return ++cid; +} + +void +DataConnection::getInfo(DRing::DataConnectionInfo& info) const +{ + std::lock_guard<std::mutex> lk {infoMutex_}; + info = info_; +} + +void +DataConnection::stopConnection() +{ + std::lock_guard<std::mutex> lk {txMtx_}; + txSeq_ = 0; + if (tls_) + tls_->shutdown(); +} + +void +DataConnection::connect(tls::TlsSession* tls) +{ + std::unique_lock<std::mutex> lk {txMtx_}; + + tls_ = tls; + maxPayload_ = tls_->getMaxPayload(); + RING_DBG("[FTP:%lx] maxPayload=%u", id_, maxPayload_); + + cc_.reset(); + + txQueue_.reset(new TxScheduler<Clock>(*this)); + txBuffer_.reset(new TxFrameBuffer(TX_BUFFER_SIZE)); + rxQueue_.reset(new RxQueue(*this)); + + lk.unlock(); + +#if 0 + if (isClient_) { + testLoop_ = std::unique_ptr<ThreadLoop>(new ThreadLoop( + []{ return true; }, + [this]{ return testAppSendData(); }, + []{})); + testLoop_->start(); + } else { + testLoop_ = std::unique_ptr<ThreadLoop>(new ThreadLoop( + []{ return true; }, + [this]{ return testAppRecvData(); }, + []{})); + testLoop_->start(); + } +#endif + + auto shared = sharedPtr(); + Manager::instance().scheduleTask([shared]{ shared->logStatistics(); }, STATISTIC_DELAY); + + { + std::lock_guard<std::mutex> lk {infoMutex_}; + if (info_.code < 200) + info_.code = DRing::DataConnectionCode::CODE_OK; + } +} + +void +DataConnection::disconnect() +{ + { + std::lock_guard<std::mutex> lk {infoMutex_}; + if (info_.code < 200) + info_.code = DRing::DataConnectionCode::CODE_DISCONNECTED; + } + + // stop asap test application (may own blocked streams) + if (testLoop_) + testLoop_->stop(); + + { + std::lock_guard<std::mutex> lk {txMtx_}; + txBuffer_.reset(); + rxQueue_.reset(); + } + + // release stream's references in ack waiting packet + { + std::lock_guard<std::mutex> lk {ackWaitMapMutex_}; + ackWaitMap_.clear(); + } + + ioCv_.notify_all(); + if (testLoop_) + testLoop_->join(); + + // stop Tx/Rx layer + { + std::lock_guard<std::mutex> lk {txMtx_}; + txQueue_.reset(); + } + + tls_ = nullptr; +} + +bool +DataConnection::disconnected() const +{ + std::lock_guard<std::mutex> lk {txMtx_}; + return !txBuffer_ or !rxQueue_; +} + +IpAddr +DataConnection::getRemoteAddress() const +{ + std::unique_lock<std::mutex> lk {txMtx_}; + if (tls_) + return tls_->getRemoteAddress(); + return {}; +} + +CongestionControl +DataConnection::getCC() const +{ + std::lock_guard<std::mutex> lk {txMtx_}; + return cc_; +} + +std::chrono::microseconds +DataConnection::getCumRtt() const +{ + std::lock_guard<std::mutex> lk {txMtx_}; + return cc_.cumRtt_; +} + +inline void +DataConnection::updateTxAckState(SeqNum seq, unsigned count) +{ + std::lock_guard<std::mutex> lk {ackMtx_}; + txAckSeq_ = seq; + txAckCt_ = count; +} + +inline void +DataConnection::getTxAckState(SeqNum& seq, unsigned& count) +{ + std::lock_guard<std::mutex> lk {ackMtx_}; + seq = txAckSeq_; + count = txAckCt_; +} + +std::shared_ptr<TxPacket> +DataConnection::nextTxPacket(TimePoint& when) +{ + auto packet_ts = Clock::now(); + std::shared_ptr<TxPacket> pkt; + + if (!protocol_) { + // non-negotiated server does nothing here + if (!isClient_) + return {}; + + // Client: force INIT packet until negotiated + pkt = std::make_shared<TxPacket>(); + Packet::initProto(pkt->bytes, id_, PROTOCOL_VERSION); + } else { + // Old missed packets are re-sent in priority over new packets. + auto rtt = getCumRtt(); + { + std::unique_lock<std::mutex> lk1 {txMtx_, std::defer_lock}; + std::unique_lock<std::mutex> lk2 {ackWaitMapMutex_, std::defer_lock}; + std::lock(lk1, lk2); + if (!ackWaitMap_.empty()) { + auto pair = ackWaitMap_.cbegin(); + if (txBuffer_->empty()) { + pkt = std::move(pair->second); + ackWaitMap_.erase(pair); + txDelay_ = (rtt * 7 + txDelay_) / 8; + goto end; + } + + auto delay = ((Clock::now() - pair->second->time_tx) - rtt).count(); + if (delay > 0) { + pkt = std::move(pair->second); + ackWaitMap_.erase(pair); + txDelay_ = (rtt / 2 + txTargetSpeed_ * 15) / 16; + goto end; + } + } + + if (!pkt and txBuffer_) { + pkt = txBuffer_->pop(); + txDelay_ = (txDelay_ * 15 + txTargetSpeed_) / 16; + } + } + if (pkt) + ioCv_.notify_one(); // see waitWrite() + } + + if (!pkt) { + //RING_WARN("no more pkt"); + return {}; + } + +end: + // Delay next call + when = packet_ts + txDelay_; + //RING_DBG("next in %zuus", cast_to_us(txDelay_).count()); + return pkt; +} + +bool +DataConnection::pushPacket(std::shared_ptr<TxPacket> pkt) +{ + std::unique_lock<std::mutex> lk {txMtx_}; + if (txBuffer_ and txBuffer_->push(pkt)) { + lk.unlock(); + txQueue_->update(); + return true; + } + return false; +} + +ssize_t +DataConnection::sendPkt(std::vector<uint8_t>& pkt, SeqNum& pkt_seq, + SeqNum ack_seq, unsigned ack_count, Clock::time_point& tp) +{ +#ifndef NDEBUG + if (pkt.size() < PACKET_HEADER_BYTES) { + RING_ERR("FATAL: TxQueue::send() : pkt.size() < PACKET_HEADER_BYTES"); + std::terminate(); + } +#endif + + if (pkt.size() > maxPayload_) { + RING_ERR("[FTP:%zu] packet oversized (%zu > %u)", id_, pkt.size(), maxPayload_); + return -1; + } + + std::lock_guard<std::mutex> tx_lk {txMtx_}; + + // disconnected? + if (!tls_) + return -1; + + if (!txSeq_) { + RING_ERR("[FTP:%zu] sequence number overflow", id_); + stopConnection(); + return -1; + } + + auto now = Clock::now(); + tp = now; + pkt_seq = txSeq_++; + + // Reset network statistics when marked sequence reached + if (pkt_seq == netStatSeq_) { + netStatTime_ = now; + netAckCount_ = 0; + netStatSent_ = pkt_seq - rxAckSeq_; // number of packets sent since last acknowledgment + } + + Packet::setTsn(pkt, pkt_seq); + Packet::setAsnAckCnt(pkt, ack_seq, ack_count); + + //RING_DBG("tx: seq=%lx, ack=%lx-%u", pkt_seq, ack_seq, ack_count); + //Packet::dump(pkt); + auto res = tls_->send(pkt); + + if (res > 0) + cc_.txBytes_ += pkt.size(); + + return res; +} + +void +DataConnection::transmit(std::shared_ptr<TxPacket> pkt) +{ + std::unique_lock<std::mutex> lk {ackWaitMapMutex_}; + //RING_ERR("-transmit: %zu", usdelay(tp_[0])); + //auto start = Clock::now(); + if (sendPkt(pkt->bytes, pkt->full_seq, pkt->time_tx) > 0) { + if (delayedAck_) { + delayedAck_ = 0; + doAck_ = false; + } + if (pkt->ack) + ackWaitMap_.emplace(pkt->full_seq, pkt); + } + //RING_ERR("+transmit: %zu", usdelay(start)); + //tp_[0] = Clock::now(); +} + +ssize_t +DataConnection::sendPkt(std::vector<uint8_t>& pkt, SeqNum& pkt_seq, Clock::time_point& tp) +{ + SeqNum ack_seq; + unsigned ack_count; + getTxAckState(ack_seq, ack_count); + return sendPkt(pkt, pkt_seq, ack_seq, ack_count, tp); +} + +ssize_t +DataConnection::sendPkt(std::vector<uint8_t>& pkt) +{ + SeqNum pkt_seq; // needed but not used + SeqNum ack_seq; + unsigned ack_count; + Clock::time_point tp; + getTxAckState(ack_seq, ack_count); + return sendPkt(pkt, pkt_seq, ack_seq, ack_count, tp); +} + +void +DataConnection::sendProto() +{ + std::vector<uint8_t> pkt; + Packet::initProto(pkt, id_, PROTOCOL_VERSION); + sendPkt(pkt); +} + +void +DataConnection::sendHeartbeat() +{ + std::vector<uint8_t> pkt; + Packet::initHeartbeat(pkt); + RING_DBG("[FTP:%zu] send heartbeat", id_); + sendPkt(pkt); +} + +void +DataConnection::sendAck(SeqNum ack_seq, unsigned ack_count) +{ + SeqNum pkt_seq; // needed but not used + std::vector<uint8_t> pkt; + Packet::initHeartbeat(pkt); + Clock::time_point tp; + sendPkt(pkt, pkt_seq, ack_seq, ack_count, tp); +} + +void +DataConnection::processIncomingPacket(std::vector<uint8_t>&& pkt) +{ + auto now = Clock::now(); + + // Drop packet without enought data to decode it + if (pkt.size() < PACKET_HEADER_BYTES) { + std::lock_guard<std::mutex> lk {txMtx_}; + ++cc_.rxBadPkt_; + return; + } + + // Obtain information about current peer state and update our own state with + auto pkt_seq = processTransmissionFields(pkt); + if (pkt_seq == 0) + return; + //RING_DBG("rx: seq=%lx", pkt_seq); + processAcknowledgmentFields(pkt, now); + + // Handle special packet immediatly (like INIT packet during protocol negotiation) + // This method returns false if the whole packet must be drop without acknowledgement + bool has_data; + if (!processChannelFlags(pkt_seq, pkt, has_data)) + return; + + { + std::lock_guard<std::mutex> lk {txMtx_}; + cc_.rxBytes_ += pkt.size(); + } + + // Check the need for sending an acknowledgment packet + bool ack_ack = pkt.size() == PACKET_HEADER_BYTES; // do not ACK an heartbeat/ack-only packets + checkForAcknowledgment(pkt_seq, ack_ack); + + // Finish by any transmit application data to upper layer + if (has_data) { + if (rxQueue_) { + std::lock_guard<std::mutex> lk {txMtx_}; + rxQueue_->push(std::move(pkt)); + } + } +} + +DataConnection::SeqNum +DataConnection::processTransmissionFields(const std::vector<uint8_t>& pkt) +{ + // Infer the global packet sequence number from its partial representation (TSN) + uint32_t peer_tsn = Packet::getTsn(pkt); + int32_t seq_delta = ((static_cast<int32_t>(peer_tsn) << 4) - (static_cast<int32_t>(rxSeq_) << 4)) >> 4; + SeqNum pkt_seq = rxSeq_ + seq_delta; + + // As sequence progression are monotonic without wraparound, + // a negative seq delta means out-of-order or duplicate packets. + // Dup and too old OOO packets are dropped. + if (seq_delta <= 0) { + if (seq_delta <= -MISS_ORDERING_LIMIT) { + RING_WARN("[FTP:%zu] drop too old pkt %016lx", id_, pkt_seq); + std::lock_guard<std::mutex> lk {txMtx_}; + ++cc_.rxOldPkt_; + return 0; + } + + // Duplicate? + if (rxMask_[-seq_delta]) { + RING_WARN("[FTP:%zu] drop dup pkt %016lx", id_, pkt_seq); + std::lock_guard<std::mutex> lk {txMtx_}; + ++cc_.rxDupPkt_; + return 0; + } + + // Accepted out-of-order packet + rxMask_.set(-seq_delta); + } else { // seq_delta > 0 + // Make sure we don't wraparound, all the design is based on this assumption. + // This should never append as our tx engine protect against that. + // But... in case of a foolish peer. + if (pkt_seq < rxSeq_) { + RING_ERR("[FTP:%zu] fatal error, rx sequence number has wraparound!", id_); + { + std::lock_guard<std::mutex> lk {txMtx_}; + ++cc_.rxBadPkt_; + } + stopConnection(); + return 0; + } + + // Increment bigger than 1 means possible loss or OoO. + // At this stage, just consider them as lost + if (seq_delta > 1) { + std::lock_guard<std::mutex> lk {txMtx_}; + cc_.rxLossPkt_ += seq_delta - 1; + } + + // This is a new packet: update our internal rx state + rxSeq_ = pkt_seq; + rxMask_ <<= seq_delta; + rxMask_.set(0); + } + + return pkt_seq; +} + +void +DataConnection::processAcknowledgmentFields(const std::vector<uint8_t>& pkt, Clock::time_point tp) +{ + // As for TSN, infer the global acknowledgment sequence number of this packet + uint32_t pkt_asn; + uint8_t pkt_ackct; + Packet::getAsnAckCt(pkt, pkt_asn, pkt_ackct); + int32_t ack_delta = ((static_cast<int32_t>(pkt_asn) << 4) - (static_cast<int32_t>(rxAckSeq_) << 4)) >> 4; + SeqNum ack_seq = rxAckSeq_ + ack_delta; + + //RING_DBG("rx ack: %lx-%u, d=%d, m=%016llx", ack_seq, pkt_ackct, ack_delta, rxAckMask_.to_ullong()); + ++pkt_ackct; // 0 means 1 packet acknowledged + + unsigned ack_count = 0; // counter of number of packets acknowlegded by this one + + // New acknowledgments? + if (ack_delta > 0) { + // Update out internal ack state + { + std::lock_guard<std::mutex> lk {txMtx_}; + rxAckSeq_ = ack_seq; + } + rxAckMask_ <<= ack_delta; + + // Compute number of contiguous acknowledged sequences and record that in the ack window + // note: ackct = 0 if 1 packet is acknowledged + // note2: we don't take in account possible OoO packets that contains ACK info + ack_count += std::min(ack_delta, static_cast<int32_t>(pkt_ackct)); + rxAckMask_ |= (1 << ack_count) - 1; // set all bits from ack_count-1 downto 0 + + // Acknowledge all contiguous packets + onAckSeq(ack_seq, tp, ack_count); + + ack_delta = 0; // reset the delta before OoO processing just below + } + + // Handle acknowledgement data in a OoO packet. + // note: an OoO packet acknowledges an in-order packet. + unsigned long ack_mask = (1ul << (-ack_delta + pkt_ackct)) - 1; + if ((rxAckMask_.to_ullong() & ack_mask) != ack_mask) { + for (unsigned i=0; i < pkt_ackct; ++i) { + unsigned bit = -ack_delta + i; + if (bit >= rxAckMask_.size()) + break; + if (!rxAckMask_[bit]) { + rxAckMask_.set(bit); + ++ack_count; + + // Acknowledge one packet + onAckSeq(ack_seq, tp); + } + } + } + + std::unique_lock<std::mutex> tx_lk {txMtx_}; + netAckCount_ += ack_count; + + // Network statistics is trigged when the marked sequence as made (or supposed to) a round-trip + if (ack_seq >= netStatSeq_) { + netStatSeq_ = txSeq_; // next trig point + + // RTT (of the group of acked sequences), clamped to [1, MAX_RTT] + auto rtt = cast_to_us(tp - netStatTime_); + rtt = clamp(std::chrono::microseconds(1), rtt, cast_to_us(MAX_RTT)); + cc_.cumRtt_ = ((cc_.cumRtt_ * 7) + rtt) / 8; + + // RTT variance measure + auto rtt_var = rtt >= cc_.cumRtt_ ? rtt - cc_.cumRtt_ : cc_.cumRtt_ - rtt; + cc_.cumRttVar_ = ((cc_.cumRttVar_ * 7) + rtt_var) / 8; + + // Number of unique ACK during this round-trip + auto pps = netAckCount_ * 1000000ul / rtt.count(); + cc_.cumPps_ = ((cc_.cumPps_ * 7) + pps) / 8; + + // Packet loss rate during this round-trip (OoO not accounted) + //float loss = static_cast<float>(netStatSent_ - cumAckCount_) / netStatSent_; + //loss = clamp(0.f, loss, 1.f); + //cc_.cumLoss_ = ((cc_.cumLoss_ * 7.f) + loss) / 8.f; + } + + tx_lk.unlock(); + + constexpr unsigned long long ooomask = (1ull << MISS_ORDERING_LIMIT) - 1; + if (!rxAckMask_.all() and (rxAckMask_.to_ullong() & ooomask) == ooomask) + txQueue_->update(); +} + +bool +DataConnection::processChannelFlags(SeqNum pkt_seq, const std::vector<uint8_t>& pkt, bool& has_data) +{ + auto flags = Packet::getChannelFlags(pkt); + has_data = flags.test(Packet::ChannelFlags::FRAME); + + // protocol not negotiated? + if (!protocol_) { + if (isClient_) { + // client protocol refused by server? + if (flags.test(Packet::ChannelFlags::INIT)) { + RING_ERR("[FTP:%zu] fatal error, protocol refused by server", id_); + stopConnection(); // current implementation stops the connection + return false; + } + + // commit the end of negotiation + protocol_ = PROTOCOL_VERSION; + } else { // server side + // during protocol negotiation, drop non-INIT packet or without enough information + auto size = Packet::getChannelPayloadSize(pkt); + if (!flags.test(Packet::ChannelFlags::INIT) or size < MINIMAL_INIT_PAYLOAD_SIZE) { + std::lock_guard<std::mutex> lk {txMtx_}; + ++cc_.rxBadPkt_; + return false; + } + + // check for any supported client protocol + auto payload = Packet::getChannelPayload<uint32_t>(pkt); + unsigned count = Packet::getChannelPayloadSize<uint32_t>(pkt); + uint32_t peer_proto = 0; + for (unsigned i=2; i < count; ++i) { + peer_proto = ntohl(payload[i]); + if (peer_proto == PROTOCOL_VERSION) { + // save peer ConnectionId + peerConnectionId_ = ntohl(payload[0]); + peerConnectionId_ <<= 32; + peerConnectionId_ += ntohl(payload[1]); + + RING_DBG("[FTP:%zu] client protocol %08x accepted, client ID: %016lx", + id_, peer_proto, peerConnectionId_); + protocol_ = PROTOCOL_VERSION; // commit the end of negotiation + return true; + } + + RING_WARN("[FTP:%zu] client protocol %08x refused", id_, peer_proto); + } + + // nothing supported. + // send an INIT packet with our protocol version and drop this one + sendProto(); + return false; + } + } else if (!isClient_ and flags.test(Packet::ChannelFlags::INIT)) { + // server drops all INIT packets after negotation, but ACK them + sendAck(pkt_seq, 0); + return false; + } + + // An INIT flagged packet shall not have application data and a data packet must have data + if (has_data and (flags.test(Packet::ChannelFlags::INIT) or + pkt.size() == PACKET_HEADER_BYTES)) { + std::lock_guard<std::mutex> lk {txMtx_}; + ++cc_.rxBadPkt_; + return false; + } + + return true; +} + +void +DataConnection::checkForAcknowledgment(SeqNum pkt_seq, bool ack_ack) +{ + // note: txAckCt_ and txAckSeq_ not mutex protected, as read in same context of write op. + int32_t ack_seq_delta = pkt_seq - txAckSeq_; + if (ack_seq_delta == 1) { + // increment AckCt and record current ack state + updateTxAckState(pkt_seq, std::min(txAckCt_ + 1, MAX_ACK_CT)); + ++delayedAck_; + + // Do not ACK acks before a minimal unsent ack count + if (ack_ack /*and delayedAck_ < MAX_UNACKED_PACKET*/) + return; + + if (delayedAck_ < MAX_UNACKED_PACKET) { + // We let the idle loop send the ACK until a threshold + // This let the Tx layer a chance to combine ACK with useful data + // After we flush immediatly + if (delayedAck_ < MIN_UNACKED_PACKET) { + ackTime_ = Clock::now() + ACK_DELAY; + doAck_ = true; + return; + } + } + + flushDelayedAck(); + } else if (ack_seq_delta > 1) { + // non-contiguous sequences + + // flush delayed ACK + if (delayedAck_ > 0) + flushDelayedAck(); + + // reset ack state + updateTxAckState(pkt_seq, 0); + + // acknowledge the current state immediately (if not a ACK packet) + if (!ack_ack) + sendAck(pkt_seq, 0); + } else { // ack_seq_delta < 0 (OoO packet) + // flush delayed ACK + if (delayedAck_ > 0) + flushDelayedAck(); + + // acknowledge this packet immediately + if (!ack_ack) + sendAck(pkt_seq, 0); + } +} + +void +DataConnection::flushDelayedAck() +{ + delayedAck_ = 0; + doAck_ = false; + sendAck(txAckSeq_, txAckCt_); +} + +void +DataConnection::onAckSeq(SeqNum base_seq, Clock::time_point tp, unsigned count) +{ + std::list<std::shared_ptr<TxPacket>> acked; + + { + std::lock_guard<std::mutex> lk {ackWaitMapMutex_}; + do { + auto seq = base_seq - (--count); + //RING_WARN("ack %lx", seq); + auto pair = ackWaitMap_.find(seq); + if (pair == ackWaitMap_.cend()) + continue; + //RING_WARN("ack %lx", seq); + auto pkt = pair->second; + if (pkt->stream) { + pkt->time_ack = tp; + acked.emplace_back(pkt); // defer stream ack out of locked region + } + ackWaitMap_.erase(seq); + } while (count); + } + + for (auto pkt : acked) + pkt->stream->onAck(pkt); +} + +void +DataConnection::onExpiredSeq(SeqNum last_expired, Clock::time_point tp) +{ + std::list<std::shared_ptr<TxPacket>> expired; + + { + std::lock_guard<std::mutex> lk {ackWaitMapMutex_}; + for (auto& pair : ackWaitMap_) { + if (pair.first > last_expired) + break; + auto pkt = pair.second; + if (pkt->stream) { + pkt->time_ack = tp; + expired.emplace_back(pkt); + } + ackWaitMap_.erase(pair.first); + } + } + + for (auto pkt : expired) + pkt->stream->onExpire(pkt); +} + +void +DataConnection::attachStream(std::shared_ptr<DataStream> stream) +{ + std::lock_guard<std::mutex> lk {txMtx_}; + rxQueue_->attachStream(stream); + stream->setDataConnection(this); +} + +void +DataConnection::detachStream(std::shared_ptr<DataStream> stream) +{ + std::lock_guard<std::mutex> lk {txMtx_}; + rxQueue_->detachStream(stream); + stream->setDataConnection(nullptr); +} + +void +DataConnection::waitWrite() +{ + std::unique_lock<std::mutex> lk {txMtx_}; + ioCv_.wait(lk, [this]{ return !txBuffer_ or txBuffer_->available() > 0; }); +} + +//----- Statistics --------------------------------------------------------------------------------- + +// Return a SI prefixed value +static std::string +getPrefix(std::size_t n) noexcept +{ + std::ostringstream os; + os << std::setprecision(3); + + if (n < 1000ul) + return to_string(n); + if (n < 1000000ul) { + os << (n / 1e3) << 'K'; + return os.str(); + } + if (n < 1000000000ul) { + os << (n / 1e6) << 'M'; + return os.str(); + } + if (n < 1000000000000ul) { + os << (n / 1e9) << 'G'; + return os.str(); + } + if (n < 1000000000000ul) { + os << (n / 1e12) << 'T'; + return os.str(); + } + os << (n / 1e16) << 'P'; + return os.str(); +} + +void +DataConnection::logStatistics() +{ +#if 0 + if (protocol_) { + // Send Heartbeat + // TODO: put it into TxScheduler + auto now = Clock::now(); + if (now - lastHeartbeat_ >= HEARTBEAT_DELAY) { + lastHeartbeat_ = now; + sendHeartbeat(); + } else if (doAck_ and (now - ackTime_) >= ACK_DELAY) { + doAck_ = false; + if (delayedAck_ > 0) + flushDelayedAck(); + } + } +#endif + + auto cc = getCC(); + auto now = Clock::now(); + auto usdelay = cast_to_us(now - lastStats_).count(); + lastStats_ = now; + + RING_WARN("\n[TX] Bytes: %sB, RTT (ms): %lu, RTT var.: %lu, PPS: %u, Bw: %sb/s" + "\n[RX] Bytes: %sB, Bad: %zu, Loss/OoO: %zu, Dup: %zu" + , getPrefix(cc.txBytes_).c_str(), cc.cumRtt_.count() / 1000 + , cc.cumRttVar_.count() / 1000, cc.cumPps_ + , getPrefix((cc.txBytes_ - prevTxBytes_) * 8000000 / usdelay).c_str() + , getPrefix(cc.rxBytes_).c_str(), cc.rxBadPkt_, cc.rxLossPkt_, cc.rxDupPkt_); + + pktSnd_ = pktLate_ = pktForced_ = 0; + prevTxBytes_ = cc.txBytes_; + + if (!disconnected()) { + auto shared = sharedPtr(); + Manager::instance().scheduleTask([shared]{ shared->logStatistics(); }, STATISTIC_DELAY); + } +} + +//----- Tests -------------------------------------------------------------------------------------- + +void +DataConnection::testChannelFlood() +{ + // This test floods the channel #1 in transmission + // Used to see network capacity and CPU usage + auto pkt = std::make_shared<TxPacket>(); + // Create the biggest frame packet (zero initialized) + uint8_t sid_size, off_size; + uint16_t data_length = maxPayload_ - Packet::getStreamFrameHeaderSize(sid_size, off_size, 0, 1, 0, 0xffff); + Packet::packStreamFrame(pkt->bytes, 0, 1, 0, data_length - PACKET_HEADER_BYTES); + if (!pushPacket(pkt)) + waitWrite(); +} + +void +DataConnection::testAppSendData() +{ + RING_WARN("testAppSendDataJob started"); + std::array<uint8_t, 1024 * 400> buf; + auto stream = std::make_shared<DataStream>(1); // SID = 1 + attachStream(stream); + std::size_t cumBytes = 0; + Clock::duration cumTime; + + while (testLoop_->isRunning()) { + auto t1 = Clock::now(); + if (stream->sendData(buf.data(), buf.size()) < 0) { + RING_ERR("sendData() error, errno=%s", strerror(errno)); + break; + } + cumTime += Clock::now() - t1; + cumBytes += buf.size(); + auto delay = cast_to_s(cumTime).count(); + if (delay >= 5) { + RING_ERR("%.3fMb/s", 8. * cumBytes / cast_to_us(cumTime).count()); + cumBytes = 0; + cumTime = Clock::duration::zero(); + } + } + RING_WARN("test finished"); + testLoop_->stop(); +} + +void +DataConnection::testAppRecvData() +{ + RING_WARN("testAppRecvData started"); + std::array<uint8_t, 1024 * 8> buf; + auto stream = std::make_shared<DataStream>(1); // SID = 1 + attachStream(stream); + std::size_t cumBytes = 0; + Clock::duration cumTime; + + while (testLoop_->isRunning()) { + auto t1 = Clock::now(); + auto res = stream->recvData(buf.data(), buf.size()); + if (res < 0) { + RING_ERR("sendData() error, errno=%s", strerror(errno)); + break; + } + cumTime += Clock::now() - t1; + cumBytes += res; + auto delay = cast_to_s(cumTime).count(); + if (delay >= 5) { + RING_ERR("%.3fMb/s", 8. * cumBytes / cast_to_us(cumTime).count()); + cumBytes = 0; + cumTime = Clock::duration::zero(); + } + } + RING_WARN("test finished"); + testLoop_->stop(); +} + +//================================================================================================== + +DataStream::~DataStream() +{ + // unlock applications + std::lock_guard<std::mutex> lk {mtx_}; + dc_ = nullptr; + flags_.set(FlagsBit::ACK); + flags_.set(FlagsBit::BYTES_AVAILABLE); + cv_.notify_all(); +} + +void +DataStream::setDataConnection(DataConnection* dc) +{ + std::lock_guard<std::mutex> lk {mtx_}; + flags_.set(FlagsBit::ACK); + cv_.notify_all(); + dc_ = dc; +} + +ssize_t +DataStream::sendData(const void* buffer, std::size_t buffer_size) +{ + std::unique_lock<std::mutex> lk {mtx_}; + if (!dc_) { + errno = ENOTCONN; + return -1; + } + + std::size_t max_size = dc_->maxPayload_ - MAX_FRAME_HEADER_BYTES; + + // Segment application buffer into transmit packets + RING_DBG("[MST] tx %zu\n>>>>>>>>%s<<<<<<<<\n", buffer_size, std::string((char*)buffer, buffer_size).c_str()); + std::size_t bytes_send = 0; + auto src = static_cast<const uint8_t*>(buffer); + while (buffer_size) { + if (!dc_) { + errno = EINTR; + return -1; + } + + ackwait_.emplace_back(std::make_shared<TxPacket>()); + auto pkt = ackwait_.back(); + pkt->stream = this; + pkt->ack = true; + + auto bytes_to_send = std::min(max_size, buffer_size); + uint8_t extra_flags = 0; + if (bytes_to_send == buffer_size) + extra_flags |= Packet::FrameExtraFlags::FRAMEXF_PUSH; + Packet::packStreamFrame(pkt->bytes, extra_flags, sid_, offset_, bytes_to_send, src); + + // Try to wait for empty tx slot and push the packet + do { + lk.unlock(); + dc_->waitWrite(); + if (dc_->disconnected()) { + errno = ENOTCONN; + return -1; + } + lk.lock(); + } while (!dc_->pushPacket(pkt)); + + src += bytes_to_send; + buffer_size -= bytes_to_send; + offset_ += bytes_to_send; + bytes_send += bytes_to_send; + } + + // Wait ack's for all pushed packet + //RING_DBG("wait acks (%zu)+", ackwait_.size()); + cv_.wait(lk, [this]{ return ackwait_.empty(); }); + //RING_DBG("wait acks-"); + + return bytes_send; +} + +ssize_t +DataStream::recvData(void* buffer, std::size_t buffer_size) +{ + std::unique_lock<std::mutex> lk {mtx_}; + + // Wait for timeout or received contiguous blocks + if (!cv_.wait_for(lk, std::chrono::seconds(60), [this] { + return !dc_ or flags_[FlagsBit::BYTES_AVAILABLE]; + })) { + errno = EINTR; + return -1; + } + + if (!dc_) { + errno = ENOTCONN; + return -1; + } + + if (buffer_size == 0) + return 0; + + auto dst = static_cast<uint8_t*>(buffer); + + // Loop on offset-ordered received segments until a discontinuity + auto it = reorderBuffer_.cbegin(); + auto next_off = it->first; + while (it != reorderBuffer_.cend() and it->first <= next_off) { + auto seg_off = it->first; + auto& segment = it->second; + + // Stop on output buffer full + std::size_t bytes_read = dst - static_cast<uint8_t*>(buffer); + if (bytes_read >= buffer_size) { + RING_WARN("stop, app buffer full"); + break; + } + + auto buf_max = buffer_size - bytes_read; + auto bytes_to_copy = std::min(segment.size(), buf_max); + RING_DBG("buf(%zu) <<< %zu B <<< segment[%lu]", buf_max, segment.size(), seg_off); + dst = std::copy_n(segment.data(), bytes_to_copy, dst); + next_off = seg_off + bytes_to_copy; + + // Re-insert non-consumed data + if (bytes_to_copy < segment.size()) + reorderBuffer_.emplace(next_off, std::vector<uint8_t>(segment.cbegin() + bytes_to_copy, + segment.cend())); + + it = reorderBuffer_.erase(it); + } + + RING_DBG("next_off=%lu, gapOffset_=%lu", next_off, gapOffset_); + if (next_off > gapOffset_) + gapOffset_ = next_off; + + auto bytes_read = dst - static_cast<uint8_t*>(buffer); + readOffset_ += bytes_read; + + if (reorderBuffer_.empty()) + flags_.reset(FlagsBit::BYTES_AVAILABLE); + return bytes_read; +} + +bool +DataStream::canRead() const +{ + std::lock_guard<std::mutex> lk {mtx_}; + return dc_ and flags_[FlagsBit::BYTES_AVAILABLE]; +} + +void +DataStream::onAck(std::shared_ptr<TxPacket> pkt) +{ + std::lock_guard<std::mutex> lk {mtx_}; + ackwait_.remove(pkt); + //RING_WARN("ack %lx (%zu)", pkt->full_seq, ackwait_.size()); + if (ackwait_.empty()) + cv_.notify_one(); +} + +void +DataStream::onExpire(std::shared_ptr<TxPacket>) +{} + +void +DataStream::onCloseStream() +{ + // Note: Called in the context of RxQueue's thread +} + +void +DataStream::onRxPayload(const uint8_t* buffer, std::size_t length, uint64_t offset) +{ + std::lock_guard<std::mutex> lk {mtx_}; + + // Drop already pushed bytes + if (offset < readOffset_) + return; + + // Insert data by copy only if vector has been inserted + auto& segment = reorderBuffer_[offset]; + if (segment.empty()) { + segment.reserve(length); + segment.resize(length); + std::copy_n(buffer, length, segment.data()); + RING_WARN("reorderBuffer(%zu, %lu) <<< %zu B @ %lu", reorderBuffer_.size()-1, gapOffset_ + , segment.size(), offset); + } + + // Wake-up application when continuity change + if (offset == gapOffset_) { + flags_.set(FlagsBit::BYTES_AVAILABLE); + cv_.notify_one(); + } +} + +}} // namespace ring::ReliableSocket diff --git a/src/data_transfer.h b/src/data_transfer.h new file mode 100644 index 0000000000000000000000000000000000000000..c8efd2d0249550d747138fa2708ba2209356f21c --- /dev/null +++ b/src/data_transfer.h @@ -0,0 +1,321 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "datatransfer_interface.h" +#include "threadloop.h" +#include "noncopyable.h" +#include "ip_utils.h" + +#include <string> +#include <fstream> +#include <mutex> +#include <condition_variable> +#include <memory> +#include <ios> +#include <vector> +#include <list> +#include <functional> +#include <utility> +#include <atomic> +#include <thread> +#include <chrono> +#include <array> +#include <bitset> +#include <unordered_map> + +// Forward declarations +namespace ring { namespace tls { +class TlsSession; +}} // namespace ring::tls + +namespace ring { namespace ReliableSocket { + +class DataConnection; +class DataTransfer; +class DataStream; +class RxQueue; +class TxFrameBuffer; +class TxPacket; +template <class T> class TxScheduler; + +using StreamId = uint32_t; // Storage type for a stream id, only lowest 24-bits are used + +static constexpr uint32_t PROTOCOL_VERSION {0xABADCAFE}; + +template<typename T> +struct Timestamp : T +{ + static T get_now() { + const auto now = std::chrono::system_clock::now().time_since_epoch(); + return std::chrono::duration_cast<T>(now); + } + + Timestamp() : T(get_now()) {} + + inline bool newer(const T& other) const { + return other < *this; + } + + inline bool older(const T& other) const { + return other > *this; + } +}; + +// Packet headers must be 32bit aligned and always made of PACKET_HEADER_WORDS 32bit words +// Control Field are also 32bit aligned, but not payload data. + +static constexpr unsigned PACKET_HEADER_WORDS {2}; +static constexpr unsigned PACKET_HEADER_BYTES {4 * PACKET_HEADER_WORDS}; + +/** + * A class to carries all congestion control needed information + * Values are filled by DataConnection class during connection. + * No more changed after disconnection, reset at each connection. + */ +class CongestionControl { +private: + friend class DataConnection; + + // These values are computed from low-pass filtered instant values + std::chrono::microseconds cumRtt_; // cumulative RTT, start at 500ms + std::chrono::microseconds cumRttVar_; // cumulative RTT variance + unsigned cumPps_; // cumulative packet that made a round-trip + unsigned cumLoss_; // cumulative loss packet + + // Cummulative values + std::size_t txBytes_; + std::size_t rxBytes_; + std::size_t rxLossPkt_; + std::size_t rxBadPkt_; + std::size_t rxOldPkt_; + std::size_t rxDupPkt_; + + void reset(); +}; + +/** + * A Channel instance represents a low-level link between local and remote peer. + * It's build over the transport protocol and manage all packets coming from. + * All application streams are build over a channel. + * The existance of a channel depends on the packet sequence number. This number + * starts at 1 and cannot overlap. Befor its maximal limit is reach, all application streams + * are encouraged to migrate on another channel, or be definitively closed (that will be forced + * by the channel protocol if the maximal packet sequence number is reached). + */ + +class DataConnection : public std::enable_shared_from_this<DataConnection> +{ +private: + NON_COPYABLE(DataConnection); + DRing::DataConnectionId genUUID(); + + friend class DataStream; + +public: // ctor/dtor + DataConnection(const std::string& account_id, const std::string& peer_id, bool is_client); + ~DataConnection(); + +public: + using SeqNum = uint64_t; // Full Sequence Number, 0 is invalid + + std::shared_ptr<DataConnection> sharedPtr() { return shared_from_this(); } + + DRing::DataConnectionId getId() const noexcept { return id_; } + void getInfo(DRing::DataConnectionInfo& info) const; + void setStatus(DRing::DataConnectionCode code); + + void stopConnection(); + bool isFailed() const { return false; } + bool disconnected() const; + + void attachStream(std::shared_ptr<DataStream> stream); + void detachStream(std::shared_ptr<DataStream> stream); + + // Called by underlaying transport + void connect(tls::TlsSession* tls); + void disconnect(); + + // Remote peer information + IpAddr getRemoteAddress() const; + + // Entry point for packets comming from underlaying transport + void processIncomingPacket(std::vector<uint8_t>&& buf); + + ssize_t addStreamFrame(StreamId sid, const void* buffer, std::size_t len); + +private: + using Clock = std::chrono::high_resolution_clock; + using TimePoint = Clock::time_point; + using Duration = Clock::duration; + + const DRing::DataConnectionId id_; + const bool isClient_; + tls::TlsSession* tls_ {nullptr}; + uint32_t maxPayload_ {0}; // initialized at TLS connection, then negotiated by handshake + std::atomic<uint32_t> protocol_ {0}; // negotiated communication protocol, 0 is invalid (non negotiated) + + mutable std::mutex infoMutex_ {}; + DRing::DataConnectionInfo info_ {}; + +private: // Peer info + uint64_t peerConnectionId_ {0}; + +private: // Tx related members + mutable std::mutex txMtx_ {}; + std::unique_ptr<TxScheduler<Clock>> txQueue_; + friend class TxScheduler<Clock>; + + std::unique_ptr<TxFrameBuffer> txBuffer_; + SeqNum txSeq_ {1}; // overall sequence number, monotonicaly increased, no wrap + std::chrono::microseconds txDelay_ {1000}; + std::chrono::microseconds txTargetSpeed_ {1000}; + SeqNum netStatSeq_ {1}; // sequence number to trig network statistics + SeqNum netStatSent_ {0}; + unsigned netAckCount_ {0}; + Clock::time_point netStatTime_ {}; + std::condition_variable ioCv_ {}; + void waitWrite(); + + std::size_t pktSnd_ {0}; + std::size_t pktLate_ {0}; + std::size_t pktForced_ {0}; + +#ifndef NDEBUG + unsigned txLossSimu_ {0}; +#endif + + std::shared_ptr<TxPacket> nextTxPacket(TimePoint&); + ssize_t sendPkt(std::vector<uint8_t>&, SeqNum&, SeqNum, unsigned, Clock::time_point&); + ssize_t sendPkt(std::vector<uint8_t>&, SeqNum&, Clock::time_point&); + ssize_t sendPkt(std::vector<uint8_t>&); + void sendProto(); + void sendHeartbeat(); + bool pushPacket(std::shared_ptr<TxPacket> pkt); + void transmit(std::shared_ptr<TxPacket> pkt); + +private: // Tx ack related members + std::mutex ackMtx_ {}; + SeqNum txAckSeq_ {0}; // last acknowledged sequence number + unsigned txAckCt_ {0}; // number of contiguous received sequence number below txAckSeq_ + std::atomic<unsigned> delayedAck_ {0}; // number of contiguous ACK not send yet + std::atomic_bool doAck_ {false}; + Clock::time_point ackTime_ {}; + + void sendAck(SeqNum seq, unsigned count); + void updateTxAckState(SeqNum seq, unsigned count); + void getTxAckState(SeqNum& seq, unsigned& count); + void flushDelayedAck(); + +private: // Rx related members + static constexpr int MISS_ORDERING_LIMIT = 32; // maximal accepted distance of out-of-order packet + + SeqNum rxSeq_ {0}; // highest received sequence number + std::bitset<MISS_ORDERING_LIMIT> rxMask_ {}; // out-of-order packets window + + SeqNum rxAckSeq_ {0}; // highest received ACK sequence number + std::bitset<64> rxAckMask_ {}; // out-of-order ack packets window + + std::mutex ackWaitMapMutex_ {}; + std::map<SeqNum, std::shared_ptr<TxPacket>> ackWaitMap_ {}; // packets waiting for acknowledgment + + std::unique_ptr<RxQueue> rxQueue_; + friend class RxQueue; + + SeqNum processTransmissionFields(const std::vector<uint8_t>&); + void processAcknowledgmentFields(const std::vector<uint8_t>&, Clock::time_point); + void checkForAcknowledgment(SeqNum seq, bool force); + bool processChannelFlags(SeqNum seq, const std::vector<uint8_t>& pkt, bool& has_data); + void onAckSeq(SeqNum, Clock::time_point, unsigned count=1); + void onExpiredSeq(SeqNum, Clock::time_point); + +private: // Congestion control and connection statistics + CongestionControl cc_; + TimePoint lastStats_ {}; + std::size_t prevTxBytes_ {0}; + + CongestionControl getCC() const; // make a safe copy of CongestionControl data + std::chrono::microseconds getCumRtt() const; // safe CongestionControl.cumRtt_ accessor + void logStatistics(); + +private: // Tests + std::unique_ptr<ThreadLoop> testLoop_; + void testChannelFlood(); + void testAppSendData(); + void testAppRecvData(); +}; + +extern std::shared_ptr<DataConnection> makeDataConnection(const std::string& account_id, + const std::string& peer_id, + bool is_client); + +// DataStream +// Hi-level protocol class connecting application streams to low-level data connection. +class DataStream +{ +public: + using Clock = std::chrono::high_resolution_clock; +public: + DataStream(StreamId sid) : sid_(sid) {} + ~DataStream(); + +public: // global API + StreamId getId() const noexcept { return sid_; } + void setDataConnection(DataConnection* dc); + +public: // application side API + // send a close packet to peer and detach + void close() {}; + + // Byte-oriented + ssize_t sendData(const void* buffer, std::size_t buffer_size); + ssize_t recvData(void* buffer, std::size_t buffer_size); + bool canRead() const; + + // Message-oriented + ssize_t sendMsg(const uint8_t* buffer, std::size_t buffer_size); + ssize_t recvMsg(uint8_t* buffer, std::size_t buffer_size); + +private: // DataConnection side API + friend class RxQueue; + friend class DataConnection; + + void onAck(std::shared_ptr<TxPacket>); + void onExpire(std::shared_ptr<TxPacket>); + void onRxPayload(const uint8_t*, std::size_t, uint64_t); + void onCloseStream(); + +private: + enum FlagsBit {ACK=0, BYTES_AVAILABLE=1}; + StreamId sid_; + mutable std::mutex mtx_ {}; + std::condition_variable cv_ {}; + std::bitset<2> flags_ {0}; + DataConnection::SeqNum ackSeq_ {0}; + DataConnection* dc_ {nullptr}; + std::list<std::shared_ptr<TxPacket>> ackwait_ {}; + uint64_t offset_ {0}; + uint64_t gapOffset_ {0}; // offset of first byte not received yet + uint64_t readOffset_ {0}; // offset of first byte not pushed to application + std::map<uint64_t, std::vector<uint8_t>> reorderBuffer_ {}; +}; + +}} // namespace ring::ReliableSocket diff --git a/src/dring/datatransfer_interface.h b/src/dring/datatransfer_interface.h new file mode 100644 index 0000000000000000000000000000000000000000..19ff1219d7d200951049210af669984821068c79 --- /dev/null +++ b/src/dring/datatransfer_interface.h @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef DRING_DATATRANSFERI_H +#define DRING_DATATRANSFERI_H + +#include "dring.h" + +#include <string> +#include <cstdlib> // std::size_t +#include <map> +#include <memory> +#include <ios> // std::streamsize_t + +namespace DRing { + +/* File transfer */ +using DataConnectionId = uint64_t; +using DataTransferId = uint64_t; +using DataTransferSize = std::size_t; + +enum DataConnectionCode { + CODE_UNKNOWN=0, + + // 1xx Informational + CODE_TRYING=100, + CODE_PROGRESSING=183, + + // 2xx Success + CODE_OK=200, + CODE_CREATED=201, + CODE_ACCEPTED=202, + + // 4xx Request Error + CODE_BAD_REQUEST=400, + CODE_UNAUTHORIZED=401, + CODE_NOT_FOUND=404, + + // 5xx Process Error + CODE_INTERNAL=500, + CODE_NOT_IMPLEMENTED=501, + CODE_SERVICE_UNAVAILABLE=503, + CODE_DISCONNECTED=504 + + // more may be defined +}; + +struct DataConnectionInfo { + std::string account; // account used for transfer + std::string peer; // remote identification + bool isClient; // true if we are initiator of the connection (client vs server) + int code; // latest status code (set by DataConnectionStatus signal change) +}; + +struct DataTransferInfo { + DataConnectionId connectionId; // used connection id + std::string name; // public name sent to peer, (e.g.: a filename for file transfer) + std::streamsize size; // total size to transfer, -1 if unknown + int code; // latest status code (set by DataTransferStatus signal change) +}; + +// strings used to serialize structs +namespace DataTransfer { + constexpr static const char ACCOUNT [] = "Account"; + constexpr static const char PEER [] = "Peer"; + constexpr static const char CODE [] = "Code"; + constexpr static const char CONNECTION_ID [] = "ConnectionID"; + constexpr static const char NAME [] = "Name"; + constexpr static const char SIZE [] = "Size"; +} //namespace DRing::DataTransfer + +// Signal handlers registration +void registerDataXferHandlers(const std::map<std::string, std::shared_ptr<CallbackWrapperBase>>&); + +// Data connection API +DataConnectionId connectToPeer(const std::string& accountId, const std::string& peerUri); +bool dataConnectionInfo(const DataConnectionId& id, const DataConnectionInfo& info); +bool closeDataConnection(const DataConnectionId& id); + +// Generic data transfer API +bool cancelDataTransfer(const DataTransferId& id); // could be used by local or remote (refuse or stop) +std::streamsize dataTransferSentBytes(const DataTransferId& id); +bool dataTransferInfo(const DataTransferId& id, DataTransferInfo& info); + +// File transfer API +DataTransferId sendFile(const std::string& accountId, const std::string& peerUri, + const std::string& pathname, const std::string& xfer_name={}); +void acceptFileTransfer(const DataTransferId& id, const std::string& pathname); + +// Signals +struct DataTransferSignal { + struct DataConnectionStatus { + constexpr static const char* name = "DataConnectionStatus"; + using cb_type = void(const DataConnectionId& /* connectionId */, int /* code */); + }; + struct DataTransferStatus { + constexpr static const char* name = "DataTransferStatus"; + using cb_type = void(const DataTransferId& /* transferId */, int /* code */); + }; +}; + +} // namespace DRing + +#endif // DRING_DATATRANSFERI_H diff --git a/src/ice_socket.h b/src/ice_socket.h index 57b83c73f67561d70bbe770f53ffe3663a24ebcc..0088ffda1995b18736d8a854187b79017814edbe 100644 --- a/src/ice_socket.h +++ b/src/ice_socket.h @@ -20,6 +20,8 @@ #ifndef ICE_SOCKET_H #define ICE_SOCKET_H +#include "ip_utils.h" + #include <memory> #include <functional> @@ -39,11 +41,12 @@ class IceSocket : ice_transport_(iceTransport), compId_(compId) {} void close(); - ssize_t recv(unsigned char* buf, size_t len); - ssize_t send(const unsigned char* buf, size_t len); + ssize_t recv(uint8_t* buf, size_t len); + ssize_t send(const uint8_t* buf, size_t len); ssize_t getNextPacketSize() const; ssize_t waitForData(unsigned int timeout); void setOnRecv(IceRecvCb cb); + IpAddr getRemoteAddress() const; }; }; diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index f768cd92252db320a2a3de70913b436b5efa24c3..746c9f7e93dd5e3d0c19c3810bc1820e53f164c4 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -94,7 +94,6 @@ IceTransport::cb_on_ice_complete(pj_ice_strans* ice_st, RING_WARN("null IceTransport"); } - IceTransport::IceTransport(const char* name, int component_count, bool master, const IceTransportOptions& options) : pool_(nullptr, pj_pool_release) @@ -105,6 +104,9 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, , initiatorSession_(master) , thread_() { + RING_DBG("[ICE:%p] new ('%s', %d component(s), %s)", this, name, component_count, + master?"master":"slave"); + if (options.upnpEnable) upnp_.reset(new upnp::Controller()); @@ -132,7 +134,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, pj_cstr(&config_.stun.server, options.stunServer.c_str()); config_.stun.port = PJ_STUN_PORT; } - RING_WARN("ICE: STUN='%s', PORT=%d", options.stunServer.c_str(), + RING_WARN("[ICE:%p] STUN='%s', PORT=%d", this, options.stunServer.c_str(), config_.stun.port); } else config_.stun.port = 0; @@ -161,7 +163,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, // Only UDP yet config_.turn.conn_type = PJ_TURN_TP_UDP; - RING_WARN("ICE: TURN='%s', PORT=%d", options.turnServer.c_str(), + RING_WARN("[ICE:%p] TURN='%s', PORT=%d", this, options.turnServer.c_str(), config_.turn.port); } else config_.turn.port = 0; @@ -189,6 +191,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, IceTransport::~IceTransport() { + RING_DBG("[ICE:%p] destruction", this); register_thread(); threadTerminateFlags_ = true; @@ -204,6 +207,12 @@ IceTransport::~IceTransport() pj_timer_heap_destroy(config_.stun_cfg.timer_heap); } +bool +IceTransport::calledFromThread() const +{ + return std::this_thread::get_id() == thread_.get_id(); +} + void IceTransport::handleEvents(unsigned max_msec) { @@ -771,7 +780,7 @@ IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) } ssize_t -IceTransport::recv(int comp_id, unsigned char* buf, size_t len) +IceTransport::recv(int comp_id, uint8_t* buf, size_t len) { register_thread(); auto& io = compIO_[comp_id]; @@ -804,7 +813,7 @@ IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) } ssize_t -IceTransport::send(int comp_id, const unsigned char* buf, size_t len) +IceTransport::send(int comp_id, const uint8_t* buf, size_t len) { register_thread(); auto remote = getRemoteAddress(comp_id); @@ -931,7 +940,7 @@ IceSocket::close() } ssize_t -IceSocket::recv(unsigned char* buf, size_t len) +IceSocket::recv(uint8_t* buf, size_t len) { if (!ice_transport_.get()) return -1; @@ -939,7 +948,7 @@ IceSocket::recv(unsigned char* buf, size_t len) } ssize_t -IceSocket::send(const unsigned char* buf, size_t len) +IceSocket::send(const uint8_t* buf, size_t len) { if (!ice_transport_.get()) return -1; @@ -971,4 +980,12 @@ IceSocket::setOnRecv(IceRecvCb cb) return ice_transport_->setOnRecv(compId_, cb); } +IpAddr +IceSocket::getRemoteAddress() const +{ + if (!ice_transport_.get()) + return {}; + return ice_transport_->getRemoteAddress(compId_); +} + } // namespace ring diff --git a/src/ice_transport.h b/src/ice_transport.h index a6cff0eaebd23d26af32311ec194409d6c516189..8916166a90dc4c8f1529d905f82e876ad5ac0d85 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -18,8 +18,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef ICE_TRANSPORT_H -#define ICE_TRANSPORT_H +#pragma once #include "ice_socket.h" #include "ip_utils.h" @@ -61,7 +60,7 @@ struct IceTransportOptions { std::string turnServerRealm {}; //!< non-empty for long-term credential }; -class IceTransport { +class IceTransport : public std::enable_shared_from_this<IceTransport> { public: using Attribute = struct { std::string ufrag; @@ -181,9 +180,10 @@ class IceTransport { void setOnRecv(unsigned comp_id, IceRecvCb cb); - ssize_t recv(int comp_id, unsigned char* buf, size_t len); + ssize_t recv(int comp_id, uint8_t* buf, size_t len); + std::vector<uint8_t> recv(int comp_id); - ssize_t send(int comp_id, const unsigned char* buf, size_t len); + ssize_t send(int comp_id, const uint8_t* buf, size_t len); ssize_t getNextPacketSize(int comp_id); @@ -195,6 +195,14 @@ class IceTransport { unsigned getComponentCount() const {return component_count_;} + std::shared_ptr<IceTransport> getSharedPtr() { return shared_from_this(); } + + /** + * Return True if called in the thread that handles IOs + * Usefull to prevent destroying of the transport by itself + */ + bool calledFromThread() const; + private: static constexpr int MAX_CANDIDATES {32}; @@ -313,6 +321,4 @@ class IceTransportFactory { pj_ice_strans_cfg ice_cfg_; }; -}; - -#endif /* ICE_TRANSPORT_H */ +} // namespace ring diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index b33bc62bfa7db956a6064c6d8d5bf81139f376ab..76a8f7d03c9600e3af4fb2a19d8ac0b3dba9b420 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -65,13 +65,13 @@ MessageEngine::reschedule() std::lock_guard<std::mutex> lock(messagesMutex_); if (messages_.empty()) return; - std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this()); + auto weak_acc = account_.weak_from_this(); auto next = nextEvent(); if (next != clock::time_point::max()) - Manager::instance().scheduleTask([w,this](){ - if (auto s = w.lock()) - retrySend(); - }, next); + Manager::instance().scheduleTask([weak_acc, this](){ + if (auto s = weak_acc.lock()) + retrySend(); + }, next); } MessageEngine::clock::time_point diff --git a/src/manager.cpp b/src/manager.cpp index bb863b7171a2f01b9c79b10de92eb3074ed82f5c..f615f0c09238c9a18ef449ecd9c0304e3f84fa2c 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -77,6 +77,8 @@ using random_device = dht::crypto::random_device; #include "libav_utils.h" #include "video/sinkclient.h" +#include "data_transfer.h" + #include <cerrno> #include <algorithm> #include <ctime> @@ -355,9 +357,13 @@ Manager::finish() noexcept } ice_tf_.reset(); + RING_DBG("/!\\ Shutdown PJSIP /!\\"); pj_shutdown(); } catch (const VoipLinkException &err) { - RING_ERR("%s", err.what()); + RING_ERR("VoipLinkException: %s", err.what()); + } catch (const std::exception& e) { + RING_ERR("Fatal exception: %s", e.what()); + std::terminate(); } } @@ -1383,24 +1389,23 @@ Manager::addTask(const std::function<bool()>&& task) } std::shared_ptr<Manager::Runnable> -Manager::scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when) +Manager::scheduleTask(const std::function<void()>&& callable, std::chrono::steady_clock::time_point when) { - auto runnable = std::make_shared<Runnable>(std::move(task)); + auto runnable = std::make_shared<Runnable>(std::move(callable)); scheduleTask(runnable, when); return runnable; } - void Manager::scheduleTask(std::shared_ptr<Runnable> task, std::chrono::steady_clock::time_point when) { std::lock_guard<std::mutex> lock(scheduledTasksMutex_); scheduledTasks_.emplace(when, task); - RING_DBG("Task scheduled. Next in %" PRId64, std::chrono::duration_cast<std::chrono::seconds>(scheduledTasks_.begin()->first - std::chrono::steady_clock::now()).count()); } // Must be invoked periodically by a timer from the main event loop -void Manager::pollEvents() +void +Manager::pollEvents() { //-- Handlers { @@ -2882,4 +2887,15 @@ Manager::getSinkClient(const std::string& id) } #endif // RING_VIDEO +DRing::DataTransferId +Manager::sendFile(const std::string& accountId, + const std::string& peerUri, + const std::string& pathname, + const std::string& name) +{ + if (auto acc = getAccount(accountId)) + return acc->sendFile(peerUri, pathname, name); + return {}; +} + } // namespace ring diff --git a/src/manager.h b/src/manager.h index 886718480f7ca67770c69e89aa55279c638bf468..46328e4e5f870a5fcc16819d4981d86173d88a99 100644 --- a/src/manager.h +++ b/src/manager.h @@ -48,6 +48,8 @@ #include "audio/audiolayer.h" #include "audio/tonecontrol.h" +#include "datatransfer_interface.h" + #include "preferences.h" #include "noncopyable.h" @@ -769,6 +771,15 @@ class Manager { */ std::vector<std::string> loadAccountOrder() const; + /** + * Send a file to a peer using given account + * @return a transfer id + */ + DRing::DataTransferId sendFile(const std::string& accountId, + const std::string& peerUri, + const std::string& pathname, + const std::string& name); + private: std::atomic_bool autoAnswer_ {false}; @@ -949,8 +960,7 @@ class Manager { /** * Call periodically to poll for VoIP events */ - void - pollEvents(); + void pollEvents(); /** * Create a new outgoing call @@ -988,9 +998,19 @@ class Manager { std::function<void()> cb; Runnable(const std::function<void()>&& t) : cb(std::move(t)) {} }; - std::shared_ptr<Runnable> scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when); + + std::shared_ptr<Runnable> scheduleTask(const std::function<void()>&& callable, std::chrono::steady_clock::time_point when); + + template <class T> + std::shared_ptr<Manager::Runnable> + scheduleTask(const std::function<void()>&& callable, const T& delay) { + return scheduleTask(std::move(callable), std::chrono::steady_clock::now() + delay); + } + void scheduleTask(std::shared_ptr<Runnable> task, std::chrono::steady_clock::time_point when); + std::mt19937_64& getRandomEngine() noexcept { return rand_; } + #ifdef RING_VIDEO std::shared_ptr<video::SinkClient> createSinkClient(const std::string& id="", bool mixer=false); std::shared_ptr<video::SinkClient> getSinkClient(const std::string& id); @@ -1004,6 +1024,7 @@ class Manager { decltype(eventHandlerMap_)::iterator nextEventHandler_; std::list<std::function<bool()>> pendingTaskList_; + std::multimap<std::chrono::steady_clock::time_point, std::shared_ptr<Runnable>> scheduledTasks_; std::mutex scheduledTasksMutex_; diff --git a/src/media/socket_pair.cpp b/src/media/socket_pair.cpp index cd731869f2d09adbca1507a1c2a346a564f6d5b2..11e6fb6eb340dc97e88fe8771c2ad7c162ddb398 100644 --- a/src/media/socket_pair.cpp +++ b/src/media/socket_pair.cpp @@ -22,7 +22,6 @@ #include "libav_deps.h" // MUST BE INCLUDED FIRST #include "socket_pair.h" -#include "ice_socket.h" #include "libav_utils.h" #include "logger.h" @@ -55,6 +54,8 @@ extern "C" { #include <fcntl.h> #endif +#include "ice_socket.h" + namespace ring { static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */ diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 95baebbe63e1830d6131acb6310109c3f12ab9eb..a7adc9d36aa799c5610ce9be418b86c470e8ee32 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -45,6 +45,7 @@ #include "logger.h" #include "manager.h" #include "utf8_utils.h" +#include "noncopyable.h" #ifdef RING_VIDEO #include "libav_utils.h" @@ -56,9 +57,15 @@ #include "config/yamlparser.h" #include "security/certstore.h" +#include "security/tls_session.h" + +#include "threadloop.h" +#include "data_transfer.h" #include <yaml-cpp/yaml.h> +// libstdc(++) +#include <utility> #include <algorithm> #include <array> #include <memory> @@ -66,11 +73,15 @@ #include <cctype> #include <cstdarg> #include <string> +#include <fstream> +#include <future> + namespace ring { using sip_utils::CONST_PJ_STR; using std::chrono::system_clock; +using std::chrono::high_resolution_clock; static constexpr int ICE_COMPONENTS {1}; static constexpr int ICE_COMP_SIP_TRANSPORT {0}; @@ -78,6 +89,9 @@ static constexpr int ICE_INIT_TIMEOUT {10}; static constexpr auto ICE_NEGOTIATION_TIMEOUT = std::chrono::seconds(60); static constexpr auto TLS_TIMEOUT = std::chrono::seconds(30); +// Limit number of ICE data msg request/msg that waiting for handling +static constexpr int PENDING_DATA_MSG_LENGHT {15}; + static constexpr const char * const RING_URI_PREFIX = "ring:"; constexpr const char * const RingAccount::ACCOUNT_TYPE; @@ -85,6 +99,8 @@ constexpr const char * const RingAccount::ACCOUNT_TYPE; static std::uniform_int_distribution<dht::Value::Id> udist; +dht::ValueType IceDataCandidates::TYPE; + static const std::string parseRingUri(const std::string& toUrl) { @@ -139,6 +155,7 @@ RingAccount::createIceTransport(const Args&... args) RingAccount::RingAccount(const std::string& accountID, bool /* presenceEnabled */) : SIPAccountBase(accountID), via_addr_() { + IceDataCandidates::TYPE = dht::IceCandidates::TYPE; // Prevent gvar init order fiasco cachePath_ = fileutils::get_cache_dir()+DIR_SEPARATOR_STR+getAccountID(); dataPath_ = cachePath_ + DIR_SEPARATOR_STR "values"; idPath_ = fileutils::get_data_dir()+DIR_SEPARATOR_STR+getAccountID(); @@ -146,8 +163,10 @@ RingAccount::RingAccount(const std::string& accountID, bool /* presenceEnabled * RingAccount::~RingAccount() { + RING_DBG("delete ring account %s", getAccountID().c_str()); Manager::instance().unregisterEventHandler((uintptr_t)this); dht_.join(); + RING_DBG("bye ring account %s", getAccountID().c_str()); } std::shared_ptr<SIPCall> @@ -462,20 +481,19 @@ RingAccount::checkIdentityPath() dht::crypto::Identity RingAccount::loadIdentity() { - dht::crypto::Certificate dht_cert; - dht::crypto::PrivateKey dht_key; + if (dht_id_.second) + return dht_id_; try { #if TARGET_OS_IPHONE const auto path = fileutils::get_data_dir() + DIR_SEPARATOR_STR + getAccountID() + DIR_SEPARATOR_STR; - dht_cert = dht::crypto::Certificate(fileutils::loadFile(path + tlsCertificateFile_)); - dht_key = dht::crypto::PrivateKey(fileutils::loadFile(path + tlsPrivateKeyFile_), tlsPassword_); + dht_id_.second = std::make_shared<dht::crypto::Certificate>(fileutils::loadFile(path + tlsCertificateFile_)); + dht_id_.first = std::make_shared<dht::crypto::PrivateKey>(fileutils::loadFile(path + tlsPrivateKeyFile_), tlsPassword_); #else - dht_cert = dht::crypto::Certificate(fileutils::loadFile(tlsCertificateFile_)); - dht_key = dht::crypto::PrivateKey(fileutils::loadFile(tlsPrivateKeyFile_), tlsPassword_); + dht_id_.second = std::make_shared<dht::crypto::Certificate>(fileutils::loadFile(tlsCertificateFile_)); + dht_id_.first = std::make_shared<dht::crypto::PrivateKey>(fileutils::loadFile(tlsPrivateKeyFile_), tlsPassword_); #endif - } - catch (const std::exception& e) { + } catch (const std::exception& e) { RING_ERR("Error loading identity: %s", e.what()); auto ca = dht::crypto::generateIdentity("Ring CA"); if (!ca.first || !ca.second) { @@ -502,14 +520,13 @@ RingAccount::loadIdentity() tlsPassword_ = {}; username_ = RING_URI_PREFIX+id.second->getId().toString(); + + dht_id_= std::move(id); return id; } - username_ = RING_URI_PREFIX+dht_cert.getId().toString(); - return { - std::make_shared<dht::crypto::PrivateKey>(std::move(dht_key)), - std::make_shared<dht::crypto::Certificate>(std::move(dht_cert)) - }; + username_ = RING_URI_PREFIX + dht_id_.second->getId().toString(); + return dht_id_; } void @@ -829,21 +846,24 @@ RingAccount::doRegister_() dht_.run((in_port_t)dhtPortUsed_, identity, false); + RING_DBG("[DHT:%s] RingID: %s", getAccountID().c_str(), dht_.getId().toString().c_str()); + dht_.setLocalCertificateStore([](const dht::InfoHash& pk_id) { - auto& store = tls::CertificateStore::instance(); - auto cert = store.getCertificate(pk_id.toString()); - std::vector<std::shared_ptr<dht::crypto::Certificate>> ret; - if (cert) - ret.emplace_back(std::move(cert)); - RING_DBG("Query for local certificate store: %s: %zu found.", pk_id.toString().c_str(), ret.size()); - return ret; + RING_DBG("Query for local certificate store: %s", pk_id.toString().c_str()); + auto& store = tls::CertificateStore::instance(); + auto cert = store.getCertificate(pk_id.toString()); + std::vector<std::shared_ptr<dht::crypto::Certificate>> ret; + if (cert) + ret.emplace_back(std::move(cert)); + return ret; }); #if 0 // enable if dht_ logging is needed + static const auto log_head = std::string("DHT| "); dht_.setLoggers( - [](char const* m, va_list args){ vlogger(LOG_ERR, m, args); }, - [](char const* m, va_list args){ vlogger(LOG_WARNING, m, args); }, - [](char const* m, va_list args){ vlogger(LOG_DEBUG, m, args); } + [](char const* m, va_list args){ auto f{log_head+m+ENDL}; vlogger(LOG_ERR, f.c_str(), args); }, + [](char const* m, va_list args){ auto f{log_head+m+ENDL}; vlogger(LOG_WARNING, f.c_str(), args); }, + [](char const* m, va_list args){ auto f{log_head+m+ENDL}; vlogger(LOG_DEBUG, f.c_str(), args); } ); #endif @@ -882,7 +902,8 @@ RingAccount::doRegister_() // Listen for incoming calls auto shared = std::static_pointer_cast<RingAccount>(shared_from_this()); callKey_ = dht::InfoHash::get("callto:"+dht_.getId().toString()); - RING_DBG("Listening on callto:%s : %s", dht_.getId().toString().c_str(), callKey_.toString().c_str()); + RING_DBG("[DHT:%s] callto key: %s", getAccountID().c_str(), callKey_.toString().c_str()); + dht_.listen<dht::IceCandidates>( callKey_, [shared] (dht::IceCandidates&& msg) { @@ -937,6 +958,8 @@ RingAccount::doRegister_() ); auto inboxKey = dht::InfoHash::get("inbox:"+dht_.getId().toString()); + RING_DBG("[DHT:%s] inbox key: %s", getAccountID().c_str(), inboxKey.toString().c_str()); + dht_.listen<dht::TrustRequest>( inboxKey, [shared](dht::TrustRequest&& v) { @@ -990,8 +1013,18 @@ RingAccount::doRegister_() return true; } ); - } - catch (const std::exception& e) { + + // Listen to data channel + dht_.listen<IceDataCandidates>( + inboxKey, + [shared] (IceDataCandidates&& msg) { + if (msg.id & 1) + shared->onDataTransactionReply(msg); + else + shared->onDataTransactionRequest(std::move(msg)); + return true; + }); + } catch (const std::exception& e) { RING_ERR("Error registering DHT account: %s", e.what()); setRegistrationState(RegistrationState::ERROR_GENERIC); } @@ -1359,8 +1392,7 @@ RingAccount::getContactHeader(pjsip_transport* t) */ void RingAccount::enablePresence(const bool& /* enabled */) -{ -} +{} /** * Set the presence (PUBLISH/SUBSCRIBE) support flags @@ -1368,8 +1400,7 @@ RingAccount::enablePresence(const bool& /* enabled */) */ void RingAccount::supportPresence(int /* function */, bool /* enabled*/) -{ -} +{} /* trust requests */ std::map<std::string, std::string> @@ -1512,4 +1543,448 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, messageEngine_.onMessageSent(token, false); } +//================================================================================================== + +class SecureIceTransport +{ +public: + SecureIceTransport(RingAccount& account, const std::string& pid, dht::Value::Id tid, + std::shared_ptr<ReliableSocket::DataConnection> dc, + IceDataCandidates&& remote_cands); + ~SecureIceTransport(); + + inline explicit operator bool() const { + return peer_id.empty(); + } + + void createTlsSession(dht::crypto::Identity& id, + std::shared_future<tls::DhParams> dh_params); + +private: + NON_COPYABLE(SecureIceTransport); + + RingAccount& account; + const std::string peer_id; // DHT peer id + const dht::Value::Id tid; // DHT transaction id + IceDataCandidates remote_candidates; + std::shared_ptr<IceTransport> transport {}; + std::chrono::seconds creationTimepoint; + + std::shared_ptr<ReliableSocket::DataConnection> dataConnection_ {}; + + // DTLS session + std::unique_ptr<tls::TlsSession> tls_; + void onCertificatesUpdate(const gnutls_datum_t*, const gnutls_datum_t*, unsigned int); + int onVerifyCertificate(gnutls_session_t); + + friend class RingAccount; +}; + +/** + * obtainDataConnection searches for already created reliable data connection + * with given peer id, or if doesn't exist, create it using all other parameters. + * On RingAccount, a DataConnection is owned by a secure ICE tranport. + * So, the search consists to find the peer's related transport. + */ +std::shared_ptr<ReliableSocket::DataConnection> +RingAccount::obtainDataConnection(const std::string& peer_id, bool create) +{ + std::lock_guard<std::mutex> lock(sitMutex_); + + // Start previous existance by looking up into established transports + auto item = establishedSITMap_.find(peer_id); + if (item != establishedSITMap_.cend()) + return item->second->dataConnection_; + + if (create) { + // Emplace a new SIT ptr holder if nothing valid existing yet + auto result = pendingSITMap_.emplace(peer_id, nullptr); + auto& sit_ptr = result.first->second; + if (result.second or !sit_ptr) { + if (auto sit = newSecureIceTransport(peer_id)) { + sit_ptr = std::move(sit); + return sit_ptr->dataConnection_; + } + return nullptr; // creation failure + } + return sit_ptr->dataConnection_; + } + + item = pendingSITMap_.find(peer_id); + if (item != pendingSITMap_.cend()) + return item->second->dataConnection_; + return nullptr; // nothing found +} + +std::unique_ptr<SecureIceTransport> +RingAccount::newSecureIceTransport(const std::string& peer_id, bool is_initiator, + IceDataCandidates&& remote_ice) +{ + dht::Value::Id tid; + + if (is_initiator) + tid = udist(rand_) << 1; // request: random id with LSB at 0 + else + tid = remote_ice.id | 1; // reply: LSB = 1 + + auto trx = std::unique_ptr<SecureIceTransport>(new SecureIceTransport {*this, peer_id, tid, + ReliableSocket::makeDataConnection(getAccountID(), peer_id, is_initiator), + std::move(remote_ice)}); + + auto acc = std::static_pointer_cast<RingAccount>(shared_from_this()); + runOnMainThread([acc, peer_id]{ acc->initSecureIceTransport(peer_id); }); + + return std::move(trx); +} + +DRing::DataTransferId +RingAccount::sendFile(const std::string& peer_uri, const std::string& pathname, + UNUSED const std::string& name) +{ + // check filename existance first + std::ifstream stream {pathname, std::ios::in | std::ios::binary}; + if (not stream.good()) { + RING_ERR("[DHT:%s] Cannot open file '%s'", getAccountID().c_str(), pathname.c_str()); + return {}; + } + + /** + * throw: + * std::invalid_argument: if uri is not a recognisable RingID + */ + const auto peer_id = parseRingUri(peer_uri); + + auto dc = obtainDataConnection(peer_id); + + //auto ftx = FileTransfer::newFileTransfer(cid, name, std::move(stream)); + return 0; +} + +void +RingAccount::onDataTransactionRequest(IceDataCandidates&& msg) +{ + /** Request handling algorithm: + * + * A data transaction as to be unique between 2 peers, but each one + * can send a request in same time and even be received in same time. + * Two ICE transports on same peer-pair will be negotiated in this context. + * It's a waste of ressources but hard to solve as breaking the symmetrie is + * not easy: on which element both side can decide with transport has to be abandoned? + * + * In case of existing transaction (in progress or established), the newer wins. + */ + std::lock_guard<std::mutex> lock(sitMutex_); + const auto& peer_id = msg.from.toString(); + + // Check pending list first + auto item = pendingSITMap_.find(peer_id); + if (item != pendingSITMap_.cend()) { + if (item->second->creationTimepoint > msg.timestamp) + return; + else if (msg.timestamp == item->second->creationTimepoint) { + // Symmetric Requests! Biggest NodeID wins. + if (msg.from < dht_.getId()) + return; + } + item->second = newSecureIceTransport(peer_id, false, std::move(msg)); + return; + } else { + auto item = establishedSITMap_.find(peer_id); + if (item != establishedSITMap_.cend()) { + if (item->second->creationTimepoint > msg.timestamp) + return; + } + } + + auto result = pendingSITMap_.emplace(peer_id, newSecureIceTransport(peer_id, false, std::move(msg))); + if (not result.second) + return; +} + +void +RingAccount::onDataTransactionReply(const IceDataCandidates& msg) +{ + std::lock_guard<std::mutex> lock(sitMutex_); + const auto peer_id = msg.from.toString(); + + auto item = pendingSITMap_.find(peer_id); + if (item == pendingSITMap_.cend()) + return; + + // Drop replies not related to the pending request + auto& trx = *item->second; + if ((trx.tid | 1) != msg.id) + return; + + RING_DBG("[DHT:%s] rx data reply/0x%lx by %s:\n%s", getAccountID().c_str(), msg.id, + peer_id.c_str(), std::string(std::begin(msg.ice_data), + std::end(msg.ice_data)).c_str()); + + if (not trx.transport->start(msg.ice_data)) + pendingSITMap_.erase(item); +} + +bool +RingAccount::initSecureIceTransport(const std::string& peer_id) +{ + std::lock_guard<std::mutex> lock(sitMutex_); + auto item = pendingSITMap_.find(peer_id); + if (item == pendingSITMap_.cend()) + return false; + + auto& trx = *item->second; + if (trx.transport) + return true; + + try { + auto weak_acc = weak_from_this(); + auto ice_opts = getIceOptions(); + + // this lambda may run in ICE thread or in the current thread + ice_opts.onInitDone = [weak_acc, peer_id](IceTransport&, bool done) mutable { + if (auto shared_acc = std::static_pointer_cast<RingAccount>(weak_acc.lock())) { + if (done) + runOnMainThread([shared_acc, peer_id]() { shared_acc->onDataIceInitSuccess(peer_id); }); + else { + shared_acc->cancelSecureIceTransport(peer_id); + RING_ERR("ice init failed"); + } + } + }; + + // same remark as onInitDone + ice_opts.onNegoDone = [weak_acc, peer_id](IceTransport&, bool done) { + if (auto shared_acc = std::static_pointer_cast<RingAccount>(weak_acc.lock())) { + if (done) + runOnMainThread([shared_acc, peer_id] { shared_acc->onDataIceNegoSuccess(peer_id); }); + else { + shared_acc->cancelSecureIceTransport(peer_id); + RING_ERR("ice nego failed"); + } + } + }; + + if (auto ice = createIceTransport("data", 1, (trx.tid & 1) == 0, ice_opts)) + trx.transport = std::move(ice); + else + throw std::runtime_error("ICE transport creation failed"); + } catch (const std::exception& e) { + RING_ERR("[DHT:%s] exception in data transaction 0x%lx creation: %s", + getAccountID().c_str(), trx.tid, e.what()); + cancelSecureIceTransport(peer_id); + return false; + } + + return true; +} + +void +RingAccount::cancelSecureIceTransport(const std::string& peer_id) +{ + std::lock_guard<std::mutex> lock(sitMutex_); + pendingSITMap_.erase(peer_id); +} + +void +RingAccount::onDataIceInitSuccess(const std::string& peer_id) +{ + dht::Value::Id tid; + std::vector<uint8_t> candidates; + + { + std::lock_guard<std::mutex> lock(sitMutex_); + auto item = pendingSITMap_.find(peer_id); + if (item == pendingSITMap_.cend()) { + RING_WARN("no pending data transaction with peer %s", peer_id.c_str()); + return; + } + auto& trx = *item->second; + if (not trx.transport) { + RING_ERR("data transaction without transport!"); + return; + } + auto& ice = *trx.transport; + + tid = trx.tid; + candidates = ice.getLocalAttributesAndCandidates(); + + // We can now wait for a remote ICE data or try to start ICE negotiation. + // In both cases we are trying to solve the global connecting state. + //emitDataCnxStatus(getAccountID(), trx.peer_id, DRing::DataTransferCode::CODE_TRYING); + + // Start negotiation if we're slave + if (tid & 1) { + if (!ice.start(trx.remote_candidates.ice_data)) { + cancelSecureIceTransport(peer_id); + return; + } + } + } + + // Send our ICE info to peer through DHT at its inbox key + const auto destination = dht::InfoHash(peer_id); + const auto key = dht::InfoHash::get("inbox:" + peer_id); + dht::Value value {IceDataCandidates(tid, candidates)}; + const auto value_id = value.id; + auto acc = std::static_pointer_cast<RingAccount>(shared_from_this()); + + RING_DBG("[DHT:%s] Tx ICE data/0x%lx at %s (key:%s)", getAccountID().c_str(), tid, + destination.toString().c_str(), key.toString().c_str()); + + dht_.putEncrypted( + key, destination, std::move(value), + // on done callback + [acc, key, value_id, tid, peer_id](bool done) { + if (!done) { + RING_ERR("[DHT:%s] put failed for ICE data/0x%lx", + acc->getAccountID().c_str(), tid); + acc->cancelSecureIceTransport(peer_id); + } + }); +} + +void +RingAccount::onDataIceNegoSuccess(const std::string& peer_id) +{ + // Move pending transaction to running transaction list + { + std::lock_guard<std::mutex> lock(sitMutex_); + + auto new_item = pendingSITMap_.find(peer_id); + if (new_item == pendingSITMap_.cend()) + return; + + RING_DBG("[SIT:%s] ICE connected to %s", getAccountID().c_str(), peer_id.c_str()); + + try { + // Time to negotiate a TLS session + new_item->second->createTlsSession(dht_id_, dhParams_); + } catch (...) { + RING_ERR("[SIT:%s] TLS session failed on peer %s", getAccountID().c_str(), + peer_id.c_str()); + cancelSecureIceTransport(peer_id); + return; + } + + // Replace existing transaction by this newest + auto old_item = establishedSITMap_.find(peer_id); + if (old_item != establishedSITMap_.cend()) + old_item->second = std::move(new_item->second); + else + establishedSITMap_.emplace(std::move(*new_item)); + pendingSITMap_.erase(new_item); + } +} + +//================================================================================================== + +SecureIceTransport::SecureIceTransport(RingAccount& account, const std::string& pid, + dht::Value::Id tid, + std::shared_ptr<ReliableSocket::DataConnection> dc, + IceDataCandidates&& remote_cands) + : account(account) + , peer_id(pid) + , tid(tid) + , remote_candidates(remote_cands) + , dataConnection_(dc) +{ + creationTimepoint = std::chrono::duration_cast<std::chrono::seconds>(system_clock::now().time_since_epoch()); + RING_DBG("[SIT:%lx] created (tid=%lx) for peer %s", dc->getId(), tid, peer_id.c_str()); +} + +SecureIceTransport::~SecureIceTransport() +{ + RING_DBG("[SIT:%lx] destruction", dataConnection_->getId()); + dataConnection_->disconnect(); + + if (transport) { + if (transport->calledFromThread()) { + auto tr = transport; + runOnMainThread([tr]() mutable { tr.reset(); }); + } else + transport.reset(); + } +} + +void +SecureIceTransport::createTlsSession(dht::crypto::Identity& id, + std::shared_future<tls::DhParams> dh_params) +{ + if (tls_) + throw std::logic_error("re-creating a tls session is forbidden"); + + // Try to establish a TLS session over the ICE transport + + dht::InfoHash remote_h {peer_id}; + tls::TlsParams tls_params { + .ca_list = "", // TODO + .cert = id.second, + .cert_key = id.first, + .dh_params = dh_params, + .timeout = std::chrono::duration_cast<decltype(tls::TlsParams::timeout)>(TLS_TIMEOUT), + .cert_check = {} + }; + tls::TlsSession::TlsSessionCallbacks tls_cbs = { + .onStateChange = [this](tls::TlsSessionState state) { + if (state == tls::TlsSessionState::ESTABLISHED) + dataConnection_->connect(tls_.get()); + else if (state == tls::TlsSessionState::SHUTDOWN) + dataConnection_->disconnect(); + }, + .onRxData = [this](std::vector<uint8_t>&& buf) { + dataConnection_->processIncomingPacket(std::move(buf)); + }, + .onCertificatesUpdate = [this](const gnutls_datum_t* l, const gnutls_datum_t* r, + unsigned int n){ onCertificatesUpdate(l, r, n); }, + .verifyCertificate = [this](gnutls_session_t session) { return onVerifyCertificate(session); } + }; + tls_.reset(new tls::TlsSession(transport, 0, tls_params, tls_cbs)); +} + +/* Update local & remote certificates info. This function should be + * called after handshake or re-negotiation successfully completed. + * + * - DO NOT BLOCK - (Called in TlsSession thread) + */ +void +SecureIceTransport::onCertificatesUpdate(UNUSED const gnutls_datum_t* local_raw, + UNUSED const gnutls_datum_t* remote_raw, + UNUSED unsigned int remote_count) +{ + // TODO +} + +int +SecureIceTransport::onVerifyCertificate(gnutls_session_t session) +{ + // Support only x509 format + if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // Store verification status + unsigned int status = 0; + auto ret = gnutls_certificate_verify_peers2(session, &status); + if (ret < 0 or (status & GNUTLS_CERT_SIGNATURE_FAILURE) != 0) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + unsigned int cert_list_size = 0; + auto cert_list = gnutls_certificate_get_peers(session, &cert_list_size); + if (cert_list == nullptr) { + return GNUTLS_E_CERTIFICATE_ERROR; + } + + dht::InfoHash remote_h {peer_id}; + try { + check_peer_certificate(remote_h, status, cert_list, cert_list_size); + } catch (const std::exception& e) { + RING_ERR("[data:%s] TLS certificate check exception: %s", peer_id.c_str(), e.what()); + return GNUTLS_E_CERTIFICATE_ERROR; + } + + // notify GnuTLS to continue handshake normally + return GNUTLS_E_SUCCESS; +} + } // namespace ring diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index dc3e3b6a46c0eb567ab802d84e49f4f6f2334526..15680e9ff9e64b2833a6e93398c43b16c861b23b 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -30,6 +30,7 @@ #include "noncopyable.h" #include "ip_utils.h" #include "ring_types.h" // enable_if_base_of +#include "ice_socket.h" #include <opendht/dhtrunner.h> #include <opendht/default_types.h> @@ -41,6 +42,8 @@ #include <chrono> #include <list> #include <future> +#include <deque> +#include <fstream> /** * @file ringaccount.h @@ -54,6 +57,44 @@ class Emitter; namespace ring { +class FileSender; + +class IceDataCandidates : public dht::EncryptedValue<IceDataCandidates> +{ +public: + static dht::ValueType TYPE; + + IceDataCandidates() {} + IceDataCandidates(dht::Value::Id msg_id, dht::Blob ice) : id(msg_id), ice_data(ice) { + auto now = std::chrono::system_clock::now().time_since_epoch(); + timestamp = std::chrono::duration_cast<std::chrono::seconds>(now); + } + + // Fully re-implement BaseClass as we need to pack 3 values, not 2 + template <typename Packer> + void msgpack_pack(Packer& pk) const { + pk.pack_array(3); + pk.pack(id); + + pk.pack_bin(ice_data.size()); + pk.pack_bin_body((const char*)ice_data.data(), ice_data.size()); + + pk.pack(static_cast<uint64_t>(timestamp.count())); + } + + void msgpack_unpack(msgpack::object o) { + if (o.type != msgpack::type::ARRAY) throw msgpack::type_error(); + if (o.via.array.size < 3) throw msgpack::type_error(); + id = o.via.array.ptr[0].as<dht::Value::Id>(); + ice_data = dht::unpackBlob(o.via.array.ptr[1]); + timestamp = std::chrono::seconds(o.via.array.ptr[2].as<uint64_t>()); + } + + dht::Value::Id id; + dht::Blob ice_data; + std::chrono::seconds timestamp; +}; + namespace Conf { const char *const DHT_PORT_KEY = "dhtPort"; const char *const DHT_VALUES_PATH_KEY = "dhtValuesPath"; @@ -65,7 +106,12 @@ const char *const DHT_ALLOW_PEERS_FROM_CONTACT = "allowPeersFromContact"; const char *const DHT_ALLOW_PEERS_FROM_TRUSTED = "allowPeersFromTrusted"; } +namespace ReliableSocket { +class DataConnection; +} + class IceTransport; +class SecureIceTransport; class RingAccount : public SIPAccountBase { public: @@ -256,6 +302,12 @@ class RingAccount : public SIPAccountBase { void connectivityChanged() override; + DRing::DataTransferId sendFile(const std::string& peer_uri, + const std::string& pathname, + const std::string& name) override; + + void cancelDataTransaction(const std::string& peer_id); + private: NON_COPYABLE(RingAccount); @@ -288,6 +340,7 @@ class RingAccount : public SIPAccountBase { bool mapPortUPnP(); void igdChanged(); + dht::crypto::Identity dht_id_; // set by calling loadIdentity dht::DhtRunner dht_ {}; @@ -415,6 +468,27 @@ class RingAccount : public SIPAccountBase { template <class... Args> std::shared_ptr<IceTransport> createIceTransport(const Args&... args); + + /** + * Data connection + */ + + void onDataTransactionReply(const IceDataCandidates&); + void onDataTransactionRequest(IceDataCandidates&&); + + void onDataIceInitSuccess(const std::string&); + void onDataIceNegoSuccess(const std::string&); + + std::shared_ptr<ReliableSocket::DataConnection> obtainDataConnection(const std::string&, bool creation=true); + std::unique_ptr<SecureIceTransport> newSecureIceTransport(const std::string&, bool is_initiator=true, + IceDataCandidates&& remote_ice={}); + + bool initSecureIceTransport(const std::string&); + void cancelSecureIceTransport(const std::string&); + + std::mutex sitMutex_ {}; // protect following maps + std::map<std::string, std::unique_ptr<SecureIceTransport>> pendingSITMap_; // not peer connected SIT + std::map<std::string, std::unique_ptr<SecureIceTransport>> establishedSITMap_; // connected SIT }; } // namespace ring diff --git a/src/security/tls_session.cpp b/src/security/tls_session.cpp index 75976f0bf9dae24d3e3b3714c4261ab527a17638..44c3700560b583ac7319da47e633d98d9ba24871 100644 --- a/src/security/tls_session.cpp +++ b/src/security/tls_session.cpp @@ -784,6 +784,12 @@ TlsSession::process() callbacks_.onStateChange(new_state); } +IpAddr +TlsSession::getRemoteAddress() const +{ + return socket_->getRemoteAddress(); +} + DhParams DhParams::generate() { diff --git a/src/security/tls_session.h b/src/security/tls_session.h index b895690f3a0cf1c166723c05095aa08ae9f18409..255f948e8ddb47b7d87b2c362dbdce0ff520da74 100644 --- a/src/security/tls_session.h +++ b/src/security/tls_session.h @@ -22,6 +22,7 @@ #pragma once #include "threadloop.h" +#include "ip_utils.h" #include <gnutls/gnutls.h> #include <gnutls/dtls.h> @@ -148,6 +149,8 @@ public: // Request TLS thread to stop and quit. IO are not possible after that. void shutdown(); + IpAddr getRemoteAddress() const; + // Return maximum application payload size in bytes // Returned value must be checked and considered valid only if not 0 (session is initialized) unsigned int getMaxPayload() const { return maxPayload_; } diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index d7b10818737d9e8166f12dca4274eefb06b58e5b..d13165e873a12371c7c2a5f69568452d854dbf4e 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -64,7 +64,7 @@ getVideoSettings() #endif static constexpr int DEFAULT_ICE_INIT_TIMEOUT {10}; // seconds -static constexpr int DEFAULT_ICE_NEGO_TIMEOUT {60}; // seconds +static constexpr auto DEFAULT_ICE_NEGO_TIMEOUT = std::chrono::seconds(60); // SDP media Ids static constexpr int SDP_AUDIO_MEDIA_ID {0}; @@ -929,7 +929,7 @@ SIPCall::onMediaUpdate() if (startIce()) { auto this_ = std::static_pointer_cast<SIPCall>(shared_from_this()); auto ice = iceTransport_; - auto iceTimeout = std::chrono::steady_clock::now() + std::chrono::seconds(10); + auto iceTimeout = std::chrono::steady_clock::now() + DEFAULT_ICE_NEGO_TIMEOUT; Manager::instance().addTask([=] { if (ice != this_->iceTransport_) { RING_WARN("[call:%s] ICE transport replaced", getCallId().c_str()); @@ -937,7 +937,7 @@ SIPCall::onMediaUpdate() } /* First step: wait for an ICE transport for SIP channel */ if (this_->iceTransport_->isFailed() or std::chrono::steady_clock::now() >= iceTimeout) { - RING_DBG("[call:%s] ICE init failed (or timeout)", getCallId().c_str()); + RING_ERR("[call:%s] ICE init failed (or timeout)", getCallId().c_str()); this_->onFailure(ETIMEDOUT); return false; }