diff --git a/src/dht.cpp b/src/dht.cpp index 6e1241c020e8adb428487f1738636f5a31a740c4..8f5f95463ba0efe8afb4f2fe3b9e2e9f99522bb0 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -240,7 +240,16 @@ struct Dht::LocalListener { }; struct Dht::SearchNode { - using AnnounceStatus = std::map<Value::Id, std::shared_ptr<Request>>; + /** + * Foreach value id, we keep track of a pair (Request, time_point) where the + * request is the request returned by the network engine and the time_point + * is the last time at which the value has been refreshed. + */ + using AnnounceStatus = std::map<Value::Id, std::pair<std::shared_ptr<Request>, time_point>>; + /** + * Foreach Query, we keep track of the request returned by the network + * engine when we sent the "get". + */ using SyncStatus = std::map<std::shared_ptr<Query>, std::shared_ptr<Request>>; std::shared_ptr<Node> node {}; /* the node info */ @@ -385,11 +394,11 @@ struct Dht::SearchNode { }) != status.end(); } - bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const { + bool isAnnounced(Value::Id vid, time_point now) const { auto ack = acked.find(vid); - if (ack == acked.end() or not ack->second) + if (ack == acked.end() or not ack->second.first) return false; - return ack->second->reply_time + type.expiration > now; + return ack->second.second > now; } bool isListening(time_point now) const { @@ -417,16 +426,16 @@ struct Dht::SearchNode { /** * Assumng the node is synced, should a "put" request be sent to this node now ? */ - time_point getAnnounceTime(Value::Id vid, const ValueType& type) const { + time_point getAnnounceTime(Value::Id vid) const { const auto& ack = acked.find(vid); const auto& gs = probe_query ? getStatus.find(probe_query) : getStatus.cend(); - if ((ack == acked.cend() or not ack->second) and (gs == getStatus.cend() + if ((ack == acked.cend() or not ack->second.first) and (gs == getStatus.cend() or not gs->second or not gs->second->pending())) return time_point::min(); return ((gs != getStatus.cend() and gs->second and gs->second->pending()) - or ack == acked.cend() or not ack->second or ack->second->pending()) + or ack == acked.cend() or not ack->second.first or ack->second.first->pending()) ? time_point::max() - : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN; + : ack->second.second - REANNOUNCE_MARGIN; } /** @@ -563,7 +572,7 @@ struct Dht::Search { time_point getUpdateTime(time_point now) const; - bool isAnnounced(Value::Id id, const ValueType& type, time_point now) const; + bool isAnnounced(Value::Id id, time_point now) const; bool isListening(time_point now) const; /** @@ -590,7 +599,7 @@ struct Dht::Search { * or time_point::max() if no such event is planned. * Only makes sense when the search is synced. */ - time_point getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; + time_point getAnnounceTime(time_point now) const; /** * Returns the time of the next "listen" event for this search, @@ -603,7 +612,7 @@ struct Dht::Search { * Returns the time of the next event for this search, * or time_point::max() if no such event is planned. */ - time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; + time_point getNextStepTime(time_point now) const; /** * Removes a node which have been expired for at least @@ -673,14 +682,12 @@ struct Dht::Search { * @param types The sequence of existing types. * @param now The time reference to now. */ - void checkAnnounced(const std::map<ValueType::Id, ValueType>& types, time_point now, Value::Id vid = Value::INVALID_ID) { + void checkAnnounced(time_point now, Value::Id vid = Value::INVALID_ID) { announce.erase(std::remove_if(announce.begin(), announce.end(), - [this,&vid,&now,&types](Announce& a) { + [this,&vid,&now](Announce& a) { if (vid != Value::INVALID_ID and (!a.value || a.value->id != vid)) return false; - const auto& type_it = types.find(a.value->type); - const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; - if (isAnnounced(a.value->id, type, now)) { + if (isAnnounced(a.value->id, now)) { if (a.callback) { a.callback(true, getNodes()); a.callback = nullptr; @@ -815,7 +822,7 @@ Dht::trySearchInsert(const std::shared_ptr<Node>& node) auto& s = *srp.second; if (s.insertNode(node, now)) { inserted = true; - scheduler.edit(s.nextSearchStep, s.getNextStepTime(types, now)); + scheduler.edit(s.nextSearchStep, s.getNextStepTime(now)); } } return inserted; @@ -1220,7 +1227,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { for (auto& n : sr->nodes) { auto something_to_announce = std::find_if(sr->announce.cbegin(), sr->announce.cend(), [this,&now,&sr,&n](const Announce& a) { - return n.isSynced(now) and n.getAnnounceTime(a.value->id, getType(a.value->type)) <= now; + return n.isSynced(now) and n.getAnnounceTime(a.value->id) <= now; }) != sr->announce.cend(); if (not something_to_announce) continue; @@ -1246,8 +1253,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { if (auto sn = sr->getNode(status.node)) { for (auto ait = sr->announce.begin(); ait != sr->announce.end();) { auto& a = *ait; - const auto& type = getType(a.value->type); - if (sn->isSynced(now) and sn->getAnnounceTime(a.value->id, type) <= now) { + if (sn->isSynced(now) and sn->getAnnounceTime(a.value->id) <= now) { bool hasValue {false}; uint16_t seq_no = 0; try { @@ -1261,39 +1267,40 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { } } catch (std::out_of_range&) { } + auto next_refresh_time = now + getType(a.value->type).expiration; /* only put the value if the node doesn't already have it */ if (not hasValue or seq_no < a.value->seq) { DHT_LOG_WARN("[search %s] [node %s] sending 'put' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = network_engine.sendAnnounceValue(sn->node, + sn->acked[a.value->id] = std::make_pair(network_engine.sendAnnounceValue(sn->node, sr->id, a.value, a.permanent ? time_point::max() : a.created, sn->token, onDone, - onExpired); + onExpired), next_refresh_time); } else if (hasValue and a.permanent) { DHT_LOG_WARN("[search %s] [node %s] sending 'refresh' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = network_engine.sendRefreshValue(sn->node, + sn->acked[a.value->id] = std::make_pair(network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, - onExpired); + onExpired), next_refresh_time); } else { DHT_LOG.WARN("[search %s] [node %s] already has value (vid: %d). Aborting.", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); auto ack_req = std::make_shared<Request>(); ack_req->reply_time = now; - sn->acked[a.value->id] = std::move(ack_req); + sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time); /* step to clear announces */ - scheduler.edit(sr->nextSearchStep, now); + scheduler.edit(sr->nextSearchStep, next_refresh_time); } } else { /* Search is now unsynced. Let's call searchStep to sync again. */ - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); } ++ait; } @@ -1346,7 +1353,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) } /* clearing callbacks for announced values */ - sr->checkAnnounced(types, now); + sr->checkAnnounced(now); if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) sr->setDone(); @@ -1432,7 +1439,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) /* periodic searchStep scheduling. */ if (not sr->done) - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); } bool @@ -1505,7 +1512,7 @@ Dht::Search::getUpdateTime(time_point now) const } bool -Dht::Search::isAnnounced(Value::Id id, const ValueType& type, time_point now) const +Dht::Search::isAnnounced(Value::Id id, time_point now) const { if (nodes.empty()) return false; @@ -1513,7 +1520,7 @@ Dht::Search::isAnnounced(Value::Id id, const ValueType& type, time_point now) co for (const auto& n : nodes) { if (n.isBad()) continue; - if (not n.isAnnounced(id, type, now)) + if (not n.isAnnounced(id, now)) return false; if (++i == TARGET_NODES) break; @@ -1544,20 +1551,18 @@ Dht::Search::isListening(time_point now) const } time_point -Dht::Search::getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const +Dht::Search::getAnnounceTime(time_point now) const { if (nodes.empty()) return time_point::max(); time_point ret {time_point::max()}; for (const auto& a : announce) { if (!a.value) continue; - auto type_it = types.find(a.value->type); - const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; unsigned i = 0, t = 0; for (const auto& n : nodes) { if (not n.isSynced(now) or (n.candidate and t >= TARGET_NODES)) continue; - ret = std::min(ret, n.getAnnounceTime(a.value->id, type)); + ret = std::min(ret, n.getAnnounceTime(a.value->id)); t++; if (not n.candidate and ++i == TARGET_NODES) break; @@ -1587,7 +1592,7 @@ Dht::Search::getListenTime(time_point now) const } time_point -Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const +Dht::Search::getNextStepTime(time_point now) const { auto next_step = time_point::max(); if (expired or done) @@ -1601,7 +1606,7 @@ Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, ti if (isSynced(now)) { - auto at = getAnnounceTime(types, now); + auto at = getAnnounceTime(now); if (at != time_point::max()) { //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl; next_step = std::min(next_step, at); @@ -1701,7 +1706,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q refill(*sr); if (sr->nextSearchStep) - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, scheduler.time())); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(scheduler.time())); else sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr)); @@ -1742,18 +1747,18 @@ Dht::announce(const InfoHash& id, sr->announce.emplace_back(Announce {permanent, value, std::min(now, created), callback}); for (auto& n : sr->nodes) { n.probe_query.reset(); - n.acked[value->id].reset(); + n.acked[value->id].first.reset(); } } else { if (a_sr->value != value) { a_sr->value = value; for (auto& n : sr->nodes) { - n.acked[value->id].reset(); + n.acked[value->id].first.reset(); n.probe_query.reset(); } } - if (sr->isAnnounced(value->id, getType(value->type), now)) { + if (sr->isAnnounced(value->id, now)) { if (a_sr->callback) a_sr->callback(true, {}); a_sr->callback = {}; @@ -1788,7 +1793,7 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter sr->done = false; auto token = ++sr->listener_token; sr->listeners.emplace(token, LocalListener{q, f, cb}); - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); return token; } @@ -2498,7 +2503,7 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const } for (const auto& n : sr.announce) { - bool announced = sr.isAnnounced(n.value->id, getType(n.value->type), now); + bool announced = sr.isAnnounced(n.value->id, now); out << "Announcement: " << *n.value << (announced ? " [announced]" : "") << std::endl; } @@ -2542,12 +2547,12 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const out << "["; for (const auto& a : sr.announce) { auto ack = n.acked.find(a.value->id); - if (ack == n.acked.end() or not ack->second) { + if (ack == n.acked.end() or not ack->second.first) { out << ' '; } else { - if (ack->second->reply_time + getType(a.value->type).expiration > now) + if (ack->second.first->reply_time + getType(a.value->type).expiration > now) out << 'a'; - else if (ack->second->pending()) + else if (ack->second.first->pending()) out << 'f'; } } @@ -3237,7 +3242,7 @@ Dht::onListenDone(const Request& status, if (not sr->done) { const auto& now = scheduler.time(); searchSendGetValues(sr); - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now)); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); } } else DHT_LOG_DEBUG("Unknown search or announce!"); @@ -3339,7 +3344,10 @@ Dht::onAnnounceDone(const Request& req, NetworkEngine::RequestAnswer& answer, st DHT_LOG_DEBUG("[search %s] [node %s] got reply to put!", sr->id.toString().c_str(), req.node->toString().c_str()); searchSendGetValues(sr); - sr->checkAnnounced(types, now, answer.vid); + /* if (auto sn = sr->getNode(req->node)) { */ + /* sn->setRefreshTime(answer.vid, now + answer) */ + /* } */ + sr->checkAnnounced(now, answer.vid); } }