diff --git a/include/opendht/dht.h b/include/opendht/dht.h index f49afbb35e81a6fd455daf0c38ec6f3fec563ebb..d0dee30133c9aa748cbffef06b5f0751384d0fad 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -538,30 +538,37 @@ private: /* when we receive a ping request */ NetworkEngine::RequestAnswer onPing(std::shared_ptr<Node> node); /* when we receive a "find node" request */ - NetworkEngine::RequestAnswer onFindNode(std::shared_ptr<Node> node, InfoHash& hash, want_t want); + NetworkEngine::RequestAnswer onFindNode(std::shared_ptr<Node> node, const InfoHash& hash, want_t want); void onFindNodeDone(const Request& status, NetworkEngine::RequestAnswer& a, std::shared_ptr<Search> sr); /* when we receive a "get values" request */ - NetworkEngine::RequestAnswer onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t want, const Query& q); + NetworkEngine::RequestAnswer onGetValues(std::shared_ptr<Node> node, + const InfoHash& hash, + want_t want, + const Query& q); void onGetValuesDone(const Request& status, NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query); /* when we receive a listen request */ NetworkEngine::RequestAnswer onListen(std::shared_ptr<Node> node, - InfoHash& hash, - Blob& token, + const InfoHash& hash, + const Blob& token, size_t rid, - Query&& query); + const Query& query); void onListenDone(const Request& status, NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query); /* when we receive an announce request */ NetworkEngine::RequestAnswer onAnnounce(std::shared_ptr<Node> node, - InfoHash& hash, - Blob& token, - std::vector<std::shared_ptr<Value>> v, - time_point created); + const InfoHash& hash, + const Blob& token, + const std::vector<std::shared_ptr<Value>>& v, + const time_point& created); + NetworkEngine::RequestAnswer onRefresh(std::shared_ptr<Node> node, + const InfoHash& hash, + const Blob& token, + const Value::Id& vid); void onAnnounceDone(const Request& status, NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr); }; diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 60f9a3fbbb55ad2e06c911ba74b3f53eb987e9f3..3d8a4157b8f5b805185769e7c5b0e2078bb3168c 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -47,6 +47,7 @@ public: // sent to another peer (http-like). static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */ static const constexpr uint16_t UNAUTHORIZED {401}; /* wrong tokens. */ + static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */ // for internal use (custom). static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */ static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */ @@ -57,6 +58,7 @@ public: static const std::string LISTEN_WRONG_TOKEN; /* wrong token in "listen" request */ static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */ static const std::string PUT_WRONG_TOKEN; /* got "put" request with wrong token */ + static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */ static const std::string PUT_INVALID_ID; /* invalid id in "put" request */ DhtProtocolException(uint16_t code, const std::string& msg="", InfoHash failing_node_id={}) @@ -90,6 +92,7 @@ struct ParsedMessage; * @param onGetValues callback for "get values" request. * @param onListen callback for "listen" request. * @param onAnnounce callback for "announce" request. + * @param onRefresh callback for "refresh" request. */ class NetworkEngine final { @@ -99,6 +102,7 @@ class NetworkEngine final static const TransPrefix FIND_NODE; static const TransPrefix GET_VALUES; static const TransPrefix ANNOUNCE_VALUES; + static const TransPrefix REFRESH; static const TransPrefix LISTEN; }; public: @@ -212,7 +216,7 @@ private: * or ipv6. */ std::function<RequestAnswer(std::shared_ptr<Node>, - InfoHash&, + const InfoHash&, want_t)> onFindNode {}; /** * @brief on "get values" request callback. @@ -223,9 +227,9 @@ private: * or ipv6. */ std::function<RequestAnswer(std::shared_ptr<Node>, - InfoHash&, + const InfoHash&, want_t, - Query)> onGetValues {}; + const Query&)> onGetValues {}; /** * @brief on listen request callback. * @@ -235,10 +239,10 @@ private: * @param rid (type: uint16_t) request id. */ std::function<RequestAnswer(std::shared_ptr<Node>, - InfoHash&, - Blob&, + const InfoHash&, + const Blob&, uint16_t, - Query)> onListen {}; + const Query&)> onListen {}; /** * @brief on announce request callback. * @@ -249,10 +253,22 @@ private: * @param created (type: time_point) time when the value was created. */ std::function<RequestAnswer(std::shared_ptr<Node>, - InfoHash&, - Blob&, - std::vector<std::shared_ptr<Value>>, - time_point)> onAnnounce {}; + const InfoHash&, + const Blob&, + const std::vector<std::shared_ptr<Value>>&, + const time_point&)> onAnnounce {}; + /** + * @brief on refresh request callback. + * + * @param node (type: std::shared_ptr<Node>) the requesting node. + * @param vhash (type: InfoHash) hash of the value of interest. + * @param token (type: Blob) security token. + * @param vid (type: Value::id) the value id. + */ + std::function<RequestAnswer(std::shared_ptr<Node>, + const InfoHash&, + const Blob&, + const Value::Id&)> onRefresh {}; public: using RequestCb = std::function<void(const Request&, RequestAnswer&&)>; @@ -267,7 +283,8 @@ public: decltype(NetworkEngine::onFindNode) onFindNode, decltype(NetworkEngine::onGetValues) onGetValues, decltype(NetworkEngine::onListen) onListen, - decltype(NetworkEngine::onAnnounce) onAnnounce); + decltype(NetworkEngine::onAnnounce) onAnnounce, + decltype(NetworkEngine::onRefresh) onRefresh); virtual ~NetworkEngine(); @@ -330,6 +347,13 @@ public: const Blob& token, RequestCb on_done, RequestExpiredCb on_expired); + std::shared_ptr<Request> + sendRefreshValue(std::shared_ptr<Node> n, + const InfoHash& infohash, + const Value::Id& vid, + const Blob& token, + RequestCb on_done, + RequestExpiredCb on_expired); /** * Parses a message and calls appropriate callbacks. @@ -373,7 +397,7 @@ private: static const constexpr size_t NODE4_INFO_BUF_LEN {26}; /* the length of a node info buffer in ipv6 format */ static const constexpr size_t NODE6_INFO_BUF_LEN {38}; - /* TODO */ + /* after a UDP reply, the period during which we tell the link layer about it */ static constexpr std::chrono::seconds UDP_REPLY_TIME {15}; /* Max. time to receive a full fragmented packet */ @@ -421,6 +445,7 @@ private: unsigned get {0}; unsigned put {0}; unsigned listen {0}; + unsigned refresh {0}; }; diff --git a/src/dht.cpp b/src/dht.cpp index 08753f96b1cc1f45724f14d7463af15ad1ddca5d..cc5c909daf78cd8295dce9dcc0ee0fce4982c61e 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -182,6 +182,20 @@ struct Dht::Storage { std::tuple<ValueStorage*, ssize_t, ssize_t> store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left); + /** + * Refreshes the time point of the value's lifetime begining. + * + * @param now The reference to now; + * @param vid The value id; + */ + void refresh(const time_point& now, const Value::Id& vid) { + auto vs = std::find_if(values.begin(), values.end(), [&vid](const ValueStorage& vs) { + return vs.data->id == vid; + }); + if (vs != values.end()) + vs->time = now; + } + std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now); private: @@ -2648,7 +2662,8 @@ Dht::Dht(int s, int s6, Config config) std::bind(&Dht::onFindNode, this, _1, _2, _3), std::bind(&Dht::onGetValues, this, _1, _2, _3, _4), std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5), - std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5)) + std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5), + std::bind(&Dht::onRefresh, this, _1, _2, _3, _4)) { scheduler.syncTime(); if (s < 0 && s6 < 0) @@ -3067,6 +3082,9 @@ Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) { break; } } + } else if (e.getCode() == DhtProtocolException::NOT_FOUND) { + DHT_LOG.ERR("[node %s] returned error 404: storage not found", req->node->id.toString().c_str()); + network_engine.cancelRequest(req); } } @@ -3086,7 +3104,7 @@ Dht::onPing(std::shared_ptr<Node>) } NetworkEngine::RequestAnswer -Dht::onFindNode(std::shared_ptr<Node> node, InfoHash& target, want_t want) +Dht::onFindNode(std::shared_ptr<Node> node, const InfoHash& target, want_t want) { const auto& now = scheduler.time(); NetworkEngine::RequestAnswer answer; @@ -3099,7 +3117,7 @@ Dht::onFindNode(std::shared_ptr<Node> node, InfoHash& target, want_t want) } NetworkEngine::RequestAnswer -Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t, const Query& query) +Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const Query& query) { if (hash == zeroes) { DHT_LOG_WARN("[node %s] Eek! Got get_values with no info_hash.", node->toString().c_str()); @@ -3196,7 +3214,7 @@ void Dht::onGetValuesDone(const Request& status, } NetworkEngine::RequestAnswer -Dht::onListen(std::shared_ptr<Node> node, InfoHash& hash, Blob& token, size_t rid, Query&& query) +Dht::onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, size_t rid, const Query& query) { if (hash == zeroes) { DHT_LOG_WARN("[node %s] Listen with no info_hash.", node->toString().c_str()); @@ -3209,7 +3227,8 @@ Dht::onListen(std::shared_ptr<Node> node, InfoHash& hash, Blob& token, size_t ri DHT_LOG_WARN("[node %s] incorrect token %s for 'listen'.", node->toString().c_str(), hash.toString().c_str()); throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::LISTEN_WRONG_TOKEN}; } - storageAddListener(hash, node, rid, std::forward<Query>(query)); + Query q = query; + storageAddListener(hash, node, rid, std::move(q)); return {}; } @@ -3237,10 +3256,10 @@ Dht::onListenDone(const Request& status, NetworkEngine::RequestAnswer Dht::onAnnounce(std::shared_ptr<Node> node, - InfoHash& hash, - Blob& token, - std::vector<std::shared_ptr<Value>> values, - time_point created) + const InfoHash& hash, + const Blob& token, + const std::vector<std::shared_ptr<Value>>& values, + const time_point& created) { if (hash == zeroes) { DHT_LOG_WARN("Put with no info_hash."); @@ -3302,6 +3321,28 @@ Dht::onAnnounce(std::shared_ptr<Node> node, return {}; } +NetworkEngine::RequestAnswer +Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid) +{ + const auto& now = scheduler.time(); + if (not tokenMatch(token, (sockaddr*)&node->addr.first)) { + DHT_LOG.WARN("[node %s] incorrect token %s for 'put'.", node->toString().c_str(), hash.toString().c_str()); + throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN}; + } + + auto s = store.find(hash); + if (s != store.end()) { + DHT_LOG.DEBUG("[Storage %s] refreshed value (vid: %d).", hash.toString().c_str(), vid); + s->second.refresh(now, vid); + } else { + DHT_LOG.DEBUG("[node %s] got refresh value for unknown storage (id: %s).", + node->id.toString().c_str(), + hash.toString().c_str()); + throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND}; + } + return {}; +} + void Dht::onAnnounceDone(const Request& req, NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr) { diff --git a/src/network_engine.cpp b/src/network_engine.cpp index fbd330c792de90b3e1c4a3f1fab3dce49501238e..f5c444f75617035847df20378b2ea6a66c37c333 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -33,6 +33,7 @@ const std::string DhtProtocolException::LISTEN_WRONG_TOKEN {"Listen with wrong t const std::string DhtProtocolException::PUT_NO_INFOHASH {"Put with no info_hash"}; const std::string DhtProtocolException::PUT_WRONG_TOKEN {"Put with wrong token"}; const std::string DhtProtocolException::PUT_INVALID_ID {"Put with invalid id"}; +const std::string DhtProtocolException::STORAGE_NOT_FOUND {"Access operation for unknown storage"}; constexpr std::chrono::seconds NetworkEngine::UDP_REPLY_TIME; constexpr std::chrono::seconds NetworkEngine::RX_MAX_PACKET_TIME; @@ -43,10 +44,11 @@ const constexpr uint16_t NetworkEngine::TransId::INVALID; std::mt19937 NetworkEngine::rd_device {dht::crypto::random_device{}()}; const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::PING = {"pn"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::FIND_NODE = {"fn"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::GET_VALUES = {"gt"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::ANNOUNCE_VALUES = {"pt"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::LISTEN = {"lt"}; +const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::FIND_NODE = {"fn"}; +const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::GET_VALUES = {"gt"}; +const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::ANNOUNCE_VALUES = {"pt"}; +const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::REFRESH = {"rf"}; +const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::LISTEN = {"lt"}; constexpr long unsigned NetworkEngine::MAX_REQUESTS_PER_SEC; static const uint8_t v4prefix[16] = { @@ -62,6 +64,7 @@ enum class MessageType { FindNode, GetValues, AnnounceValue, + Refresh, Listen, ValueData }; @@ -139,10 +142,11 @@ NetworkEngine::NetworkEngine(InfoHash& myid, NetId net, int s, int s6, Logger& l decltype(NetworkEngine::onFindNode) onFindNode, decltype(NetworkEngine::onGetValues) onGetValues, decltype(NetworkEngine::onListen) onListen, - decltype(NetworkEngine::onAnnounce) onAnnounce) : + decltype(NetworkEngine::onAnnounce) onAnnounce, + decltype(NetworkEngine::onRefresh) onRefresh) : onError(onError), onNewNode(onNewNode), onReportedAddr(onReportedAddr), onPing(onPing), onFindNode(onFindNode), - onGetValues(onGetValues), onListen(onListen), onAnnounce(onAnnounce), myid(myid), network(net), - dht_socket(s), dht_socket6(s6), DHT_LOG(log), scheduler(scheduler) + onGetValues(onGetValues), onListen(onListen), onAnnounce(onAnnounce), onRefresh(onRefresh), myid(myid), + network(net), dht_socket(s), dht_socket6(s6), DHT_LOG(log), scheduler(scheduler) { transaction_id = std::uniform_int_distribution<decltype(transaction_id)>{1}(rd_device); } @@ -460,14 +464,14 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro switch (msg->type) { case MessageType::Error: { - if (msg->error_code == DhtProtocolException::UNAUTHORIZED - && msg->id != zeroes - && (msg->tid.matches(TransPrefix::ANNOUNCE_VALUES) - || msg->tid.matches(TransPrefix::LISTEN))) + if (msg->id != zeroes and ( + (msg->error_code == DhtProtocolException::NOT_FOUND and msg->tid.matches(TransPrefix::REFRESH)) or + (msg->error_code == DhtProtocolException::UNAUTHORIZED and (msg->tid.matches(TransPrefix::ANNOUNCE_VALUES) + or msg->tid.matches(TransPrefix::LISTEN))))) { req->last_try = TIME_INVALID; req->reply_time = TIME_INVALID; - onError(req, DhtProtocolException {DhtProtocolException::UNAUTHORIZED}); + onError(req, DhtProtocolException {msg->error_code}); } else { DHT_LOG_WARN("[node %s %s] received unknown error message %u", msg->id.toString().c_str(), from.toString().c_str(), msg->error_code); @@ -530,6 +534,14 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro } break; } + case MessageType::Refresh: + DHT_LOG.DEBUG("[node %s %s] got 'refresh value' request for %s.", + msg->id.toString().c_str(), from.toString().c_str(), + msg->info_hash.toString().c_str()); + onRefresh(node, msg->info_hash, msg->token, msg->value_id); + /* Same note as above in MessageType::AnnounceValue applies. */ + sendValueAnnounced(from, msg->tid, msg->value_id); + break; case MessageType::Listen: { DHT_LOG_DEBUG("[node %s] got 'listen' request for %s.", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.listen; @@ -1010,8 +1022,14 @@ NetworkEngine::sendListenConfirmation(const SockAddr& addr, TransId tid) { } std::shared_ptr<Request> -NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, const InfoHash& infohash, const std::shared_ptr<Value>& value, time_point created, - const Blob& token, RequestCb on_done, RequestExpiredCb on_expired) { +NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, + const InfoHash& infohash, + const std::shared_ptr<Value>& value, + time_point created, + const Blob& token, + RequestCb on_done, + RequestExpiredCb on_expired) +{ auto tid = TransId {TransPrefix::ANNOUNCE_VALUES, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -1062,6 +1080,59 @@ NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, const InfoHash& infoha return req; } +std::shared_ptr<Request> +NetworkEngine::sendRefreshValue(std::shared_ptr<Node> n, + const InfoHash& infohash, + const Value::Id& vid, + const Blob& token, + RequestCb on_done, + RequestExpiredCb on_expired) +{ + auto tid = TransId {TransPrefix::REFRESH, getNewTid()}; + msgpack::sbuffer buffer; + msgpack::packer<msgpack::sbuffer> pk(&buffer); + pk.pack_map(5+(network?1:0)); + + pk.pack(std::string("a")); pk.pack_map(4); + pk.pack(std::string("id")); pk.pack(myid); + pk.pack(std::string("h")); pk.pack(infohash); + pk.pack(std::string("vid")); pk.pack(vid); + pk.pack(std::string("token")); pk.pack(token); + + pk.pack(std::string("q")); pk.pack(std::string("refresh")); + pk.pack(std::string("t")); pk.pack_bin(tid.size()); + pk.pack_bin_body((const char*)tid.data(), tid.size()); + pk.pack(std::string("y")); pk.pack(std::string("q")); + pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } + + Blob b {buffer.data(), buffer.data() + buffer.size()}; + std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), + [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ + if (msg.value_id == Value::INVALID_ID) { + DHT_LOG.DEBUG("Unknown search or announce!"); + } else { + if (on_done) { + RequestAnswer answer {}; + answer.vid = msg.value_id; + on_done(req_status, std::move(answer)); + } + } + }, + [=](const Request& req_status, bool done) { /* on expired */ + if (on_expired) { + on_expired(req_status, done); + } + } + }); + sendRequest(req); + ++out_stats.refresh; + return req; + +} + void NetworkEngine::sendValueAnnounced(const SockAddr& addr, TransId tid, Value::Id vid) { msgpack::sbuffer buffer; @@ -1156,6 +1227,8 @@ ParsedMessage::msgpack_unpack(msgpack::object msg) type = MessageType::Listen; else if (q == "put") type = MessageType::AnnounceValue; + else if (q == "refresh") + type = MessageType::Refresh; else throw msgpack::type_error();