Skip to content
Snippets Groups Projects
Commit 8cce9fcb authored by Adrien Béraud's avatar Adrien Béraud
Browse files

search: remove getNextStepTime

parent 56caae01
Branches
Tags
No related merge requests found
/*
* Copyright (C) 2014-2017 Savoir-faire Linux Inc.
* Copyright (C) 2014-2018 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
*
......@@ -130,7 +130,7 @@ Dht::trySearchInsert(const Sp<Node>& node)
auto& s = *it->second;
if (s.insertNode(node, now)) {
inserted = true;
scheduler.edit(s.nextSearchStep, s.getNextStepTime(now));
scheduler.edit(s.nextSearchStep, now);
} else if (not s.expired and not s.done)
break;
++it;
......@@ -142,7 +142,7 @@ Dht::trySearchInsert(const Sp<Node>& node)
auto& s = *it->second;
if (s.insertNode(node, now)) {
inserted = true;
scheduler.edit(s.nextSearchStep, s.getNextStepTime(now));
scheduler.edit(s.nextSearchStep, now);
} else if (not s.expired and not s.done)
break;
}
......@@ -217,6 +217,7 @@ Dht::searchNodeGetDone(const net::Request& req,
{
const auto& now = scheduler.time();
if (auto sr = ws.lock()) {
sr->insertNode(req.node, now, answer.ntoken);
if (auto srn = sr->getNode(req.node)) {
/* all other get requests which are satisfied by this answer
should not be sent anymore */
......@@ -228,8 +229,12 @@ Dht::searchNodeGetDone(const net::Request& req,
srn->getStatus[q] = std::move(dummy_req);
}
}
auto syncTime = srn->getSyncTime(scheduler.time());
if (srn->syncJob)
scheduler.edit(srn->syncJob, syncTime);
else
srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr));
}
sr->insertNode(req.node, now, answer.ntoken);
onGetValuesDone(req.node, answer, sr, query);
}
}
......@@ -411,7 +416,7 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
const auto& now = scheduler.time();
if (not sn->isSynced(now)) {
/* Search is now unsynced. Let's call searchStep to sync again. */
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now));
scheduler.edit(sr->nextSearchStep, now);
return;
}
for (auto& a : sr->announce) {
......@@ -549,6 +554,8 @@ Dht::searchStep(Sp<Search> sr)
{ /* on done */
if (auto sr = ws.lock()) {
scheduler.edit(sr->nextSearchStep, scheduler.time());
if (auto sn = sr->getNode(req.node))
scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr));
onListenDone(req.node, answer, sr);
}
},
......@@ -606,8 +613,8 @@ Dht::searchStep(Sp<Search> sr)
/* dumpSearch(*sr, std::cout); */
/* periodic searchStep scheduling. */
if (not sr->done)
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now));
//if (not sr->done)
// scheduler.edit(sr->nextSearchStep, now);
}
unsigned Dht::refill(Dht::Search& sr) {
......@@ -695,7 +702,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q
refill(*sr);
if (sr->nextSearchStep)
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(scheduler.time()));
scheduler.edit(sr->nextSearchStep, scheduler.time());
else
sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr));
......@@ -782,7 +789,7 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter
sr->done = false;
auto token = ++sr->listener_token;
sr->listeners.emplace(token, LocalListener{q, f, cb});
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now));
scheduler.edit(sr->nextSearchStep, now);
return token;
}
......@@ -1434,10 +1441,8 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const
out << " [expired]";
bool synced = sr.isSynced(now);
out << (synced ? " [synced]" : " [not synced]");
if (synced && sr.isListening(now)) {
auto lt = sr.getListenTime(now);
out << " [listening, next in " << duration_cast<seconds>(lt-now).count() << " s]";
}
if (synced && sr.isListening(now))
out << " [listening]";
out << std::endl;
/*printing the queries*/
......@@ -2059,6 +2064,7 @@ Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) {
n.token.clear();
n.last_get_reply = time_point::min();
searchSendGetValues(sr);
scheduler.edit(sr->nextSearchStep, scheduler.time());
break;
}
}
......@@ -2221,7 +2227,7 @@ Dht::onListenDone(const Sp<Node>& node,
if (not sr->done) {
const auto& now = scheduler.time();
searchSendGetValues(sr);
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now));
scheduler.edit(sr->nextSearchStep, now);
}
}
......
/*
* Copyright (C) 2014-2017 Savoir-faire Linux Inc.
* Copyright (C) 2014-2018 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
......@@ -74,6 +74,7 @@ struct Dht::SearchNode {
time_point last_get_reply {time_point::min()}; /* last time received valid token */
bool candidate {false}; /* A search node is candidate if the search is/was synced and this
node is a new candidate for inclusion. */
Sp<Scheduler::Job> syncJob {};
SearchNode() : node() {}
SearchNode(const SearchNode&) = delete;
......@@ -93,11 +94,17 @@ struct Dht::SearchNode {
/**
* Can we use this node to listen/announce now ?
*/
bool isSynced(time_point now) const {
bool isSynced(const time_point& now) const {
return not node->isExpired() and
not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME;
}
time_point getSyncTime(const time_point& now) const {
if (node->isExpired() or token.empty())
return now;
return last_get_reply + Node::NODE_EXPIRE_TIME;
}
/**
* Could a particular "get" request be sent to this node now ?
*
......@@ -421,8 +428,6 @@ struct Dht::Search {
done = true;
}
time_point getUpdateTime(time_point now) const;
bool isAnnounced(Value::Id id) const;
bool isListening(time_point now) const;
......@@ -460,26 +465,6 @@ struct Dht::Search {
return count;
}
/**
* Returns the time of the next "announce" event for this search,
* or time_point::max() if no such event is planned.
* Only makes sense when the search is synced.
*/
time_point getAnnounceTime(time_point now) const;
/**
* Returns the time of the next "listen" event for this search,
* or time_point::max() if no such event is planned.
* Only makes sense when the search is synced.
*/
time_point getListenTime(time_point now) const;
/**
* Returns the time of the next event for this search,
* or time_point::max() if no such event is planned.
*/
time_point getNextStepTime(time_point now) const;
/**
* Removes a node which have been expired for at least
* NODE::NODE_EXPIRE_TIME minutes. The search for an expired node starts
......@@ -721,36 +706,6 @@ Dht::Search::isDone(const Get& get) const
return true;
}
time_point
Dht::Search::getUpdateTime(time_point now) const
{
time_point ut = time_point::max();
const auto last_get = getLastGetTime();
unsigned i = 0, t = 0, d = 0;
const auto solicited_nodes = currentlySolicitedNodeCount();
for (const auto& sn : nodes) {
if (sn.node->isExpired() or (sn.candidate and t >= TARGET_NODES))
continue;
auto pending = sn.pendingGet();
if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) {
// not isSynced
if (not pending and solicited_nodes < MAX_REQUESTED_SEARCH_NODES)
ut = std::min(ut, now);
if (not sn.candidate)
d++;
} else
ut = std::min(ut, sn.last_get_reply + Node::NODE_EXPIRE_TIME);
t++;
if (not sn.candidate and ++i == TARGET_NODES)
break;
}
if (not callbacks.empty() and d == 0)
// If all synced/updated but some callbacks remain, step now to clear them
return now;
return ut;
}
bool
Dht::Search::isAnnounced(Value::Id id) const
{
......@@ -790,78 +745,4 @@ Dht::Search::isListening(time_point now) const
return i;
}
time_point
Dht::Search::getAnnounceTime(time_point now) const
{
if (nodes.empty())
return time_point::max();
time_point ret {time_point::max()};
for (const auto& a : announce) {
if (!a.value) continue;
unsigned i = 0, t = 0;
for (const auto& n : nodes) {
if (not n.isSynced(now) or (n.candidate and t >= TARGET_NODES))
continue;
ret = std::min(ret, n.getAnnounceTime(a.value->id));
t++;
if (not n.candidate and ++i == TARGET_NODES)
break;
}
}
return ret;
}
time_point
Dht::Search::getListenTime(time_point now) const
{
if (listeners.empty())
return time_point::max();
time_point listen_time {time_point::max()};
unsigned i = 0, t = 0;
for (const auto& sn : nodes) {
if (not sn.isSynced(now) or (sn.candidate and t >= LISTEN_NODES))
continue;
for (auto& l : listeners)
listen_time = std::min(listen_time, sn.getListenTime(l.second.query));
t++;
if (not sn.candidate and ++i == LISTEN_NODES)
break;
}
return listen_time;
}
time_point
Dht::Search::getNextStepTime(time_point now) const
{
auto next_step = time_point::max();
if (expired or done)
return next_step;
auto ut = getUpdateTime(now);
if (ut != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl;
next_step = std::min(next_step, ut);
}
if (isSynced(now))
{
auto at = getAnnounceTime(now);
if (at != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl;
next_step = std::min(next_step, at);
}
auto lt = getListenTime(now);
if (lt != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl;
next_step = std::min(next_step, lt);
}
}
return next_step;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment