diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 28f6c609560d9c2d68c4a1fa13b001951e3e47c2..cbe3e22c5dc0ba209214d710d7c990224d90369d 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -41,6 +41,56 @@ THE SOFTWARE. namespace dht { +struct NodeExport { + InfoHash id; + sockaddr_storage ss; + socklen_t sslen; +}; + +struct Node { + InfoHash id {}; + sockaddr_storage ss; + socklen_t sslen {0}; + time_point time {}; /* last time eared about */ + time_point reply_time {}; /* time of last correct reply received */ + time_point pinged_time {}; /* time of last message sent */ + unsigned pinged {0}; /* how many requests we sent since last reply */ + + Node() : ss() { + std::fill_n((uint8_t*)&ss, sizeof(ss), 0); + } + Node(const InfoHash& id, const sockaddr* sa, socklen_t salen) + : id(id), ss(), sslen(salen) { + std::copy_n((const uint8_t*)sa, salen, (uint8_t*)&ss); + } + InfoHash getId() const { + return id; + } + bool isExpired(time_point now) const; + bool isGood(time_point now) const; + NodeExport exportNode() const { return NodeExport {id, ss, sslen}; } + sa_family_t getFamily() const { return ss.ss_family; } + + void update(const sockaddr* sa, socklen_t salen); + + /** To be called when a message was sent to the noe */ + void requested(time_point now); + + /** To be called when a message was received from the node. + Answer should be true if the message was an aswer to a request we made*/ + void received(time_point now, bool answer); + + friend std::ostream& operator<< (std::ostream& s, const Node& h); + + static constexpr const std::chrono::minutes NODE_GOOD_TIME {120}; + + /* The time after which we consider a node to be expirable. */ + static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {10}; + + /* Time for a request to timeout */ + static constexpr const std::chrono::seconds MAX_RESPONSE_TIME {3}; +}; + /** * Main Dht class. * Provides a Distributed Hash Table node. @@ -57,9 +107,34 @@ public: Connected // 1+ good nodes }; - struct Node; - typedef std::function<bool(const std::vector<std::shared_ptr<Value>>& values)> GetCallback; - typedef std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)> DoneCallback; + // [[deprecated]] + using NodeExport = dht::NodeExport; + + //typedef std::function<bool(const std::vector<std::shared_ptr<Value>>& values)> GetCallback; + struct GetCallback : public std::function<bool(const std::vector<std::shared_ptr<Value>>& values)> + { + typedef bool (*GetCallbackRaw)(std::vector<std::shared_ptr<Value>>*, void *user_data); + + using std::function<bool(const std::vector<std::shared_ptr<Value>>& values)>::function; + GetCallback(GetCallbackRaw raw_cb, void *user_data) + : GetCallback([=](const std::vector<std::shared_ptr<Value>>& values){ + return raw_cb((std::vector<std::shared_ptr<Value>>*)&values, user_data); + }) {} + GetCallback() : GetCallback(nullptr) {} + }; + struct DoneCallback : public std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)> + { + typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data); + + using std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)>::function; + DoneCallback(DoneCallbackRaw raw_cb, void *user_data) + : DoneCallback([=](bool success, const std::vector<std::shared_ptr<Node>>& nodes) { + return raw_cb(success, (std::vector<std::shared_ptr<Node>>*)&nodes, user_data); + }) {} + DoneCallback() : DoneCallback(nullptr) {} + }; + + //typedef std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)> DoneCallback; typedef std::function<void(bool success)> DoneCallbackSimple; static DoneCallback @@ -68,11 +143,6 @@ public: return std::bind(donecb, _1); } - struct NodeExport { - InfoHash id; - sockaddr_storage ss; - socklen_t sslen; - }; using want_t = int_fast8_t; @@ -221,39 +291,6 @@ public: /* This must be provided by the user. */ static bool isBlacklisted(const sockaddr*, socklen_t) { return false; } - struct Node { - InfoHash id {}; - sockaddr_storage ss; - socklen_t sslen {0}; - time_point time {}; /* last time eared about */ - time_point reply_time {}; /* time of last correct reply received */ - time_point pinged_time {}; /* time of last message sent */ - unsigned pinged {0}; /* how many requests we sent since last reply */ - - Node() : ss() { - std::fill_n((uint8_t*)&ss, sizeof(ss), 0); - } - Node(const InfoHash& id, const sockaddr* sa, socklen_t salen) - : id(id), ss(), sslen(salen) { - std::copy_n((const uint8_t*)sa, salen, (uint8_t*)&ss); - } - bool isExpired(time_point now) const; - bool isGood(time_point now) const; - NodeExport exportNode() const { return NodeExport {id, ss, sslen}; } - sa_family_t getFamily() const { return ss.ss_family; } - - void update(const sockaddr* sa, socklen_t salen); - - /** To be called when a message was sent to the noe */ - void requested(time_point now); - - /** To be called when a message was received from the node. - Answer should be true if the message was an aswer to a request we made*/ - void received(time_point now, bool answer); - - friend std::ostream& operator<< (std::ostream& s, const Node& h); - }; - protected: LogMethod DHT_DEBUG = NOLOG; LogMethod DHT_WARN = NOLOG; @@ -278,9 +315,6 @@ private: /* The maximum number of searches we keep data about. */ static constexpr unsigned MAX_SEARCHES {1024}; - /* Time for a request to timeout */ - static constexpr std::chrono::seconds MAX_RESPONSE_TIME {3}; - /* The time after which we can send get requests for a search in case of no answers. */ static constexpr std::chrono::seconds SEARCH_GET_STEP {3}; @@ -288,14 +322,9 @@ private: /* The time after which we consider a search to be expirable. */ static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62}; - /* The time after which we consider a node to be expirable. */ - static constexpr std::chrono::minutes NODE_EXPIRE_TIME {10}; - /* Timeout for listen */ static constexpr std::chrono::minutes LISTEN_EXPIRE_TIME {3}; - static constexpr std::chrono::minutes NODE_GOOD_TIME {120}; - static constexpr std::chrono::seconds REANNOUNCE_MARGIN {5}; static constexpr std::chrono::seconds UDP_REPLY_TIME {15}; @@ -372,16 +401,16 @@ private: RequestStatus() {}; RequestStatus(time_point q, time_point a = {}) : request_time(q), reply_time(a) {}; bool expired(time_point now) { - return (reply_time < request_time && request_time >= now - MAX_RESPONSE_TIME); + return (reply_time < request_time && now - request_time <= Node::MAX_RESPONSE_TIME); } }; typedef std::map<Value::Id, RequestStatus> AnnounceStatusMap; /** - * Can we use this node to listen/announce ? + * Can we use this node to listen/announce now ? */ bool isSynced(time_point now) const { - return not node->isExpired(now) and getStatus.reply_time >= now - NODE_EXPIRE_TIME; + return not node->isExpired(now) and getStatus.reply_time >= now - Node::NODE_EXPIRE_TIME; } bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const { @@ -397,11 +426,11 @@ private: time_point getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const { if (ack == acked.end()) - return getStatus.request_time + MAX_RESPONSE_TIME; + return getStatus.request_time + Node::MAX_RESPONSE_TIME; return std::max<time_point>({ ack->second.reply_time + type.expiration - REANNOUNCE_MARGIN, - ack->second.request_time + MAX_RESPONSE_TIME, - getStatus.request_time + MAX_RESPONSE_TIME + ack->second.request_time + Node::MAX_RESPONSE_TIME, + getStatus.request_time + Node::MAX_RESPONSE_TIME }); } time_point getAnnounceTime(Value::Id vid, const ValueType& type) const { @@ -410,7 +439,7 @@ private: time_point getListenTime() const { if (listenStatus.reply_time > listenStatus.request_time) return listenStatus.reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN; - return listenStatus.request_time + MAX_RESPONSE_TIME; + return listenStatus.request_time + Node::MAX_RESPONSE_TIME; //time_point min_t = listenStatus.request_time + MAX_RESPONSE_TIME; //return listenStatus.reply_time.time_since_epoch().count() ? std::max(listenStatus.reply_time + NODE_EXPIRE_TIME - REANNOUNCE_MARGIN, min_t) : min_t; } @@ -509,7 +538,7 @@ private: bool removeExpiredNode(time_point now); - std::vector<std::shared_ptr<Dht::Node>> getNodes(time_point now) const; + std::vector<std::shared_ptr<Node>> getNodes(time_point now) const; }; struct ValueStorage { @@ -735,7 +764,7 @@ private: std::shared_ptr<Node> newNode(const InfoHash& id, const sockaddr*, socklen_t, int confirm); std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af); const std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af) const; - bool trySearchInsert(const std::shared_ptr<Dht::Node>& node); + bool trySearchInsert(const std::shared_ptr<Node>& node); void pinged(Node& n, Bucket *b = nullptr); diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 229dcd2c1cf5923850be7e90af910830b7df6d01..1c05b6ef74b6f7f6a05ab4c7d284b83897385534 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -149,7 +149,7 @@ public: void bootstrap(const char* host, const char* service); void bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes); - void bootstrap(const std::vector<Dht::NodeExport>& nodes); + void bootstrap(const std::vector<NodeExport>& nodes); void dumpTables() const { @@ -174,7 +174,7 @@ public: return getNodeId(); } - std::vector<Dht::NodeExport> exportNodes() const { + std::vector<NodeExport> exportNodes() const { std::lock_guard<std::mutex> lck(dht_mtx); if (!dht_) return {}; diff --git a/src/dht.cpp b/src/dht.cpp index 692fa1a7f91222acec0dba85d38e48f412fdb0c5..6173c416cec5e99308e2bff53df6625093706de9 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -146,11 +146,12 @@ static constexpr InfoHash ones = {std::array<uint8_t, HASH_LEN>{ 0xFF, 0xFF, 0xFF, 0xFF }}; -constexpr std::chrono::seconds Dht::MAX_RESPONSE_TIME; +constexpr std::chrono::minutes Node::NODE_EXPIRE_TIME; +constexpr std::chrono::minutes Node::NODE_GOOD_TIME; +constexpr std::chrono::seconds Node::MAX_RESPONSE_TIME; + constexpr std::chrono::seconds Dht::SEARCH_GET_STEP; constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME; -constexpr std::chrono::minutes Dht::NODE_EXPIRE_TIME; -constexpr std::chrono::minutes Dht::NODE_GOOD_TIME; constexpr std::chrono::minutes Dht::LISTEN_EXPIRE_TIME; constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN; constexpr std::chrono::seconds Dht::UDP_REPLY_TIME; @@ -225,7 +226,7 @@ Dht::isMartian(const sockaddr *sa, socklen_t len) } } -std::shared_ptr<Dht::Node> +std::shared_ptr<Node> Dht::Bucket::randomNode() { if (nodes.empty()) @@ -297,7 +298,7 @@ Dht::RoutingTable::findBucket(const InfoHash& id) const } /* Every bucket contains an unordered list of nodes. */ -std::shared_ptr<Dht::Node> +std::shared_ptr<Node> Dht::findNode(const InfoHash& id, sa_family_t af) { Bucket* b = findBucket(id, af); @@ -308,7 +309,7 @@ Dht::findNode(const InfoHash& id, sa_family_t af) return {}; } -const std::shared_ptr<Dht::Node> +const std::shared_ptr<Node> Dht::findNode(const InfoHash& id, sa_family_t af) const { const Bucket* b = findBucket(id, af); @@ -321,7 +322,7 @@ Dht::findNode(const InfoHash& id, sa_family_t af) const /* This is our definition of a known-good node. */ bool -Dht::Node::isGood(time_point now) const +Node::isGood(time_point now) const { return not isExpired(now) && @@ -330,13 +331,13 @@ Dht::Node::isGood(time_point now) const } bool -Dht::Node::isExpired(time_point now) const +Node::isExpired(time_point now) const { return pinged >= 3 && reply_time < pinged_time && pinged_time + MAX_RESPONSE_TIME < now; } void -Dht::Node::update(const sockaddr* sa, socklen_t salen) +Node::update(const sockaddr* sa, socklen_t salen) { std::copy_n((const uint8_t*)sa, salen, (uint8_t*)&ss); sslen = salen; @@ -344,7 +345,7 @@ Dht::Node::update(const sockaddr* sa, socklen_t salen) /** To be called when a message was sent to the noe */ void -Dht::Node::requested(time_point now) +Node::requested(time_point now) { pinged++; if (reply_time > pinged_time || pinged_time + MAX_RESPONSE_TIME < now) @@ -354,7 +355,7 @@ Dht::Node::requested(time_point now) /** To be called when a message was received from the node. Answer should be true if the message was an aswer to a request we made*/ void -Dht::Node::received(time_point now, bool answer) +Node::received(time_point now, bool answer) { time = now; if (answer) { @@ -363,14 +364,14 @@ Dht::Node::received(time_point now, bool answer) } } -std::ostream& operator<< (std::ostream& s, const Dht::Node& h) +std::ostream& operator<< (std::ostream& s, const Node& h) { s << h.id << " " << print_addr(h.ss, h.sslen); return s; } -std::shared_ptr<Dht::Node> +std::shared_ptr<Node> Dht::NodeCache::getNode(const InfoHash& id, sa_family_t family) { auto& list = family == AF_INET ? cache_4 : cache_6; for (auto n = list.begin(); n != list.end();) { @@ -385,13 +386,13 @@ Dht::NodeCache::getNode(const InfoHash& id, sa_family_t family) { return nullptr; } -std::shared_ptr<Dht::Node> +std::shared_ptr<Node> Dht::NodeCache::getNode(const InfoHash& id, const sockaddr* sa, socklen_t sa_len, time_point now, int confirm) { auto node = getNode(id, sa->sa_family); if (not node) { node = std::make_shared<Node>(id, sa, sa_len); putNode(node); - } else if (confirm || node->time < now - NODE_EXPIRE_TIME) { + } else if (confirm || node->time < now - Node::NODE_EXPIRE_TIME) { node->update(sa, sa_len); } if (confirm) @@ -400,7 +401,7 @@ Dht::NodeCache::getNode(const InfoHash& id, const sockaddr* sa, socklen_t sa_len } void -Dht::NodeCache::putNode(std::shared_ptr<Dht::Node> n) { +Dht::NodeCache::putNode(std::shared_ptr<Node> n) { if (not n) return; auto& list = n->ss.ss_family == AF_INET ? cache_4 : cache_6; list.push_back(n); @@ -513,7 +514,7 @@ Dht::RoutingTable::split(const RoutingTable::iterator& b) } bool -Dht::trySearchInsert(const std::shared_ptr<Dht::Node>& node) +Dht::trySearchInsert(const std::shared_ptr<Node>& node) { bool inserted = false; auto family = node->getFamily(); @@ -532,7 +533,7 @@ Dht::trySearchInsert(const std::shared_ptr<Dht::Node>& node) /* We just learnt about a node, not necessarily a new one. Confirm is 1 if the node sent a message, 2 if it sent us a reply. */ -std::shared_ptr<Dht::Node> +std::shared_ptr<Node> Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confirm) { if (id == myid || isMartian(sa, salen) || isNodeBlacklisted(sa, salen)) @@ -552,7 +553,7 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir if (n->id != id) continue; /* Known node. Update stuff. */ - if (confirm || n->time < now - NODE_EXPIRE_TIME) { + if (confirm || n->time < now - Node::NODE_EXPIRE_TIME) { n->update(sa, salen); if (confirm) { n->received(now, confirm >= 2); @@ -593,7 +594,7 @@ Dht::newNode(const InfoHash& id, const sockaddr *sa, socklen_t salen, int confir of bad nodes fast. */ if (not n->isGood(now)) { dubious = true; - if (n->pinged_time + MAX_RESPONSE_TIME < now) { + if (n->pinged_time + Node::MAX_RESPONSE_TIME < now) { DHT_DEBUG("Sending ping to dubious node."); sendPing((sockaddr*)&n->ss, n->sslen, TransId {TransPrefix::PING}); n->pinged++; @@ -672,7 +673,7 @@ Dht::Search::removeExpiredNode(time_point now) while (e != nodes.cbegin()) { e = std::prev(e); const Node& n = *e->node; - if (n.isExpired(now) and n.time + NODE_EXPIRE_TIME < now) { + if (n.isExpired(now) and n.time + Node::NODE_EXPIRE_TIME < now) { nodes.erase(e); return true; } @@ -737,7 +738,7 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& return true; } -std::vector<std::shared_ptr<Dht::Node>> +std::vector<std::shared_ptr<Node>> Dht::Search::getNodes(time_point now) const { std::vector<std::shared_ptr<Node>> ret {}; @@ -768,11 +769,11 @@ Dht::searchSendGetValues(Search& sr, SearchNode *n, bool update) std::function<bool(const SearchNode&)> check_node; if (update) check_node = [this,sr](const SearchNode& sn) { - return not sn.node->isExpired(now) && (!sn.isSynced(now) || !sr.isUpdated(sn)) && sn.getStatus.request_time < now - MAX_RESPONSE_TIME; + return not sn.node->isExpired(now) && (!sn.isSynced(now) || !sr.isUpdated(sn)) && sn.getStatus.request_time < now - Node::MAX_RESPONSE_TIME; }; else check_node = [this](const SearchNode& sn) { - return not sn.node->isExpired(now) && !sn.isSynced(now) && sn.getStatus.request_time < now - MAX_RESPONSE_TIME; + return not sn.node->isExpired(now) && !sn.isSynced(now) && sn.getStatus.request_time < now - Node::MAX_RESPONSE_TIME; }; if (!n) { @@ -1015,7 +1016,7 @@ bool Dht::Search::isDone(const Get& get, time_point now) const { unsigned i = 0; - auto limit = std::max(get.start, now - NODE_EXPIRE_TIME); + auto limit = std::max(get.start, now - Node::NODE_EXPIRE_TIME); for (const auto& sn : nodes) { if (sn.node->isExpired(now)) continue; @@ -1036,16 +1037,16 @@ Dht::Search::getUpdateTime(time_point now) const for (const auto& sn : nodes) { if (sn.node->isExpired(now)) continue; - if (sn.getStatus.reply_time < now - NODE_EXPIRE_TIME) { // not isSynced + if (sn.getStatus.reply_time < now - Node::NODE_EXPIRE_TIME) { // not isSynced return std::min(ut, get_step_time + SEARCH_GET_STEP); } else if (sn.getStatus.reply_time < last_get) { - ut = std::min(ut, sn.getStatus.request_time + MAX_RESPONSE_TIME); + ut = std::min(ut, sn.getStatus.request_time + Node::MAX_RESPONSE_TIME); } else { d++; ut = std::min(ut, std::max( - sn.getStatus.request_time + MAX_RESPONSE_TIME, - sn.getStatus.reply_time + NODE_EXPIRE_TIME)); + sn.getStatus.request_time + Node::MAX_RESPONSE_TIME, + sn.getStatus.reply_time + Node::NODE_EXPIRE_TIME)); } if (++i == TARGET_NODES) break; @@ -1658,7 +1659,7 @@ Dht::expireStorage() std::partition(i->listeners.begin(), i->listeners.end(), [&](const Listener& l) { - bool expired = l.time + NODE_EXPIRE_TIME < now; + bool expired = l.time + Node::NODE_EXPIRE_TIME < now; if (expired) DHT_DEBUG("Discarding expired listener %s", l.id.toString().c_str()); // return false if the element should be removed @@ -1881,7 +1882,7 @@ Dht::getStorageLog() const for (const auto& l : st.listeners) { out << " " << "Listener " << l.id << " " << print_addr((sockaddr*)&l.ss, l.sslen); auto since = duration_cast<seconds>(now - l.time); - auto expires = duration_cast<seconds>(l.time + NODE_EXPIRE_TIME - now); + auto expires = duration_cast<seconds>(l.time + Node::NODE_EXPIRE_TIME - now); out << " (since " << since.count() << "s, exp in " << expires.count() << "s)" << std::endl; } for (const auto& v : st.values) { @@ -2526,7 +2527,7 @@ Dht::importValues(const std::vector<ValuesExport>& import) } -std::vector<Dht::NodeExport> +std::vector<NodeExport> Dht::exportNodes() { std::vector<NodeExport> nodes; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index eba77862e4a5750fe6d646f330b92a283de95260..e63cbe3df5f58d553ab724e0fbb3be04c626c875 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -394,7 +394,7 @@ DhtRunner::bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& } void -DhtRunner::bootstrap(const std::vector<Dht::NodeExport>& nodes) +DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) {