Skip to content
Snippets Groups Projects
Commit 013b1dfe authored by Adrien Béraud's avatar Adrien Béraud Committed by GitHub
Browse files

Merge pull request #138 from sim590/connectivity-changes-detect

search: detect connectivity changes
parents a366d502 733f699f
Branches
Tags
No related merge requests found
...@@ -294,6 +294,16 @@ private: ...@@ -294,6 +294,16 @@ private:
the target 8 turn out to be dead. */ the target 8 turn out to be dead. */
static constexpr unsigned SEARCH_NODES {14}; static constexpr unsigned SEARCH_NODES {14};
/* The number of bad nodes is limited in order to help determine
* presence of connectivity changes. See
* https://github.com/savoirfairelinux/opendht/issues/137 for details.
*
* According to the tables, 25 is a good average value for big networks. If
* the network is small, normal search expiration process will handle the
* situation.
* */
static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
/* Concurrent search nodes requested count */ /* Concurrent search nodes requested count */
static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4}; static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
......
...@@ -429,7 +429,7 @@ struct Dht::SearchNode { ...@@ -429,7 +429,7 @@ struct Dht::SearchNode {
* Is this node expired or candidate * Is this node expired or candidate
*/ */
bool isBad() const { bool isBad() const {
return !node || node->isExpired() || candidate; return not node or node->isExpired() or candidate;
} }
}; };
...@@ -553,7 +553,21 @@ struct Dht::Search { ...@@ -553,7 +553,21 @@ struct Dht::Search {
/** /**
* @return The number of non-good search nodes. * @return The number of non-good search nodes.
*/ */
unsigned getNumberOfBadNodes() const; unsigned getNumberOfBadNodes() const {
return std::count_if(nodes.begin(), nodes.end(), [](const SearchNode& sn) {
return sn.isBad();
});
}
unsigned getNumberOfConsecutiveBadNodes() const {
unsigned count = 0;
std::find_if(nodes.begin(), nodes.end(), [&count](const SearchNode& sn) {
if (not sn.isBad())
return true;
++count;
return false;
});
return count;
}
/** /**
* Returns the time of the next "announce" event for this search, * Returns the time of the next "announce" event for this search,
...@@ -575,7 +589,66 @@ struct Dht::Search { ...@@ -575,7 +589,66 @@ struct Dht::Search {
*/ */
time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const; time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const;
bool removeExpiredNode(time_point now); /**
* Removes a node which have been expired for at least
* NODE::NODE_EXPIRE_TIME minutes. The search for an expired node starts
* from the end.
*
* @param now The reference to now.
*
* @return true if a node has been removed, else false.
*/
bool removeExpiredNode(time_point now) {
auto e = nodes.end();
while (e != nodes.cbegin()) {
e = std::prev(e);
const Node& n = *e->node;
if (n.isExpired() and n.time + Node::NODE_EXPIRE_TIME < now) {
//std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl;
nodes.erase(e);
return true;
}
}
return false;
}
/**
* This method is called when we have discovered that the search is expired.
* We have to
*
* - remove all nodes from the search;
* - clear (non-permanent) callbacks;
*/
void expire() {
// no nodes or all expired nodes. This is most likely a connectivity change event.
expired = true;
nodes.clear();
if (announce.empty() && listeners.empty())
// Listening or announcing requires keeping the cluster up to date.
setDone();
{
auto get_cbs = std::move(callbacks);
for (const auto& g : get_cbs) {
if (g.second.done_cb)
g.second.done_cb(false, {});
}
}
{
std::vector<DoneCallback> a_cbs;
a_cbs.reserve(announce.size());
for (auto ait = announce.begin() ; ait != announce.end(); ) {
if (ait->callback)
a_cbs.emplace_back(std::move(ait->callback));
if (not ait->permanent)
ait = announce.erase(ait);
else
ait++;
}
for (const auto& a : a_cbs)
a(false, {});
}
}
/** /**
* If the value was just successfully announced, call the callback and erase it if not permanent. * If the value was just successfully announced, call the callback and erase it if not permanent.
...@@ -652,8 +725,12 @@ Dht::shutdown(ShutdownCallback cb) { ...@@ -652,8 +725,12 @@ Dht::shutdown(ShutdownCallback cb) {
for (const auto& str : store) { for (const auto& str : store) {
*remaining += maintainStorage(str.first, true, str_donecb); *remaining += maintainStorage(str.first, true, str_donecb);
} }
if (!*remaining) {
DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining);
if (!*remaining && cb) { cb(); } if (cb)
cb();
}
} }
bool bool
...@@ -836,22 +913,6 @@ Dht::expireBuckets(RoutingTable& list) ...@@ -836,22 +913,6 @@ Dht::expireBuckets(RoutingTable& list)
} }
} }
bool
Dht::Search::removeExpiredNode(time_point now)
{
auto e = nodes.end();
while (e != nodes.cbegin()) {
e = std::prev(e);
const Node& n = *e->node;
if (n.isExpired() and n.time + Node::NODE_EXPIRE_TIME < now) {
//std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl;
nodes.erase(e);
return true;
}
}
return false;
}
/* A search contains a list of nodes, sorted by decreasing distance to the /* A search contains a list of nodes, sorted by decreasing distance to the
target. We just got a new candidate, insert it at the right spot or target. We just got a new candidate, insert it at the right spot or
discard it. */ discard it. */
...@@ -872,6 +933,8 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons ...@@ -872,6 +933,8 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons
found = true; found = true;
break; break;
} }
/* Node not found. We could insert it after this one. */
if (id.xorCmp(nid, n->node->id) > 0) { if (id.xorCmp(nid, n->node->id) > 0) {
++n; ++n;
break; break;
...@@ -879,7 +942,7 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons ...@@ -879,7 +942,7 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons
} }
bool new_search_node = false; bool new_search_node = false;
if (!found) { if (not found) {
// find if and where to trim excessive nodes // find if and where to trim excessive nodes
auto t = nodes.cend(); auto t = nodes.cend();
size_t bad = 0; // number of bad nodes (if search is not expired) size_t bad = 0; // number of bad nodes (if search is not expired)
...@@ -936,9 +999,8 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons ...@@ -936,9 +999,8 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons
n->token = token; n->token = token;
expired = false; expired = false;
} }
if (new_search_node) { if (new_search_node)
removeExpiredNode(now); removeExpiredNode(now);
}
return new_search_node; return new_search_node;
} }
...@@ -1341,44 +1403,17 @@ Dht::searchStep(std::shared_ptr<Search> sr) ...@@ -1341,44 +1403,17 @@ Dht::searchStep(std::shared_ptr<Search> sr)
while (sent and sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES); while (sent and sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES);
/*DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).", /*DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->currentlySolicitedNodeCount());*/ sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->currentlySolicitedNodeCount());*/
}
auto expiredn = (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) { if (sr->getNumberOfConsecutiveBadNodes() >= std::min(sr->nodes.size(),
return sn.candidate or sn.node->isExpired(); static_cast<size_t>(SEARCH_MAX_BAD_NODES)))
});
if (i == 0 && expiredn == sr->nodes.size())
{ {
DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
// no nodes or all expired nodes sr->expire();
sr->expired = true; connectivityChanged();
if (sr->announce.empty() && sr->listeners.empty()) {
// Listening or announcing requires keeping the cluster up to date.
sr->setDone();
}
{
auto get_cbs = std::move(sr->callbacks);
for (const auto& g : get_cbs) {
if (g.second.done_cb)
g.second.done_cb(false, {});
}
}
{
std::vector<DoneCallback> a_cbs;
a_cbs.reserve(sr->announce.size());
for (auto ait = sr->announce.begin() ; ait != sr->announce.end(); ) {
if (ait->callback)
a_cbs.emplace_back(std::move(ait->callback));
if (not ait->permanent)
ait = sr->announce.erase(ait);
else
ait++;
}
for (const auto& a : a_cbs)
a(false, {});
}
}
} }
//dumpSearch(*sr, std::cout); /* dumpSearch(*sr, std::cout); */
/* periodic searchStep scheduling. */ /* periodic searchStep scheduling. */
if (not sr->done) if (not sr->done)
...@@ -1400,12 +1435,6 @@ Dht::Search::isSynced(time_point now) const ...@@ -1400,12 +1435,6 @@ Dht::Search::isSynced(time_point now) const
return i > 0; return i > 0;
} }
unsigned Dht::Search::getNumberOfBadNodes() const {
return std::count_if(nodes.begin(), nodes.end(),
[=](const SearchNode& sn) { return sn.isBad(); }
);
}
time_point time_point
Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment