diff --git a/CMakeLists.txt b/CMakeLists.txt index 560d91d10b1eba0187c9637009147868b2e93891..83a76acd8ebd35a8e99402cf1e29856d2e0ced70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,7 +35,9 @@ list (APPEND opendht_SOURCES src/node.cpp src/value.cpp src/dht.cpp + src/callbacks.cpp src/routing_table.cpp + src/node_cache.cpp src/network_engine.cpp src/securedht.cpp src/dhtrunner.cpp @@ -56,7 +58,9 @@ list (APPEND opendht_HEADERS include/opendht/node.h include/opendht/value.h include/opendht/dht.h + include/opendht/callbacks.h include/opendht/routing_table.h + include/opendht/node_cache.h include/opendht/network_engine.h include/opendht/scheduler.h include/opendht/securedht.h diff --git a/include/opendht.h b/include/opendht.h index 97c2c05c04cc4199bffebe282a7789497607911b..7765476c8f0db7e8f9225642183d161eee2441c5 100644 --- a/include/opendht.h +++ b/include/opendht.h @@ -19,13 +19,6 @@ #pragma once -#include "opendht/dht.h" -#include "opendht/scheduler.h" -#include "opendht/network_engine.h" -#include "opendht/node.h" -#include "opendht/value.h" -#include "opendht/infohash.h" -#include "opendht/securedht.h" #include "opendht/dhtrunner.h" #include "opendht/log.h" #include "opendht/default_types.h" diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h new file mode 100644 index 0000000000000000000000000000000000000000..5c70723f72b8ac494242e19242424485d88f3959 --- /dev/null +++ b/include/opendht/callbacks.h @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2014-2016 Savoir-faire Linux Inc. + * Author : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "infohash.h" +#include "value.h" + +#include <vector> +#include <memory> +#include <functional> + +namespace dht { + +class Node; + +enum class NodeStatus { + Disconnected, // 0 nodes + Connecting, // 1+ nodes + Connected // 1+ good nodes +}; + +struct Config { + InfoHash node_id; + bool is_bootstrap; +}; + +struct SecureDhtConfig +{ + Config node_config; + crypto::Identity id; +}; + +using ValuesExport = std::pair<InfoHash, Blob>; + +using GetCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values)>; +using GetCallbackSimple = std::function<bool(std::shared_ptr<Value> value)>; +using ShutdownCallback = std::function<void()>; + +using CertificateStoreQuery = std::function<std::vector<std::shared_ptr<crypto::Certificate>>(const InfoHash& pk_id)>; + +typedef bool (*GetCallbackRaw)(std::shared_ptr<Value>, void *user_data); + +static constexpr size_t DEFAULT_STORAGE_LIMIT {1024 * 1024 * 64}; + +GetCallbackSimple bindGetCb(GetCallbackRaw raw_cb, void* user_data); +GetCallback bindGetCb(GetCallbackSimple cb); + +using DoneCallback = std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)>; +typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data); +typedef void (*ShutdownCallbackRaw)(void *user_data); + +using DoneCallbackSimple = std::function<void(bool success)>; + +ShutdownCallback bindShutdownCb(ShutdownCallbackRaw shutdown_cb_raw, void* user_data); +DoneCallback bindDoneCb(DoneCallbackSimple donecb); +DoneCallback bindDoneCb(DoneCallbackRaw raw_cb, void* user_data); + + +} diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 5d94a06900b9310038e5a7eff4d2d5297626720c..d3385fa68861663789864376aa30b38b4150b9c5 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -27,6 +27,8 @@ #include "network_engine.h" #include "scheduler.h" #include "routing_table.h" +#include "node_cache.h" +#include "callbacks.h" #include <string> #include <array> @@ -50,69 +52,11 @@ namespace dht { class Dht { public: - enum class Status { - Disconnected, // 0 nodes - Connecting, // 1+ nodes - Connected // 1+ good nodes - }; - - struct Config { - InfoHash node_id; - bool is_bootstrap; - }; - // [[deprecated]] using NodeExport = dht::NodeExport; - typedef std::function<bool(const std::vector<std::shared_ptr<Value>>& values)> GetCallback; - typedef std::function<bool(std::shared_ptr<Value> value)> GetCallbackSimple; - typedef std::function<void()> ShutdownCallback; - - typedef bool (*GetCallbackRaw)(std::shared_ptr<Value>, void *user_data); - - static constexpr size_t DEFAULT_STORAGE_LIMIT {1024 * 1024 * 64}; - - static GetCallbackSimple - bindGetCb(GetCallbackRaw raw_cb, void* user_data) { - if (not raw_cb) return {}; - return [=](const std::shared_ptr<Value>& value) { - return raw_cb(value, user_data); - }; - } - static GetCallback - bindGetCb(GetCallbackSimple cb) { - if (not cb) return {}; - return [=](const std::vector<std::shared_ptr<Value>>& values) { - for (const auto& v : values) - if (not cb(v)) - return false; - return true; - }; - } - - typedef std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)> DoneCallback; - typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data); - typedef void (*ShutdownCallbackRaw)(void *user_data); - - typedef std::function<void(bool success)> DoneCallbackSimple; - - static ShutdownCallback - bindShutdownCb(ShutdownCallbackRaw shutdown_cb_raw, void* user_data) { - return [=]() { shutdown_cb_raw(user_data); }; - } - static DoneCallback - bindDoneCb(DoneCallbackSimple donecb) { - if (not donecb) return {}; - using namespace std::placeholders; - return std::bind(donecb, _1); - } - static DoneCallback - bindDoneCb(DoneCallbackRaw raw_cb, void* user_data) { - if (not raw_cb) return {}; - return [=](bool success, const std::vector<std::shared_ptr<Node>>& nodes) { - raw_cb(success, (std::vector<std::shared_ptr<Node>>*)&nodes, user_data); - }; - } + // [[deprecated]] + using Status = NodeStatus; Dht() : network_engine(DHT_LOG, scheduler) {} @@ -136,9 +80,9 @@ public: /** * Get the current status of the node for the given family. */ - Status getStatus(sa_family_t af) const; + NodeStatus getStatus(sa_family_t af) const; - Status getStatus() const { + NodeStatus getStatus() const { return std::max(getStatus(AF_INET), getStatus(AF_INET6)); } @@ -278,7 +222,6 @@ public: */ std::vector<NodeExport> exportNodes(); - typedef std::pair<InfoHash, Blob> ValuesExport; std::vector<ValuesExport> exportValues() const; void importValues(const std::vector<ValuesExport>&); @@ -354,22 +297,6 @@ private: static constexpr size_t TOKEN_SIZE {64}; - struct NodeCache { - std::shared_ptr<Node> getNode(const InfoHash& id, sa_family_t family); - std::shared_ptr<Node> getNode(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirmed); - void putNode(std::shared_ptr<Node> n); - - /** - * Reset the connectivity state of every node, - * Giving them a new chance if they where expired. - * To use in case of connectivity change etc. - */ - void clearBadNodes(sa_family_t family = 0); - private: - std::list<std::weak_ptr<Node>> cache_4; - std::list<std::weak_ptr<Node>> cache_6; - }; - struct SearchNode { SearchNode(std::shared_ptr<Node> node) : node(node) {} diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index da57404bc5015b3135dc88b7e27f3aafeb0683ec..3bb98ea7a59800c297dc5d36f8a73d53f278286a 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -20,7 +20,10 @@ #pragma once -#include "securedht.h" +//#include "securedht.h" +#include "infohash.h" +#include "value.h" +#include "callbacks.h" #include <thread> #include <mutex> @@ -33,6 +36,10 @@ namespace dht { +struct Node; +class SecureDht; +struct SecureDhtConfig; + /** * Provides a thread-safe interface to run the (secure) DHT. * The class will open sockets on the provided port and will @@ -42,28 +49,28 @@ namespace dht { class DhtRunner { public: - typedef std::function<void(Dht::Status, Dht::Status)> StatusCallback; + typedef std::function<void(NodeStatus, NodeStatus)> StatusCallback; DhtRunner(); virtual ~DhtRunner(); - void get(InfoHash id, Dht::GetCallbackSimple cb, Dht::DoneCallback donecb={}, Value::Filter f = Value::AllFilter()) { - get(id, Dht::bindGetCb(cb), donecb, f); + void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = Value::AllFilter()) { + get(id, bindGetCb(cb), donecb, f); } - void get(InfoHash id, Dht::GetCallbackSimple cb, Dht::DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter()) { - get(id, Dht::bindGetCb(cb), donecb, f); + void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter()) { + get(id, bindGetCb(cb), donecb, f); } - void get(InfoHash hash, Dht::GetCallback vcb, Dht::DoneCallback dcb, Value::Filter f={}); + void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}); - void get(InfoHash id, Dht::GetCallback cb, Dht::DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter()) { - get(id, cb, Dht::bindDoneCb(donecb), f); + void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter()) { + get(id, cb, bindDoneCb(donecb), f); } - void get(const std::string& key, Dht::GetCallback vcb, Dht::DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter()); + void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter()); template <class T> - void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, Dht::DoneCallbackSimple dcb={}) + void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={}) { get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) { return cb(unpackVector<T>(vals)); @@ -72,7 +79,7 @@ public: getFilterSet<T>()); } template <class T> - void get(InfoHash hash, std::function<bool(T&&)> cb, Dht::DoneCallbackSimple dcb={}) + void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={}) { get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) { for (const auto& v : vals) { @@ -115,10 +122,10 @@ public: return p->get_future(); } - std::future<size_t> listen(InfoHash key, Dht::GetCallback vcb, Value::Filter f = Value::AllFilter()); - std::future<size_t> listen(const std::string& key, Dht::GetCallback vcb, Value::Filter f = Value::AllFilter()); - std::future<size_t> listen(InfoHash key, Dht::GetCallbackSimple cb, Value::Filter f = Value::AllFilter()) { - return listen(key, Dht::bindGetCb(cb), f); + std::future<size_t> listen(InfoHash key, GetCallback vcb, Value::Filter f = Value::AllFilter()); + std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter()); + std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter()) { + return listen(key, bindGetCb(cb), f); } template <class T> @@ -149,40 +156,40 @@ public: void cancelListen(InfoHash h, size_t token); void cancelListen(InfoHash h, std::shared_future<size_t> token); - void put(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallback cb={}); - void put(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallbackSimple cb) { - put(hash, value, Dht::bindDoneCb(cb)); + void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}); + void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) { + put(hash, value, bindDoneCb(cb)); } - void put(InfoHash hash, Value&& value, Dht::DoneCallback cb={}); - void put(InfoHash hash, Value&& value, Dht::DoneCallbackSimple cb) { - put(hash, std::forward<Value>(value), Dht::bindDoneCb(cb)); + void put(InfoHash hash, Value&& value, DoneCallback cb={}); + void put(InfoHash hash, Value&& value, DoneCallbackSimple cb) { + put(hash, std::forward<Value>(value), bindDoneCb(cb)); } - void put(const std::string& key, Value&& value, Dht::DoneCallbackSimple cb={}); + void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}); void cancelPut(const InfoHash& h, const Value::Id& id); - void putSigned(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallback cb={}); - void putSigned(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallbackSimple cb) { - putSigned(hash, value, Dht::bindDoneCb(cb)); + void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}); + void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) { + putSigned(hash, value, bindDoneCb(cb)); } - void putSigned(InfoHash hash, Value&& value, Dht::DoneCallback cb={}); - void putSigned(InfoHash hash, Value&& value, Dht::DoneCallbackSimple cb) { - putSigned(hash, std::forward<Value>(value), Dht::bindDoneCb(cb)); + void putSigned(InfoHash hash, Value&& value, DoneCallback cb={}); + void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb) { + putSigned(hash, std::forward<Value>(value), bindDoneCb(cb)); } - void putSigned(const std::string& key, Value&& value, Dht::DoneCallbackSimple cb={}); + void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={}); - void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, Dht::DoneCallback cb={}); - void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, Dht::DoneCallbackSimple cb) { - putEncrypted(hash, to, value, Dht::bindDoneCb(cb)); + void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={}); + void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb) { + putEncrypted(hash, to, value, bindDoneCb(cb)); } - void putEncrypted(InfoHash hash, InfoHash to, Value&& value, Dht::DoneCallback cb={}); - void putEncrypted(InfoHash hash, InfoHash to, Value&& value, Dht::DoneCallbackSimple cb) { - putEncrypted(hash, to, std::forward<Value>(value), Dht::bindDoneCb(cb)); + void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={}); + void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb) { + putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb)); } - void putEncrypted(const std::string& key, InfoHash to, Value&& value, Dht::DoneCallback cb={}); + void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={}); void bootstrap(const char* host, const char* service); void bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes); @@ -195,31 +202,11 @@ public: */ void connectivityChanged(); - void dumpTables() const - { - std::lock_guard<std::mutex> lck(dht_mtx); - dht_->dumpTables(); - } + void dumpTables() const; - InfoHash getId() const { - if (!dht_) - return {}; - return dht_->getId(); - } + InfoHash getId() const; - InfoHash getNodeId() const { - if (!dht_) - return {}; - return dht_->getNodeId(); - } - - /** - * @deprecated Use getNodeId() - */ - //[[deprecated]] - InfoHash getRoutingId() const { - return getNodeId(); - } + InfoHash getNodeId() const; /** * Returns the currently bound address. @@ -237,107 +224,41 @@ public: return ntohs(((sockaddr_in*)&getBound(f).first)->sin_port); } - std::pair<size_t, size_t> getStoreSize() const { - std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) - return {}; - return dht_->getStoreSize(); - } + std::pair<size_t, size_t> getStoreSize() const; - void setStorageLimit(size_t limit = Dht::DEFAULT_STORAGE_LIMIT) { - std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) - throw std::runtime_error("dht is not running"); - return dht_->setStorageLimit(limit); - } + void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT); - std::vector<NodeExport> exportNodes() const { - std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) - return {}; - return dht_->exportNodes(); - } + std::vector<NodeExport> exportNodes() const; - std::vector<Dht::ValuesExport> exportValues() const { - std::lock_guard<std::mutex> lck(dht_mtx); - if (!dht_) - return {}; - return dht_->exportValues(); - } + std::vector<ValuesExport> exportValues() const; - void setLoggers(LogMethod&& error = NOLOG, LogMethod&& warn = NOLOG, LogMethod&& debug = NOLOG) { - std::lock_guard<std::mutex> lck(dht_mtx); - dht_->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); - } + void setLoggers(LogMethod&& error = NOLOG, LogMethod&& warn = NOLOG, LogMethod&& debug = NOLOG); - void registerType(const ValueType& type) { - std::lock_guard<std::mutex> lck(dht_mtx); - dht_->registerType(type); - } + void registerType(const ValueType& type); - void importValues(const std::vector<Dht::ValuesExport>& values) { - std::lock_guard<std::mutex> lck(dht_mtx); - dht_->importValues(values); - } + void importValues(const std::vector<ValuesExport>& values); bool isRunning() const { return running; } - int getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const - { - std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getNodesStats(af, good_return, dubious_return, cached_return, incoming_return); - } - - std::vector<unsigned> getNodeMessageStats(bool in = false) const - { - std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getNodeMessageStats(in); - } + int getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const; - std::string getStorageLog() const - { - std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getStorageLog(); - } - std::string getRoutingTablesLog(sa_family_t af) const - { - std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getRoutingTablesLog(af); - } - std::string getSearchesLog(sa_family_t af = 0) const - { - std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getSearchesLog(af); - } - std::vector<Address> getPublicAddress(sa_family_t af = 0) - { - std::lock_guard<std::mutex> lck(dht_mtx); - return dht_->getPublicAddress(af); - } - std::vector<std::string> getPublicAddressStr(sa_family_t af = 0) - { - auto addrs = getPublicAddress(af); - std::vector<std::string> ret(addrs.size()); - std::transform(addrs.begin(), addrs.end(), ret.begin(), dht::printAddr); - return ret; - } + std::vector<unsigned> getNodeMessageStats(bool in = false) const; + std::string getStorageLog() const; + std::string getRoutingTablesLog(sa_family_t af) const; + std::string getSearchesLog(sa_family_t af = 0) const; + std::vector<Address> getPublicAddress(sa_family_t af = 0); + std::vector<std::string> getPublicAddressStr(sa_family_t af = 0); // securedht methods void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)>); - void registerCertificate(std::shared_ptr<crypto::Certificate> cert) { - std::lock_guard<std::mutex> lck(dht_mtx); - dht_->registerCertificate(cert); - } - void setLocalCertificateStore(SecureDht::CertificateStoreQuery&& query_method) { - std::lock_guard<std::mutex> lck(dht_mtx); - dht_->setLocalCertificateStore(std::move(query_method)); - } + void registerCertificate(std::shared_ptr<crypto::Certificate> cert); + void setLocalCertificateStore(CertificateStoreQuery&& query_method); struct Config { - SecureDht::Config dht_config; + SecureDhtConfig dht_config; bool threaded; }; @@ -393,7 +314,7 @@ public: /** * Gracefuly disconnect from network. */ - void shutdown(Dht::ShutdownCallback cb); + void shutdown(ShutdownCallback cb); /** * Quit and wait for all threads to terminate. @@ -404,16 +325,16 @@ public: private: - void doRun(const sockaddr_in* sin4, const sockaddr_in6* sin6, SecureDht::Config config); + void doRun(const sockaddr_in* sin4, const sockaddr_in6* sin6, SecureDhtConfig config); time_point loop_(); static std::vector<std::pair<sockaddr_storage, socklen_t>> getAddrInfo(const char* host, const char* service); - Dht::Status getStatus() const { + NodeStatus getStatus() const { return std::max(status4, status6); } - std::unique_ptr<SecureDht> dht_ {}; + std::unique_ptr<SecureDht> dht_; mutable std::mutex dht_mtx {}; std::thread dht_thread {}; std::condition_variable cv {}; @@ -428,8 +349,8 @@ private: std::atomic<bool> running {false}; - Dht::Status status4 {Dht::Status::Disconnected}, - status6 {Dht::Status::Disconnected}; + NodeStatus status4 {NodeStatus::Disconnected}, + status6 {NodeStatus::Disconnected}; StatusCallback statusCb {nullptr}; Address bound4 {}; diff --git a/include/opendht/infohash.h b/include/opendht/infohash.h index dbde0c0819508e099976d8788c96bb446030bc13..ac54a7b7b10740e3ce9b484c35655d45f49fb3a5 100644 --- a/include/opendht/infohash.h +++ b/include/opendht/infohash.h @@ -204,4 +204,10 @@ static constexpr InfoHash ones = {std::array<uint8_t, HASH_LEN>{{ 0xFF, 0xFF, 0xFF, 0xFF }}}; +struct NodeExport { + InfoHash id; + sockaddr_storage ss; + socklen_t sslen; +}; + } diff --git a/include/opendht/node.h b/include/opendht/node.h index 7a4511c1985607249e5b62877c56730b713b9074..dfe00c6921b1b97c044650e7716c011846ad9b7d 100644 --- a/include/opendht/node.h +++ b/include/opendht/node.h @@ -28,12 +28,6 @@ namespace dht { -struct NodeExport { - InfoHash id; - sockaddr_storage ss; - socklen_t sslen; -}; - struct Node { friend class NetworkEngine; diff --git a/include/opendht/node_cache.h b/include/opendht/node_cache.h new file mode 100644 index 0000000000000000000000000000000000000000..a00685a619a68f59424843e356a1398107a411b6 --- /dev/null +++ b/include/opendht/node_cache.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2014-2016 Savoir-faire Linux Inc. + * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "node.h" + +#include <list> +#include <memory> + +namespace dht { + +struct NodeCache { + std::shared_ptr<Node> getNode(const InfoHash& id, sa_family_t family); + std::shared_ptr<Node> getNode(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirmed); + + /** + * Reset the connectivity state of every node, + * Giving them a new chance if they where expired. + * To use in case of connectivity change etc. + */ + void clearBadNodes(sa_family_t family = 0); + +private: + struct NodeTree { + std::shared_ptr<Node> get(const InfoHash& id); + std::shared_ptr<Node> get(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirmed); + + void clearBadNodes(); + + private: + std::shared_ptr<Node> getLocal(const InfoHash& id); + + std::vector<NodeTree> childs; + std::vector<std::weak_ptr<Node>> nodes; + }; + + NodeTree cache_4; + NodeTree cache_6; +}; + +} diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index fe2e47c4c92156db5bf00f6fcef8c66579c4d93b..9496ab775c5e429b57a3e42d3b243325a758ce39 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -45,11 +45,7 @@ public: typedef std::function<void(bool)> SignatureCheckCallback; - struct Config - { - Dht::Config node_config; - crypto::Identity id; - }; + using Config = SecureDhtConfig; SecureDht() {} @@ -131,7 +127,7 @@ public: const std::shared_ptr<crypto::Certificate> getCertificate(const InfoHash& node) const; - using CertificateStoreQuery = std::function<std::vector<std::shared_ptr<crypto::Certificate>>(const InfoHash& pk_id)>; + /** * Allows to set a custom callback called by the library to find a locally-stored certificate. diff --git a/include/opendht/utils.h b/include/opendht/utils.h index da56ffe0c2484aeb754f355bc10fd2b1e67151df..19154ed579d8e531cc9d1c4380fe6ad225450e70 100644 --- a/include/opendht/utils.h +++ b/include/opendht/utils.h @@ -137,7 +137,7 @@ struct Logger { // Serialization related definitions and utility functions -typedef std::vector<uint8_t> Blob; +using Blob = std::vector<uint8_t>; /** * Provides backward compatibility with msgpack 1.0 @@ -162,4 +162,4 @@ unpackMsg(Blob b) { msgpack::unpacked unpackMsg(Blob b); -} +} // namespace dht diff --git a/include/opendht/value.h b/include/opendht/value.h index 26f17bbdf911b80b05ef25d203f594e30064cd35..a3f34dc54e5d0d7d35562d4e69b8201781c36015 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -431,6 +431,8 @@ struct Value Blob cypher {}; }; +using ValuesExport = std::pair<InfoHash, Blob>; + template <typename T, typename std::enable_if<std::is_base_of<Value::SerializableBase, T>::value, T>::type* = nullptr> Value::Filter diff --git a/python/opendht.pyx b/python/opendht.pyx index d7b1c485d0b5f3dfe11c46a5ffae5a6e26bd7c93..67ff82f6286d383b12ff51ede8c69987f1ba1b6f 100644 --- a/python/opendht.pyx +++ b/python/opendht.pyx @@ -239,9 +239,9 @@ cdef class Identity(object): return c cdef class DhtConfig(object): - cdef cpp.Config _config + cdef cpp.DhtRunnerConfig _config def __init__(self): - self._config = cpp.Config() + self._config = cpp.DhtRunnerConfig() self._config.threaded = True; def setIdentity(self, Identity id): self._config.dht_config.id = id._id @@ -276,7 +276,7 @@ cdef class DhtRunner(_WithID): def shutdown(self, shutdown_cb=None): cb_obj = {'shutdown':shutdown_cb} ref.Py_INCREF(cb_obj) - self.thisptr.shutdown(cpp.Dht.bindShutdownCb(shutdown_callback, <void*>cb_obj)) + self.thisptr.shutdown(cpp.bindShutdownCb(shutdown_callback, <void*>cb_obj)) def enableLogging(self): cpp.enableLogging(self.thisptr[0]) def disableLogging(self): @@ -308,7 +308,7 @@ cdef class DhtRunner(_WithID): if get_cb: cb_obj = {'get':get_cb, 'done':done_cb} ref.Py_INCREF(cb_obj) - self.thisptr.get(key._infohash, cpp.Dht.bindGetCb(get_callback, <void*>cb_obj), cpp.Dht.bindDoneCb(done_callback, <void*>cb_obj)) + self.thisptr.get(key._infohash, cpp.bindGetCb(get_callback, <void*>cb_obj), cpp.bindDoneCb(done_callback, <void*>cb_obj)) else: lock = threading.Condition() pending = 0 @@ -337,7 +337,7 @@ cdef class DhtRunner(_WithID): """ cb_obj = {'done':done_cb} ref.Py_INCREF(cb_obj) - self.thisptr.put(key._infohash, val._value, cpp.Dht.bindDoneCb(done_callback, <void*>cb_obj)) + self.thisptr.put(key._infohash, val._value, cpp.bindDoneCb(done_callback, <void*>cb_obj)) def listen(self, InfoHash key, get_cb): t = ListenToken() t._h = key._infohash @@ -345,7 +345,7 @@ cdef class DhtRunner(_WithID): t._cb['cb'] = cb_obj # avoid the callback being destructed if the token is destroyed ref.Py_INCREF(cb_obj) - t._t = self.thisptr.listen(t._h, cpp.Dht.bindGetCb(get_callback, <void*>cb_obj)).share() + t._t = self.thisptr.listen(t._h, cpp.bindGetCb(get_callback, <void*>cb_obj)).share() return t def cancelListen(self, ListenToken token): self.thisptr.cancelListen(token._h, token._t) diff --git a/python/opendht_cpp.pxd b/python/opendht_cpp.pxd index f88797c36c0ba8c42d0e7e2d144c3dbca485f276..4c726a59411519eb9aad1199a414ea311739fede 100644 --- a/python/opendht_cpp.pxd +++ b/python/opendht_cpp.pxd @@ -91,41 +91,37 @@ cdef extern from "opendht/value.h" namespace "dht": vector[uint8_t] data string user_type -cdef extern from "opendht/dht.h" namespace "dht": +cdef extern from "opendht/node.h" namespace "dht": cdef cppclass Node: Node() except + InfoHash getId() const string getAddrStr() const bool isExpired() const + +cdef extern from "opendht/callbacks.h" namespace "dht": ctypedef void (*ShutdownCallbackRaw)(void *user_data) ctypedef bool (*GetCallbackRaw)(shared_ptr[Value] values, void *user_data) ctypedef void (*DoneCallbackRaw)(bool done, vector[shared_ptr[Node]]* nodes, void *user_data) - cdef cppclass Dht: - cppclass Config: - InfoHash node_id - bool is_bootstrap - cppclass ShutdownCallback: - ShutdownCallback() except + - cppclass GetCallback: - GetCallback() except + - #GetCallback(GetCallbackRaw cb, void *user_data) except + - cppclass DoneCallback: - DoneCallback() except + - #DoneCallback(DoneCallbackRaw, void *user_data) except + - Dht() except + - InfoHash getNodeId() const - @staticmethod - ShutdownCallback bindShutdownCb(ShutdownCallbackRaw cb, void *user_data) - @staticmethod - GetCallback bindGetCb(GetCallbackRaw cb, void *user_data) - @staticmethod - DoneCallback bindDoneCb(DoneCallbackRaw cb, void *user_data) -cdef extern from "opendht/dht.h" namespace "dht": - cdef cppclass SecureDht: - cppclass Config: - Dht.Config node_config - Identity id + cppclass ShutdownCallback: + ShutdownCallback() except + + cppclass GetCallback: + GetCallback() except + + #GetCallback(GetCallbackRaw cb, void *user_data) except + + cppclass DoneCallback: + DoneCallback() except + + #DoneCallback(DoneCallbackRaw, void *user_data) except + + + cdef ShutdownCallback bindShutdownCb(ShutdownCallbackRaw cb, void *user_data) + cdef GetCallback bindGetCb(GetCallbackRaw cb, void *user_data) + cdef DoneCallback bindDoneCb(DoneCallbackRaw cb, void *user_data) + + cppclass Config: + InfoHash node_id + bool is_bootstrap + cppclass SecureDhtConfig: + Config node_config + Identity id cdef extern from "opendht/dhtrunner.h" namespace "dht": ctypedef future[size_t] ListenToken @@ -133,7 +129,7 @@ cdef extern from "opendht/dhtrunner.h" namespace "dht": cdef cppclass DhtRunner: DhtRunner() except + cppclass Config: - SecureDht.Config dht_config + SecureDhtConfig dht_config bool threaded InfoHash getId() const InfoHash getNodeId() const @@ -141,18 +137,18 @@ cdef extern from "opendht/dhtrunner.h" namespace "dht": void run(in_port_t, Config config) void run(const char*, const char*, const char*, Config config) void join() - void shutdown(Dht.ShutdownCallback) + void shutdown(ShutdownCallback) bool isRunning() string getStorageLog() const string getRoutingTablesLog(sa_family_t af) const string getSearchesLog(sa_family_t af) const - void get(InfoHash key, Dht.GetCallback get_cb, Dht.DoneCallback done_cb) - void put(InfoHash key, shared_ptr[Value] val, Dht.DoneCallback done_cb) - ListenToken listen(InfoHash key, Dht.GetCallback get_cb) + void get(InfoHash key, GetCallback get_cb, DoneCallback done_cb) + void put(InfoHash key, shared_ptr[Value] val, DoneCallback done_cb) + ListenToken listen(InfoHash key, GetCallback get_cb) void cancelListen(InfoHash key, SharedListenToken token) vector[unsigned] getNodeMessageStats(bool i) -ctypedef DhtRunner.Config Config +ctypedef DhtRunner.Config DhtRunnerConfig cdef extern from "opendht/log.h" namespace "dht::log": void enableLogging(DhtRunner& dht) diff --git a/src/callbacks.cpp b/src/callbacks.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6a84b5d7c3748aad3b04f43fd23a3d30f0e0cbca --- /dev/null +++ b/src/callbacks.cpp @@ -0,0 +1,50 @@ +#include "callbacks.h" + +namespace dht { + + +GetCallbackSimple +bindGetCb(GetCallbackRaw raw_cb, void* user_data) +{ + if (not raw_cb) return {}; + return [=](const std::shared_ptr<Value>& value) { + return raw_cb(value, user_data); + }; +} + +GetCallback +bindGetCb(GetCallbackSimple cb) +{ + if (not cb) return {}; + return [=](const std::vector<std::shared_ptr<Value>>& values) { + for (const auto& v : values) + if (not cb(v)) + return false; + return true; + }; +} + +ShutdownCallback +bindShutdownCb(ShutdownCallbackRaw shutdown_cb_raw, void* user_data) +{ + return [=]() { shutdown_cb_raw(user_data); }; +} + +DoneCallback +bindDoneCb(DoneCallbackSimple donecb) +{ + if (not donecb) return {}; + using namespace std::placeholders; + return std::bind(donecb, _1); +} + +DoneCallback +bindDoneCb(DoneCallbackRaw raw_cb, void* user_data) +{ + if (not raw_cb) return {}; + return [=](bool success, const std::vector<std::shared_ptr<Node>>& nodes) { + raw_cb(success, (std::vector<std::shared_ptr<Node>>*)&nodes, user_data); + }; +} + +} \ No newline at end of file diff --git a/src/dht.cpp b/src/dht.cpp index 1fed7ff72ddc18f81bd398550c695bffd932f8ca..426ddcc0d4c00d668f93776355efa25a6487f92e 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -102,16 +102,16 @@ Dht::setLoggers(LogMethod&& error, LogMethod&& warn, LogMethod&& debug) DHT_LOG.ERROR = std::move(error); } -Dht::Status +NodeStatus Dht::getStatus(sa_family_t af) const { unsigned good = 0, dubious = 0, cached = 0, incoming = 0; int tot = getNodesStats(af, &good, &dubious, &cached, &incoming); if (tot < 1) - return Status::Disconnected; + return NodeStatus::Disconnected; else if (good < 1) - return Status::Connecting; - return Status::Connected; + return NodeStatus::Connecting; + return NodeStatus::Connected; } void @@ -195,61 +195,6 @@ Dht::findNode(const InfoHash& id, sa_family_t af) const return {}; } -std::shared_ptr<Node> -Dht::NodeCache::getNode(const InfoHash& id, sa_family_t family) { - auto& list = family == AF_INET ? cache_4 : cache_6; - for (auto n = list.begin(); n != list.end();) { - if (auto ln = n->lock()) { - if (ln->id == id) - return ln; - ++n; - } else { - n = list.erase(n); - } - } - return nullptr; -} - -std::shared_ptr<Node> -Dht::NodeCache::getNode(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirm) { - auto node = getNode(id, sa->sa_family); - if (not node) { - node = std::make_shared<Node>(id, sa, sa_len); - putNode(node); - } else if (confirm || node->time < now - Node::NODE_EXPIRE_TIME) { - node->update(sa, sa_len); - } - if (confirm) - node->received(now, confirm >= 2); - return node; -} - -void -Dht::NodeCache::putNode(std::shared_ptr<Node> n) { - if (not n) return; - auto& list = n->ss.ss_family == AF_INET ? cache_4 : cache_6; - list.push_back(n); -} - -void -Dht::NodeCache::clearBadNodes(sa_family_t family) -{ - if (family == 0) { - clearBadNodes(AF_INET); - clearBadNodes(AF_INET6); - } else { - auto& list = family == AF_INET ? cache_4 : cache_6; - for (auto n = list.begin(); n != list.end();) { - if (auto ln = n->lock()) { - ln->reset(); - ++n; - } else { - n = list.erase(n); - } - } - } -} - /* Every bucket caches the address of a likely node. Ping it. */ int Dht::sendCachedPing(Bucket& b) @@ -2228,11 +2173,11 @@ Dht::confirmNodes() bool soon = false; const auto& now = scheduler.time(); - if (searches4.empty() and getStatus(AF_INET) != Status::Disconnected) { + if (searches4.empty() and getStatus(AF_INET) != NodeStatus::Disconnected) { DHT_LOG.DEBUG("[confirm nodes] initial IPv4 'get' for my id (%s).", myid.toString().c_str()); search(myid, AF_INET); } - if (searches6.empty() and getStatus(AF_INET6) != Status::Disconnected) { + if (searches6.empty() and getStatus(AF_INET6) != NodeStatus::Disconnected) { DHT_LOG.DEBUG("[confirm nodes] initial IPv6 'get' for my id (%s).", myid.toString().c_str()); search(myid, AF_INET6); } @@ -2259,7 +2204,7 @@ Dht::confirmNodes() nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); } -std::vector<Dht::ValuesExport> +std::vector<ValuesExport> Dht::exportValues() const { std::vector<ValuesExport> e {}; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 799208fa4a70ca40f211beb66ccf65c380814549..18088d0477bc3a1fe3d431251b704be3779985f6 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -19,6 +19,7 @@ */ #include "dhtrunner.h" +#include "securedht.h" #include <unistd.h> // close(fd) @@ -32,7 +33,7 @@ namespace dht { -DhtRunner::DhtRunner() +DhtRunner::DhtRunner() : dht_() { #ifdef _WIN32 WSADATA wsd; @@ -99,7 +100,7 @@ DhtRunner::run(const sockaddr_in* local4, const sockaddr_in6* local6, DhtRunner: std::lock_guard<std::mutex> lck(storage_mtx); if (not pending_ops_prio.empty()) return true; - if (not pending_ops.empty() and getStatus() >= Dht::Status::Connecting) + if (not pending_ops.empty() and getStatus() >= NodeStatus::Connecting) return true; } return false; @@ -109,7 +110,7 @@ DhtRunner::run(const sockaddr_in* local4, const sockaddr_in6* local6, DhtRunner: } void -DhtRunner::shutdown(Dht::ShutdownCallback cb) { +DhtRunner::shutdown(ShutdownCallback cb) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops_prio.emplace([=](SecureDht& dht) mutable { dht.shutdown(cb); @@ -134,13 +135,145 @@ DhtRunner::join() { std::lock_guard<std::mutex> lck(dht_mtx); dht_.reset(); - status4 = Dht::Status::Disconnected; - status6 = Dht::Status::Disconnected; + status4 = NodeStatus::Disconnected; + status6 = NodeStatus::Disconnected; bound4 = {}; bound6 = {}; } } +void +DhtRunner::dumpTables() const +{ + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->dumpTables(); +} + +InfoHash +DhtRunner::getId() const +{ + if (!dht_) + return {}; + return dht_->getId(); +} + +InfoHash +DhtRunner::getNodeId() const +{ + if (!dht_) + return {}; + return dht_->getNodeId(); +} + + +std::pair<size_t, size_t> +DhtRunner::getStoreSize() const { + std::lock_guard<std::mutex> lck(dht_mtx); + if (!dht_) + return {}; + return dht_->getStoreSize(); +} + +void +DhtRunner::setStorageLimit(size_t limit) { + std::lock_guard<std::mutex> lck(dht_mtx); + if (!dht_) + throw std::runtime_error("dht is not running"); + return dht_->setStorageLimit(limit); +} + +std::vector<NodeExport> +DhtRunner::exportNodes() const { + std::lock_guard<std::mutex> lck(dht_mtx); + if (!dht_) + return {}; + return dht_->exportNodes(); +} + +std::vector<ValuesExport> +DhtRunner::exportValues() const { + std::lock_guard<std::mutex> lck(dht_mtx); + if (!dht_) + return {}; + return dht_->exportValues(); +} + +void +DhtRunner::setLoggers(LogMethod&& error, LogMethod&& warn, LogMethod&& debug) { + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); +} + +void +DhtRunner::registerType(const ValueType& type) { + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->registerType(type); +} + +void +DhtRunner::importValues(const std::vector<ValuesExport>& values) { + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->importValues(values); +} + +int +DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const +{ + std::lock_guard<std::mutex> lck(dht_mtx); + return dht_->getNodesStats(af, good_return, dubious_return, cached_return, incoming_return); +} + +std::vector<unsigned> +DhtRunner::getNodeMessageStats(bool in) const +{ + std::lock_guard<std::mutex> lck(dht_mtx); + return dht_->getNodeMessageStats(in); +} + +std::string +DhtRunner::getStorageLog() const +{ + std::lock_guard<std::mutex> lck(dht_mtx); + return dht_->getStorageLog(); +} +std::string +DhtRunner::getRoutingTablesLog(sa_family_t af) const +{ + std::lock_guard<std::mutex> lck(dht_mtx); + return dht_->getRoutingTablesLog(af); +} +std::string +DhtRunner::getSearchesLog(sa_family_t af) const +{ + std::lock_guard<std::mutex> lck(dht_mtx); + return dht_->getSearchesLog(af); +} +std::vector<Address> +DhtRunner::getPublicAddress(sa_family_t af) +{ + std::lock_guard<std::mutex> lck(dht_mtx); + return dht_->getPublicAddress(af); +} +std::vector<std::string> +DhtRunner::getPublicAddressStr(sa_family_t af) +{ + auto addrs = getPublicAddress(af); + std::vector<std::string> ret(addrs.size()); + std::transform(addrs.begin(), addrs.end(), ret.begin(), dht::printAddr); + return ret; +} + +void +DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) { + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->registerCertificate(cert); +} +void +DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); +} + time_point DhtRunner::loop_() { @@ -156,7 +289,7 @@ DhtRunner::loop_() ops.front()(*dht_); ops.pop(); } - if (getStatus() >= Dht::Status::Connecting) { + if (getStatus() >= NodeStatus::Connecting) { { std::lock_guard<std::mutex> lck(storage_mtx); ops = std::move(pending_ops); @@ -185,8 +318,8 @@ DhtRunner::loop_() wakeup = dht_->periodic(nullptr, 0, nullptr, 0); } - Dht::Status nstatus4 = dht_->getStatus(AF_INET); - Dht::Status nstatus6 = dht_->getStatus(AF_INET6); + NodeStatus nstatus4 = dht_->getStatus(AF_INET); + NodeStatus nstatus6 = dht_->getStatus(AF_INET6); if (nstatus4 != status4 || nstatus6 != status6) { status4 = nstatus4; status6 = nstatus6; @@ -292,7 +425,7 @@ DhtRunner::doRun(const sockaddr_in* sin4, const sockaddr_in6* sin6, SecureDht::C } void -DhtRunner::get(InfoHash hash, Dht::GetCallback vcb, Dht::DoneCallback dcb, Value::Filter f) +DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) mutable { @@ -302,13 +435,13 @@ DhtRunner::get(InfoHash hash, Dht::GetCallback vcb, Dht::DoneCallback dcb, Value } void -DhtRunner::get(const std::string& key, Dht::GetCallback vcb, Dht::DoneCallbackSimple dcb, Value::Filter f) +DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, Value::Filter f) { get(InfoHash::get(key), vcb, dcb, f); } std::future<size_t> -DhtRunner::listen(InfoHash hash, Dht::GetCallback vcb, Value::Filter f) +DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f) { std::lock_guard<std::mutex> lck(storage_mtx); auto ret_token = std::make_shared<std::promise<size_t>>(); @@ -320,7 +453,7 @@ DhtRunner::listen(InfoHash hash, Dht::GetCallback vcb, Value::Filter f) } std::future<size_t> -DhtRunner::listen(const std::string& key, Dht::GetCallback vcb, Value::Filter f) +DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f) { return listen(InfoHash::get(key), vcb, f); } @@ -347,7 +480,7 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> token) } void -DhtRunner::put(InfoHash hash, Value&& value, Dht::DoneCallback cb) +DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb) { std::lock_guard<std::mutex> lck(storage_mtx); auto sv = std::make_shared<Value>(std::move(value)); @@ -358,7 +491,7 @@ DhtRunner::put(InfoHash hash, Value&& value, Dht::DoneCallback cb) } void -DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallback cb) +DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) { @@ -368,7 +501,7 @@ DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallback cb } void -DhtRunner::put(const std::string& key, Value&& value, Dht::DoneCallbackSimple cb) +DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb) { put(InfoHash::get(key), std::forward<Value>(value), cb); } @@ -384,7 +517,7 @@ DhtRunner::cancelPut(const InfoHash& h , const Value::Id& id) } void -DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallback cb) +DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) { @@ -394,19 +527,19 @@ DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, Dht::DoneCallb } void -DhtRunner::putSigned(InfoHash hash, Value&& value, Dht::DoneCallback cb) +DhtRunner::putSigned(InfoHash hash, Value&& value, DoneCallback cb) { putSigned(hash, std::make_shared<Value>(std::move(value)), cb); } void -DhtRunner::putSigned(const std::string& key, Value&& value, Dht::DoneCallbackSimple cb) +DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb) { putSigned(InfoHash::get(key), std::forward<Value>(value), cb); } void -DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, Dht::DoneCallback cb) +DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) { @@ -416,13 +549,13 @@ DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value } void -DhtRunner::putEncrypted(InfoHash hash, InfoHash to, Value&& value, Dht::DoneCallback cb) +DhtRunner::putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb) { putEncrypted(hash, to, std::make_shared<Value>(std::move(value)), cb); } void -DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, Dht::DoneCallback cb) +DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb) { putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), cb); } diff --git a/src/node_cache.cpp b/src/node_cache.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7ed542e524ccd8a1f21ea1cbf93eb3bde3ac8bdd --- /dev/null +++ b/src/node_cache.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2014-2016 Savoir-faire Linux Inc. + * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "node_cache.h" + +namespace dht { + +std::shared_ptr<Node> +NodeCache::getNode(const InfoHash& id, sa_family_t family) { + return (family == AF_INET ? cache_4 : cache_6).get(id); +} + +std::shared_ptr<Node> +NodeCache::getNode(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirm) { + return (sa->sa_family == AF_INET ? cache_4 : cache_6).get(id, sa, sa_len, now, confirm); +} + +void +NodeCache::clearBadNodes(sa_family_t family) +{ + if (family == 0) { + clearBadNodes(AF_INET); + clearBadNodes(AF_INET6); + } else { + (family == AF_INET ? cache_4 : cache_6).clearBadNodes(); + } +} + +std::shared_ptr<Node> +NodeCache::NodeTree::getLocal(const InfoHash& id) +{ + for (auto it = nodes.begin(); it != nodes.end();) { + if (auto n = it->lock()) { + if (n->id == id) return n; + ++it; + } else { + it = nodes.erase(it); + } + } + return {}; +} + +std::shared_ptr<Node> +NodeCache::NodeTree::get(const InfoHash& id) +{ + NodeTree* t = this; + for (auto b : id) { + if (t->childs.empty()) + return t->getLocal(id); + else + t = &t->childs[b]; + } + return {}; +} + +std::shared_ptr<Node> +NodeCache::NodeTree::get(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirm) +{ + // find the bucket + NodeTree* t = this; + size_t offset = 0; + while (not t->childs.empty() and offset < 4) + t = &t->childs[id[offset++]]; + + // find node in bucket + auto node = t->getLocal(id); + if (not node) { + node = std::make_shared<Node>(id, sa, sa_len); + + // insert node in bucket + if (t->nodes.size() >= 8 && offset < 4) { + offset++; + t->childs.resize(256); + for (auto& w : t->nodes) { + if (auto tn = w.lock()) { + t->childs[tn->id[offset]].nodes.emplace_back(std::move(w)); + } + } + t->nodes = {}; + t->childs[id[offset]].nodes.emplace_back(node); + } else { + t->nodes.emplace_back(node); + } + } else if (confirm || node->time < now - Node::NODE_EXPIRE_TIME) { + node->update(sa, sa_len); + } + if (confirm) + node->received(now, confirm >= 2); + return node; +} + +void +NodeCache::NodeTree::clearBadNodes() { + if (childs.empty()) { + for (auto it = nodes.begin(); it != nodes.end();) { + if (auto n = it->lock()) { + n->reset(); + ++it; + } else { + it = nodes.erase(it); + } + } + } else { + for (auto& c : childs) + c.clearBadNodes(); + } +} + +} diff --git a/src/securedht.cpp b/src/securedht.cpp index fdc3dccc358d7327ce8ded2bf4767004a9fc0a9a..39b343c2369f68c12ccfb5a451c1725f2ed57c72 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -32,7 +32,7 @@ extern "C" { namespace dht { -Dht::Config& getConfig(SecureDht::Config& conf) +Config& getConfig(SecureDht::Config& conf) { auto& c = conf.node_config; if (c.node_id == InfoHash()) { @@ -214,7 +214,7 @@ SecureDht::findCertificate(const InfoHash& node, std::function<void(const std::s } -Dht::GetCallback +GetCallback SecureDht::getCallbackFilter(GetCallback cb, Value::Filter&& filter) { return [=](const std::vector<std::shared_ptr<Value>>& values) { diff --git a/tools/dhtscanner.cpp b/tools/dhtscanner.cpp index 7d2e67989a0fb43ad03ef45dde52b4cc05c0299f..eb323781d203d39e3f17f709e48fadbda3e8c5ba 100644 --- a/tools/dhtscanner.cpp +++ b/tools/dhtscanner.cpp @@ -19,6 +19,8 @@ */ #include "tools_common.h" +#include <opendht/node.h> + extern "C" { #include <gnutls/gnutls.h> } @@ -78,7 +80,7 @@ main(int argc, char **argv) auto crt_tmp = dht::crypto::generateIdentity("Scanner node", ca_tmp); DhtRunner dht; - dht.run(params.port, crt_tmp, true, [](dht::Dht::Status /* ipv4 */, dht::Dht::Status /* ipv6 */) {}); + dht.run(params.port, crt_tmp, true, [](dht::NodeStatus /* ipv4 */, dht::NodeStatus /* ipv6 */) {}); if (not params.bootstrap.first.empty()) dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); diff --git a/tools/tools_common.h b/tools/tools_common.h index e9cc9362106fed8dbe98f6e2c5f2cc0dfd8347ed..144bd521f394536cb9725f9b42f74f2902b8e614 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -189,7 +189,7 @@ parseArgs(int argc, char **argv) { break; case 'v': if (optarg) - params.logfile = {optarg}; + params.logfile = optarg; params.log = true; break; case 'i':