diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 5db34f396af17796bb4144a6b33368c02cccf21a..e8b6425abe9166344eb9031f6233c9be95c1963b 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -59,19 +59,14 @@ public: // [[deprecated]] using Status = NodeStatus; - Dht() : network_engine(DHT_LOG, scheduler) {} + Dht(); /** * Initialise the Dht with two open sockets (for IPv4 and IP6) * and an ID for the node. */ Dht(int s, int s6, Config config); - virtual ~Dht() { - for (auto& s : searches4) - s.second->clear(); - for (auto& s : searches6) - s.second->clear(); - } + virtual ~Dht(); /** * Get the ID of the node. @@ -291,73 +286,7 @@ private: static constexpr size_t TOKEN_SIZE {64}; - struct SearchNode { - SearchNode(std::shared_ptr<Node> node) : node(node) {} - - using AnnounceStatusMap = std::map<Value::Id, std::shared_ptr<Request>>; - - /** - * Can we use this node to listen/announce now ? - */ - bool isSynced(time_point now) const { - return not node->isExpired() and - not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME; - } - bool canGet(time_point now, time_point update) const { - return not node->isExpired() and - (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply) - and (not getStatus or not getStatus->pending()); - } - - bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const { - auto ack = acked.find(vid); - if (ack == acked.end() or not ack->second) { - return false; - } - return ack->second->reply_time + type.expiration > now; - } - bool isListening(time_point now) const { - if (not listenStatus) - return false; - - return listenStatus->reply_time + LISTEN_EXPIRE_TIME > now; - } - - time_point getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const { - if (ack == acked.end() or not ack->second) - return time_point::min(); - return ack->second->pending() ? time_point::max() : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN; - } - - time_point getAnnounceTime(Value::Id vid, const ValueType& type) const { - return getAnnounceTime(acked.find(vid), type); - } - - time_point getListenTime() const { - if (not listenStatus) - return time_point::min(); - - return listenStatus->pending() ? time_point::max() : listenStatus->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN; - } - bool isBad() const { - return !node || node->isExpired() || candidate; - } - - std::shared_ptr<Node> node {}; - - time_point last_get_reply {time_point::min()}; /* last time received valid token */ - std::shared_ptr<Request> getStatus {}; /* get/sync status */ - std::shared_ptr<Request> listenStatus {}; - AnnounceStatusMap acked {}; /* announcement status for a given value id */ - - Blob token {}; - - /** - * A search node is candidate if the search is/was synced and this node is a new candidate for inclusion - * - */ - bool candidate {false}; - }; + struct SearchNode; /** * A single "get" operation data @@ -387,96 +316,10 @@ private: }; /** - * A search is a pointer to the nodes we think are responsible + * A search is a list of the nodes we think are responsible * for storing values for a given hash. */ - struct Search { - InfoHash id {}; - sa_family_t af; - - uint16_t tid; - time_point refill_time {time_point::min()}; - time_point step_time {time_point::min()}; /* the time of the last search step */ - unsigned current_get_requests {0}; /* number of concurrent sync requests */ - std::shared_ptr<Scheduler::Job> nextSearchStep {}; - - bool expired {false}; /* no node, or all nodes expired */ - bool done {false}; /* search is over, cached for later */ - std::vector<SearchNode> nodes {}; - - /* pending puts */ - std::vector<Announce> announce {}; - - /* pending gets */ - std::vector<Get> callbacks {}; - - /* listeners */ - std::map<size_t, LocalListener> listeners {}; - size_t listener_token = 1; - - /** - * @returns true if the node was not present and added to the search - */ - bool insertNode(std::shared_ptr<Node> n, time_point now, const Blob& token={}); - unsigned insertBucket(const Bucket&, time_point now); - - SearchNode* getNode(const std::shared_ptr<Node>& n) { - auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) { - return n == sn.node; - }); - return (srn == nodes.end()) ? nullptr : &(*srn); - } - - /** - * Can we use this search to announce ? - */ - bool isSynced(time_point now) const; - - time_point getLastGetTime() const; - - /** - * Is this get operation done ? - */ - bool isDone(const Get& get, time_point now) const; - - time_point getUpdateTime(time_point now) const; - - bool isAnnounced(Value::Id id, const ValueType& type, time_point now) const; - bool isListening(time_point now) const; - - /** - * @return The number of non-good search nodes. - */ - unsigned getNumberOfBadNodes(); - - /** - * ret = 0 : no announce required. - * ret > 0 : (re-)announce required at time ret. - */ - time_point getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; - - /** - * ret = 0 : no listen required. - * ret > 0 : (re-)announce required at time ret. - */ - time_point getListenTime(time_point now) const; - - time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; - - bool removeExpiredNode(time_point now); - - unsigned refill(const RoutingTable&, time_point now); - - std::vector<std::shared_ptr<Node>> getNodes() const; - - void clear() { - announce.clear(); - callbacks.clear(); - listeners.clear(); - nodes.clear(); - nextSearchStep = {}; - } - }; + struct Search; struct ValueStorage { std::shared_ptr<Value> data {}; @@ -501,84 +344,7 @@ private: } }; - struct Storage { - InfoHash id; - time_point maintenance_time {}; - std::map<std::shared_ptr<Node>, Listener> listeners {}; - std::map<size_t, LocalListener> local_listeners {}; - size_t listener_token {1}; - - Storage() {} - Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} - -#if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 9 - // GCC-bug: remove me when support of GCC < 4.9.2 is abandoned - Storage(Storage&& o) noexcept - : id(std::move(o.id)) - , maintenance_time(std::move(o.maintenance_time)) - , listeners(std::move(o.listeners)) - , local_listeners(std::move(o.local_listeners)) - , listener_token(std::move(o.listener_token)) - , values(std::move(o.values)) - , total_size(std::move(o.total_size)) {} -#else - Storage(Storage&& o) noexcept = default; -#endif - - Storage& operator=(Storage&& o) = default; - - bool empty() const { - return values.empty(); - } - - void clear(); - - size_t valueCount() const { - return values.size(); - } - - size_t totalSize() const { - return total_size; - } - - const std::vector<ValueStorage>& getValues() const { return values; } - - std::shared_ptr<Value> getById(Value::Id vid) const { - for (auto& v : values) - if (v.data->id == vid) return v.data; - return {}; - } - - std::vector<std::shared_ptr<Value>> get(Value::Filter f = {}) const { - std::vector<std::shared_ptr<Value>> newvals {}; - if (not f) newvals.reserve(values.size()); - for (auto& v : values) { - if (not f || f(*v.data)) - newvals.push_back(v.data); - } - return newvals; - } - - /** - * Stores a new value in this storage, or replace a previous value - * - * @return <storage, change_size, change_value_num> - * storage: set if a change happened - * change_size: size difference - * change_value_num: change of value number (0 or 1) - */ - std::tuple<ValueStorage*, ssize_t, ssize_t> - store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left); - - std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now); - - private: - Storage(const Storage&) = delete; - Storage& operator=(const Storage&) = delete; - - std::vector<ValueStorage> values {}; - size_t total_size {}; - }; + struct Storage; // prevent copy Dht(const Dht&) = delete; @@ -602,7 +368,7 @@ private: RoutingTable buckets {}; RoutingTable buckets6 {}; - std::vector<Storage> store {}; + std::vector<Storage> store; size_t total_values {0}; size_t total_store_size {0}; size_t max_store_size {DEFAULT_STORAGE_LIMIT}; @@ -634,16 +400,8 @@ private: void reportedAddr(const sockaddr *sa, socklen_t sa_len); // Storage - decltype(Dht::store)::iterator findStorage(const InfoHash& id) { - return std::find_if(store.begin(), store.end(), [&](const Storage& st) { - return st.id == id; - }); - } - decltype(Dht::store)::const_iterator findStorage(const InfoHash& id) const { - return std::find_if(store.cbegin(), store.cend(), [&](const Storage& st) { - return st.id == id; - }); - } + decltype(Dht::store)::iterator findStorage(const InfoHash& id); + decltype(Dht::store)::const_iterator findStorage(const InfoHash& id) const; void storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t tid); bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created); diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 637ed05e6f94dca4b361390d8f2b7d3e6d7a674f..5986b9a566c0e82b8c56930909e79a3f7fb0e833 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -28,7 +28,6 @@ #include "scheduler.h" #include "utils.h" #include "rng.h" -#include "request.h" #include <vector> #include <string> @@ -165,12 +164,7 @@ public: * Cancel a request. Setting req->cancelled = true is not enough in the case * a request is "persistent". */ - void cancelRequest(std::shared_ptr<Request>& req) { - if (req) { - req->cancel(); - requests.erase(req->tid); - } - } + void cancelRequest(std::shared_ptr<Request>& req); void connectivityChanged(); @@ -281,11 +275,7 @@ public: clear(); }; - void clear() { - for (auto& req : requests) - req.second->cancel(); - requests.clear(); - } + void clear(); /** * Sends values (with closest nodes) to a listenner. @@ -399,48 +389,13 @@ private: static bool isMartian(const sockaddr* sa, socklen_t len); bool isNodeBlacklisted(const sockaddr*, socklen_t) const; - void requestStep(std::shared_ptr<Request> req) { - if (not req->pending()) { - if (req->cancelled()) - requests.erase(req->tid); - return; - } - - auto now = scheduler.time(); - if (req->isExpired(now)) { - req->node->setExpired(); - requests.erase(req->tid); - return; - } else if (req->attempt_count == 1) { - req->on_expired(*req, false); - } - - send((char*)req->msg.data(), req->msg.size(), - (req->node->reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM, - (sockaddr*)&req->node->ss, req->node->sslen); - ++req->attempt_count; - req->last_try = now; - std::weak_ptr<Request> wreq = req; - scheduler.add(req->last_try + Node::MAX_RESPONSE_TIME, [this,wreq]() { - if (auto req = wreq.lock()) { - requestStep(req); - } - }); - } + void requestStep(std::shared_ptr<Request> req); /** * Sends a request to a node. Request::MAX_ATTEMPT_COUNT attempts will * be made before the request expires. */ - void sendRequest(std::shared_ptr<Request>& request) { - request->start = scheduler.time(); - auto e = requests.emplace(request->tid, request); - if (!e.second) { - DHT_LOG.ERROR("Request already existed !"); - } - request->node->requested(request); - requestStep(request); - } + void sendRequest(std::shared_ptr<Request>& request); /** * Generates a new request id, skipping the invalid id. diff --git a/src/dht.cpp b/src/dht.cpp index 702e32ce13737e970ac9002361c5dce78a09f453..02b3357318ece9ce80667ce8dd837f039e482ca1 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -21,6 +21,7 @@ #include "dht.h" #include "rng.h" +#include "request.h" #include <msgpack.hpp> extern "C" { @@ -79,6 +80,244 @@ constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME; constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME; constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN; +// internal structures definition + +struct Dht::Storage { + InfoHash id; + time_point maintenance_time {}; + std::map<std::shared_ptr<Node>, Listener> listeners {}; + std::map<size_t, LocalListener> local_listeners {}; + size_t listener_token {1}; + + Storage() {} + Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} + +#if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 9 + // GCC-bug: remove me when support of GCC < 4.9.2 is abandoned + Storage(Storage&& o) noexcept + : id(std::move(o.id)) + , maintenance_time(std::move(o.maintenance_time)) + , listeners(std::move(o.listeners)) + , local_listeners(std::move(o.local_listeners)) + , listener_token(std::move(o.listener_token)) + , values(std::move(o.values)) + , total_size(std::move(o.total_size)) {} +#else + Storage(Storage&& o) noexcept = default; +#endif + + Storage& operator=(Storage&& o) = default; + + bool empty() const { + return values.empty(); + } + + void clear(); + + size_t valueCount() const { + return values.size(); + } + + size_t totalSize() const { + return total_size; + } + + const std::vector<ValueStorage>& getValues() const { return values; } + + std::shared_ptr<Value> getById(Value::Id vid) const { + for (auto& v : values) + if (v.data->id == vid) return v.data; + return {}; + } + + std::vector<std::shared_ptr<Value>> get(Value::Filter f = {}) const { + std::vector<std::shared_ptr<Value>> newvals {}; + if (not f) newvals.reserve(values.size()); + for (auto& v : values) { + if (not f || f(*v.data)) + newvals.push_back(v.data); + } + return newvals; + } + + /** + * Stores a new value in this storage, or replace a previous value + * + * @return <storage, change_size, change_value_num> + * storage: set if a change happened + * change_size: size difference + * change_value_num: change of value number (0 or 1) + */ + std::tuple<ValueStorage*, ssize_t, ssize_t> + store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left); + + std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now); + +private: + Storage(const Storage&) = delete; + Storage& operator=(const Storage&) = delete; + + std::vector<ValueStorage> values {}; + size_t total_size {}; +}; + + +struct Dht::SearchNode { + SearchNode(std::shared_ptr<Node> node) : node(node) {} + + using AnnounceStatusMap = std::map<Value::Id, std::shared_ptr<Request>>; + + /** + * Can we use this node to listen/announce now ? + */ + bool isSynced(time_point now) const { + return not node->isExpired() and + not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME; + } + bool canGet(time_point now, time_point update) const { + return not node->isExpired() and + (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply) + and (not getStatus or not getStatus->pending()); + } + + bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const { + auto ack = acked.find(vid); + if (ack == acked.end() or not ack->second) { + return false; + } + return ack->second->reply_time + type.expiration > now; + } + bool isListening(time_point now) const { + if (not listenStatus) + return false; + + return listenStatus->reply_time + LISTEN_EXPIRE_TIME > now; + } + + time_point getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const { + if (ack == acked.end() or not ack->second) + return time_point::min(); + return ack->second->pending() ? time_point::max() : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN; + } + + time_point getAnnounceTime(Value::Id vid, const ValueType& type) const { + return getAnnounceTime(acked.find(vid), type); + } + + time_point getListenTime() const { + if (not listenStatus) + return time_point::min(); + + return listenStatus->pending() ? time_point::max() : listenStatus->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN; + } + bool isBad() const { + return !node || node->isExpired() || candidate; + } + + std::shared_ptr<Node> node {}; + + time_point last_get_reply {time_point::min()}; /* last time received valid token */ + std::shared_ptr<Request> getStatus {}; /* get/sync status */ + std::shared_ptr<Request> listenStatus {}; + AnnounceStatusMap acked {}; /* announcement status for a given value id */ + + Blob token {}; + + /** + * A search node is candidate if the search is/was synced and this node is a new candidate for inclusion + * + */ + bool candidate {false}; +}; + +struct Dht::Search { + InfoHash id {}; + sa_family_t af; + + uint16_t tid; + time_point refill_time {time_point::min()}; + time_point step_time {time_point::min()}; /* the time of the last search step */ + unsigned current_get_requests {0}; /* number of concurrent sync requests */ + std::shared_ptr<Scheduler::Job> nextSearchStep {}; + + bool expired {false}; /* no node, or all nodes expired */ + bool done {false}; /* search is over, cached for later */ + std::vector<SearchNode> nodes {}; + + /* pending puts */ + std::vector<Announce> announce {}; + + /* pending gets */ + std::vector<Get> callbacks {}; + + /* listeners */ + std::map<size_t, LocalListener> listeners {}; + size_t listener_token = 1; + + /** + * @returns true if the node was not present and added to the search + */ + bool insertNode(std::shared_ptr<Node> n, time_point now, const Blob& token={}); + unsigned insertBucket(const Bucket&, time_point now); + + SearchNode* getNode(const std::shared_ptr<Node>& n) { + auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) { + return n == sn.node; + }); + return (srn == nodes.end()) ? nullptr : &(*srn); + } + + /** + * Can we use this search to announce ? + */ + bool isSynced(time_point now) const; + + time_point getLastGetTime() const; + + /** + * Is this get operation done ? + */ + bool isDone(const Get& get, time_point now) const; + + time_point getUpdateTime(time_point now) const; + + bool isAnnounced(Value::Id id, const ValueType& type, time_point now) const; + bool isListening(time_point now) const; + + /** + * @return The number of non-good search nodes. + */ + unsigned getNumberOfBadNodes(); + + /** + * ret = 0 : no announce required. + * ret > 0 : (re-)announce required at time ret. + */ + time_point getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; + + /** + * ret = 0 : no listen required. + * ret > 0 : (re-)announce required at time ret. + */ + time_point getListenTime(time_point now) const; + + time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; + + bool removeExpiredNode(time_point now); + + unsigned refill(const RoutingTable&, time_point now); + + std::vector<std::shared_ptr<Node>> getNodes() const; + + void clear() { + announce.clear(); + callbacks.clear(); + listeners.clear(); + nodes.clear(); + nextSearchStep = {}; + } +}; + void Dht::setLoggers(LogMethod&& error, LogMethod&& warn, LogMethod&& debug) { @@ -335,7 +574,7 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& const auto& nid = node->id; // Fast track for the case where the node is not relevant for this search - if (nodes.size() >= SEARCH_NODES && id.xorCmp(nid, nodes.back().node->id) > 0 && node->isExpired()) + if (node->isExpired() && nodes.size() >= SEARCH_NODES && id.xorCmp(nid, nodes.back().node->id) > 0) return false; bool found = false; @@ -1339,6 +1578,24 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) return canceled; } + +// Storage + +decltype(Dht::store)::iterator +Dht::findStorage(const InfoHash& id) +{ + return std::find_if(store.begin(), store.end(), [&](const Storage& st) { + return st.id == id; + }); +} +decltype(Dht::store)::const_iterator +Dht::findStorage(const InfoHash& id) const +{ + return std::find_if(store.cbegin(), store.cend(), [&](const Storage& st) { + return st.id == id; + }); +} + void Dht::storageChanged(Storage& st, ValueStorage& v) { @@ -1805,8 +2062,18 @@ Dht::getSearchesLog(sa_family_t af) const return out.str(); } +Dht::~Dht() +{ + for (auto& s : searches4) + s.second->clear(); + for (auto& s : searches6) + s.second->clear(); +} + +Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {} + Dht::Dht(int s, int s6, Config config) - : myid(config.node_id), is_bootstrap(config.is_bootstrap), + : myid(config.node_id), is_bootstrap(config.is_bootstrap), store(), network_engine(myid, s, s6, DHT_LOG, scheduler, std::bind(&Dht::onError, this, _1, _2), std::bind(&Dht::newNode, this, _1, _2), @@ -1946,7 +2213,6 @@ Dht::dataPersistence() { if (now > str.maintenance_time) { maintainStorage(str.id); str.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; - } storage_maintenance_time = std::min(storage_maintenance_time, str.maintenance_time); } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 49ae581291a73a728e59c79410e3d057cd046355..c0ebbb32de753264ca7adeb3f568d4d614e8e049 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -20,6 +20,7 @@ #include "network_engine.h" +#include "request.h" #include <msgpack.hpp> @@ -110,12 +111,77 @@ NetworkEngine::isRunning(sa_family_t af) const } } +void +NetworkEngine::cancelRequest(std::shared_ptr<Request>& req) +{ + if (req) { + req->cancel(); + requests.erase(req->tid); + } +} + +void +NetworkEngine::clear() +{ + for (auto& req : requests) + req.second->cancel(); + requests.clear(); +} + void NetworkEngine::connectivityChanged() { cache.clearBadNodes(); } +void +NetworkEngine::requestStep(std::shared_ptr<Request> req) +{ + if (not req->pending()) { + if (req->cancelled()) + requests.erase(req->tid); + return; + } + + auto now = scheduler.time(); + if (req->isExpired(now)) { + req->node->setExpired(); + requests.erase(req->tid); + return; + } else if (req->attempt_count == 1) { + req->on_expired(*req, false); + } + + send((char*)req->msg.data(), req->msg.size(), + (req->node->reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM, + (sockaddr*)&req->node->ss, req->node->sslen); + ++req->attempt_count; + req->last_try = now; + std::weak_ptr<Request> wreq = req; + scheduler.add(req->last_try + Node::MAX_RESPONSE_TIME, [this,wreq]() { + if (auto req = wreq.lock()) { + requestStep(req); + } + }); +} + +/** + * Sends a request to a node. Request::MAX_ATTEMPT_COUNT attempts will + * be made before the request expires. + */ +void +NetworkEngine::sendRequest(std::shared_ptr<Request>& request) +{ + request->start = scheduler.time(); + auto e = requests.emplace(request->tid, request); + if (!e.second) { + DHT_LOG.ERROR("Request already existed !"); + } + request->node->requested(request); + requestStep(request); +} + + /* Rate control for requests we receive. */ bool NetworkEngine::rateLimit()