diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 65bceec0866a451a3f9fea2a98a45e08169497fa..d52e9b6ccde88cf2b6914cb58553df32edcd8ef3 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -452,11 +452,30 @@ private: void confirmNodes(); void expire(); + /** + * Generic function to execute when a 'get' request has completed. + * + * @param status The request passed by the network engine. + * @param answer The answer from the network engine. + * @param ws A weak pointer to the search concerned by the request. + * @param query The query sent to the node. + */ void searchNodeGetDone(const Request& status, NetworkEngine::RequestAnswer&& answer, std::weak_ptr<Search> ws, std::shared_ptr<Query> query); + + /** + * Generic function to execute when a 'get' request expires. + * + * @param status The request passed by the network engine. + * @param over Whether we're done to try sending the request to the node + * or not. This lets us mark a node as candidate. + * @param ws A weak pointer to the search concerned by the request. + * @param query The query sent to the node. + */ void searchNodeGetExpired(const Request& status, bool over, std::weak_ptr<Search> ws, std::shared_ptr<Query> query); + /** * This method recovers sends individual request for values per id. * @@ -471,6 +490,21 @@ private: */ SearchNode* searchSendGetValues(std::shared_ptr<Search> sr, SearchNode *n = nullptr, bool update = true); + /** + * Forwards an 'announce' request for a list of nodes to the network engine. + * + * @param sr The search for which we want to announce a value. + * @param announce The 'announce' structure. + */ + void searchSendAnnounceValue(const std::shared_ptr<Search>& sr); + + /** + * Main process of a Search's operations. This function will demand the + * network engine to send requests packets for all pending operations + * ('get', 'put' and 'listen'). + * + * @param sr The search to execute its operations. + */ void searchStep(std::shared_ptr<Search> sr); void dumpSearch(const Search& sr, std::ostream& out) const; diff --git a/include/opendht/value.h b/include/opendht/value.h index 70c1bb854b8b99e83e14e91381f49bb4bbb8e587..c7426f4b6f2901624cc4fb31db8ac07aa20ebf8d 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -121,6 +121,7 @@ struct Value Id, ValueType, OwnerPk, + SeqNum, UserType, }; @@ -166,6 +167,8 @@ struct Value } }; + /* Sneaky functions disguised in classes */ + static const Filter AllFilter() { return [](const Value&){return true;}; } @@ -188,24 +191,29 @@ struct Value }; } - static Filter recipientFilter(const InfoHash& r) { + static Filter RecipientFilter(const InfoHash& r) { return [r](const Value& v) { return v.recipient == r; }; } - static Filter ownerFilter(const crypto::PublicKey& pk) { - return ownerFilter(pk.getId()); + static Filter OwnerFilter(const crypto::PublicKey& pk) { + return OwnerFilter(pk.getId()); } - static Filter ownerFilter(const InfoHash& pkh) { + static Filter OwnerFilter(const InfoHash& pkh) { return [pkh](const Value& v) { return v.owner and v.owner->getId() == pkh; }; } - static Filter userTypeFilter(const std::string& ut) - { + static Filter SeqNumFilter(uint16_t seq_no) { + return [seq_no](const Value& v) { + return v.seq == seq_no; + }; + } + + static Filter UserTypeFilter(const std::string& ut) { return [ut](const Value& v) { return v.user_type == ut; }; @@ -468,6 +476,9 @@ struct Value else InfoHash().msgpack_pack(pk); break; + case Value::Field::SeqNum: + pk.pack(static_cast<uint64_t>(seq)); + break; case Value::Field::UserType: pk.pack(user_type); break; @@ -739,6 +750,18 @@ struct Where return *this; } + /** + * Adds restriction on Value::OwnerPk based on the owner_pk_hash argument. + * + * @param owner_pk_hash the owner public key fingerprint. + * + * @return the resulting Where instance. + */ + Where& seq(uint16_t seq_no) { + filters_.emplace_back(Value::Field::SeqNum, seq_no); + return *this; + } + /** * Adds restriction on Value::UserType based on the user_type argument. * diff --git a/src/dht.cpp b/src/dht.cpp index 8ba46df6b4aa5cad65a562fd3e4a43c2a71401c5..74c90f4d6bad3a9a132fd43836a44e5dfc339bc5 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -230,14 +230,17 @@ struct Dht::LocalListener { struct Dht::SearchNode { using AnnounceStatus = std::map<Value::Id, std::shared_ptr<Request>>; using SyncStatus = std::map<std::shared_ptr<Query>, std::shared_ptr<Request>>; - using PaginationQueries = std::map<std::shared_ptr<Query>, std::vector<std::shared_ptr<Query>>>; std::shared_ptr<Node> node {}; /* the node info */ - PaginationQueries pagination_queries {}; - SyncStatus getStatus {}; /* get/sync status */ - SyncStatus listenStatus {}; /* listen status */ - AnnounceStatus acked {}; /* announcement status for a given value id */ + /* queries sent for finding out values hosted by the node */ + std::shared_ptr<Query> probe_query {}; + /* queries substituting formal 'get' requests */ + std::map<std::shared_ptr<Query>, std::vector<std::shared_ptr<Query>>> pagination_queries {}; + + SyncStatus getStatus {}; /* get/sync status */ + SyncStatus listenStatus {}; /* listen status */ + AnnounceStatus acked {}; /* announcement status for a given value id */ Blob token {}; /* last token the node sent to us after a get request */ time_point last_get_reply {time_point::min()}; /* last time received valid token */ @@ -365,9 +368,8 @@ struct Dht::SearchNode { bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const { auto ack = acked.find(vid); - if (ack == acked.end() or not ack->second) { + if (ack == acked.end() or not ack->second) return false; - } return ack->second->reply_time + type.expiration > now; } @@ -396,14 +398,13 @@ struct Dht::SearchNode { /** * Assumng the node is synced, should a "put" request be sent to this node now ? */ - time_point getAnnounceTime(AnnounceStatus::const_iterator ack, const ValueType& type) const { - if (ack == acked.end() or not ack->second) - return time_point::min(); - return ack->second->pending() ? time_point::max() : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN; - } - time_point getAnnounceTime(Value::Id vid, const ValueType& type) const { - return getAnnounceTime(acked.find(vid), type); + const auto& ack = acked.find(vid); + const auto& gs = probe_query ? getStatus.find(probe_query) : getStatus.cend(); + if ((ack == acked.cend() or not ack->second) and (gs == getStatus.cend() + or not gs->second or not gs->second->pending())) + return time_point::min(); + return gs->second->pending() ? time_point::max() : gs->second->reply_time + type.expiration - REANNOUNCE_MARGIN; } /** @@ -541,6 +542,32 @@ struct Dht::Search { bool removeExpiredNode(time_point now); + /** + * If the value was just successfully announced, call the callback and erase it if not permanent. + * + * @param vid The id of the announced value. + * @param types The sequence of existing types. + * @param now The time reference to now. + */ + void announced(Value::Id vid, const std::map<ValueType::Id, ValueType>& types, time_point now) { + announce.erase(std::remove_if(announce.begin(), announce.end(), + [this,&vid,&now,&types](Announce& a) { + if (!a.value || a.value->id != vid) + return false; + const auto& type_it = types.find(a.value->type); + const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; + if (isAnnounced(vid, type, now)) { + if (a.callback) { + a.callback(true, getNodes()); + a.callback = nullptr; + } + if (not a.permanent) + return true; + } + return false; + }), announce.end()); + } + std::vector<std::shared_ptr<Node>> getNodes() const; void clear() { @@ -1062,6 +1089,117 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update return n; } +void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { + if (sr->announce.empty()) + return; + unsigned i = 0; + auto probe_query = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum)); + std::weak_ptr<Search> ws = sr; + const auto& now = scheduler.time(); + for (auto& n : sr->nodes) { + auto something_to_announce = std::find_if(sr->announce.cbegin(), sr->announce.cend(), + [this,&now,&sr,&n](const Announce& a) { + return n.isSynced(now) and n.getAnnounceTime(a.value->id, getType(a.value->type)) <= now; + }) != sr->announce.cend(); + if (not something_to_announce) + continue; + + auto onDone = [this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer) + { /* when put done */ + if (auto sr = ws.lock()) { + onAnnounceDone(status, answer, sr); + searchStep(sr); + } + }; + auto onExpired = [this,ws](const Request&, bool over) + { /* when put expired */ + if (over) + if (auto sr = ws.lock()) + scheduler.edit(sr->nextSearchStep, scheduler.time()); + }; + auto onSelectDone = + [this,ws,onDone,onExpired](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable + { /* on probing done */ + const auto& now = scheduler.time(); + if (auto sr = ws.lock()) { + if (auto sn = sr->getNode(status.node)) { + for (auto ait = sr->announce.begin(); ait != sr->announce.end();) { + auto& a = *ait; + if (not (sn->isSynced(now) and sn->getAnnounceTime(a.value->id, getType(a.value->type)) <= now)) + continue; + if (!a.value) continue; + + auto hasValue {false}; + uint16_t seq_no; + try { + const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), + [&a](const std::shared_ptr<FieldValueIndex>& i){ + return i->index.at(Value::Field::Id).getInt() == a.value->id; + }); + if (f != answer.fields.cend() and *f) { + hasValue = true; + seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt()); + } + } catch (std::out_of_range&) { } + + /* only put the value if the node doesn't already have it */ + if (not hasValue or seq_no < a.value->seq) { + DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)", + sr->id.toString().c_str(), + sr->af == AF_INET ? '4' : '6', + sn->node->toString().c_str(), + a.value->id); + sn->acked[a.value->id] = network_engine.sendAnnounceValue(sn->node, + sr->id, + *a.value, + a.created, + sn->token, + onDone, + onExpired); + } else { + DHT_LOG.WARN("[search %s IPv%c] [node %s] already has value (vid: %d). Aborting.", + sr->id.toString().c_str(), + sr->af == AF_INET ? '4' : '6', + sn->node->toString().c_str(), + a.value->id); + /* TODO: kind of a hack. Other solution? */ + auto ack_req = std::make_shared<Request>(); + ack_req->reply_time = now; + sn->acked[a.value->id] = std::move(ack_req); + if (sr->isAnnounced(a.value->id, getType(a.value->type), now)){ + if (a.callback) { + a.callback(true, sr->getNodes()); + a.callback = nullptr; + } + if (not a.permanent) { + ait = sr->announce.erase(ait); + continue; + } + } + } + ++ait; + } + } + + if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) + sr->done = true; + } + }; + DHT_LOG.WARN("[search %s IPv%c] [node %s] sending %s", + sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', + n.node->toString().c_str(), probe_query->toString().c_str()); + n.probe_query = probe_query; + n.getStatus[probe_query] = network_engine.sendGetValues(n.node, + sr->id, + *probe_query, + -1, + onSelectDone, + std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, probe_query)); + if (not n.candidate and ++i == TARGET_NODES) + break; + } +} + /* When a search is in progress, we periodically call search_step to send further requests. */ void @@ -1149,53 +1287,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) } // Announce requests - for (auto ait = sr->announce.begin(); ait != sr->announce.end();) { - auto& a = *ait; - if (!a.value) continue; - auto vid = a.value->id; - const auto& type = getType(a.value->type); - if (sr->isAnnounced(vid, type, now)) { - if (a.callback) { - a.callback(true, sr->getNodes()); - a.callback = nullptr; - } - if (not a.permanent) { - ait = sr->announce.erase(ait); - continue; - } - } - - unsigned i = 0; - for (auto& n : sr->nodes) { - if (not n.isSynced(now)) - continue; - if (n.getAnnounceTime(vid, type) <= now) { - DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', n.node->toString().c_str(), vid); - std::weak_ptr<Search> ws = sr; - n.acked[vid] = network_engine.sendAnnounceValue(n.node, sr->id, *a.value, a.created, n.token, - [this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer) - { /* on done */ - if (auto sr = ws.lock()) { - onAnnounceDone(status, answer, sr); - searchStep(sr); - } - }, - [this,ws](const Request&, bool over) - { /* on expired */ - if (over) - if (auto sr = ws.lock()) - scheduler.edit(sr->nextSearchStep, scheduler.time()); - } - ); - } - if (not n.candidate and ++i == TARGET_NODES) - break; - } - ++ait; - } - if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) - sr->done = true; + searchSendAnnounceValue(sr); } if (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES) { @@ -1532,8 +1624,12 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q } void -Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, - time_point created, bool permanent) +Dht::announce(const InfoHash& id, + sa_family_t af, + std::shared_ptr<Value> value, + DoneCallback callback, + time_point created, + bool permanent) { const auto& now = scheduler.time(); if (!value) { @@ -1559,14 +1655,18 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, }); if (a_sr == sr->announce.end()) { sr->announce.emplace_back(Announce {permanent, value, std::min(now, created), callback}); - for (auto& n : sr->nodes) + for (auto& n : sr->nodes) { + n.probe_query.reset(); n.acked[value->id].reset(); + } } else { if (a_sr->value != value) { a_sr->value = value; - for (auto& n : sr->nodes) + for (auto& n : sr->nodes) { n.acked[value->id].reset(); + n.probe_query.reset(); + } } if (sr->isAnnounced(value->id, getType(value->type), now)) { if (a_sr->callback) @@ -1583,12 +1683,6 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, } } scheduler.edit(sr->nextSearchStep, scheduler.time()); - //TODO - //if (tm < search_time) { - // DHT_LOG.ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(),l - // (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now())); - // search_time = tm; - //} } size_t @@ -3132,25 +3226,8 @@ Dht::onAnnounceDone(const Request&, NetworkEngine::RequestAnswer& answer, std::s const auto& now = scheduler.time(); DHT_LOG.DEBUG("[search %s IPv%c] got reply to put!", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); - searchSendGetValues(sr); - - // If the value was just successfully announced, call the callback - sr->announce.erase(std::remove_if(sr->announce.begin(), sr->announce.end(), - [&](Announce& a) { - if (!a.value || a.value->id != answer.vid) - return false; - auto type = getType(a.value->type); - if (sr->isAnnounced(answer.vid, type, now)) { - if (a.callback) { - a.callback(true, sr->getNodes()); - a.callback = nullptr; - } - if (not a.permanent) - return true; - } - return false; - }), sr->announce.end()); + sr->announced(answer.vid, types, now); } } diff --git a/src/value.cpp b/src/value.cpp index 5370ef14916576d160dac3bc0ab4bfc1f0f747f1..fc8eecd3374f9d45a50694bfa83da67e218a4a64 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -167,6 +167,7 @@ FieldValue::operator==(const FieldValue& vfd) const switch (field) { case Value::Field::Id: case Value::Field::ValueType: + case Value::Field::SeqNum: return intValue == vfd.intValue; case Value::Field::OwnerPk: return hashValue == vfd.hashValue; @@ -188,9 +189,11 @@ FieldValue::getLocalFilter() const case Value::Field::ValueType: return Value::TypeFilter(intValue); case Value::Field::OwnerPk: - return Value::ownerFilter(hashValue); + return Value::OwnerFilter(hashValue); + case Value::Field::SeqNum: + return Value::SeqNumFilter(intValue); case Value::Field::UserType: - return Value::userTypeFilter(std::string {blobValue.begin(), blobValue.end()}); + return Value::UserTypeFilter(std::string {blobValue.begin(), blobValue.end()}); default: return Value::AllFilter(); } @@ -206,7 +209,7 @@ FieldValueIndex::FieldValueIndex(const Value& v, Select s) }); } else { index.clear(); - for (size_t f = 1 ; f < 5 ; ++f) + for (size_t f = 1 ; f < 6 ; ++f) index[static_cast<Value::Field>(f)] = {}; } for (const auto& fvp : index) { @@ -221,6 +224,9 @@ FieldValueIndex::FieldValueIndex(const Value& v, Select s) case Value::Field::OwnerPk: index[f] = {f, v.owner ? v.owner->getId() : InfoHash() }; break; + case Value::Field::SeqNum: + index[f] = {f, v.seq}; + break; case Value::Field::UserType: index[f] = {f, Blob {v.user_type.begin(), v.user_type.end()}}; break; @@ -254,6 +260,9 @@ std::ostream& operator<<(std::ostream& os, const FieldValueIndex& fvi) { case Value::Field::OwnerPk: os << "Owner:" << v->second.getHash().toString(); break; + case Value::Field::SeqNum: + os << "Seq:" << v->second.getInt(); + break; case Value::Field::UserType: { auto ut = v->second.getBlob(); os << "UserType:" << std::string(ut.begin(), ut.end()); @@ -278,6 +287,7 @@ FieldValueIndex::msgpack_unpack_fields(const std::set<Value::Field>& fields, con switch (field) { case Value::Field::Id: case Value::Field::ValueType: + case Value::Field::SeqNum: index[field] = FieldValue(field, field_value.as<uint64_t>()); break; case Value::Field::OwnerPk: @@ -315,6 +325,8 @@ Select::Select(const std::string& q_str) { field(Value::Field::ValueType); else if (token == "owner_pk") field(Value::Field::OwnerPk); + if (token == "seq") + field(Value::Field::SeqNum); else if (token == "user_type") field(Value::Field::UserType); } @@ -342,7 +354,10 @@ Where::Where(const std::string& q_str) { std::string s {}; std::istringstream convert {value_str}; convert >> v; - if (convert.failbit and value_str.size() > 1 and value_str[0] == '\"' and value_str[value_str.size()-1] == '\"') + if (convert.failbit + and value_str.size() > 1 + and value_str[0] == '\"' + and value_str[value_str.size()-1] == '\"') s = value_str.substr(1, value_str.size()-2); else s = value_str; @@ -352,6 +367,8 @@ Where::Where(const std::string& q_str) { valueType(v); else if (field_str == "owner_pk") owner(InfoHash(s)); + else if (field_str == "seq") + seq(v); else if (field_str == "user_type") userType(s); else @@ -423,6 +440,9 @@ std::ostream& operator<<(std::ostream& s, const dht::Select& select) { case Value::Field::OwnerPk: s << "owner_public_key"; break; + case Value::Field::SeqNum: + s << "seq"; + break; default: break; } @@ -445,6 +465,9 @@ std::ostream& operator<<(std::ostream& s, const dht::Where& where) { case Value::Field::OwnerPk: s << "owner_pk_hash=" << f->getHash().toString(); break; + case Value::Field::SeqNum: + s << "seq=" << f->getInt(); + break; case Value::Field::UserType: { auto b = f->getBlob(); s << "user_type=" << std::string {b.begin(), b.end()};