Select Git revision
-
Adrien Béraud authoredAdrien Béraud authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht.cpp 88.75 KiB
/*
* Copyright (C) 2014-2016 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Simon Désaulniers <sim.desaulniers@gmail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "dht.h"
#include "rng.h"
#include "request.h"
#include <msgpack.hpp>
extern "C" {
#include <gnutls/gnutls.h>
}
#ifndef _WIN32
#include <arpa/inet.h>
#else
#include <ws2tcpip.h>
#endif
#include <algorithm>
#include <random>
#include <sstream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#ifdef _WIN32
static bool
set_nonblocking(int fd, int nonblocking)
{
unsigned long mode = !!nonblocking;
int rc = ioctlsocket(fd, FIONBIO, &mode);
return rc == 0;
}
extern const char *inet_ntop(int, const void *, char *, socklen_t);
#else
static bool
set_nonblocking(int fd, int nonblocking)
{
int rc = fcntl(fd, F_GETFL, 0);
if (rc < 0)
return false;
rc = fcntl(fd, F_SETFL, nonblocking?(rc | O_NONBLOCK):(rc & ~O_NONBLOCK));
return rc >= 0;
}
#endif
static std::mt19937 rd {dht::crypto::random_device{}()};
static std::uniform_int_distribution<uint8_t> rand_byte;
namespace dht {
using namespace std::placeholders;
constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME;
constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN;
// internal structures definition
struct Dht::Storage {
InfoHash id;
time_point maintenance_time {};
std::map<std::shared_ptr<Node>, Listener> listeners {};
std::map<size_t, LocalListener> local_listeners {};
size_t listener_token {1};
Storage() {}
Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {}
#if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 9
// GCC-bug: remove me when support of GCC < 4.9.2 is abandoned
Storage(Storage&& o) noexcept
: id(std::move(o.id))
, maintenance_time(std::move(o.maintenance_time))
, listeners(std::move(o.listeners))
, local_listeners(std::move(o.local_listeners))
, listener_token(std::move(o.listener_token))
, values(std::move(o.values))
, total_size(std::move(o.total_size)) {}
#else
Storage(Storage&& o) noexcept = default;
#endif
Storage& operator=(Storage&& o) = default;
bool empty() const {
return values.empty();
}
void clear();
size_t valueCount() const {
return values.size();
}
size_t totalSize() const {
return total_size;
}
const std::vector<ValueStorage>& getValues() const { return values; }
std::shared_ptr<Value> getById(Value::Id vid) const {
for (auto& v : values)
if (v.data->id == vid) return v.data;
return {};
}
std::vector<std::shared_ptr<Value>> get(Value::Filter f = {}) const {
std::vector<std::shared_ptr<Value>> newvals {};
if (not f) newvals.reserve(values.size());
for (auto& v : values) {
if (not f || f(*v.data))
newvals.push_back(v.data);
}
return newvals;
}
/**
* Stores a new value in this storage, or replace a previous value
*
* @return <storage, change_size, change_value_num>
* storage: set if a change happened
* change_size: size difference
* change_value_num: change of value number (0 or 1)
*/
std::tuple<ValueStorage*, ssize_t, ssize_t>
store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left);
std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now);
private:
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
std::vector<ValueStorage> values {};
size_t total_size {};
};
struct Dht::SearchNode {
SearchNode(std::shared_ptr<Node> node) : node(node) {}
using AnnounceStatusMap = std::map<Value::Id, std::shared_ptr<Request>>;
/**
* Can we use this node to listen/announce now ?
*/
bool isSynced(time_point now) const {
return not node->isExpired() and
not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME;
}
bool canGet(time_point now, time_point update) const {
return not node->isExpired() and
(now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply)
and (not getStatus or not getStatus->pending());
}
bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const {
auto ack = acked.find(vid);
if (ack == acked.end() or not ack->second) {
return false;
}
return ack->second->reply_time + type.expiration > now;
}
bool isListening(time_point now) const {
if (not listenStatus)
return false;
return listenStatus->reply_time + LISTEN_EXPIRE_TIME > now;
}
time_point getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const {
if (ack == acked.end() or not ack->second)
return time_point::min();
return ack->second->pending() ? time_point::max() : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN;
}
time_point getAnnounceTime(Value::Id vid, const ValueType& type) const {
return getAnnounceTime(acked.find(vid), type);
}
time_point getListenTime() const {
if (not listenStatus)
return time_point::min();
return listenStatus->pending() ? time_point::max() : listenStatus->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN;
}
bool isBad() const {
return !node || node->isExpired() || candidate;
}
std::shared_ptr<Node> node {};
time_point last_get_reply {time_point::min()}; /* last time received valid token */
std::shared_ptr<Request> getStatus {}; /* get/sync status */
std::shared_ptr<Request> listenStatus {};
AnnounceStatusMap acked {}; /* announcement status for a given value id */
Blob token {};
/**
* A search node is candidate if the search is/was synced and this node is a new candidate for inclusion
*
*/
bool candidate {false};
};
struct Dht::Search {
InfoHash id {};
sa_family_t af;
uint16_t tid;
time_point refill_time {time_point::min()};
time_point step_time {time_point::min()}; /* the time of the last search step */
std::shared_ptr<Scheduler::Job> nextSearchStep {};
bool expired {false}; /* no node, or all nodes expired */
bool done {false}; /* search is over, cached for later */
std::vector<SearchNode> nodes {};
/* pending puts */
std::vector<Announce> announce {};
/* pending gets */
std::vector<Get> callbacks {};
/* listeners */
std::map<size_t, LocalListener> listeners {};
size_t listener_token = 1;
/**
* @returns true if the node was not present and added to the search
*/
bool insertNode(const std::shared_ptr<Node>& n, time_point now, const Blob& token={});
unsigned insertBucket(const Bucket&, time_point now);
SearchNode* getNode(const std::shared_ptr<Node>& n) {
auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) {
return n == sn.node;
});
return (srn == nodes.end()) ? nullptr : &(*srn);
}
/* number of concurrent sync requests */
unsigned currentGetRequests() const {
unsigned count = 0;
for (const auto& n : nodes)
if (not n.isBad() and n.getStatus and n.getStatus->pending())
count++;
return count;
}
/**
* Can we use this search to announce ?
*/
bool isSynced(time_point now) const;
time_point getLastGetTime() const;
/**
* Is this get operation done ?
*/
bool isDone(const Get& get, time_point now) const;
time_point getUpdateTime(time_point now) const;
bool isAnnounced(Value::Id id, const ValueType& type, time_point now) const;
bool isListening(time_point now) const;
/**
* @return The number of non-good search nodes.
*/
unsigned getNumberOfBadNodes() const;
/**
* ret = 0 : no announce required.
* ret > 0 : (re-)announce required at time ret.
*/
time_point getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const;
/**
* ret = 0 : no listen required.
* ret > 0 : (re-)announce required at time ret.
*/
time_point getListenTime(time_point now) const;
time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const;
bool removeExpiredNode(time_point now);
unsigned refill(const RoutingTable&, time_point now);
std::vector<std::shared_ptr<Node>> getNodes() const;
void clear() {
announce.clear();
callbacks.clear();
listeners.clear();
nodes.clear();
nextSearchStep = {};
}
};
void
Dht::setLoggers(LogMethod&& error, LogMethod&& warn, LogMethod&& debug)
{
DHT_LOG.DEBUG = std::move(debug);
DHT_LOG.WARN = std::move(warn);
DHT_LOG.ERROR = std::move(error);
}
NodeStatus
Dht::getStatus(sa_family_t af) const
{
unsigned good = 0, dubious = 0, cached = 0, incoming = 0;
int tot = getNodesStats(af, &good, &dubious, &cached, &incoming);
if (tot < 1)
return NodeStatus::Disconnected;
else if (good < 1)
return NodeStatus::Connecting;
return NodeStatus::Connected;
}
void
Dht::shutdown(ShutdownCallback cb) {
/****************************
* Last store maintenance *
****************************/
scheduler.syncTime();
auto remaining = std::make_shared<int>(0);
auto str_donecb = [=](bool, const std::vector<std::shared_ptr<Node>>&) {
--*remaining;
if (!*remaining && cb) { cb(); }
else DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining);
};
for (const auto& str : store) {
*remaining += maintainStorage(str.id, true, str_donecb);
}
DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining);
if (!*remaining && cb) { cb(); }
}
bool
Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); }
/* Every bucket contains an unordered list of nodes. */
std::shared_ptr<Node>
Dht::findNode(const InfoHash& id, sa_family_t af)
{
Bucket* b = findBucket(id, af);
if (!b)
return {};
for (auto& n : b->nodes)
if (n->id == id) return n;
return {};
}
const std::shared_ptr<Node>
Dht::findNode(const InfoHash& id, sa_family_t af) const
{
const Bucket* b = findBucket(id, af);
if (!b)
return {};
for (const auto& n : b->nodes)
if (n->id == id) return n;
return {};
}
/* Every bucket caches the address of a likely node. Ping it. */
int
Dht::sendCachedPing(Bucket& b)
{
/* We set family to 0 when there's no cached node. */
if (!b.cached)
return 0;
DHT_LOG.DEBUG("Sending ping to cached node.");
network_engine.sendPing(b.cached, nullptr, nullptr);
b.cached = {};
return 0;
}
std::vector<Address>
Dht::getPublicAddress(sa_family_t family)
{
std::sort(reported_addr.begin(), reported_addr.end(), [](const ReportedAddr& a, const ReportedAddr& b) {
return a.first < b.first;
});
std::vector<Address> ret;
for (const auto& addr : reported_addr)
if (!family || family == addr.second.first.ss_family)
ret.emplace_back(addr.second);
return ret;
}
bool
Dht::trySearchInsert(const std::shared_ptr<Node>& node)
{
const auto& now = scheduler.time();
if (not node) return false;
bool inserted = false;
auto family = node->getFamily();
auto& srs = family == AF_INET ? searches4 : searches6;
for (auto& srp : srs) {
auto& s = srp.second;
if (s->insertNode(node, now)) {
inserted = true;
scheduler.edit(s->nextSearchStep, s->getNextStepTime(types, now));
}
}
return inserted;
}
void
Dht::reportedAddr(const sockaddr *sa, socklen_t sa_len)
{
auto it = std::find_if(reported_addr.begin(), reported_addr.end(), [=](const ReportedAddr& addr){
return (addr.second.second == sa_len) &&
std::equal((uint8_t*)&addr.second.first, (uint8_t*)&addr.second.first + addr.second.second, (uint8_t*)sa);
});
if (it == reported_addr.end()) {
if (reported_addr.size() < 32)
reported_addr.emplace_back(1, std::make_pair(*((sockaddr_storage*)sa), sa_len));
} else
it->first++;
}
/* We just learnt about a node, not necessarily a new one. Confirm is 1 if
the node sent a message, 2 if it sent us a reply. */
std::shared_ptr<Node>
Dht::newNode(const std::shared_ptr<Node>& node, int confirm)
{
auto& list = node->getFamily() == AF_INET ? buckets : buckets6;
auto b = list.findBucket(node->id);
if (b == list.end())
return {};
for (auto& n : b->nodes) {
if (n == node) {
if (confirm)
trySearchInsert(node);
return n;
}
}
/* New node. */
/* Try adding the node to searches */
trySearchInsert(node);
const auto& now = scheduler.time();
bool mybucket = list.contains(b, myid);
if (mybucket) {
if (node->getFamily() == AF_INET)
mybucket_grow_time = now;
else
mybucket6_grow_time = now;
//scheduler.edit(nextNodesConfirmation, now);
}
/* Try to get rid of an expired node. */
for (auto& n : b->nodes) {
if (not n->isExpired())
continue;
n = node;
return n;
}
if (b->nodes.size() >= TARGET_NODES) {
/* Bucket full. Ping a dubious node */
bool dubious = false;
for (auto& n : b->nodes) {
/* Pick the first dubious node that we haven't pinged in the
last 9 seconds. This gives nodes the time to reply, but
tends to concentrate on the same nodes, so that we get rid
of bad nodes fast. */
if (not n->isGood(now)) {
dubious = true;
if (not n->isPendingMessage()) {
DHT_LOG.DEBUG("Sending ping to dubious node %s.", n->toString().c_str());
network_engine.sendPing(n, nullptr, nullptr);
break;
}
}
}
if ((mybucket || (is_bootstrap and list.depth(b) < 6)) && (!dubious || list.size() == 1)) {
DHT_LOG.DEBUG("Splitting from depth %u", list.depth(b));
sendCachedPing(*b);
list.split(b);
return newNode(node, 0);
}
/* No space for this node. Cache it away for later. */
if (confirm or not b->cached)
b->cached = node;
} else {
/* Create a new node. */
b->nodes.emplace_front(node);
}
return node;
}
/* Called periodically to purge known-bad nodes. Note that we're very
conservative here: broken nodes in the table don't do much harm, we'll
recover as soon as we find better ones. */
void
Dht::expireBuckets(RoutingTable& list)
{
for (auto& b : list) {
bool changed = false;
b.nodes.remove_if([this,&changed](const std::shared_ptr<Node>& n) {
if (n->isExpired()) {
changed = true;
return true;
}
return false;
});
if (changed)
sendCachedPing(b);
}
}
bool
Dht::Search::removeExpiredNode(time_point now)
{
auto e = nodes.end();
while (e != nodes.cbegin()) {
e = std::prev(e);
const Node& n = *e->node;
if (n.isExpired() and n.time + Node::NODE_EXPIRE_TIME < now) {
//std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl;
nodes.erase(e);
return true;
}
}
return false;
}
/* A search contains a list of nodes, sorted by decreasing distance to the
target. We just got a new candidate, insert it at the right spot or
discard it. */
bool
Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, const Blob& token)
{
if (expired and nodes.empty())
return false;
auto& node = *snode;
const auto& nid = node.id;
if (node.getFamily() != af) {
//DHT_LOG.DEBUG("Attempted to insert node in the wrong family.");
return false;
}
// Fast track for the case where the node is not relevant for this search
if (node.isExpired() && nodes.size() >= SEARCH_NODES && id.xorCmp(nid, nodes.back().node->id) > 0)
return false;
bool found = false;
auto n = std::find_if(nodes.begin(), nodes.end(), [&](const SearchNode& sn) {
if (sn.node == snode) {
found = true;
return true;
}
return id.xorCmp(nid, sn.node->id) < 0;
});
bool new_search_node = false;
if (!found) {
// Be more restricitve if there are too many
// good or unknown nodes in this search,
unsigned num_bad_nodes = getNumberOfBadNodes();
if (nodes.size() - num_bad_nodes >= SEARCH_NODES) {
if (node.isExpired() or n == nodes.end())
return false;
}
// Reset search timer if the search is empty
if (nodes.empty()) {
step_time = TIME_INVALID;
}
n = nodes.insert(n, SearchNode(snode));
node.time = now;
new_search_node = true;
// trim good nodes
while (nodes.size() - num_bad_nodes > SEARCH_NODES) {
if (removeExpiredNode(now))
num_bad_nodes--;
auto to_remove = std::find_if(nodes.rbegin(), nodes.rend(),
[](const SearchNode& n) { return not n.isBad(); }
);
if (to_remove != nodes.rend()) {
nodes.erase(std::prev(to_remove.base()));
} // else, all nodes are expired.
}
expired = false;
}
if (not token.empty()) {
n->candidate = false;
n->last_get_reply = now;
if (token.size() <= 64)
n->token = token;
expired = false;
}
return new_search_node;
}
std::vector<std::shared_ptr<Node>>
Dht::Search::getNodes() const
{
std::vector<std::shared_ptr<Node>> ret {};
ret.reserve(nodes.size());
for (const auto& sn : nodes)
ret.emplace_back(sn.node);
return ret;
}
void
Dht::expireSearches()
{
auto t = scheduler.time() - SEARCH_EXPIRE_TIME;
auto expired = [&](std::pair<const InfoHash, std::shared_ptr<Search>>& srp) {
auto& sr = srp.second;
auto b = sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty() && sr->step_time < t;
if (b) {
DHT_LOG.DEBUG("Removing search %s ", srp.first.toString().c_str());
return b;
} else { return false; }
};
erase_if(searches4, expired);
erase_if(searches6, expired);
}
Dht::SearchNode*
Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update)
{
if (sr->done or sr->currentGetRequests() >= SEARCH_REQUESTS)
return nullptr;
const auto& now = scheduler.time();
const time_point up = update ? sr->getLastGetTime() : time_point::min();
SearchNode* n = nullptr;
if (pn) {
if (not pn->canGet(now, up))
return nullptr;
n = pn;
} else {
for (auto& sn : sr->nodes) {
if (sn.canGet(now, up)) {
n = &sn;
break;
}
}
if (not n)
return nullptr;
}
/*DHT_LOG.DEBUG("[search %s IPv%c] [node %s] sending 'get'",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n->node->toString().c_str());*/
std::weak_ptr<Search> ws = sr;
auto onDone =
[this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable {
if (auto sr = ws.lock()) {
sr->insertNode(status.node, scheduler.time(), answer.ntoken);
onGetValuesDone(status, answer, sr);
}
};
auto onExpired =
[this,ws](const Request& status, bool over) mutable {
if (auto sr = ws.lock()) {
if (auto srn = sr->getNode(status.node))
srn->candidate = not over;
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
};
std::shared_ptr<Request> rstatus;
if (sr->callbacks.empty() and sr->listeners.empty())
rstatus = network_engine.sendFindNode(n->node, sr->id, -1, onDone, onExpired);
else
rstatus = network_engine.sendGetValues(n->node, sr->id, -1, onDone, onExpired);
n->getStatus = rstatus;
return n;
}
/* When a search is in progress, we periodically call search_step to send
further requests. */
void
Dht::searchStep(std::shared_ptr<Search> sr)
{
if (not sr or sr->expired or sr->done) return;
const auto& now = scheduler.time();
DHT_LOG.DEBUG("[search %s IPv%c] step (%d requests)", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', sr->currentGetRequests());
sr->step_time = now;
/*
* The accurate delay between two refills has not been strongly determined.
* TODO: Emprical analysis over refill timeout.
*/
if (sr->refill_time + Node::NODE_EXPIRE_TIME < now and sr->nodes.size()-sr->getNumberOfBadNodes() < SEARCH_NODES) {
if (auto added = sr->refill(sr->af == AF_INET ? buckets : buckets6, now)) {
sr->refill_time = now;
DHT_LOG.DEBUG("[search %s IPv%c] refilled with %u nodes", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', added);
}
}
/* Check if the first TARGET_NODES (8) live nodes have replied. */
if (sr->isSynced(now)) {
if (not sr->callbacks.empty()) {
// search is synced but some (newer) get operations are not complete
// Call callbacks when done
for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) {
if (sr->isDone(*b, now)) {
if (b->done_cb)
b->done_cb(true, sr->getNodes());
b = sr->callbacks.erase(b);
}
else
++b;
}
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->done = true;
}
// true if this node is part of the target nodes cluter.
bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0;
DHT_LOG.DEBUG("[search %s IPv%c] synced%s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");
if (not sr->listeners.empty()) {
unsigned i = 0;
for (auto& n : sr->nodes) {
if (not n.isSynced(now))
continue;
if (n.getListenTime() <= now) {
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'listen'",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n.node->toString().c_str());
//std::cout << "Sending listen to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl;
//network_engine.cancelRequest(n.listenStatus);
auto ls = n.listenStatus;
std::weak_ptr<Search> ws = sr;
n.listenStatus = network_engine.sendListen(n.node, sr->id, n.token,
[this,ws,ls](const Request& status,
NetworkEngine::RequestAnswer&& answer) mutable
{ /* on done */
// cancel previous request
network_engine.cancelRequest(ls);
if (auto sr = ws.lock()) {
onListenDone(status, answer, sr);
searchStep(sr);
}
},
[this,ws,ls](const Request&, bool over) mutable
{ /* on expired */
if (over) {
network_engine.cancelRequest(ls);
if (auto sr = ws.lock())
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
}
);
}
if (not n.candidate and ++i == LISTEN_NODES)
break;
}
}
// Announce requests
for (auto ait = sr->announce.begin(); ait != sr->announce.end();) {
auto& a = *ait;
if (!a.value) continue;
auto vid = a.value->id;
const auto& type = getType(a.value->type);
if (sr->isAnnounced(vid, type, now)) {
if (a.callback) {
a.callback(true, sr->getNodes());
a.callback = nullptr;
}
ait = sr->announce.erase(ait);
continue;
}
if (in) storageStore(sr->id, a.value, a.created);
unsigned i = 0;
for (auto& n : sr->nodes) {
if (not n.isSynced(now))
continue;
if (n.getAnnounceTime(vid, type) <= now) {
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', n.node->toString().c_str(), vid);
std::weak_ptr<Search> ws = sr;
n.acked[vid] = network_engine.sendAnnounceValue(n.node, sr->id, *a.value, a.created, n.token,
[this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer)
{ /* on done */
if (auto sr = ws.lock()) {
onAnnounceDone(status, answer, sr);
searchStep(sr);
}
},
[this,ws](const Request&, bool over)
{ /* on expired */
if (over)
if (auto sr = ws.lock())
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
);
}
if (not n.candidate and ++i == TARGET_NODES)
break;
}
++ait;
}
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->done = true;
}
if (sr->currentGetRequests() < SEARCH_REQUESTS) {
unsigned i = 0;
SearchNode* sent;
do {
sent = searchSendGetValues(sr);
if (sent and not sent->candidate)
i++;
}
while (sent and sr->currentGetRequests() < SEARCH_REQUESTS);
/*DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->currentGetRequests());*/
auto expiredn = (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) {
return sn.candidate or sn.node->isExpired();
});
if (i == 0 && expiredn == sr->nodes.size())
{
DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
// no nodes or all expired nodes
sr->expired = true;
if (sr->announce.empty() && sr->listeners.empty()) {
// Listening or announcing requires keeping the cluster up to date.
sr->done = true;
}
{
auto get_cbs = std::move(sr->callbacks);
for (const auto& g : get_cbs) {
if (g.done_cb)
g.done_cb(false, {});
}
}
{
std::vector<DoneCallback> a_cbs;
a_cbs.reserve(sr->announce.size());
for (const auto& a : sr->announce)
if (a.callback)
a_cbs.emplace_back(std::move(a.callback));
sr->announce.clear();
for (const auto& a : a_cbs)
a(false, {});
}
}
}
//dumpSearch(*sr, std::cout);
/* periodic searchStep scheduling. */
if (not sr->done)
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now));
}
/* Insert the contents of a bucket into a search structure. */
unsigned
Dht::Search::insertBucket(const Bucket& b, time_point now)
{
unsigned inserted = 0;
for (auto& n : b.nodes) {
if (not n->isExpired() and insertNode(n, now))
inserted++;
}
return inserted;
}
bool
Dht::Search::isSynced(time_point now) const
{
unsigned i = 0;
for (const auto& n : nodes) {
if (n.isBad())
continue;
if (not n.isSynced(now))
return false;
if (++i == TARGET_NODES)
break;
}
return i > 0;
}
unsigned Dht::Search::getNumberOfBadNodes() const {
return std::count_if(nodes.begin(), nodes.end(),
[=](const SearchNode& sn) { return sn.isBad(); }
);
}
time_point
Dht::Search::getLastGetTime() const
{
time_point last = time_point::min();
for (const auto& g : callbacks)
last = std::max(last, g.start);
return last;
}
bool
Dht::Search::isDone(const Get& get, time_point now) const
{
unsigned i = 0;
const auto limit = std::max(get.start, now - Node::NODE_EXPIRE_TIME);
for (const auto& sn : nodes) {
if (sn.isBad())
continue;
if (sn.last_get_reply < limit)
return false;
if (++i == TARGET_NODES)
break;
}
return true;
}
time_point
Dht::Search::getUpdateTime(time_point now) const
{
time_point ut = time_point::max();
const auto last_get = getLastGetTime();
unsigned i = 0, t = 0, d = 0;
const auto reqs = currentGetRequests();
for (const auto& sn : nodes) {
if (sn.node->isExpired() or (sn.candidate and t >= TARGET_NODES))
continue;
bool pending = sn.getStatus and sn.getStatus->pending();
if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) {
// not isSynced
if (not pending and reqs < SEARCH_REQUESTS)
ut = std::min(ut, now);
if (not sn.candidate)
d++;
} else {
ut = std::min(ut, sn.last_get_reply + Node::NODE_EXPIRE_TIME);
}
t++;
if (not sn.candidate and ++i == TARGET_NODES)
break;
}
if (not callbacks.empty() and d == 0) {
// If all synced/updated but some callbacks remain, step now to clear them
return now;
}
return ut;
}
bool
Dht::Search::isAnnounced(Value::Id id, const ValueType& type, time_point now) const
{
if (nodes.empty())
return false;
unsigned i = 0;
for (const auto& n : nodes) {
if (n.isBad())
continue;
if (not n.isAnnounced(id, type, now))
return false;
if (++i == TARGET_NODES)
break;
}
return i;
}
bool
Dht::Search::isListening(time_point now) const
{
if (nodes.empty() or listeners.empty())
return false;
unsigned i = 0;
for (const auto& n : nodes) {
if (n.isBad())
continue;
if (!n.isListening(now))
return false;
if (++i == LISTEN_NODES)
break;
}
return i;
}
time_point
Dht::Search::getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const
{
if (nodes.empty())
return time_point::max();
time_point ret {time_point::max()};
for (const auto& a : announce) {
if (!a.value) continue;
auto type_it = types.find(a.value->type);
const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second;
unsigned i = 0, t = 0;
for (const auto& n : nodes) {
if (not n.isSynced(now) or (n.candidate and t >= TARGET_NODES))
continue;
ret = std::min(ret, n.getAnnounceTime(a.value->id, type));
t++;
if (not n.candidate and ++i == TARGET_NODES)
break;
}
}
return ret;
}
time_point
Dht::Search::getListenTime(time_point now) const
{
if (listeners.empty())
return time_point::max();
time_point listen_time {time_point::max()};
unsigned i = 0, t = 0;
for (const auto& sn : nodes) {
if (not sn.isSynced(now) or (sn.candidate and t >= LISTEN_NODES))
continue;
auto lt = sn.getListenTime();
listen_time = std::min(listen_time, lt);
t++;
if (not sn.candidate and ++i == LISTEN_NODES)
break;
}
return listen_time;
}
time_point
Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const
{
auto next_step = time_point::max();
if (expired or done)
return next_step;
auto ut = getUpdateTime(now);
if (ut != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl;
next_step = std::min(next_step, ut);
}
if (isSynced(now))
{
auto at = getAnnounceTime(types, now);
if (at != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl;
next_step = std::min(next_step, at);
}
auto lt = getListenTime(now);
if (lt != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl;
next_step = std::min(next_step, lt);
}
}
return next_step;
}
void
Dht::bootstrapSearch(Dht::Search& sr)
{
const auto& now = scheduler.time();
auto& list = (sr.af == AF_INET) ? buckets : buckets6;
if (list.empty() || (list.size() == 1 && list.front().nodes.empty()))
return;
auto b = list.findBucket(sr.id);
if (b == list.end()) {
DHT_LOG.ERROR("No bucket");
return;
}
sr.insertBucket(*b, now);
if (sr.nodes.size() < SEARCH_NODES) {
if (std::next(b) != list.end())
sr.insertBucket(*std::next(b), now);
if (b != list.begin())
sr.insertBucket(*std::prev(b), now);
}
if (sr.nodes.size() < SEARCH_NODES)
sr.insertBucket(*list.findBucket(myid), now);
sr.refill_time = now;
}
unsigned
Dht::Search::refill(const RoutingTable& r, time_point now) {
if (r.isEmpty() or r.front().af != af)
return 0;
unsigned added = 0;
auto num_bad_nodes = getNumberOfBadNodes();
auto b = r.findBucket(id);
auto n = b;
while (nodes.size()-num_bad_nodes < SEARCH_NODES && (std::next(n) != r.end() || b != r.begin())) {
if (std::next(n) != r.end()) {
added += insertBucket(*std::next(n), now);
n = std::next(n);
}
if (b != r.begin()) {
added += insertBucket(*std::prev(b), now);
b = std::prev(b);
}
}
return added;
}
/* Start a search. */
std::shared_ptr<Dht::Search>
Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallback done_callback, Value::Filter filter)
{
if (!isRunning(af)) {
DHT_LOG.ERROR("[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
if (done_callback)
done_callback(false, {});
return {};
}
auto& srs = af == AF_INET ? searches4 : searches6;
const auto& srp = srs.find(id);
std::shared_ptr<Search> sr {};
if (srp != srs.end()) {
sr = srp->second;
sr->done = false;
sr->expired = false;
} else {
if (searches4.size() + searches6.size() < MAX_SEARCHES) {
sr = std::make_shared<Search>();
srs.emplace(id, sr);
} else {
for (auto it = srs.begin(); it!=srs.end();) {
auto& s = *it->second;
if ((s.done or s.expired) and s.announce.empty() and s.listeners.empty()) {
sr = it->second;
break;
}
}
if (not sr)
throw DhtException("Can't create search");
}
sr->af = af;
sr->tid = search_id++;
sr->step_time = TIME_INVALID;
sr->id = id;
sr->done = false;
sr->expired = false;
sr->nodes.clear();
sr->nodes.reserve(SEARCH_NODES+1);
DHT_LOG.WARN("[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
if (search_id == 0)
search_id++;
}
if (callback)
sr->callbacks.push_back({.start=scheduler.time(), .filter=filter, .get_cb=callback, .done_cb=done_callback});
bootstrapSearch(*sr);
if (sr->nextSearchStep)
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, scheduler.time()));
else
sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr));
return sr;
}
void
Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created)
{
const auto& now = scheduler.time();
if (!value) {
if (callback)
callback(false, {});
return;
}
auto& srs = af == AF_INET ? searches4 : searches6;
auto srp = srs.find(id);
auto sr = srp == srs.end() ? search(id, af, nullptr, nullptr) : srp->second;
if (!sr) {
if (callback)
callback(false, {});
return;
}
sr->done = false;
sr->expired = false;
auto a_sr = std::find_if(sr->announce.begin(), sr->announce.end(), [&](const Announce& a){
return a.value->id == value->id;
});
if (a_sr == sr->announce.end()) {
sr->announce.emplace_back(Announce {value, std::min(now, created), callback});
for (auto& n : sr->nodes)
n.acked[value->id] = {};
}
else {
if (a_sr->value != value) {
a_sr->value = value;
for (auto& n : sr->nodes)
n.acked[value->id] = {};
}
if (sr->isAnnounced(value->id, getType(value->type), now)) {
if (a_sr->callback)
a_sr->callback(true, {});
a_sr->callback = {};
if (callback) {
callback(true, {});
}
return;
} else {
if (a_sr->callback)
a_sr->callback(false, {});
a_sr->callback = callback;
}
}
scheduler.edit(sr->nextSearchStep, scheduler.time());
//TODO
//if (tm < search_time) {
// DHT_LOG.ERROR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(),l
// (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
// search_time = tm;
//}
}
size_t
Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f)
{
const auto& now = scheduler.time();
if (!isRunning(af))
return 0;
// DHT_LOG.ERROR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
//DHT_LOG.WARN("listenTo %s", id.toString().c_str());
auto& srs = af == AF_INET ? searches4 : searches6;
auto srp = srs.find(id);
std::shared_ptr<Search> sr = (srp == srs.end()) ? search(id, af, nullptr, nullptr) : srp->second;
if (!sr)
throw DhtException("Can't create search");
DHT_LOG.ERROR("[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
sr->done = false;
auto token = ++sr->listener_token;
sr->listeners.emplace(token, LocalListener{f, cb});
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now));
return token;
}
size_t
Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f)
{
scheduler.syncTime();
auto vals = std::make_shared<std::map<Value::Id, std::shared_ptr<Value>>>();
auto token = ++listener_token;
auto gcb = [=](const std::vector<std::shared_ptr<Value>>& values) {
std::vector<std::shared_ptr<Value>> newvals {};
for (const auto& v : values) {
auto it = vals->find(v->id);
if (it == vals->cend() || !(*it->second == *v))
newvals.push_back(v);
}
if (!newvals.empty()) {
if (!cb(newvals)) {
cancelListen(id, token);
return false;
}
for (const auto& v : newvals) {
auto it = vals->emplace(v->id, v);
if (not it.second)
it.first->second = v;
}
}
return true;
};
auto st = findStorage(id);
size_t tokenlocal = 0;
if (st == store.end() && store.size() < MAX_HASHES) {
store.emplace_back(id, scheduler.time());
st = std::prev(store.end());
}
if (st != store.end()) {
if (not st->empty()) {
std::vector<std::shared_ptr<Value>> newvals = st->get(f);
if (not newvals.empty()) {
if (!cb(newvals))
return 0;
for (const auto& v : newvals) {
auto it = vals->emplace(v->id, v);
if (not it.second)
it.first->second = v;
}
}
}
tokenlocal = ++st->listener_token;
st->local_listeners.emplace(tokenlocal, LocalListener{f, gcb});
}
auto token4 = Dht::listenTo(id, AF_INET, gcb, f);
auto token6 = Dht::listenTo(id, AF_INET6, gcb, f);
DHT_LOG.DEBUG("Added listen : %d -> %d %d %d", token, tokenlocal, token4, token6);
listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6));
return token;
}
bool
Dht::cancelListen(const InfoHash& id, size_t token)
{
scheduler.syncTime();
auto it = listeners.find(token);
if (it == listeners.end()) {
DHT_LOG.WARN("Listen token not found: %d", token);
return false;
}
DHT_LOG.DEBUG("cancelListen %s with token %d", id.toString().c_str(), token);
auto st = findStorage(id);
auto tokenlocal = std::get<0>(it->second);
if (st != store.end() && tokenlocal)
st->local_listeners.erase(tokenlocal);
auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& sp : srs) {
auto& s = sp.second;
if (s->id != id) continue;
auto af_token = s->af == AF_INET ? std::get<1>(it->second) : std::get<2>(it->second);
if (af_token == 0)
continue;
s->listeners.erase(af_token);
for (auto& sn : s->nodes) // also erase requests for all searchnodes.
network_engine.cancelRequest(sn.listenStatus);
}
};
searches_cancel_listen(searches4);
searches_cancel_listen(searches6);
listeners.erase(it);
return true;
}
void
Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, time_point created)
{
scheduler.syncTime();
if (val->id == Value::INVALID_ID) {
crypto::random_device rdev;
std::uniform_int_distribution<Value::Id> rand_id {};
val->id = rand_id(rdev);
}
DHT_LOG.DEBUG("put: adding %s -> %s", id.toString().c_str(), val->toString().c_str());
auto ok = std::make_shared<bool>(false);
auto done = std::make_shared<bool>(false);
auto done4 = std::make_shared<bool>(false);
auto done6 = std::make_shared<bool>(false);
auto donecb = [=](const std::vector<std::shared_ptr<Node>>& nodes) {
// Callback as soon as the value is announced on one of the available networks
if (callback && !*done && (*done4 && *done6)) {
callback(*ok, nodes);
*done = true;
}
};
announce(id, AF_INET, val, [=](bool ok4, const std::vector<std::shared_ptr<Node>>& nodes) {
DHT_LOG.DEBUG("Announce done IPv4 %d", ok4);
*done4 = true;
*ok |= ok4;
donecb(nodes);
}, created);
announce(id, AF_INET6, val, [=](bool ok6, const std::vector<std::shared_ptr<Node>>& nodes) {
DHT_LOG.DEBUG("Announce done IPv6 %d", ok6);
*done6 = true;
*ok |= ok6;
donecb(nodes);
}, created);
}
struct OpStatus {
bool done {false};
bool ok {false};
};
void
Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filter&& filter)
{
scheduler.syncTime();
auto status = std::make_shared<OpStatus>();
auto status4 = std::make_shared<OpStatus>();
auto status6 = std::make_shared<OpStatus>();
auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>();
auto all_nodes = std::make_shared<std::vector<std::shared_ptr<Node>>>();
auto done_l = [=](const std::vector<std::shared_ptr<Node>>& nodes) {
if (status->done)
return;
all_nodes->insert(all_nodes->end(), nodes.begin(), nodes.end());
if (status->ok || (status4->done && status6->done)) {
bool ok = status->ok || status4->ok || status6->ok;
status->done = true;
if (donecb)
donecb(ok, *all_nodes);
}
};
auto cb = [=](const std::vector<std::shared_ptr<Value>>& values) {
if (status->done)
return false;
std::vector<std::shared_ptr<Value>> newvals {};
for (const auto& v : values) {
auto it = std::find_if(vals->cbegin(), vals->cend(), [&](const std::shared_ptr<Value>& sv) {
return sv == v || *sv == *v;
});
if (it == vals->cend()) {
if (!filter || filter(*v))
newvals.push_back(v);
}
}
if (!newvals.empty()) {
status->ok = !getcb(newvals);
vals->insert(vals->end(), newvals.begin(), newvals.end());
}
done_l({});
return !status->ok;
};
/* Try to answer this search locally. */
cb(getLocal(id, filter));
Dht::search(id, AF_INET, cb, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) {
//DHT_LOG.WARN("DHT done IPv4");
status4->done = true;
status4->ok = ok;
done_l(nodes);
});
Dht::search(id, AF_INET6, cb, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) {
//DHT_LOG.WARN("DHT done IPv6");
status6->done = true;
status6->ok = ok;
done_l(nodes);
});
}
std::vector<std::shared_ptr<Value>>
Dht::getLocal(const InfoHash& id, Value::Filter f) const
{
auto s = findStorage(id);
if (s == store.end()) return {};
return s->get(f);
}
std::shared_ptr<Value>
Dht::getLocalById(const InfoHash& id, Value::Id vid) const
{
auto s = findStorage(id);
if (s != store.end())
return s->getById(vid);
return {};
}
std::vector<std::shared_ptr<Value>>
Dht::getPut(const InfoHash& id)
{
std::vector<std::shared_ptr<Value>> ret;
auto find_values = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
auto srp = srs.find(id);
if (srp == srs.end())
return;
auto& search = srp->second;
ret.reserve(ret.size() + search->announce.size());
for (const auto& a : search->announce)
ret.push_back(a.value);
};
find_values(searches4);
find_values(searches6);
return ret;
}
std::shared_ptr<Value>
Dht::getPut(const InfoHash& id, const Value::Id& vid)
{
auto find_value = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
auto srp = srs.find(id);
if (srp == srs.end())
return std::shared_ptr<Value> {};
auto& search = srp->second;
for (auto& a : search->announce) {
if (a.value->id == vid)
return a.value;
}
return std::shared_ptr<Value> {};
};
auto v4 = find_value(searches4);
if (v4) return v4;
auto v6 = find_value(searches6);
if (v6) return v6;
return {};
}
bool
Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
{
bool canceled {false};
auto sr_cancel_put = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
auto srp = srs.find(id);
if (srp == srs.end())
return;
auto& sr = srp->second;
for (auto it = sr->announce.begin(); it != sr->announce.end();) {
if (it->value->id == vid) {
canceled = true;
it = sr->announce.erase(it);
}
else
++it;
}
};
sr_cancel_put(searches4);
sr_cancel_put(searches6);
return canceled;
}
// Storage
decltype(Dht::store)::iterator
Dht::findStorage(const InfoHash& id)
{
return std::find_if(store.begin(), store.end(), [&](const Storage& st) {
return st.id == id;
});
}
decltype(Dht::store)::const_iterator
Dht::findStorage(const InfoHash& id) const
{
return std::find_if(store.cbegin(), store.cend(), [&](const Storage& st) {
return st.id == id;
});
}
void
Dht::storageChanged(Storage& st, ValueStorage& v)
{
const auto& now = scheduler.time();
if (not st.local_listeners.empty()) {
std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> cbs;
DHT_LOG.DEBUG("Storage changed. Sending update to %lu local listeners.", st.local_listeners.size());
for (const auto& l : st.local_listeners) {
std::vector<std::shared_ptr<Value>> vals;
if (not l.second.filter or l.second.filter(*v.data))
vals.push_back(v.data);
if (not vals.empty())
cbs.emplace_back(l.second.get_cb, std::move(vals));
}
// listeners are copied: they may be deleted by the callback
for (auto& cb : cbs)
cb.first(cb.second);
}
for (const auto& l : st.listeners) {
DHT_LOG.DEBUG("Storage changed. Sending update to %s.", l.first->toString().c_str());
std::vector<std::shared_ptr<Value>> vals;
vals.push_back(v.data);
Blob ntoken = makeToken((const sockaddr*)&l.first->ss, false);
network_engine.tellListener(l.first, l.second.rid, st.id, WANT4 | WANT6, ntoken,
buckets.findClosestNodes(st.id, now, TARGET_NODES), buckets6.findClosestNodes(st.id, now, TARGET_NODES),
vals);
}
}
bool
Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created)
{
const auto& now = scheduler.time();
created = std::min(created, now);
auto st = findStorage(id);
if (st == store.end()) {
if (store.size() >= MAX_HASHES)
return false;
store.emplace_back(id, now);
st = std::prev(store.end());
}
auto store = st->store(value, created, max_store_size - total_store_size);
if (std::get<0>(store)) {
total_store_size += std::get<1>(store);
total_values += std::get<2>(store);
storageChanged(*st, *std::get<0>(store));
}
return std::get<0>(store);
}
std::tuple<Dht::ValueStorage*, ssize_t, ssize_t>
Dht::Storage::store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left) {
auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) {
return vr.data == value || vr.data->id == value->id;
});
if (it != values.end()) {
/* Already there, only need to refresh */
it->time = created;
ssize_t size_diff = value->size() - it->data->size();
if (size_diff <= size_left and it->data != value) {
//DHT_LOG.DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str());
it->data = value;
total_size += size_diff;
return std::make_tuple(&(*it), size_diff, 0);
}
return std::make_tuple(nullptr, 0, 0);
} else {
//DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str());
ssize_t size = value->size();
if (size <= size_left and values.size() < MAX_VALUES) {
total_size += size;
values.emplace_back(value, created);
return std::make_tuple(&values.back(), size, 1);
}
return std::make_tuple(nullptr, 0, 0);
}
}
void
Dht::Storage::clear()
{
values.clear();
total_size = 0;
}
void
Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t rid)
{
const auto& now = scheduler.time();
auto st = findStorage(id);
if (st == store.end()) {
if (store.size() >= MAX_HASHES)
return;
store.emplace_back(id, now);
st = std::prev(store.end());
}
auto l = st->listeners.find(node);
if (l == st->listeners.end()) {
const auto& stvalues = st->getValues();
if (not stvalues.empty()) {
std::vector<std::shared_ptr<Value>> values(stvalues.size());
std::transform(stvalues.begin(), stvalues.end(), values.begin(), [=](const ValueStorage& vs) { return vs.data; });
network_engine.tellListener(node, rid, id, WANT4 | WANT6, makeToken((sockaddr*)&node->ss, false),
buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
values);
}
st->listeners.emplace(node, Listener {rid, now});
}
else
l->second.refresh(rid, now);
}
void
Dht::expireStorage()
{
const auto& now = scheduler.time();
auto i = store.begin();
while (i != store.end()) {
for (auto l = i->listeners.cbegin(); l != i->listeners.cend();){
bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now;
if (expired) {
DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str());
i->listeners.erase(l++);
} else
++l;
}
auto stats = i->expire(types, now);
total_store_size += stats.first;
total_values += stats.second;
if (i->empty() && i->listeners.empty() && i->local_listeners.empty()) {
DHT_LOG.DEBUG("Discarding expired value %s", i->id.toString().c_str());
i = store.erase(i);
}
else
++i;
}
}
std::pair<ssize_t, ssize_t>
Dht::Storage::expire(const std::map<ValueType::Id, ValueType>& types, time_point now)
{
auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) {
if (!v.data) return false; // should not happen
auto type_it = types.find(v.data->type);
const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second;
bool expired = v.time + type.expiration < now;
//if (expired)
// DHT_LOG.DEBUG("Discarding expired value %s", v.data->toString().c_str());
return !expired;
});
ssize_t del_num = std::distance(r, values.end());
ssize_t size_diff {};
std::for_each(r, values.end(), [&](const ValueStorage& v){
size_diff -= v.data->size();
});
total_size += size_diff;
values.erase(r, values.end());
return {size_diff, -del_num};
}
void
Dht::connectivityChanged()
{
const auto& now = scheduler.time();
scheduler.edit(nextNodesConfirmation, now);
mybucket_grow_time = now;
mybucket6_grow_time = now;
reported_addr.clear();
network_engine.connectivityChanged();
auto stop_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& sp : srs)
for (auto& sn : sp.second->nodes)
sn.listenStatus = {};
};
stop_listen(searches4);
stop_listen(searches6);
}
void
Dht::rotateSecrets()
{
const auto& now = scheduler.time();
uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45));
auto rotate_secrets_time = now + time_dist(rd);
oldsecret = secret;
{
crypto::random_device rdev;
std::generate_n(secret.begin(), secret.size(), std::bind(rand_byte, std::ref(rdev)));
}
scheduler.add(rotate_secrets_time, std::bind(&Dht::rotateSecrets, this));
}
Blob
Dht::makeToken(const sockaddr *sa, bool old) const
{
void *ip;
size_t iplen;
in_port_t port;
if (sa->sa_family == AF_INET) {
sockaddr_in *sin = (sockaddr_in*)sa;
ip = &sin->sin_addr;
iplen = 4;
port = htons(sin->sin_port);
} else if (sa->sa_family == AF_INET6) {
sockaddr_in6 *sin6 = (sockaddr_in6*)sa;
ip = &sin6->sin6_addr;
iplen = 16;
port = htons(sin6->sin6_port);
} else {
return {};
}
const auto& c1 = old ? oldsecret : secret;
Blob data;
data.reserve(sizeof(secret)+2+iplen);
data.insert(data.end(), c1.begin(), c1.end());
data.insert(data.end(), (uint8_t*)ip, (uint8_t*)ip+iplen);
data.insert(data.end(), (uint8_t*)&port, ((uint8_t*)&port)+2);
size_t sz = TOKEN_SIZE;
Blob ret {};
ret.resize(sz);
gnutls_datum_t gnudata = {data.data(), (unsigned int)data.size()};
if (gnutls_fingerprint(GNUTLS_DIG_SHA512, &gnudata, ret.data(), &sz) != GNUTLS_E_SUCCESS)
throw DhtException("Can't compute SHA512");
ret.resize(sz);
return ret;
}
bool
Dht::tokenMatch(const Blob& token, const sockaddr *sa) const
{
if (!sa || token.size() != TOKEN_SIZE)
return false;
if (token == makeToken(sa, false))
return true;
if (token == makeToken(sa, true))
return true;
return false;
}
int
Dht::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const
{
const auto& now = scheduler.time();
unsigned good = 0, dubious = 0, cached = 0, incoming = 0;
auto& list = (af == AF_INET) ? buckets : buckets6;
for (const auto& b : list) {
for (auto& n : b.nodes) {
if (n->isGood(now)) {
good++;
if (n->time > n->reply_time)
incoming++;
} else {
dubious++;
}
}
if (b.cached)
cached++;
}
if (good_return)
*good_return = good;
if (dubious_return)
*dubious_return = dubious;
if (cached_return)
*cached_return = cached;
if (incoming_return)
*incoming_return = incoming;
return good + dubious;
}
void
Dht::dumpBucket(const Bucket& b, std::ostream& out) const
{
const auto& now = scheduler.time();
using namespace std::chrono;
out << b.first << " count " << b.nodes.size() << " age " << duration_cast<seconds>(now - b.time).count() << " sec";
if (b.cached)
out << " (cached)";
out << std::endl;
for (auto& n : b.nodes) {
out << " Node " << n->toString();
if (n->time != n->reply_time)
out << " age " << duration_cast<seconds>(now - n->time).count() << ", reply: " << duration_cast<seconds>(now - n->reply_time).count();
else
out << " age " << duration_cast<seconds>(now - n->time).count();
if (n->isExpired())
out << " [expired]";
else if (n->isGood(now))
out << " [good]";
out << std::endl;
}
}
void
Dht::dumpSearch(const Search& sr, std::ostream& out) const
{
const auto& now = scheduler.time();
using namespace std::chrono;
out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " gets: " << sr.callbacks.size();
out << ", age: " << duration_cast<seconds>(now - sr.step_time).count() << " s";
if (sr.done)
out << " [done]";
if (sr.expired)
out << " [expired]";
bool synced = sr.isSynced(now);
out << (synced ? " [synced]" : " [not synced]");
if (synced && sr.isListening(now)) {
auto lt = sr.getListenTime(now);
out << " [listening, next in " << duration_cast<seconds>(lt-now).count() << " s]";
}
out << std::endl;
for (const auto& n : sr.announce) {
bool announced = sr.isAnnounced(n.value->id, getType(n.value->type), now);
out << "Announcement: " << *n.value << (announced ? " [announced]" : "") << std::endl;
}
out << " Common bits InfoHash Conn. Get Ops IP" << std::endl;
unsigned i = 0;
auto last_get = sr.getLastGetTime();
for (const auto& n : sr.nodes) {
i++;
out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id;
out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' ');
out << " [";
if (auto pendingCount = n.node->getPendingMessageCount())
out << pendingCount;
else
out << ' ';
out << (n.node->isExpired() ? 'x' : ' ') << "]";
// Get status
{
char g_i = (n.getStatus && n.getStatus->pending()) ? (n.candidate ? 'c' : 'f') : ' ';
char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-';
out << " [" << s_i << g_i << "] ";
}
// Listen status
if (not sr.listeners.empty()) {
if (not n.listenStatus)
out << " ";
else
out << "["
<< (n.isListening(now) ? 'l' : (n.listenStatus->pending() ? 'f' : ' ')) << "] ";
}
// Announce status
if (not sr.announce.empty()) {
if (n.acked.empty()) {
out << " ";
for (size_t a=0; a < sr.announce.size(); a++)
out << ' ';
} else {
out << "[";
for (const auto& a : sr.announce) {
auto ack = n.acked.find(a.value->id);
if (ack == n.acked.end() or not ack->second) {
out << ' ';
} else {
if (ack->second->reply_time + getType(a.value->type).expiration > now)
out << 'a';
else if (ack->second->pending())
out << 'f';
}
}
out << "] ";
}
}
out << print_addr(n.node->ss, n.node->sslen);
out << std::endl;
}
}
void
Dht::dumpTables() const
{
std::stringstream out;
out << "My id " << myid << std::endl;
out << "Buckets IPv4 :" << std::endl;
for (const auto& b : buckets)
dumpBucket(b, out);
out << "Buckets IPv6 :" << std::endl;
for (const auto& b : buckets6)
dumpBucket(b, out);
auto dump_searches = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& srp : srs)
dumpSearch(*srp.second, out);
};
dump_searches(searches4);
dump_searches(searches6);
out << std::endl;
out << getStorageLog() << std::endl;
DHT_LOG.DEBUG("%s", out.str().c_str());
}
std::string
Dht::getStorageLog() const
{
const auto& now = scheduler.time();
using namespace std::chrono;
std::stringstream out;
for (const auto& st : store) {
out << "Storage " << st.id << " " << st.listeners.size() << " list., " << st.valueCount() << " values (" << st.totalSize() << " bytes)" << std::endl;
if (not st.local_listeners.empty())
out << " " << st.local_listeners.size() << " local listeners" << std::endl;
for (const auto& l : st.listeners) {
out << " " << "Listener " << l.first->toString();
auto since = duration_cast<seconds>(now - l.second.time);
auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now);
out << " (since " << since.count() << "s, exp in " << expires.count() << "s)" << std::endl;
}
}
out << "Total " << store.size() << " storages, " << total_values << " values (" << (total_store_size/1024) << " ĶB)" << std::endl;
return out.str();
}
std::string
Dht::getRoutingTablesLog(sa_family_t af) const
{
auto& list = (af == AF_INET) ? buckets : buckets6;
std::stringstream out;
for (const auto& b : list)
dumpBucket(b, out);
return out.str();
}
std::string
Dht::getSearchesLog(sa_family_t af) const
{
std::stringstream out;
out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl;
if (not af or af == AF_INET)
for (const auto& sr : searches4)
dumpSearch(*sr.second, out);
if (not af or af == AF_INET6)
for (const auto& sr : searches6)
dumpSearch(*sr.second, out);
return out.str();
}
Dht::~Dht()
{
for (auto& s : searches4)
s.second->clear();
for (auto& s : searches6)
s.second->clear();
}
Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {}
Dht::Dht(int s, int s6, Config config)
: myid(config.node_id), is_bootstrap(config.is_bootstrap), store(),
network_engine(myid, s, s6, DHT_LOG, scheduler,
std::bind(&Dht::onError, this, _1, _2),
std::bind(&Dht::newNode, this, _1, _2),
std::bind(&Dht::onReportedAddr, this, _1, _2, _3),
std::bind(&Dht::onPing, this, _1),
std::bind(&Dht::onFindNode, this, _1, _2, _3),
std::bind(&Dht::onGetValues, this, _1, _2, _3),
std::bind(&Dht::onListen, this, _1, _2, _3, _4),
std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5))
{
scheduler.syncTime();
if (s < 0 && s6 < 0)
return;
if (s >= 0) {
buckets = {Bucket {AF_INET}};
if (!set_nonblocking(s, 1))
throw DhtException("Can't set socket to non-blocking mode");
}
if (s6 >= 0) {
buckets6 = {Bucket {AF_INET6}};
if (!set_nonblocking(s6, 1))
throw DhtException("Can't set socket to non-blocking mode");
}
search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd);
uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)};
auto confirm_nodes_time = scheduler.time() + time_dis(rd);
DHT_LOG.DEBUG("Scheduling %s", myid.toString().c_str());
nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this));
// Fill old secret
{
crypto::random_device rdev;
std::generate_n(secret.begin(), secret.size(), std::bind(rand_byte, std::ref(rdev)));
}
rotateSecrets();
expire();
DHT_LOG.DEBUG("DHT initialised with node ID %s", myid.toString().c_str());
}
bool
Dht::neighbourhoodMaintenance(RoutingTable& list)
{
//DHT_LOG.DEBUG("neighbourhoodMaintenance");
auto b = list.findBucket(myid);
if (b == list.end())
return false;
InfoHash id = myid;
id[HASH_LEN-1] = rand_byte(rd);
std::bernoulli_distribution rand_trial(1./8.);
auto q = b;
if (std::next(q) != list.end() && (q->nodes.empty() || rand_trial(rd)))
q = std::next(q);
if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
auto r = std::prev(b);
if (!r->nodes.empty())
q = r;
}
/* Since our node-id is the same in both DHTs, it's probably
profitable to query both families. */
auto n = q->randomNode();
if (n) {
DHT_LOG.DEBUG("[find %s IPv%c] sending find for neighborhood maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4');
network_engine.sendFindNode(n, id, network_engine.want(), nullptr, nullptr);
}
return true;
}
bool
Dht::bucketMaintenance(RoutingTable& list)
{
std::bernoulli_distribution rand_trial(1./8.);
std::bernoulli_distribution rand_trial_38(1./38.);
for (auto b = list.begin(); b != list.end(); ++b) {
if (b->time < scheduler.time() - std::chrono::minutes(10) || b->nodes.empty()) {
/* This bucket hasn't seen any positive confirmation for a long
time. Pick a random id in this bucket's range, and send
a request to a random node. */
InfoHash id = list.randomId(b);
auto q = b;
/* If the bucket is empty, we try to fill it from a neighbour.
We also sometimes do it gratuitiously to recover from
buckets full of broken nodes. */
if (std::next(b) != list.end() && (q->nodes.empty() || rand_trial(rd)))
q = std::next(b);
if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
auto r = std::prev(b);
if (!r->nodes.empty())
q = r;
}
auto n = q->randomNode();
if (n) {
want_t want = -1;
if (network_engine.want() != want) {
auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET);
if (otherbucket && otherbucket->nodes.size() < TARGET_NODES)
/* The corresponding bucket in the other family
is emptyish -- querying both is useful. */
want = WANT4 | WANT6;
else if (rand_trial_38(rd))
/* Most of the time, this just adds overhead.
However, it might help stitch back one of
the DHTs after a network collapse, so query
both, but only very occasionally. */
want = WANT4 | WANT6;
}
DHT_LOG.DEBUG("[find %s IPv%c] sending for bucket maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4');
network_engine.sendFindNode(n, id, want, nullptr, nullptr);
/* In order to avoid sending queries back-to-back,
give up for now and reschedule us soon. */
return true;
}
}
}
return false;
}
void
Dht::dataPersistence() {
const auto& now = scheduler.time();
auto storage_maintenance_time = time_point::max();
for (auto &str : store) {
if (now > str.maintenance_time) {
maintainStorage(str.id);
str.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
}
storage_maintenance_time = std::min(storage_maintenance_time, str.maintenance_time);
}
scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this));
}
size_t
Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) {
const auto& now = scheduler.time();
size_t announce_per_af = 0;
auto local_storage = findStorage(id);
if (local_storage == store.end()) { return 0; }
bool want4 = true, want6 = true;
auto nodes = buckets.findClosestNodes(id, now);
if (!nodes.empty()) {
if (force || id.xorCmp(nodes.back()->id, myid) < 0) {
for (auto &value : local_storage->getValues()) {
const auto& vt = getType(value.data->type);
if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there
announce(id, AF_INET, value.data, donecb, value.time);
++announce_per_af;
}
}
want4 = false;
}
}
else { want4 = false; }
auto nodes6 = buckets6.findClosestNodes(id, now);
if (!nodes6.empty()) {
if (force || id.xorCmp(nodes6.back()->id, myid) < 0) {
for (auto &value : local_storage->getValues()) {
const auto& vt = getType(value.data->type);
if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there
announce(id, AF_INET6, value.data, donecb, value.time);
++announce_per_af;
}
}
want6 = false;
}
}
else { want6 = false; }
if (not want4 and not want6) {
DHT_LOG.DEBUG("Discarding storage values %s", id.toString().c_str());
local_storage->clear();
}
return announce_per_af;
}
void
Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen)
{
if (buflen == 0)
return;
try {
network_engine.processMessage(buf, buflen, from, fromlen);
} catch (const std::exception& e) {
DHT_LOG.ERROR("Can't parse message from %s: %s", print_addr(from, fromlen).c_str(), e.what());
//auto code = e.getCode();
//if (code == DhtProtocolException::INVALID_TID_SIZE or code == DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) {
/* This is really annoying, as it means that we will
time-out all our searches that go through this node.
Kill it. */
//const auto& id = e.getNodeId();
//blacklistNode(&id, from, fromlen);
///}
}
}
time_point
Dht::periodic(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen)
{
scheduler.syncTime();
processMessage(buf, buflen, from, fromlen);
return scheduler.run();
}
void
Dht::expire()
{
uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));
expireBuckets(buckets);
expireBuckets(buckets6);
expireStorage();
expireSearches();
scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this));
}
void
Dht::confirmNodes()
{
using namespace std::chrono;
bool soon = false;
const auto& now = scheduler.time();
if (searches4.empty() and getStatus(AF_INET) != NodeStatus::Disconnected) {
DHT_LOG.DEBUG("[confirm nodes] initial IPv4 'get' for my id (%s).", myid.toString().c_str());
search(myid, AF_INET);
}
if (searches6.empty() and getStatus(AF_INET6) != NodeStatus::Disconnected) {
DHT_LOG.DEBUG("[confirm nodes] initial IPv6 'get' for my id (%s).", myid.toString().c_str());
search(myid, AF_INET6);
}
soon |= bucketMaintenance(buckets);
soon |= bucketMaintenance(buckets6);
if (!soon) {
if (mybucket_grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(buckets);
if (mybucket6_grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(buckets6);
}
/* In order to maintain all buckets' age within 600 seconds, worst
case is roughly 27 seconds, assuming the table is 22 bits deep.
We want to keep a margin for neighborhood maintenance, so keep
this within 25 seconds. */
auto time_dis = soon ?
uniform_duration_distribution<> {seconds(5) , seconds(25)}
: uniform_duration_distribution<> {seconds(60), seconds(180)};
auto confirm_nodes_time = now + time_dis(rd);
nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this));
}
std::vector<ValuesExport>
Dht::exportValues() const
{
std::vector<ValuesExport> e {};
e.reserve(store.size());
for (const auto& h : store) {
ValuesExport ve;
ve.first = h.id;
msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_array(h.getValues().size());
for (const auto& v : h.getValues()) {
pk.pack_array(2);
pk.pack(v.time.time_since_epoch().count());
v.data->msgpack_pack(pk);
}
ve.second = {buffer.data(), buffer.data()+buffer.size()};
e.push_back(std::move(ve));
}
return e;
}
void
Dht::importValues(const std::vector<ValuesExport>& import)
{
for (const auto& h : import) {
if (h.second.empty())
continue;
try {
msgpack::unpacked msg;
msgpack::unpack(msg, (const char*)h.second.data(), h.second.size());
auto valarr = msg.get();
if (valarr.type != msgpack::type::ARRAY)
throw msgpack::type_error();
for (unsigned i = 0; i < valarr.via.array.size; i++) {
auto& valel = valarr.via.array.ptr[i];
if (valel.via.array.size < 2)
throw msgpack::type_error();
time_point val_time;
Value tmp_val;
try {
val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}};
tmp_val.msgpack_unpack(valel.via.array.ptr[1]);
} catch (const std::exception&) {
DHT_LOG.ERROR("Error reading value at %s", h.first.toString().c_str());
continue;
}
if (val_time + getType(tmp_val.type).expiration < scheduler.time()) {
DHT_LOG.DEBUG("Discarding expired value at %s", h.first.toString().c_str());
continue;
}
storageStore(h.first, std::make_shared<Value>(std::move(tmp_val)), val_time);
}
} catch (const std::exception&) {
DHT_LOG.ERROR("Error reading values at %s", h.first.toString().c_str());
continue;
}
}
}
std::vector<NodeExport>
Dht::exportNodes()
{
const auto& now = scheduler.time();
std::vector<NodeExport> nodes;
const auto b4 = buckets.findBucket(myid);
if (b4 != buckets.end()) {
for (auto& n : b4->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
const auto b6 = buckets6.findBucket(myid);
if (b6 != buckets6.end()) {
for (auto& n : b6->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = buckets.begin(); b != buckets.end(); ++b) {
if (b == b4) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = buckets6.begin(); b != buckets6.end(); ++b) {
if (b == b6) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
return nodes;
}
bool
Dht::insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen)
{
if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6)
return false;
scheduler.syncTime();
auto n = network_engine.insertNode(id, sa, salen);
return !!n;
}
int
Dht::pingNode(const sockaddr *sa, socklen_t salen)
{
scheduler.syncTime();
DHT_LOG.DEBUG("Sending ping to %s", print_addr(sa, salen).c_str());
network_engine.sendPing(sa, salen, nullptr, nullptr);
return -1;
}
void
Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) {
if (e.getCode() == DhtProtocolException::UNAUTHORIZED) {
network_engine.cancelRequest(req);
unsigned cleared = 0;
for (auto& srp : req->node->getFamily() == AF_INET ? searches4 : searches6) {
auto& sr = srp.second;
for (auto& n : sr->nodes) {
if (n.node != req->node) continue;
n.token.clear();
n.last_get_reply = time_point::min();
cleared++;
searchSendGetValues(sr);
break;
}
}
DHT_LOG.WARN("[node %s] token flush (%d searches affected)", req->node->toString().c_str(), cleared);
}
}
void
Dht::onReportedAddr(const InfoHash& id, sockaddr* addr , socklen_t addr_length)
{
const auto& b = (addr->sa_family == AF_INET ? buckets : buckets6).findBucket(id);
b->time = scheduler.time();
if (addr and addr_length)
reportedAddr(addr, addr_length);
}
NetworkEngine::RequestAnswer
Dht::onPing(std::shared_ptr<Node>)
{
return {};
}
NetworkEngine::RequestAnswer
Dht::onFindNode(std::shared_ptr<Node> node, InfoHash& target, want_t want)
{
const auto& now = scheduler.time();
NetworkEngine::RequestAnswer answer;
answer.ntoken = makeToken((sockaddr*)&node->ss, false);
if (want & WANT4)
answer.nodes4 = buckets.findClosestNodes(target, now, TARGET_NODES);
if (want & WANT6)
answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES);
return answer;
}
NetworkEngine::RequestAnswer
Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t)
{
if (hash == zeroes) {
DHT_LOG.WARN("[node %s] Eek! Got get_values with no info_hash.", node->toString().c_str());
throw DhtProtocolException {DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, DhtProtocolException::GET_NO_INFOHASH};
}
const auto& now = scheduler.time();
NetworkEngine::RequestAnswer answer {};
auto st = findStorage(hash);
answer.ntoken = makeToken((sockaddr*)&node->ss, false);
answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES);
answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES);
if (st != store.end() && not st->empty()) {
auto values = st->getValues();
answer.values.resize(values.size());
std::transform(values.begin(), values.end(), answer.values.begin(), [](const ValueStorage& vs) {
return vs.data;
});
DHT_LOG.DEBUG("[node %s] sending %u values.", node->toString().c_str(), answer.values.size());
} else {
DHT_LOG.DEBUG("[node %s] sending nodes.", node->toString().c_str());
}
return answer;
}
void
Dht::onGetValuesDone(const Request& status,
NetworkEngine::RequestAnswer& a, std::shared_ptr<Search> sr)
{
if (not sr) {
DHT_LOG.WARN("[search unknown] got reply to 'get'. Ignoring.");
return;
}
DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s with %u nodes", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status.node->toString().c_str(), a.nodes4.size());
if (not a.ntoken.empty()) {
if (!a.values.empty()) {
DHT_LOG.DEBUG("[search %s IPv%c] found %u values",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
a.values.size());
for (auto& cb : sr->callbacks) {
if (!cb.get_cb) continue;
std::vector<std::shared_ptr<Value>> tmp;
std::copy_if(a.values.begin(), a.values.end(), std::back_inserter(tmp),
[&](const std::shared_ptr<Value>& v) {
return not static_cast<bool>(cb.filter) or cb.filter(*v);
}
);
if (not tmp.empty())
cb.get_cb(tmp);
}
std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> tmp_lists;
for (auto& l : sr->listeners) {
if (!l.second.get_cb) continue;
std::vector<std::shared_ptr<Value>> tmp;
std::copy_if(a.values.begin(), a.values.end(), std::back_inserter(tmp),
[&](const std::shared_ptr<Value>& v) {
return not static_cast<bool>(l.second.filter) or l.second.filter(*v);
}
);
if (not tmp.empty())
tmp_lists.emplace_back(l.second.get_cb, tmp);
}
for (auto& l : tmp_lists)
l.first(l.second);
}
} else {
DHT_LOG.WARN("[node %s] no token provided. Ignoring response content.", status.node->toString().c_str());
network_engine.blacklistNode(status.node);
}
if (not sr->done) {
searchSendGetValues(sr);
// Force to recompute the next step time
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
}
NetworkEngine::RequestAnswer
Dht::onListen(std::shared_ptr<Node> node, InfoHash& hash, Blob& token, size_t rid)
{
if (hash == zeroes) {
DHT_LOG.WARN("Listen with no info_hash.");
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::LISTEN_NO_INFOHASH
};
}
if (!tokenMatch(token, (sockaddr*)&node->ss)) {
DHT_LOG.WARN("[node %s] incorrect token %s for 'listen'.", node->toString().c_str(), hash.toString().c_str());
throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::LISTEN_WRONG_TOKEN};
}
storageAddListener(hash, node, rid);
return {};
}
void
Dht::onListenDone(const Request& status, NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr)
{
DHT_LOG.DEBUG("[search %s] Got reply to listen.", sr->id.toString().c_str());
if (sr) {
if (not answer.values.empty()) { /* got new values from listen request */
DHT_LOG.DEBUG("[listen %s] Got new values.", sr->id.toString().c_str());
onGetValuesDone(status, answer, sr);
}
if (not sr->done) {
const auto& now = scheduler.time();
searchSendGetValues(sr);
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now));
}
} else
DHT_LOG.DEBUG("Unknown search or announce!");
}
NetworkEngine::RequestAnswer
Dht::onAnnounce(std::shared_ptr<Node> node, InfoHash& hash, Blob& token, std::vector<std::shared_ptr<Value>> values,
time_point created)
{
if (hash == zeroes) {
DHT_LOG.WARN("Put with no info_hash.");
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::PUT_NO_INFOHASH
};
}
if (!tokenMatch(token, (sockaddr*)&node->ss)) {
DHT_LOG.WARN("[node %s] incorrect token %s for 'put'.", node->toString().c_str(), hash.toString().c_str());
throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN};
}
{
// We store a value only if we think we're part of the
// SEARCH_NODES nodes around the target id.
auto closest_nodes = (node->getFamily() == AF_INET ? buckets : buckets6)
.findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
DHT_LOG.WARN("[node %s] announce too far from the target. Dropping value.", node->toString().c_str());
return {};
}
}
for (const auto& v : values) {
if (v->id == Value::INVALID_ID) {
DHT_LOG.WARN("[value %s %s] incorrect value id", hash.toString().c_str(), v->id);
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::PUT_INVALID_ID
};
}
auto lv = getLocalById(hash, v->id);
std::shared_ptr<Value> vc = v;
if (lv) {
if (*lv == *vc) {
DHT_LOG.WARN("[value %s %lu] nothing to do.", hash.toString().c_str(), lv->id);
} else {
const auto& type = getType(lv->type);
if (type.editPolicy(hash, lv, vc, node->id, (sockaddr*)&node->ss, node->sslen)) {
DHT_LOG.DEBUG("[value %s %lu] editing %s.",
hash.toString().c_str(), lv->id, vc->toString().c_str());
storageStore(hash, vc, created);
} else {
DHT_LOG.DEBUG("[value %s %lu] rejecting edition of %s because of storage policy.",
hash.toString().c_str(), lv->id, vc->toString().c_str());
}
}
} else {
// Allow the value to be edited by the storage policy
const auto& type = getType(vc->type);
if (type.storePolicy(hash, vc, node->id, (sockaddr*)&node->ss, node->sslen)) {
DHT_LOG.DEBUG("[value %s %lu] storing %s.", hash.toString().c_str(), vc->id, vc->toString().c_str());
storageStore(hash, vc, created);
} else {
DHT_LOG.DEBUG("[value %s %lu] rejecting storage of %s.",
hash.toString().c_str(), vc->id, vc->toString().c_str());
}
}
}
return {};
}
void
Dht::onAnnounceDone(const Request&, NetworkEngine::RequestAnswer& answer,
std::shared_ptr<Search>& sr)
{
const auto& now = scheduler.time();
DHT_LOG.DEBUG("[search %s IPv%c] got reply to put!",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
searchSendGetValues(sr);
// If the value was just successfully announced, call the callback
sr->announce.erase(std::remove_if(sr->announce.begin(), sr->announce.end(),
[&](Announce& a) {
if (!a.value || a.value->id != answer.vid)
return false;
auto type = getType(a.value->type);
if (sr->isAnnounced(answer.vid, type, now)) {
if (a.callback) {
a.callback(true, sr->getNodes());
a.callback = nullptr;
}
return true;
}
return false;
}), sr->announce.end());
}
}