Skip to content
Snippets Groups Projects
Commit 42671be2 authored by Adrien Béraud's avatar Adrien Béraud Committed by GitHub
Browse files

Merge pull request #168 from sim590/announce-op-fixes

Announce op fixes
parents 435cd554 efa6930d
No related branches found
No related tags found
No related merge requests found
......@@ -289,7 +289,7 @@ struct Dht::SearchNode {
/**
* Foreach value id, we keep track of a pair (net::Request, time_point) where the
* request is the request returned by the network engine and the time_point
* is the last time at which the value has been refreshed.
* is the next time at which the value must be refreshed.
*/
using AnnounceStatus = std::map<Value::Id, std::pair<std::shared_ptr<net::Request>, time_point>>;
/**
......@@ -440,11 +440,11 @@ struct Dht::SearchNode {
}) != status.end();
}
bool isAnnounced(Value::Id vid, time_point now) const {
bool isAnnounced(Value::Id vid) const {
auto ack = acked.find(vid);
if (ack == acked.end() or not ack->second.first)
return false;
return ack->second.second > now;
return ack->second.first->completed();
}
bool isListening(time_point now) const {
......@@ -618,7 +618,7 @@ struct Dht::Search {
time_point getUpdateTime(time_point now) const;
bool isAnnounced(Value::Id id, time_point now) const;
bool isAnnounced(Value::Id id) const;
bool isListening(time_point now) const;
/**
......@@ -728,20 +728,20 @@ struct Dht::Search {
* @param types The sequence of existing types.
* @param now The time reference to now.
*/
void checkAnnounced(time_point now, Value::Id vid = Value::INVALID_ID) {
auto announced = std::remove_if(announce.begin(), announce.end(),
[this,&vid,&now](Announce& a) {
void checkAnnounced(Value::Id vid = Value::INVALID_ID) {
auto announced = std::partition(announce.begin(), announce.end(),
[this,&vid](Announce& a) {
if (vid != Value::INVALID_ID and (!a.value || a.value->id != vid))
return false;
if (isAnnounced(a.value->id, now)) {
return true;
if (isAnnounced(a.value->id)) {
if (a.callback) {
a.callback(true, getNodes());
a.callback = nullptr;
}
if (not a.permanent)
return true;
}
return false;
}
return true;
});
// remove acked for cleared annouces
for (auto it = announced; it != announce.end(); ++it) {
......@@ -1427,7 +1427,7 @@ Dht::searchStep(std::shared_ptr<Search> sr)
}
/* clearing callbacks for announced values */
sr->checkAnnounced(now);
sr->checkAnnounced();
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->setDone();
......@@ -1591,7 +1591,7 @@ Dht::Search::getUpdateTime(time_point now) const
}
bool
Dht::Search::isAnnounced(Value::Id id, time_point now) const
Dht::Search::isAnnounced(Value::Id id) const
{
if (nodes.empty())
return false;
......@@ -1599,10 +1599,10 @@ Dht::Search::isAnnounced(Value::Id id, time_point now) const
for (const auto& n : nodes) {
if (n.isBad())
continue;
if (not n.isAnnounced(id, now))
if (not n.isAnnounced(id))
return false;
if (++i == TARGET_NODES)
break;
return true;
}
return i;
}
......@@ -1837,7 +1837,7 @@ Dht::announce(const InfoHash& id,
n.probe_query.reset();
}
}
if (sr->isAnnounced(value->id, now)) {
if (sr->isAnnounced(value->id)) {
if (a_sr->callback)
a_sr->callback(true, {});
a_sr->callback = {};
......@@ -2674,9 +2674,9 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const
out << *l.second.query << std::endl;
}
for (const auto& n : sr.announce) {
bool announced = sr.isAnnounced(n.value->id, now);
out << "Announcement: " << *n.value << (announced ? " [announced]" : "") << std::endl;
for (const auto& a : sr.announce) {
bool announced = sr.isAnnounced(a.value->id);
out << "Announcement: " << *a.value << (announced ? " [announced]" : "") << std::endl;
}
out << " Common bits InfoHash Conn. Get Ops IP" << std::endl;
......@@ -3566,14 +3566,13 @@ Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& tok
void
Dht::onAnnounceDone(const std::shared_ptr<Node>& node, net::NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr)
{
const auto& now = scheduler.time();
DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got reply to put!",
sr->id.toString().c_str(), node->toString().c_str());
searchSendGetValues(sr);
/* if (auto sn = sr->getNode(req->node)) { */
/* sn->setRefreshTime(answer.vid, now + answer) */
/* } */
sr->checkAnnounced(now, answer.vid);
sr->checkAnnounced(answer.vid);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment