Skip to content
Snippets Groups Projects
Unverified Commit 7c5db84f authored by Simon Désaulniers's avatar Simon Désaulniers
Browse files

dht: take multiple queries into account

The code preceding this would just not take multiple concurrent queries for a
same search into account. This is a bug fix for a bad behavior since the
queries.
parent 4fecb206
No related branches found
No related tags found
No related merge requests found
...@@ -504,8 +504,10 @@ struct Dht::Search { ...@@ -504,8 +504,10 @@ struct Dht::Search {
/** /**
* Get the time of the last "get" operation performed on this search, * Get the time of the last "get" operation performed on this search,
* or time_point::min() if no such operation have been performed. * or time_point::min() if no such operation have been performed.
*
* @param query The query identifying a 'get' request.
*/ */
time_point getLastGetTime() const; time_point getLastGetTime(std::shared_ptr<Query> query = {}) const;
/** /**
* Is this get operation done ? * Is this get operation done ?
...@@ -943,8 +945,21 @@ Dht::searchNodeGetDone(const Request& status, ...@@ -943,8 +945,21 @@ Dht::searchNodeGetDone(const Request& status,
std::weak_ptr<Search> ws, std::weak_ptr<Search> ws,
std::shared_ptr<Query> query) std::shared_ptr<Query> query)
{ {
const auto& now = scheduler.time();
if (auto sr = ws.lock()) { if (auto sr = ws.lock()) {
sr->insertNode(status.node, scheduler.time(), answer.ntoken); if (auto srn = sr->getNode(status.node)) {
/* all other get requests which are satisfied by this answer
should not be sent anymore */
for (auto& g : sr->callbacks) {
auto& q = g.second.query;
if (q->isSatisfiedBy(*query) and q != query) {
auto req = std::make_shared<Request>();
req->cancel();
srn->getStatus[q] = std::move(req);
}
}
}
sr->insertNode(status.node, now, answer.ntoken);
onGetValuesDone(status, answer, sr, query); onGetValuesDone(status, answer, sr, query);
} }
} }
...@@ -1028,18 +1043,17 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update ...@@ -1028,18 +1043,17 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
return nullptr; return nullptr;
const auto& now = scheduler.time(); const auto& now = scheduler.time();
const time_point up = update ? sr->getLastGetTime() : time_point::min();
std::weak_ptr<Search> ws = sr; std::weak_ptr<Search> ws = sr;
SearchNode* n = nullptr;
auto cb = sr->callbacks.begin(); auto cb = sr->callbacks.begin();
do { /* for all queries to send */ do { /* for all requests to send */
SearchNode* n = nullptr;
auto query = not sr->callbacks.empty() ? cb->second.query : std::make_shared<Query>();
const time_point up = not sr->callbacks.empty() and update
? sr->getLastGetTime(query)
: time_point::min();
/* cases v 'get' v 'find_node' */ if (pn and pn->canGet(now, up, query)) {
auto query = cb != sr->callbacks.end() ? cb->second.query : std::make_shared<Query>();
if (pn) {
if (not pn->canGet(now, up, query))
return nullptr;
n = pn; n = pn;
} else { } else {
for (auto& sn : sr->nodes) { for (auto& sn : sr->nodes) {
...@@ -1048,11 +1062,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update ...@@ -1048,11 +1062,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
break; break;
} }
} }
}
if (sr->callbacks.empty()) { /* 'find_node' request */
if (not n) if (not n)
return nullptr; return nullptr;
}
if (sr->callbacks.empty()) {
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'find_node'", DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'find_node'",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n->node->toString().c_str()); n->node->toString().c_str());
...@@ -1067,9 +1082,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update ...@@ -1067,9 +1082,12 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
} }
searchNodeGetDone(status, std::forward<NetworkEngine::RequestAnswer>(answer), ws, query); searchNodeGetDone(status, std::forward<NetworkEngine::RequestAnswer>(answer), ws, query);
}, },
/* std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), */
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query)); std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
} else {
} else { /* 'get' request */
if (not n)
continue;
if (query and not query->select.getSelection().empty()) { if (query and not query->select.getSelection().empty()) {
/* The request contains a select. No need to paginate... */ /* The request contains a select. No need to paginate... */
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'get'", DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'get'",
...@@ -1085,10 +1103,13 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update ...@@ -1085,10 +1103,13 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update
paginate(ws, query, n); paginate(ws, query, n);
} }
if (not sr->isSynced(now) or cb == sr->callbacks.end()) /* We only try to send one request. return. */
break; /* only trying to find nodes, only send the oldest query */
} while (++cb != sr->callbacks.end());
return n; return n;
} while (++cb != sr->callbacks.end());
/* no request were sent */
return nullptr;
} }
void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) {
...@@ -1365,11 +1386,11 @@ unsigned Dht::Search::getNumberOfBadNodes() const { ...@@ -1365,11 +1386,11 @@ unsigned Dht::Search::getNumberOfBadNodes() const {
} }
time_point time_point
Dht::Search::getLastGetTime() const Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const
{ {
time_point last = time_point::min(); time_point last = time_point::min();
for (const auto& g : callbacks) for (const auto& g : callbacks)
last = std::max(last, g.second.start); last = std::max(last, (not q or q->isSatisfiedBy(*g.second.query) ? g.second.start : time_point::min()));
return last; return last;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment