diff --git a/include/opendht/node.h b/include/opendht/node.h index 5ecb9a61e1e4853e2e01605840a26a409f79db99..14e8ae7c1929243ac9c026a5b3365680a5e9a3dc 100644 --- a/include/opendht/node.h +++ b/include/opendht/node.h @@ -50,6 +50,18 @@ struct Node { std::string getAddrStr() const { return addr.toString(); } + + /** + * Makes notice about an additionnal authentication error with this node. Up + * to MAX_AUTH_ERRORS errors are accepted in order to let the node recover. + * Upon this limit, the node expires. + */ + void authError() { + if (++auth_errors > MAX_AUTH_ERRORS) + setExpired(); + } + void authSuccess() { auth_errors = 0; } + bool isExpired() const { return expired_; } bool isGood(time_point now) const; bool isPendingMessage() const; @@ -83,7 +95,11 @@ struct Node { static constexpr const std::chrono::seconds MAX_RESPONSE_TIME {1}; private: + /* Number of times we accept authentication errors from this node. */ + static const constexpr unsigned MAX_AUTH_ERRORS {3}; + std::list<std::weak_ptr<Request>> requests_ {}; + unsigned auth_errors {0}; bool expired_ {false}; void clearPendingQueue() { diff --git a/include/opendht/value.h b/include/opendht/value.h index 320fd27e5fa7ee1dcada200b89e4b46a8adcd4a2..31e3ac418d426f3263db29eb75535e5ecd6d071d 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -116,13 +116,15 @@ struct ValueType { */ struct Value { - enum class Field { + enum class Field : int { None = 0, - Id, - ValueType, - OwnerPk, - SeqNum, - UserType, + Id, /* Value::id */ + ValueType, /* Value::type */ + OwnerPk, /* Value::owner */ + SeqNum, /* Value::seq */ + UserType, /* Value::user_type */ + + COUNT /* the total number of fields */ }; typedef uint64_t Id; @@ -814,13 +816,13 @@ struct Query { static const std::string QUERY_PARSE_ERROR; - Query(Select s = {}, Where w = {}) : select(s), where(w) { }; + Query(Select s = {}, Where w = {}, bool none = false) : select(s), where(w), none(none) { }; /** * Initializes a query based on a SQL-ish formatted string. The abstract * form of such a string is the following: * - * [SELECT <$field$> [WHERE <$field$=$value$>]] + * [SELECT $field$ [WHERE $field$=$value$]] * * where * @@ -864,6 +866,7 @@ struct Query Select select {}; Where where {}; + bool none {false}; /* When true, any query satisfies this. */ }; /*! diff --git a/src/dht.cpp b/src/dht.cpp index 9bcbbd6f723da2e2bcd4ee4ff21c8ed45bd2c109..1782f870a681ff534a312e2527fdaf4a4005770a 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -275,19 +275,25 @@ struct Dht::SearchNode { * @return true if we can send get, else false. */ bool canGet(time_point now, time_point update, std::shared_ptr<Query> q = {}) const { - if (node->isExpired() or not (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply)) + if (node->isExpired()) return false; - auto completed_sq_status {false}, pending_sq_status {false}; + auto pending {false}, completed_sq_status {false}, pending_sq_status {false}; for (const auto& s : getStatus) { + if (s.second and s.second->pending()) + pending = true; if (s.first and q and q->isSatisfiedBy(*s.first) and s.second) { if (s.second->pending() and not pending_sq_status) pending_sq_status = true; - if (s.second->reply_time > update and not completed_sq_status) + if (s.second->completed() and not (update > s.second->reply_time) and not completed_sq_status) completed_sq_status = true; + if (completed_sq_status and pending_sq_status) + break; } } - return not (hasStartedPagination(q) or completed_sq_status or pending_sq_status); + + return (not pending and now > last_get_reply + Node::NODE_EXPIRE_TIME) or + not (hasStartedPagination(q) or completed_sq_status or pending_sq_status); } /** @@ -402,25 +408,17 @@ struct Dht::SearchNode { or not gs->second or not gs->second->pending())) return time_point::min(); return ((gs != getStatus.cend() and gs->second and gs->second->pending()) - or ack == acked.cend() or not ack->second or ack->second->pending()) ? - time_point::max() : - ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN; + or ack == acked.cend() or not ack->second or ack->second->pending()) + ? time_point::max() + : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN; } /** - * Assumng the node is synced, should a "listen" request be sent to this node now ? + * Assumng the node is synced, should the "listen" request with Query q be + * sent to this node now ? */ - time_point getListenTime() const { - time_point t {time_point::max()}; - for (auto ls = listenStatus.begin(); ls != listenStatus.end() ; ++ls) { - t = std::min(t, getListenTime(ls)); - } - return t; - } time_point getListenTime(const std::shared_ptr<Query>& q) const { - return getListenTime(listenStatus.find(q)); - } - time_point getListenTime(SyncStatus::const_iterator listen_status) const { + auto listen_status = listenStatus.find(q); if (listen_status == listenStatus.end()) return time_point::min(); return listen_status->second->pending() ? time_point::max() : @@ -1080,7 +1078,7 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update auto cb = sr->callbacks.begin(); do { /* for all requests to send */ SearchNode* n = nullptr; - auto query = not sr->callbacks.empty() ? cb->second.query : std::make_shared<Query>(); + auto query = not sr->callbacks.empty() ? cb->second.query : std::make_shared<Query>(Select {}, Where {}, true); const time_point up = not sr->callbacks.empty() and update ? sr->getLastGetTime(query) : time_point::min(); @@ -1106,14 +1104,7 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update n->getStatus[query] = network_engine.sendFindNode(n->node, sr->id, -1, - [this,ws,query](const Request& status, NetworkEngine::RequestAnswer&& answer) { - if (auto sr = ws.lock()) { - if (auto sn = sr->getNode(status.node)) { - sn->getStatus.erase(query); - } - } - searchNodeGetDone(status, std::forward<NetworkEngine::RequestAnswer>(answer), ws, query); - }, + std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query)); } else { /* 'get' request */ @@ -1215,7 +1206,6 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { sr->af == AF_INET ? '4' : '6', sn->node->toString().c_str(), a.value->id); - /* TODO: kind of a hack. Other solution? */ auto ack_req = std::make_shared<Request>(); ack_req->reply_time = now; sn->acked[a.value->id] = std::move(ack_req); @@ -1223,6 +1213,9 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { /* step to clear announces */ scheduler.edit(sr->nextSearchStep, now); } + } else { + /* Search is now unsynced. Let's call searchStep to sync again. */ + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); } ++ait; } @@ -1304,8 +1297,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) std::weak_ptr<Search> ws = sr; n.listenStatus[query] = network_engine.sendListen(n.node, sr->id, *query, n.token, - [this,ws,last_req,query](const Request& req, - NetworkEngine::RequestAnswer&& answer) mutable + [this,ws,last_req,query](const Request& req, NetworkEngine::RequestAnswer&& answer) mutable { /* on done */ network_engine.cancelRequest(last_req); if (auto sr = ws.lock()) { @@ -1535,13 +1527,14 @@ Dht::Search::getListenTime(time_point now) const { if (listeners.empty()) return time_point::max(); + time_point listen_time {time_point::max()}; unsigned i = 0, t = 0; for (const auto& sn : nodes) { if (not sn.isSynced(now) or (sn.candidate and t >= LISTEN_NODES)) continue; - auto lt = sn.getListenTime(); - listen_time = std::min(listen_time, lt); + for (auto& l : listeners) + listen_time = std::min(listen_time, sn.getListenTime(l.second.query)); t++; if (not sn.candidate and ++i == LISTEN_NODES) break; @@ -3009,20 +3002,19 @@ Dht::pingNode(const sockaddr* sa, socklen_t salen) void Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) { if (e.getCode() == DhtProtocolException::UNAUTHORIZED) { + DHT_LOG.ERR("[node %s] token flush", req->node->toString().c_str()); + req->node->authError(); network_engine.cancelRequest(req); - unsigned cleared = 0; for (auto& srp : req->node->getFamily() == AF_INET ? searches4 : searches6) { auto& sr = srp.second; for (auto& n : sr->nodes) { if (n.node != req->node) continue; n.token.clear(); n.last_get_reply = time_point::min(); - cleared++; searchSendGetValues(sr); break; } } - DHT_LOG.WARN("[node %s] token flush (%d searches affected)", req->node->toString().c_str(), cleared); } } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 8d69afa5702700bfe065b628f8c48cf34c75d031..4ada3962fb26832344bfcb82898abff14b692a82 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -64,23 +64,23 @@ enum class MessageType { struct ParsedMessage { MessageType type; - InfoHash id; /* the id of the sender */ - NetId network {0}; /* network id */ - InfoHash info_hash; /* hash for which values are requested */ - InfoHash target; /* target id around which to find nodes */ - NetworkEngine::TransId tid; /* transaction id */ - Blob token; /* security token */ - Value::Id value_id; /* the value id */ - time_point created { time_point::max() }; /* time when value was first created */ - Blob nodes4_raw, nodes6_raw; /* IPv4 nodes in response to a 'find' request */ + InfoHash id; /* the id of the sender */ + NetId network {0}; /* network id */ + InfoHash info_hash; /* hash for which values are requested */ + InfoHash target; /* target id around which to find nodes */ + NetworkEngine::TransId tid; /* transaction id */ + Blob token; /* security token */ + Value::Id value_id; /* the value id */ + time_point created { time_point::max() }; /* time when value was first created */ + Blob nodes4_raw, nodes6_raw; /* IPv4 nodes in response to a 'find' request */ std::vector<std::shared_ptr<Node>> nodes4, nodes6; - std::vector<std::shared_ptr<Value>> values; /* values for a 'get' request */ + std::vector<std::shared_ptr<Value>> values; /* values for a 'get' request */ std::vector<std::shared_ptr<FieldValueIndex>> fields; /* index for fields values */ - Query query; /* query describing a filter to apply on values. */ - want_t want; /* states if ipv4 or ipv6 request */ - uint16_t error_code; /* error code in case of error */ + Query query; /* query describing a filter to apply on values. */ + want_t want; /* states if ipv4 or ipv6 request */ + uint16_t error_code; /* error code in case of error */ std::string ua; - SockAddr addr; /* reported address by the distant node */ + SockAddr addr; /* reported address by the distant node */ void msgpack_unpack(msgpack::object o); }; @@ -377,6 +377,9 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& break; } case MessageType::Reply: + if (msg.type == MessageType::AnnounceValue or msg.type == MessageType::Listen) + req->node->authSuccess(); + // erase before calling callback to make sure iterator is still valid if (not req->persistent) requests.erase(reqp); @@ -824,12 +827,14 @@ NetworkEngine::sendListen(std::shared_ptr<Node> n, const InfoHash& infohash, con msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5+(network?1:0)); - pk.pack(std::string("a")); pk.pack_map(3 + - (query.where.getFilter() or not query.select.getSelection().empty() ? 1:0)); + auto has_query = query.where.getFilter() or not query.select.getSelection().empty(); + pk.pack(std::string("a")); pk.pack_map(3 + has_query); pk.pack(std::string("id")); pk.pack(myid); pk.pack(std::string("h")); pk.pack(infohash); - pk.pack(std::string("q")); pk.pack(query); pk.pack(std::string("token")); packToken(pk, token); + if (has_query) { + pk.pack(std::string("q")); pk.pack(query); + } pk.pack(std::string("q")); pk.pack(std::string("listen")); pk.pack(std::string("t")); pk.pack_bin(tid.size()); diff --git a/src/value.cpp b/src/value.cpp index fc8eecd3374f9d45a50694bfa83da67e218a4a64..a1f4b628254dc01bff919ef7951a687b9ee5b832 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -209,7 +209,7 @@ FieldValueIndex::FieldValueIndex(const Value& v, Select s) }); } else { index.clear(); - for (size_t f = 1 ; f < 6 ; ++f) + for (size_t f = 1 ; f < static_cast<int>(Value::Field::COUNT) ; ++f) index[static_cast<Value::Field>(f)] = {}; } for (const auto& fvp : index) { @@ -421,7 +421,7 @@ bool Where::isSatisfiedBy(const Where& ow) const { } bool Query::isSatisfiedBy(const Query& q) const { - return where.isSatisfiedBy(q.where) and select.isSatisfiedBy(q.select); + return none or (where.isSatisfiedBy(q.where) and select.isSatisfiedBy(q.select)); } std::ostream& operator<<(std::ostream& s, const dht::Select& select) {