From cb72c104fd2d787a78fd13723d6e0d16c9a01547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Fri, 3 Nov 2017 10:32:48 -0400 Subject: [PATCH] proxy: implement basic DhtProxyClient with the ability to get and put. + Add an abstract class to describe the Dht + Now, we can modify the Dht linked to a SecureDht. So we can choose between the classic Dht and the Dht using a proxy. + Implement a basic client which can Get and Put values on the Dht using the proxy. Note: Update dhtnode to have the ability to use the DhtProxyClient. Todo: Implement Listen, ping, connectivityChanged for the client. --- CMakeLists.txt | 23 +- configure.ac | 20 ++ include/opendht/callbacks.h | 12 +- include/opendht/dht.h | 27 +-- include/opendht/dht_interface.h | 235 +++++++++++++++++++++ include/opendht/dht_proxy_client.h | 296 ++++++++++++++++++++++++++ include/opendht/dhtrunner.h | 33 ++- include/opendht/securedht.h | 190 +++++++++++++++-- src/callbacks.cpp | 12 ++ src/dht.cpp | 8 - src/dht_proxy_client.cpp | 323 +++++++++++++++++++++++++++++ src/dht_proxy_server.cpp | 6 +- src/dhtrunner.cpp | 123 +++++++---- src/securedht.cpp | 30 +-- tools/dhtnode.cpp | 24 +++ tools/tools_common.h | 5 + 16 files changed, 1255 insertions(+), 112 deletions(-) create mode 100644 include/opendht/dht_interface.h create mode 100644 include/opendht/dht_proxy_client.h create mode 100644 src/dht_proxy_client.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2f73c475..26b8cb30 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,6 +145,7 @@ list (APPEND opendht_HEADERS include/opendht/node.h include/opendht/value.h include/opendht/dht.h + include/opendht/dht_interface.h include/opendht/callbacks.h include/opendht/routing_table.h include/opendht/node_cache.h @@ -170,13 +171,31 @@ if (OPENDHT_PROXY_SERVER) ) list (APPEND opendht_SOURCES src/dht_proxy_server.cpp - src/base64.h - src/base64.cpp ) else () add_definitions(-DENABLE_PROXY_SERVER=false) endif () +if (OPENDHT_PROXY_CLIENT) + add_definitions(-DOPENDHT_PROXY_CLIENT=true) + list (APPEND opendht_HEADERS + include/opendht/dht_proxy_client.h + ) + list (APPEND opendht_SOURCES + src/dht_proxy_client.cpp + ) +else () + add_definitions(-DOPENDHT_PROXY_CLIENT=false) +endif () + +if (OPENDHT_PROXY_SERVER OR OPENDHT_PROXY_CLIENT) + list (APPEND opendht_SOURCES + src/base64.h + src/base64.cpp + ) +endif () + + if(OPENDHT_ARGON2) # make sure argon2 submodule is up to date and initialized message("Initializing Argon2 submodule") diff --git a/configure.ac b/configure.ac index 819f20a9..5f94d27c 100644 --- a/configure.ac +++ b/configure.ac @@ -130,6 +130,12 @@ AM_COND_IF([ENABLE_TOOLS], [ AC_ARG_ENABLE([proxy_server], AS_HELP_STRING([--enable-proxy-server], [Enable proxy server ability]), proxy_server=yes, proxy_server=no) AM_CONDITIONAL(ENABLE_PROXY_SERVER, test x$proxy_server == xyes) +AC_ARG_ENABLE([proxy_server_identity], AS_HELP_STRING([--enable-proxy-server-identity], + [Enable proxy server ability]), proxy_server_identity=yes, proxy_server_identity=no) +AM_CONDITIONAL(ENABLE_PROXY_SERVER_IDENTITY, test x$proxy_server_identity == xyes) +AC_ARG_ENABLE([proxy_client], AS_HELP_STRING([--enable-proxy-client], [Enable proxy client ability]), proxy_client=yes, proxy_client=no) +AM_CONDITIONAL(ENABLE_PROXY_CLIENT, test x$proxy_client == xyes) + AM_COND_IF([ENABLE_PROXY_SERVER], [ AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files])) PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.4]) @@ -146,6 +152,20 @@ AM_COND_IF([ENABLE_PROXY_SERVER_IDENTITY], [ CPPFLAGS+=" -DOPENDHT_PROXY_SERVER_IDENTITY=false" ]) +AM_COND_IF([ENABLE_PROXY_CLIENT], [ + CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT=true" +], [ + CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT=false" +]) + +AM_CONDITIONAL(PROXY_CLIENT_OR_SERVER, test x$proxy_client == xyes | test x$proxy_server == xyes) +AM_COND_IF([PROXY_CLIENT_OR_SERVER], [ + AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files])) + PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.4]) + CPPFLAGS="${CPPFLAGS} ${Jsoncpp_CFLAGS}" + LDFLAGS="${LDFLAGS} ${Jsoncpp_LIBS} -lrestbed" +], []) + AC_CONFIG_FILES([doc/Doxyfile doc/Makefile]) diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h index e354ba21..b58a8b1f 100644 --- a/include/opendht/callbacks.h +++ b/include/opendht/callbacks.h @@ -44,11 +44,11 @@ enum class NodeStatus { }; struct OPENDHT_PUBLIC NodeStats { - unsigned good_nodes, - dubious_nodes, - cached_nodes, - incoming_nodes; - unsigned table_depth; + unsigned good_nodes {0}, + dubious_nodes {0}, + cached_nodes {0}, + incoming_nodes {0}; + unsigned table_depth {0}; unsigned getKnownNodes() const { return good_nodes + dubious_nodes; } std::string toString() const; #if OPENDHT_PROXY_SERVER @@ -56,6 +56,8 @@ struct OPENDHT_PUBLIC NodeStats { * Build a json object from a NodeStats */ Json::Value toJson() const; + NodeStats() {}; + explicit NodeStats(const Json::Value& v); #endif //OPENDHT_PROXY_SERVER }; diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 60794af6..33aaef39 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -26,7 +26,7 @@ #include "scheduler.h" #include "routing_table.h" #include "callbacks.h" -#include "log_enable.h" +#include "dht_interface.h" #include <string> #include <array> @@ -58,15 +58,9 @@ struct LocalListener; * Must be given open UDP sockets and ::periodic must be * called regularly. */ -class OPENDHT_PUBLIC Dht { +class OPENDHT_PUBLIC Dht : public DhtInterface { public: - // [[deprecated]] - using NodeExport = dht::NodeExport; - - // [[deprecated]] - using Status = NodeStatus; - Dht(); /** @@ -103,18 +97,6 @@ public: */ bool isRunning(sa_family_t af = 0) const; - /** - * Enable or disable logging of DHT internal messages - */ - void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG); - - /** - * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). - */ - void setLogFilter(const InfoHash& f) { - DHT_LOG.setFilter(f); - } - virtual void registerType(const ValueType& type) { types[type.id] = type; } @@ -307,11 +289,6 @@ public: std::vector<SockAddr> getPublicAddress(sa_family_t family = 0); -protected: - Logger DHT_LOG; - bool logFilerEnable_ {}; - InfoHash logFiler_ {}; - private: /* When performing a search, we search for up to SEARCH_NODES closest nodes diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h new file mode 100644 index 00000000..bfaa7601 --- /dev/null +++ b/include/opendht/dht_interface.h @@ -0,0 +1,235 @@ +/* + * Copyright (C) 2014-2017 Savoir-faire Linux Inc. + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "infohash.h" +#include "log_enable.h" + +namespace dht { + +class OPENDHT_PUBLIC DhtInterface { +public: + DhtInterface() = default; + virtual ~DhtInterface() = default; + + // [[deprecated]] + using Status = NodeStatus; + // [[deprecated]] + using NodeExport = dht::NodeExport; + + /** + * Get the current status of the node for the given family. + */ + virtual NodeStatus getStatus(sa_family_t af) const = 0; + virtual NodeStatus getStatus() const = 0; + + /** + * Get the ID of the node. + */ + virtual const InfoHash& getNodeId() const = 0; + + /** + * Performs final operations before quitting. + */ + virtual void shutdown(ShutdownCallback cb) = 0; + + /** + * Returns true if the node is running (have access to an open socket). + * + * af: address family. If non-zero, will return true if the node + * is running for the provided family. + */ + virtual bool isRunning(sa_family_t af = 0) const = 0; + + virtual void registerType(const ValueType& type) = 0; + + virtual const ValueType& getType(ValueType::Id type_id) const = 0; + + /** + * Insert a node in the main routing table. + * The node is not pinged, so this should be + * used to bootstrap efficiently from previously known nodes. + */ + virtual void insertNode(const InfoHash& id, const SockAddr&) = 0; + virtual void insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen) = 0; + virtual void insertNode(const NodeExport& n) = 0; + + virtual void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& cb={}) = 0; + + virtual time_point periodic(const uint8_t *buf, size_t buflen, const SockAddr&) = 0; + virtual time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) = 0; + + /** + * Get a value by searching on all available protocols (IPv4, IPv6), + * and call the provided get callback when values are found at key. + * The operation will start as soon as the node is connected to the network. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param donecb a function called when the operation is complete. + cb and donecb won't be called again afterward. + * @param f a filter function used to prefilter values. + */ + virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) = 0; + virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) = 0; + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) = 0; + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) = 0; + + /** + * Similar to Dht::get, but sends a Query to filter data remotely. + * @param key the key for which to query data for. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param done_cb a function called when the operation is complete. + cb and done_cb won't be called again afterward. + * @param q a query used to filter values on the remotes before they send a + * response. + */ + virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) = 0; + virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) = 0; + + /** + * Get locally stored data for the given hash. + */ + virtual std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const = 0; + + /** + * Get locally stored data for the given key and value id. + */ + virtual Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const = 0; + + /** + * Announce a value on all available protocols (IPv4, IPv6). + * + * The operation will start as soon as the node is connected to the network. + * The done callback will be called once, when the first announce succeeds, or fails. + */ + virtual void put(const InfoHash& key, + Sp<Value>, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) = 0; + virtual void put(const InfoHash& key, + const Sp<Value>& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) = 0; + virtual void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) = 0; + virtual void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) = 0; + + /** + * Get data currently being put at the given hash. + */ + virtual std::vector<Sp<Value>> getPut(const InfoHash&) = 0; + + /** + * Get data currently being put at the given hash with the given id. + */ + virtual Sp<Value> getPut(const InfoHash&, const Value::Id&) = 0; + + /** + * Stop any put/announce operation at the given location, + * for the value with the given id. + */ + virtual bool cancelPut(const InfoHash&, const Value::Id&) = 0; + + /** + * Listen on the network for any changes involving a specified hash. + * The node will register to receive updates from relevent nodes when + * new values are added or removed. + * + * @return a token to cancel the listener later. + */ + virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&& w = {}) = 0; + virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) = 0; + + virtual bool cancelListen(const InfoHash&, size_t token) = 0; + + /** + * Inform the DHT of lower-layer connectivity changes. + * This will cause the DHT to assume a public IP address change. + * The DHT will recontact neighbor nodes, re-register for listen ops etc. + */ + virtual void connectivityChanged(sa_family_t) = 0; + virtual void connectivityChanged() = 0; + + /** + * Get the list of good nodes for local storage saving purposes + * The list is ordered to minimize the back-to-work delay. + */ + virtual std::vector<NodeExport> exportNodes() = 0; + + virtual std::vector<ValuesExport> exportValues() const = 0; + virtual void importValues(const std::vector<ValuesExport>&) = 0; + + virtual NodeStats getNodesStats(sa_family_t af) const = 0; + + virtual std::string getStorageLog() const = 0; + virtual std::string getStorageLog(const InfoHash&) const = 0; + + virtual std::string getRoutingTablesLog(sa_family_t) const = 0; + virtual std::string getSearchesLog(sa_family_t) const = 0; + virtual std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const = 0; + + virtual void dumpTables() const = 0; + virtual std::vector<unsigned> getNodeMessageStats(bool in = false) = 0; + + /** + * Set the in-memory storage limit in bytes + */ + virtual void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) = 0; + + /** + * Returns the total memory usage of stored values and the number + * of stored values. + */ + virtual std::pair<size_t, size_t> getStoreSize() const = 0; + + virtual std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) = 0; + + Logger DHT_LOG; + /** + * Enable or disable logging of DHT internal messages + */ + virtual void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG) + { + DHT_LOG.DEBUG = debug; + DHT_LOG.WARN = warn; + DHT_LOG.ERR = error; + } + + /** + * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). + */ + virtual void setLogFilter(const InfoHash& f) { + DHT_LOG.setFilter(f); + } +protected: + bool logFilerEnable_ {}; + InfoHash logFiler_ {}; +}; + +} // namespace dht diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h new file mode 100644 index 00000000..64a2881f --- /dev/null +++ b/include/opendht/dht_proxy_client.h @@ -0,0 +1,296 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#if OPENDHT_PROXY_SERVER + +#pragma once + +#include <thread> + +#include "callbacks.h" +#include "def.h" +#include "dht_interface.h" +#include "scheduler.h" + +namespace restbed +{ + class Request; +} + +namespace dht { + +class OPENDHT_PUBLIC DhtProxyClient : public DhtInterface { +public: + + DhtProxyClient() : scheduler(DHT_LOG) {} + + /** + * Initialise the DhtProxyClient with two open sockets (for IPv4 and IP6) + * and an ID for the node. + */ + explicit DhtProxyClient(const std::string& serverHost); + virtual ~DhtProxyClient(); + + /** + * Get the ID of the node. + */ + inline const InfoHash& getNodeId() const { return myid; } + + /** + * Get the current status of the node for the given family. + */ + NodeStatus getStatus(sa_family_t af) const; + NodeStatus getStatus() const { + return std::max(getStatus(AF_INET), getStatus(AF_INET6)); + } + + /** + * Performs final operations before quitting. + */ + void shutdown(ShutdownCallback cb); + + /** + * Returns true if the node is running (have access to an open socket). + * + * af: address family. If non-zero, will return true if the node + * is running for the provided family. + */ + bool isRunning(sa_family_t af = 0) const; + + /** + * Get a value by asking the proxy and call the provided get callback when + * values are found at key. + * The operation will start as soon as the node is connected to the network. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param donecb a function called when the operation is complete. + cb and donecb won't be called again afterward. + * @param f a filter function used to prefilter values. + */ + virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}); + virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) { + get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); + } + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) { + get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w)); + } + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) { + get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); + } + + /** + * Announce a value on all available protocols (IPv4, IPv6). + * + * The operation will start as soon as the node is connected to the network. + * The done callback will be called once, when the first announce succeeds, or fails. + * NOTE: For now, created parameter is ignored. + */ + void put(const InfoHash& key, + Sp<Value>, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false); + void put(const InfoHash& key, + const Sp<Value>& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, v, bindDoneCb(cb), created, permanent); + } + + void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent); + } + void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent); + } + + NodeStats getNodesStats(sa_family_t af) const; + + std::vector<SockAddr> getPublicAddress(sa_family_t family = 0); + + + /** + * TODO + * NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods. + * It will come in another version. + */ + + /** + * Listen on the network for any changes involving a specified hash. + * The node will register to receive updates from relevent nodes when + * new values are added or removed. + * + * @return a token to cancel the listener later. + */ + virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&&={}) { return 0; }; + virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { + return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); + } + virtual bool cancelListen(const InfoHash&, size_t /*token*/) { return false; } + + /** + * Similar to Dht::get, but sends a Query to filter data remotely. + * @param key the key for which to query data for. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param done_cb a function called when the operation is complete. + cb and done_cb won't be called again afterward. + * @param q a query used to filter values on the remotes before they send a + * response. + */ + virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) { } + virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) { + query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q)); + } + + /** + * Get data currently being put at the given hash. + */ + std::vector<Sp<Value>> getPut(const InfoHash&) { return {}; } + + /** + * Get data currently being put at the given hash with the given id. + */ + Sp<Value> getPut(const InfoHash&, const Value::Id&) { return {}; } + + /** + * Stop any put/announce operation at the given location, + * for the value with the given id. + */ + bool cancelPut(const InfoHash&, const Value::Id&) { return false; } + + void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { } + + /** + * NOTE: The following methods will not be implemented because the + * DhtProxyClient doesn't have any storage nor synchronization process + */ + + /** + * Insert a node in the main routing table. + * The node is not pinged, so this should be + * used to bootstrap efficiently from previously known nodes. + */ + void insertNode(const InfoHash&, const SockAddr&) { } + void insertNode(const InfoHash&, const sockaddr*, socklen_t) { } + void insertNode(const NodeExport&) { } + + /** + * Returns the total memory usage of stored values and the number + * of stored values. + */ + std::pair<size_t, size_t> getStoreSize() const { return {}; } + + virtual void registerType(const ValueType&) { } + const ValueType& getType(ValueType::Id) const { } + + /** + * Get locally stored data for the given hash. + */ + std::vector<Sp<Value>> getLocal(const InfoHash&, Value::Filter) const { return {}; } + + /** + * Get locally stored data for the given key and value id. + */ + Sp<Value> getLocalById(const InfoHash&, Value::Id) const { return {}; } + + /** + * Get the list of good nodes for local storage saving purposes + * The list is ordered to minimize the back-to-work delay. + */ + std::vector<NodeExport> exportNodes() { return {}; } + + std::vector<ValuesExport> exportValues() const { return {}; } + void importValues(const std::vector<ValuesExport>&) {} + + std::string getStorageLog() const { return {}; } + std::string getStorageLog(const InfoHash&) const { return {}; } + + std::string getRoutingTablesLog(sa_family_t) const { return {}; } + std::string getSearchesLog(sa_family_t) const { return {}; } + std::string getSearchLog(const InfoHash&, sa_family_t) const { return {}; } + + void dumpTables() const {} + std::vector<unsigned> getNodeMessageStats(bool) { return {}; } + + /** + * Set the in-memory storage limit in bytes + */ + void setStorageLimit(size_t) {} + + /** + * Inform the DHT of lower-layer connectivity changes. + * This will cause the DHT to assume a public IP address change. + * The DHT will recontact neighbor nodes, re-register for listen ops etc. + */ + void connectivityChanged(sa_family_t) {} + void connectivityChanged() { + connectivityChanged(AF_INET); + connectivityChanged(AF_INET6); + } + + time_point periodic(const uint8_t*, size_t, const SockAddr&) { + scheduler.syncTime(); + return scheduler.run(); + } + time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { + return periodic(buf, buflen, SockAddr(from, fromlen)); + } + +private: + Json::Value getProxyInfos() const; + /** + * Initialize statusIpvX_ + */ + void getConnectivityStatus(); + /** + * cancel all Operations + */ + void cancelAllOperations(); + std::string serverHost_; + NodeStatus statusIpv4_ {NodeStatus::Disconnected}; + NodeStatus statusIpv6_ {NodeStatus::Disconnected}; + + InfoHash myid {}; + struct Operation + { + std::shared_ptr<restbed::Request> req; + std::unique_ptr<std::thread> thread; + }; + std::vector<Operation> operations_; + + Scheduler scheduler; + Sp<Scheduler::Job> nextNodesConfirmation {}; + void confirmProxy(); +}; + +} + +#endif // OPENDHT_PROXY_CLIENT diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index c95c2eb8..d75e977f 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -36,6 +36,10 @@ #include <queue> #include <chrono> +#if OPENDHT_PROXY_CLIENT +#include "dht_proxy_client.h" +#endif // OPENDHT_PROXY_CLIENT + namespace dht { struct Node; @@ -297,6 +301,9 @@ public: struct Config { SecureDhtConfig dht_config; bool threaded; +#if OPENDHT_PROXY_CLIENT + std::string proxy_server; +#endif //OPENDHT_PROXY_CLIENT }; /** @@ -305,7 +312,11 @@ public: * @param threaded: If false, ::loop() must be called periodically. Otherwise a thread is launched. * @param cb: Optional callback to receive general state information. */ - void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0) { + void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0 +#if OPENDHT_PROXY_CLIENT + , const std::string& proxy_server = "127.0.0.1:8000" +#endif //OPENDHT_PROXY_CLIENT +) { run(port, { /*.dht_config = */{ /*.node_config = */{ @@ -316,7 +327,10 @@ public: }, /*.id = */identity }, - /*.threaded = */threaded + /*.threaded = */threaded, +#if OPENDHT_PROXY_CLIENT + /*.proxy_server = */proxy_server +#endif //OPENDHT_PROXY_CLIENT }); } void run(in_port_t port, Config config); @@ -362,6 +376,14 @@ public: */ void join(); +#if OPENDHT_PROXY_CLIENT + void setProxyServer(const std::string& url = "127.0.0.1:8000") { + config_.proxy_server = url; + } + void enableProxy(bool proxify); +#endif // OPENDHT_PROXY_CLIENT + + static std::vector<SockAddr> getAddrInfo(const std::string& host, const std::string& service); private: static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10}; @@ -381,6 +403,13 @@ private: } std::unique_ptr<SecureDht> dht_; + void resetDht(); + SecureDht* activeDht() const; +#if OPENDHT_PROXY_CLIENT + std::atomic_bool use_proxy {false}; + std::unique_ptr<SecureDht> dht_via_proxy_; + Config config_; +#endif // OPENDHT_PROXY_CLIENT mutable std::mutex dht_mtx {}; std::thread dht_thread {}; std::condition_variable cv {}; diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 3c251941..e2efed5f 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -29,13 +29,21 @@ namespace dht { -class OPENDHT_PUBLIC SecureDht : public Dht { +class OPENDHT_PUBLIC SecureDht : public DhtInterface { public: typedef std::function<void(bool)> SignatureCheckCallback; using Config = SecureDhtConfig; + static dht::Config& getConfig(SecureDht::Config& conf) + { + auto& c = conf.node_config; + if (not c.node_id and conf.id.second) + c.node_id = InfoHash::get("node:"+conf.id.second->getId().toString()); + return c; + } + SecureDht() {} /** @@ -44,7 +52,7 @@ public: * id: the identity to use for the crypto layer and to compute * our own hash on the Dht. */ - SecureDht(int s, int s6, Config config); + SecureDht(std::unique_ptr<DhtInterface> dht, Config config); virtual ~SecureDht(); @@ -62,14 +70,17 @@ public: return secureType(std::move(tmp_type)); } - virtual void registerType(const ValueType& type) override { - Dht::registerType(secureType(type)); + void registerType(const ValueType& type) { + if (dht_) + dht_->registerType(secureType(type)); } - virtual void registerType(ValueType&& type) { - Dht::registerType(secureType(std::forward<ValueType>(type))); + void registerType(ValueType&& type) { + if (dht_) + dht_->registerType(secureType(std::forward<ValueType>(type))); } - virtual void registerInsecureType(const ValueType& type) { - Dht::registerType(type); + void registerInsecureType(const ValueType& type) { + if (dht_) + dht_->registerType(type); } /** @@ -77,18 +88,18 @@ public: * If the signature can't be checked, or if the data can't be decrypted, it is not returned. * Public, non-signed & non-encrypted data is retransmitted as-is. */ - virtual void get(const InfoHash& id, GetCallback cb, DoneCallback donecb={}, Value::Filter&& = {}, Where&& w = {}) override; - virtual void get(const InfoHash& id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f = {}, Where&& w = {}) override { + void get(const InfoHash& id, GetCallback cb, DoneCallback donecb={}, Value::Filter&& = {}, Where&& w = {}); + void get(const InfoHash& id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f = {}, Where&& w = {}) { get(id, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } - virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override { + void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) { get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w)); } - virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override { + void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) { get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } - virtual size_t listen(const InfoHash& id, GetCallback cb, Value::Filter&& = {}, Where&& w = {}) override; + size_t listen(const InfoHash& id, GetCallback cb, Value::Filter&& = {}, Where&& w = {}); /** * Will take ownership of the value, sign it using our private key and put it in the DHT. @@ -135,7 +146,160 @@ public: localQueryMethod_ = std::move(query_method); } + /** + * SecureDht to Dht proxy + */ + void shutdown(ShutdownCallback cb) { + dht_->shutdown(cb); + } + void dumpTables() const { + dht_->dumpTables(); + } + inline const InfoHash& getNodeId() const { return dht_->getNodeId(); } + std::pair<size_t, size_t> getStoreSize() const { + return dht_->getStoreSize(); + } + std::string getStorageLog() const { + return dht_->getStorageLog(); + } + std::string getStorageLog(const InfoHash& h) const { + return dht_->getStorageLog(h); + } + void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) { + dht_->setStorageLimit(limit); + } + std::vector<NodeExport> exportNodes() { + return dht_->exportNodes(); + } + std::vector<ValuesExport> exportValues() const { + return dht_->exportValues(); + } + void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG) { + dht_->setLoggers(error, warn, debug); + } + void setLogFilter(const InfoHash& f) { + dht_->setLogFilter(f); + } + void importValues(const std::vector<ValuesExport>& v) { + dht_->importValues(v); + } + NodeStats getNodesStats(sa_family_t af) const { + return dht_->getNodesStats(af); + } + std::vector<unsigned> getNodeMessageStats(bool in = false) { + return dht_->getNodeMessageStats(in); + } + std::string getRoutingTablesLog(sa_family_t af) const { + return dht_->getRoutingTablesLog(af); + } + std::string getSearchesLog(sa_family_t af) const { + return dht_->getSearchesLog(af); + } + std::string getSearchLog(const InfoHash& h, sa_family_t af = AF_UNSPEC) const { + return dht_->getSearchLog(h, af); + } + std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) { + return dht_->getPublicAddress(family); + } + time_point periodic(const uint8_t *buf, size_t buflen, const SockAddr& sa) { + return dht_->periodic(buf, buflen, sa); + } + time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { + return dht_->periodic(buf, buflen, from, fromlen); + } + NodeStatus getStatus(sa_family_t af) const { + return dht_->getStatus(af); + } + NodeStatus getStatus() const { + return dht_->getStatus(); + } + bool isRunning(sa_family_t af = 0) const { + return dht_->isRunning(af); + } + const ValueType& getType(ValueType::Id type_id) const { + return dht_->getType(type_id); + } + void insertNode(const InfoHash& id, const SockAddr& sa) { + dht_->insertNode(id, sa); + } + void insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen) { + dht_->insertNode(id, sa, salen); + } + void insertNode(const NodeExport& n) { + dht_->insertNode(n); + } + void pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb={}) { + dht_->pingNode(sa, salen, std::move(cb)); + } + void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) { + dht_->query(key, cb, done_cb, std::move(q)); + } + void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) { + dht_->query(key, cb, done_cb, std::move(q)); + } + std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const { + return dht_->getLocal(key, f); + } + Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const { + return dht_->getLocalById(key, vid); + } + void put(const InfoHash& key, + Sp<Value> v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, v, cb, created, permanent); + } + void put(const InfoHash& key, + const Sp<Value>& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, v, cb, created, permanent); + } + + void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, std::move(v), cb, created, permanent); + } + void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, std::move(v), cb, created, permanent); + } + std::vector<Sp<Value>> getPut(const InfoHash& h) { + return dht_->getPut(h); + } + Sp<Value> getPut(const InfoHash& h, const Value::Id& vid) { + return dht_->getPut(h, vid); + } + bool cancelPut(const InfoHash& h, const Value::Id& vid) { + return dht_->cancelPut(h, vid); + } + size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { + return dht_->listen(key, cb, f, w); + } + bool cancelListen(const InfoHash& h, size_t token) { + return dht_->cancelListen(h, token); + } + void connectivityChanged(sa_family_t af) { + dht_->connectivityChanged(af); + } + void connectivityChanged() { + dht_->connectivityChanged(); + } + private: + std::unique_ptr<DhtInterface> dht_; // prevent copy SecureDht(const SecureDht&) = delete; SecureDht& operator=(const SecureDht&) = delete; diff --git a/src/callbacks.cpp b/src/callbacks.cpp index 89719ef1..26a321f9 100644 --- a/src/callbacks.cpp +++ b/src/callbacks.cpp @@ -86,6 +86,18 @@ NodeStats::toJson() const } return val; } + +NodeStats::NodeStats(const Json::Value& val) +{ + if (val.isMember("good")) + good_nodes = static_cast<unsigned>(val["good"].asLargestUInt()); + if (val.isMember("dubious")) + dubious_nodes = static_cast<unsigned>(val["dubious"].asLargestUInt()); + if (val.isMember("incoming")) + incoming_nodes = static_cast<unsigned>(val["incoming"].asLargestUInt()); + if (val.isMember("table_depth")) + table_depth = static_cast<unsigned>(val["table_depth"].asLargestUInt()); +} #endif //OPENDHT_PROXY_SERVER } diff --git a/src/dht.cpp b/src/dht.cpp index dc66511b..418538da 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -39,14 +39,6 @@ constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME; constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME; constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN; -void -Dht::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) -{ - DHT_LOG.DEBUG = debug; - DHT_LOG.WARN = warn; - DHT_LOG.ERR = error; -} - NodeStatus Dht::getStatus(sa_family_t af) const { diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp new file mode 100644 index 00000000..9f175f5b --- /dev/null +++ b/src/dht_proxy_client.cpp @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#if OPENDHT_PROXY_SERVER + +#include "dht_proxy_client.h" + +#include <chrono> +#include <json/json.h> +#include <restbed> +#include <vector> + +#include "dhtrunner.h" + +constexpr const char* const HTTP_PROTO {"http://"}; + +// TODO connectivity changed +// TODO follow listen between non proxified and proxified + +namespace dht { + +DhtProxyClient::DhtProxyClient(const std::string& serverHost) +: serverHost_(serverHost), scheduler(DHT_LOG) +{ + auto confirm_nodes_time = scheduler.time() + std::chrono::seconds(5); + nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&DhtProxyClient::confirmProxy, this)); + + getConnectivityStatus(); +} + +void +DhtProxyClient::confirmProxy() +{ + getConnectivityStatus(); + auto disconnected = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected; + auto time = disconnected ? std::chrono::seconds(5) : std::chrono::seconds(600); + auto confirm_nodes_time = scheduler.time() + time; + scheduler.edit(nextNodesConfirmation, confirm_nodes_time); +} + +DhtProxyClient::~DhtProxyClient() +{ + cancelAllOperations(); +} + +void +DhtProxyClient::cancelAllOperations() +{ + for (auto& operation: operations_) { + if (operation.thread && operation.thread->joinable()) { + // Close connection to stop operation? + restbed::Http::close(operation.req); + operation.thread->join(); + } + } +} + +void +DhtProxyClient::shutdown(ShutdownCallback cb) +{ + cancelAllOperations(); + cb(); +} + +NodeStatus +DhtProxyClient::getStatus(sa_family_t af) const +{ + switch (af) + { + case AF_INET: + return statusIpv4_; + case AF_INET6: + return statusIpv6_; + default: + return NodeStatus::Disconnected; + } +} + +bool +DhtProxyClient::isRunning(sa_family_t af) const +{ + switch (af) + { + case AF_INET: + return statusIpv4_ == NodeStatus::Connected; + case AF_INET6: + return statusIpv6_ == NodeStatus::Connected; + default: + return false; + } +} + + +void +DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, + Value::Filter&& filter, Where&& where) +{ + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared<restbed::Request>(uri); + Query query {{}, where}; + auto filterChain = filter.chain(query.where.getFilter()); + + Operation o; + o.req = req; + o.thread = std::move(std::unique_ptr<std::thread>( + new std::thread([=](){ + // Try to contact the proxy and set the status to connected when done. + // will change the connectivity status + auto ok = std::make_shared<bool>(true); + auto future = restbed::Http::async(req, + [=](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + try { + while (restbed::Http::is_open(req)) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + Json::Value json; + Json::Reader reader; + if (reader.parse(body, json)) { + auto value = std::make_shared<Value>(json); + if (not filterChain or filterChain(*value)) + cb({value}); + } else { + *ok = false; + } + } + } catch (std::runtime_error& e) { } + } else { + *ok = false; + } + }); + future.wait(); + donecb(*ok, {}); + if (!ok) { + // Connection failed, update connectivity + getConnectivityStatus(); + } + }))); + operations_.emplace_back(std::move(o)); +} + +void +DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point, bool permanent) +{ + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("POST"); + Json::FastWriter writer; + auto json = val->toJson(); + if (permanent) + json["permanent"] = true; + auto body = writer.write(json); + req->set_body(body); + req->set_header("Content-Length", std::to_string(body.size())); + + Operation o; + o.req = req; + o.thread = std::move(std::unique_ptr<std::thread>( + new std::thread([=](){ + auto ok = std::make_shared<bool>(true); + auto future = restbed::Http::async(req, + [this, val, ok](const std::shared_ptr<restbed::Request>& /*req*/, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + Json::Value json; + Json::Reader reader; + if (reader.parse(body, json)) { + auto value = std::make_shared<Value>(json); + } else { + *ok = false; + } + } else { + *ok = false; + } + }); + future.wait(); + cb(*ok, {}); + if (!ok) { + // Connection failed, update connectivity + getConnectivityStatus(); + } + }))); + operations_.emplace_back(std::move(o)); +} + +NodeStats +DhtProxyClient::getNodesStats(sa_family_t af) const +{ + auto proxyInfos = getProxyInfos(); + NodeStats stats {}; + auto identifier = af == AF_INET6 ? "ipv6" : "ipv4"; + try { + stats = NodeStats(proxyInfos[identifier]); + } catch (...) { } + return stats; +} + +Json::Value +DhtProxyClient::getProxyInfos() const +{ + auto result = std::make_shared<Json::Value>(); + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/"); + auto req = std::make_shared<restbed::Request>(uri); + + // Try to contact the proxy and set the status to connected when done. + // will change the connectivity status + auto future = restbed::Http::async(req, + [this, result](const std::shared_ptr<restbed::Request>&, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + + Json::Reader reader; + reader.parse(body, *result); + } + }); + future.wait(); + return *result; +} + +std::vector<SockAddr> +DhtProxyClient::getPublicAddress(sa_family_t family) +{ + auto proxyInfos = getProxyInfos(); + // json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port + if (!proxyInfos.isMember("public_ip")) { + getConnectivityStatus(); + return {}; + } + auto public_ip = proxyInfos["public_ip"].asString(); + if (public_ip.length() < 2) { + getConnectivityStatus(); + return {}; + } + std::string ipv4Address = ""; + std::string ipv6Address = ""; + std::string port = ""; + if (public_ip[0] == '[') { + // ipv6 complient + auto endIp = public_ip.find(']'); + if (public_ip.length() > endIp + 2) { + port = public_ip.substr(endIp + 2); + auto ips = public_ip.substr(1, endIp - 1); + auto ipv4And6Separator = ips.find_last_of(':'); + ipv4Address = ips.substr(ipv4And6Separator + 1); + ipv6Address = ips.substr(0, ipv4And6Separator - 1); + } + } else { + auto endIp = public_ip.find_last_of(':'); + port = public_ip.substr(endIp + 1); + ipv4Address = public_ip.substr(0, endIp - 1); + } + switch (family) + { + case AF_INET: + return DhtRunner::getAddrInfo(ipv4Address, port); + case AF_INET6: + return DhtRunner::getAddrInfo(ipv6Address, port); + default: + return {}; + } +} + +void +DhtProxyClient::getConnectivityStatus() +{ + auto proxyInfos = getProxyInfos(); + // NOTE: json["ipvX"] contains NodeStats::toJson() + try { + auto goodIpv4 = static_cast<long>(proxyInfos["ipv4"]["good"].asLargestUInt()); + auto dubiousIpv4 = static_cast<long>(proxyInfos["ipv4"]["dubious"].asLargestUInt()); + if (goodIpv4 + dubiousIpv4 > 0) { + statusIpv4_ = NodeStatus::Connected; + } + auto goodIpv6 = static_cast<long>(proxyInfos["ipv6"]["good"].asLargestUInt()); + auto dubiousIpv6 = static_cast<long>(proxyInfos["ipv6"]["dubious"].asLargestUInt()); + if (goodIpv6 + dubiousIpv6 > 0) { + statusIpv6_ = NodeStatus::Connected; + } + myid = InfoHash(proxyInfos["node_id"].asString()); + } catch (...) { + statusIpv4_ = NodeStatus::Disconnected; + statusIpv6_ = NodeStatus::Disconnected; + } + + // TODO for now, we don't handle connectivity issues. (when the proxy is down, we don't try to reconnect) +} + +} // namespace dht + +#endif // OPENDHT_PROXY_CLIENT diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index de1f03c4..57be7d75 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -28,6 +28,8 @@ #include <json/json.h> #include <limits> +#include <iostream> + using namespace std::placeholders; namespace dht { @@ -129,6 +131,7 @@ DhtProxyServer::getNodeInfo(const std::shared_ptr<restbed::Session>& session) co result["node_id"] = dht_->getNodeId().toString(); result["ipv4"] = dht_->getNodesStats(AF_INET).toJson(); result["ipv6"] = dht_->getNodesStats(AF_INET6).toJson(); + result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port Json::FastWriter writer; s->close(restbed::OK, writer.write(result)); } @@ -238,6 +241,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const if (parsingSuccessful) { // Build the Value from json auto value = std::make_shared<Value>(root); + auto permanent = root.isMember("permanent") ? root["permanent"].asBool() : false; dht_->put(infoHash, value, [s, value](bool ok) { if (ok) { @@ -246,7 +250,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const } else { s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); } - }); + }, time_point::max(), permanent); } else { s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index d86c4071..2e8b6dc3 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -39,6 +39,9 @@ namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; DhtRunner::DhtRunner() : dht_() +#if OPENDHT_PROXY_CLIENT +, dht_via_proxy_() +#endif //OPENDHT_PROXY_CLIENT { #ifdef _WIN32 WSADATA wsd; @@ -84,6 +87,10 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config if (rcv_thread.joinable()) rcv_thread.join(); running = true; +#if OPENDHT_PROXY_CLIENT + config_.dht_config = config.dht_config; + config_.threaded = config.threaded; +#endif //OPENDHT_PROXY_CLIENT doRun(local4, local6, config.dht_config); if (not config.threaded) return; @@ -142,7 +149,7 @@ DhtRunner::join() } { std::lock_guard<std::mutex> lck(dht_mtx); - dht_.reset(); + resetDht(); status4 = NodeStatus::Disconnected; status6 = NodeStatus::Disconnected; bound4 = {}; @@ -154,40 +161,40 @@ void DhtRunner::dumpTables() const { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->dumpTables(); + activeDht()->dumpTables(); } InfoHash DhtRunner::getId() const { - if (!dht_) + if (!activeDht()) return {}; - return dht_->getId(); + return activeDht()->getId(); } InfoHash DhtRunner::getNodeId() const { - if (!dht_) + if (!activeDht()) return {}; - return dht_->getNodeId(); + return activeDht()->getNodeId(); } std::pair<size_t, size_t> DhtRunner::getStoreSize() const { std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) + if (!activeDht()) return {}; - return dht_->getStoreSize(); + return activeDht()->getStoreSize(); } void DhtRunner::setStorageLimit(size_t limit) { std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) + if (!activeDht()) throw std::runtime_error("dht is not running"); - return dht_->setStorageLimit(limit); + return activeDht()->setStorageLimit(limit); } std::vector<NodeExport> @@ -195,46 +202,46 @@ DhtRunner::exportNodes() const { std::lock_guard<std::mutex> lck(dht_mtx); if (!dht_) return {}; - return dht_->exportNodes(); + return activeDht()->exportNodes(); } std::vector<ValuesExport> DhtRunner::exportValues() const { std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) + if (!activeDht()) return {}; - return dht_->exportValues(); + return activeDht()->exportValues(); } void DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); + activeDht()->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); } void DhtRunner::setLogFilter(const InfoHash& f) { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->setLogFilter(f); + activeDht()->setLogFilter(f); } void DhtRunner::registerType(const ValueType& type) { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->registerType(type); + activeDht()->registerType(type); } void DhtRunner::importValues(const std::vector<ValuesExport>& values) { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->importValues(values); + activeDht()->importValues(values); } unsigned DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const { std::lock_guard<std::mutex> lck(dht_mtx); - const auto stats = dht_->getNodesStats(af); + const auto stats = activeDht()->getNodesStats(af); if (good_return) *good_return = stats.good_nodes; if (dubious_return) @@ -250,51 +257,51 @@ NodeStats DhtRunner::getNodesStats(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getNodesStats(af); + return activeDht()->getNodesStats(af); } std::vector<unsigned> DhtRunner::getNodeMessageStats(bool in) const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getNodeMessageStats(in); + return activeDht()->getNodeMessageStats(in); } std::string DhtRunner::getStorageLog() const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getStorageLog(); + return activeDht()->getStorageLog(); } std::string DhtRunner::getStorageLog(const InfoHash& f) const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getStorageLog(f); + return activeDht()->getStorageLog(f); } std::string DhtRunner::getRoutingTablesLog(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getRoutingTablesLog(af); + return activeDht()->getRoutingTablesLog(af); } std::string DhtRunner::getSearchesLog(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getSearchesLog(af); + return activeDht()->getSearchesLog(af); } std::string DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getSearchLog(f, af); + return activeDht()->getSearchLog(f, af); } std::vector<SockAddr> DhtRunner::getPublicAddress(sa_family_t af) { std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getPublicAddress(af); + return activeDht()->getPublicAddress(af); } std::vector<std::string> DhtRunner::getPublicAddressStr(sa_family_t af) @@ -308,18 +315,18 @@ DhtRunner::getPublicAddressStr(sa_family_t af) void DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->registerCertificate(cert); + activeDht()->registerCertificate(cert); } void DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { std::lock_guard<std::mutex> lck(dht_mtx); - dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); + activeDht()->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); } time_point DhtRunner::loop_() { - if (!dht_) + if (!activeDht()) return {}; decltype(pending_ops) ops {}; @@ -330,7 +337,7 @@ DhtRunner::loop_() std::move(pending_ops) : std::move(pending_ops_prio); } while (not ops.empty()) { - ops.front()(*dht_); + ops.front()(*activeDht()); ops.pop(); } @@ -345,15 +352,15 @@ DhtRunner::loop_() for (const auto& pck : received) { auto& buf = pck.first; auto& from = pck.second; - wakeup = dht_->periodic(buf.data(), buf.size()-1, from); + wakeup = activeDht()->periodic(buf.data(), buf.size()-1, from); } received.clear(); } else { - wakeup = dht_->periodic(nullptr, 0, nullptr, 0); + wakeup = activeDht()->periodic(nullptr, 0, nullptr, 0); } - NodeStatus nstatus4 = dht_->getStatus(AF_INET); - NodeStatus nstatus6 = dht_->getStatus(AF_INET6); + NodeStatus nstatus4 = activeDht()->getStatus(AF_INET); + NodeStatus nstatus6 = activeDht()->getStatus(AF_INET6); if (nstatus4 != status4 || nstatus6 != status6) { status4 = nstatus4; status6 = nstatus6; @@ -399,7 +406,7 @@ int bindSocket(const SockAddr& addr, SockAddr& bound) void DhtRunner::doRun(const SockAddr& sin4, const SockAddr& sin6, SecureDht::Config config) { - dht_.reset(); + resetDht(); int s4 = -1, s6 = -1; @@ -414,7 +421,10 @@ DhtRunner::doRun(const SockAddr& sin4, const SockAddr& sin6, SecureDht::Config c s6 = bindSocket(sin6, bound6); #endif - dht_ = std::unique_ptr<SecureDht>(new SecureDht {s4, s6, config}); + auto dht = std::unique_ptr<DhtInterface>( + new Dht(s4, s6, SecureDht::getConfig(config)) + ); + dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config)); rcv_thread = std::thread([this,s4,s6]() { try { @@ -772,4 +782,45 @@ DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_p cv.notify_all(); } +void +DhtRunner::resetDht() { +#if OPENDHT_PROXY_CLIENT + use_proxy? dht_via_proxy_.reset() : dht_.reset(); +#else + dht_.reset(); +#endif // OPENDHT_PROXY_CLIENT +} + +SecureDht* +DhtRunner::activeDht() const +{ +#if OPENDHT_PROXY_CLIENT + return use_proxy? dht_via_proxy_.get() : dht_.get(); +#else + return dht_.get(); +#endif // OPENDHT_PROXY_CLIENT +} + +#if OPENDHT_PROXY_CLIENT +void +DhtRunner::enableProxy(bool proxify) { + if (proxify) { + // If no proxy url in the config, use 127.0.0.1:8000 + std::string serverHost = config_.proxy_server.empty() ? "127.0.0.1:8000" : config_.proxy_server; + // Init the proxy client + auto dht_via_proxy = std::unique_ptr<DhtInterface>( + new DhtProxyClient(serverHost) + ); + dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); + // and use it + use_proxy = proxify; + } else { + use_proxy = proxify; + // We doesn't need to maintain the connection with the proxy. + // Delete it + dht_via_proxy_.reset(nullptr); + } +} +#endif // OPENDHT_PROXY_CLIENT + } diff --git a/src/securedht.cpp b/src/securedht.cpp index 88a1b118..c6fd389e 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -32,20 +32,10 @@ extern "C" { namespace dht { -Config& getConfig(SecureDht::Config& conf) +SecureDht::SecureDht(std::unique_ptr<DhtInterface> dht, SecureDht::Config conf) +: dht_(std::move(dht)), key_(conf.id.first), certificate_(conf.id.second) { - auto& c = conf.node_config; - if (not c.node_id and conf.id.second) - c.node_id = InfoHash::get("node:"+conf.id.second->getId().toString()); - return c; -} - -SecureDht::SecureDht(int s, int s6, SecureDht::Config conf) -: Dht(s, s6, getConfig(conf)), key_(conf.id.first), certificate_(conf.id.second) -{ - if (s < 0 && s6 < 0) - return; - + if (!dht_) return; for (const auto& type : DEFAULT_TYPES) registerType(type); @@ -59,7 +49,7 @@ SecureDht::SecureDht(int s, int s6, SecureDht::Config conf) if (key_ and certId != key_->getPublicKey().getId()) throw DhtException("SecureDht: provided certificate doesn't match private key."); - Dht::put(certId, Value { + dht_->put(certId, Value { CERTIFICATE_TYPE, *certificate_, 1 @@ -190,7 +180,7 @@ SecureDht::findCertificate(const InfoHash& node, std::function<void(const Sp<cry } auto found = std::make_shared<bool>(false); - Dht::get(node, [cb,node,found,this](const std::vector<Sp<Value>>& vals) { + dht_->get(node, [cb,node,found,this](const std::vector<Sp<Value>>& vals) { if (*found) return false; for (const auto& v : vals) { @@ -277,13 +267,13 @@ SecureDht::getCallbackFilter(GetCallback cb, Value::Filter&& filter) void SecureDht::get(const InfoHash& id, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w) { - Dht::get(id, getCallbackFilter(cb, std::forward<Value::Filter>(f)), donecb, {}, std::forward<Where>(w)); + dht_->get(id, getCallbackFilter(cb, std::forward<Value::Filter>(f)), donecb, {}, std::forward<Where>(w)); } size_t SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& w) { - return Dht::listen(id, getCallbackFilter(cb, std::forward<Value::Filter>(f)), {}, std::forward<Where>(w)); + return dht_->listen(id, getCallbackFilter(cb, std::forward<Value::Filter>(f)), {}, std::forward<Where>(w)); } void @@ -295,7 +285,7 @@ SecureDht::putSigned(const InfoHash& hash, Sp<Value> val, DoneCallback callback, } // Check if we are already announcing a value - auto p = getPut(hash, val->id); + auto p = dht_->getPut(hash, val->id); if (p && val->seq <= p->seq) { DHT_LOG.DEBUG("Found previous value being announced."); val->seq = p->seq + 1; @@ -317,7 +307,7 @@ SecureDht::putSigned(const InfoHash& hash, Sp<Value> val, DoneCallback callback, }, [hash,val,this,callback,permanent] (bool /* ok */) { sign(*val); - put(hash, val, callback, time_point::max(), permanent); + dht_->put(hash, val, callback, time_point::max(), permanent); }, Value::IdFilter(val->id) ); @@ -334,7 +324,7 @@ SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, Sp<Value> val, } DHT_LOG.WARN("Encrypting data for PK: %s", pk->getId().toString().c_str()); try { - put(hash, encrypt(*val, *pk), callback, time_point::max(), permanent); + dht_->put(hash, encrypt(*val, *pk), callback, time_point::max(), permanent); } catch (const std::exception& e) { DHT_LOG.ERR("Error putting encrypted data: %s", e.what()); if (callback) diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 8b7bb75f..6776876f 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -63,10 +63,17 @@ void print_help() { << " psp [port] Stop the proxy interface on port." << std::endl; #endif //OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT + std::cout << std::endl << "Operations with the proxy:" << std::endl + << " stt [server_address] Start the proxy client." << std::endl + << " stp Stop the proxy client." << std::endl; +#endif //OPENDHT_PROXY_CLIENT + std::cout << std::endl << "Operations on the DHT:" << std::endl << " b <ip:port> Ping potential node at given IP address/port." << std::endl << " g <key> Get values at <key>." << std::endl << " l <key> Listen for value changes at <key>." << std::endl + << " cl <key> <token> Cancel listen for <token> and <key>." << std::endl << " p <key> <str> Put string value at <key>." << std::endl << " pp <key> <str> Put string value at <key> (persistent version)." << std::endl << " s <key> <str> Put string value at <key>, signed with our generated private key." << std::endl @@ -96,6 +103,12 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) proxies.emplace(params.proxyserver, new DhtProxyServer(dht, params.proxyserver)); } #endif //OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT + if (!params.proxyclient.empty()) { + dht->setProxyServer(params.proxyclient); + dht->enableProxy(true); + } +#endif //OPENDHT_PROXY_CLIENT while (true) { @@ -191,6 +204,17 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) continue; } #endif //OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT + else if (op == "stt") { + iss >> idstr; + dht->setProxyServer(idstr); + dht->enableProxy(true); + continue; + } else if (op == "stp") { + dht->enableProxy(false); + continue; + } +#endif //OPENDHT_PROXY_CLIENT if (op.empty()) continue; diff --git a/tools/tools_common.h b/tools/tools_common.h index 39974fad..c71fa680 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -120,6 +120,7 @@ struct dht_params { bool service {false}; std::pair<std::string, std::string> bootstrap {}; in_port_t proxyserver {0}; + std::string proxyclient {}; }; static const constexpr struct option long_options[] = { @@ -134,6 +135,7 @@ static const constexpr struct option long_options[] = { {"logfile", required_argument, nullptr, 'l'}, {"syslog", no_argument , nullptr, 'L'}, {"proxyserver",required_argument, nullptr, 'S'}, + {"proxyclient",required_argument, nullptr, 'C'}, {nullptr, 0 , nullptr, 0} }; @@ -159,6 +161,9 @@ parseArgs(int argc, char **argv) { std::cout << "Invalid port: " << port_arg << std::endl; } break; + case 'C': + params.proxyclient = optarg; + break; case 'n': params.network = strtoul(optarg, nullptr, 0); break; -- GitLab