Skip to content
Snippets Groups Projects
Commit cfb817d3 authored by Adrien Béraud's avatar Adrien Béraud Committed by Cyrille Béraud
Browse files

dht: add buckets() utility

parent 9440c030
No related branches found
No related tags found
No related merge requests found
......@@ -364,7 +364,7 @@ private:
const bool is_bootstrap {false};
// the stuff
RoutingTable buckets {};
RoutingTable buckets4 {};
RoutingTable buckets6 {};
std::map<InfoHash, Storage> store;
......@@ -416,12 +416,14 @@ private:
size_t maintainStorage(InfoHash id, bool force=false, DoneCallback donecb=nullptr);
// Buckets
RoutingTable& buckets(sa_family_t af) { return af == AF_INET ? buckets4 : buckets6; }
const RoutingTable& buckets(sa_family_t af) const { return af == AF_INET ? buckets4 : buckets6; }
Bucket* findBucket(const InfoHash& id, sa_family_t af) {
RoutingTable::iterator b;
switch (af) {
case AF_INET:
b = buckets.findBucket(id);
return b == buckets.end() ? nullptr : &(*b);
b = buckets4.findBucket(id);
return b == buckets4.end() ? nullptr : &(*b);
case AF_INET6:
b = buckets6.findBucket(id);
return b == buckets6.end() ? nullptr : &(*b);
......
......@@ -824,7 +824,7 @@ Dht::reportedAddr(const SockAddr& addr)
void
Dht::onNewNode(const std::shared_ptr<Node>& node, int confirm)
{
auto& list = node->getFamily() == AF_INET ? buckets : buckets6;
auto& list = buckets(node->getFamily());
auto b = list.findBucket(node->id);
if (b == list.end())
return;
......@@ -2256,7 +2256,7 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s
auto vals = st->second.get(query.where.getFilter());
if (not vals.empty()) {
network_engine.tellListener(node, rid, id, WANT4 | WANT6, makeToken((sockaddr*)&node->addr.first, false),
buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
buckets4.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
std::move(vals), query);
}
st->second.listeners.emplace(node, Listener {rid, now, std::forward<Query>(query)});
......@@ -2322,14 +2322,16 @@ Dht::connectivityChanged(sa_family_t af)
scheduler.edit(nextNodesConfirmation, now);
auto& bucket_grow_time = (af == AF_INET) ? mybucket_grow_time : mybucket6_grow_time;
bucket_grow_time = now;
reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){
return addr.second.getFamily() == af;
}));
for (auto& b : buckets(af))
b.time = time_point::min();
network_engine.connectivityChanged(af);
auto& searches = (af == AF_INET) ? searches4 : searches6;
for (auto& sp : searches)
for (auto& sn : sp.second->nodes)
sn.listenStatus.clear();
reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){
return addr.second.getFamily() == af;
}));
}
void
......@@ -2402,9 +2404,7 @@ Dht::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_retu
{
const auto& now = scheduler.time();
unsigned good = 0, dubious = 0, cached = 0, incoming = 0;
auto& list = (af == AF_INET) ? buckets : buckets6;
for (const auto& b : list) {
for (const auto& b : buckets(af)) {
for (auto& n : b.nodes) {
if (n->isGood(now)) {
good++;
......@@ -2547,7 +2547,7 @@ Dht::dumpTables() const
out << "My id " << myid << std::endl;
out << "Buckets IPv4 :" << std::endl;
for (const auto& b : buckets)
for (const auto& b : buckets4)
dumpBucket(b, out);
out << "Buckets IPv6 :" << std::endl;
for (const auto& b : buckets6)
......@@ -2593,9 +2593,8 @@ Dht::getStorageLog() const
std::string
Dht::getRoutingTablesLog(sa_family_t af) const
{
auto& list = (af == AF_INET) ? buckets : buckets6;
std::stringstream out;
for (const auto& b : list)
for (const auto& b : buckets(af))
dumpBucket(b, out);
return out.str();
}
......@@ -2641,7 +2640,7 @@ Dht::Dht(int s, int s6, Config config)
return;
if (s >= 0) {
buckets = {Bucket {AF_INET}};
buckets4 = {Bucket {AF_INET}};
if (!set_nonblocking(s, 1))
throw DhtException("Can't set socket to non-blocking mode");
}
......@@ -2787,7 +2786,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) {
bool want4 = true, want6 = true;
auto nodes = buckets.findClosestNodes(id, now);
auto nodes = buckets4.findClosestNodes(id, now);
if (!nodes.empty()) {
if (force || id.xorCmp(nodes.back()->id, myid) < 0) {
for (auto &value : local_storage->second.getValues()) {
......@@ -2864,7 +2863,7 @@ Dht::expire()
uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));
expireBuckets(buckets);
expireBuckets(buckets4);
expireBuckets(buckets6);
expireStorage();
expireSearches();
......@@ -2887,12 +2886,12 @@ Dht::confirmNodes()
search(myid, AF_INET6);
}
soon |= bucketMaintenance(buckets);
soon |= bucketMaintenance(buckets4);
soon |= bucketMaintenance(buckets6);
if (!soon) {
if (mybucket_grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(buckets);
soon |= neighbourhoodMaintenance(buckets4);
if (mybucket6_grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(buckets6);
}
......@@ -2978,8 +2977,8 @@ Dht::exportNodes()
{
const auto& now = scheduler.time();
std::vector<NodeExport> nodes;
const auto b4 = buckets.findBucket(myid);
if (b4 != buckets.end()) {
const auto b4 = buckets4.findBucket(myid);
if (b4 != buckets4.end()) {
for (auto& n : b4->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
......@@ -2990,7 +2989,7 @@ Dht::exportNodes()
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = buckets.begin(); b != buckets.end(); ++b) {
for (auto b = buckets4.begin(); b != buckets4.end(); ++b) {
if (b == b4) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
......@@ -3056,7 +3055,7 @@ Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) {
void
Dht::onReportedAddr(const InfoHash& id, const SockAddr& addr)
{
const auto& b = (addr.getFamily() == AF_INET ? buckets : buckets6).findBucket(id);
const auto& b = buckets(addr.getFamily()).findBucket(id);
b->time = scheduler.time();
if (addr.second)
reportedAddr(addr);
......@@ -3075,7 +3074,7 @@ Dht::onFindNode(std::shared_ptr<Node> node, InfoHash& target, want_t want)
NetworkEngine::RequestAnswer answer;
answer.ntoken = makeToken((sockaddr*)&node->addr.first, false);
if (want & WANT4)
answer.nodes4 = buckets.findClosestNodes(target, now, TARGET_NODES);
answer.nodes4 = buckets4.findClosestNodes(target, now, TARGET_NODES);
if (want & WANT6)
answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES);
return answer;
......@@ -3095,7 +3094,7 @@ Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t, const Query
NetworkEngine::RequestAnswer answer {};
auto st = store.find(hash);
answer.ntoken = makeToken((sockaddr*)&node->addr.first, false);
answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES);
answer.nodes4 = buckets4.findClosestNodes(hash, now, TARGET_NODES);
answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES);
if (st != store.end() && not st->second.empty()) {
answer.values = st->second.get(query.where.getFilter());
......@@ -3239,8 +3238,7 @@ Dht::onAnnounce(std::shared_ptr<Node> node,
{
// We store a value only if we think we're part of the
// SEARCH_NODES nodes around the target id.
auto closest_nodes = (node->getFamily() == AF_INET ? buckets : buckets6)
.findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
auto closest_nodes = buckets(node->getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
DHT_LOG.WARN("[node %s] announce too far from the target. Dropping value.", node->toString().c_str());
return {};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment