-
When the put is performed just after a get, then the get will not include the value which was in the put.
When the put is performed just after a get, then the get will not include the value which was in the put.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht.cpp 106.20 KiB
/*
* Copyright (C) 2014-2016 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Simon Désaulniers <sim.desaulniers@gmail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "dht.h"
#include "rng.h"
#include "request.h"
#include <msgpack.hpp>
extern "C" {
#include <gnutls/gnutls.h>
}
#ifndef _WIN32
#include <arpa/inet.h>
#else
#include <ws2tcpip.h>
#endif
#include <algorithm>
#include <random>
#include <sstream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <io.h>
#endif
#include <fcntl.h>
#include <cstring>
#ifdef _WIN32
static bool
set_nonblocking(int fd, int nonblocking)
{
unsigned long mode = !!nonblocking;
int rc = ioctlsocket(fd, FIONBIO, &mode);
return rc == 0;
}
extern const char *inet_ntop(int, const void *, char *, socklen_t);
#else
static bool
set_nonblocking(int fd, int nonblocking)
{
int rc = fcntl(fd, F_GETFL, 0);
if (rc < 0)
return false;
rc = fcntl(fd, F_SETFL, nonblocking?(rc | O_NONBLOCK):(rc & ~O_NONBLOCK));
return rc >= 0;
}
#endif
static std::mt19937 rd {dht::crypto::random_device{}()};
#ifdef _WIN32
static std::uniform_int_distribution<int> rand_byte{ 0, std::numeric_limits<uint8_t>::max() };
#else
static std::uniform_int_distribution<uint8_t> rand_byte;
#endif
namespace dht {
using namespace std::placeholders;
constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME;
constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN;
// internal structures definition
struct Dht::Storage {
InfoHash id;
time_point maintenance_time {};
std::map<std::shared_ptr<Node>, Listener> listeners {};
std::map<size_t, LocalListener> local_listeners {};
size_t listener_token {1};
Storage() {}
Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {}
#if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 9 || defined(_WIN32)
// GCC-bug: remove me when support of GCC < 4.9.2 is abandoned
Storage(Storage&& o) noexcept
: id(std::move(o.id))
, maintenance_time(std::move(o.maintenance_time))
, listeners(std::move(o.listeners))
, local_listeners(std::move(o.local_listeners))
, listener_token(std::move(o.listener_token))
, values(std::move(o.values))
, total_size(std::move(o.total_size)) {}
#else
Storage(Storage&& o) noexcept = default;
#endif
Storage& operator=(Storage&& o) = default;
bool empty() const {
return values.empty();
}
void clear();
size_t valueCount() const {
return values.size();
}
size_t totalSize() const {
return total_size;
}
const std::vector<ValueStorage>& getValues() const { return values; }
std::shared_ptr<Value> getById(Value::Id vid) const {
for (auto& v : values)
if (v.data->id == vid) return v.data;
return {};
}
std::vector<std::shared_ptr<Value>> get(Value::Filter f = {}) const {
std::vector<std::shared_ptr<Value>> newvals {};
if (not f) newvals.reserve(values.size());
for (auto& v : values) {
if (not f || f(*v.data))
newvals.push_back(v.data);
}
return newvals;
}
/**
* Stores a new value in this storage, or replace a previous value
*
* @return <storage, change_size, change_value_num>
* storage: set if a change happened
* change_size: size difference
* change_value_num: change of value number (0 or 1)
*/
std::tuple<ValueStorage*, ssize_t, ssize_t>
store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left);
std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now);
private:
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
std::vector<ValueStorage> values {};
size_t total_size {};
};
struct Dht::SearchNode {
using AnnounceStatus = std::map<Value::Id, std::shared_ptr<Request>>;
using SyncStatus = std::map<std::shared_ptr<Query>, std::shared_ptr<Request>>;
using PaginationQueries = std::map<std::shared_ptr<Query>, std::vector<std::shared_ptr<Query>>>;
std::shared_ptr<Node> node {}; /* the node info */
PaginationQueries pagination_queries {};
SyncStatus getStatus {}; /* get/sync status */
SyncStatus listenStatus {}; /* listen status */
AnnounceStatus acked {}; /* announcement status for a given value id */
Blob token {}; /* last token the node sent to us after a get request */
time_point last_get_reply {time_point::min()}; /* last time received valid token */
bool candidate {false}; /* A search node is candidate if the search is/was synced and this
node is a new candidate for inclusion. */
SearchNode() : node() {}
SearchNode(const std::shared_ptr<Node>& node) : node(node) {}
/**
* Can we use this node to listen/announce now ?
*/
bool isSynced(time_point now) const {
return not node->isExpired() and
not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME;
}
/**
* Could a particular "get" request be sent to this node now ?
*
* A 'get' request can be sent when all of the following requirements are
* met:
*
* - The node is not expired;
* - If we have heard from the node, we must have heard from him in the last
* NODE_EXPIRE_TIME minutes;
* - The request must not already have been sent;
* - No other request satisfying the request must be pending;
* - The pagination process for this particular 'get' must not have begun;
*
* @param now The time reference to now.
* @param update Time of the last "get" op for the search.
* @param q The query defining the "get" operation we're referring to.
*
* @return true if we can send get, else false.
*/
bool canGet(time_point now, time_point update, std::shared_ptr<Query> q = {}) const {
/* Find request status for the given query */
const auto& get_status = getStatus.find(q);
/* Find request status for a query satisfying the initial query */
const auto& sq_status = std::find_if(getStatus.cbegin(), getStatus.cend(),
[&q](const SyncStatus::value_type& s) {
return s.first and q and q->isSatisfiedBy(*s.first) and s.second and s.second->pending();
}
);
return not node->isExpired() and (now > last_get_reply + Node::NODE_EXPIRE_TIME or update > last_get_reply)
and not hasStartedPagination(q)
and (get_status == getStatus.cend() or not get_status->second)
and sq_status == getStatus.cend();
}
/**
* Tells if we have started sending a 'get' request in paginated form.
*
* @param q The query as an id for a given 'get' request.
*
* @return true if pagination process has started, else false.
*/
bool hasStartedPagination(const std::shared_ptr<Query>& q) const {
const auto& pqs = pagination_queries.find(q);
if (pqs == pagination_queries.cend() or pqs->second.empty())
return false;
return std::find_if(pqs->second.cbegin(), pqs->second.cend(),
[this](const std::shared_ptr<Query>& query) {
const auto& req = getStatus.find(query);
return req != getStatus.cend() and req->second;
}) != pqs->second.cend();
};
/**
* Tell if the node has finished responding to a given 'get' request.
*
* A 'get' request can be divided in multiple requests called "pagination
* requests". If this is the case, we have to check if they're all finished.
* Otherwise, we only check for the single request.
*
* @param get The 'get' request data structure;
*
* @return true if it has finished, else false.
*/
bool isDone(const Get& get) const {
if (hasStartedPagination(get.query)) {
const auto& pqs = pagination_queries.find(get.query);
auto paginationPending = std::find_if(pqs->second.cbegin(), pqs->second.cend(),
[this](const std::shared_ptr<Query>& query) {
const auto& req = getStatus.find(query);
return req != getStatus.cend() and req->second and req->second->pending();
}) != pqs->second.cend();
return not paginationPending;
}
else { /* no pagination yet */
const auto& gs = get.query ? getStatus.find(get.query) : getStatus.cend();
return gs != getStatus.end() and gs->second and not gs->second->pending();
}
}
/**
* Tells if a request in the status map is expired.
*
* @param status A SyncStatus reference.
*
* @return true if there exists an expired request, else false.
*/
bool expired(const SyncStatus& status) const {
return std::find_if(status.begin(), status.end(),
[](const SyncStatus::value_type& r){
return r.second and r.second->expired();
}) != status.end();
}
/**
* Tells if a request in the status map is pending.
*
* @param status A SyncStatus reference.
*
* @return true if there exists an expired request, else false.
*/
bool pending(const SyncStatus& status) const {
return std::find_if(status.begin(), status.end(),
[](const SyncStatus::value_type& r){
return r.second and r.second->pending();
}) != status.end();
}
bool isAnnounced(Value::Id vid, const ValueType& type, time_point now) const {
auto ack = acked.find(vid);
if (ack == acked.end() or not ack->second) {
return false;
}
return ack->second->reply_time + type.expiration > now;
}
bool isListening(time_point now) const {
auto ls = listenStatus.begin();
for ( ; ls != listenStatus.end() ; ++ls) {
if (isListening(now, ls)) {
break;
}
}
return ls != listenStatus.end();
}
bool isListening(time_point now, const std::shared_ptr<Query>& q) const {
const auto& ls = listenStatus.find(q);
if (ls == listenStatus.end())
return false;
else
return isListening(now, ls);
}
bool isListening(time_point now, SyncStatus::const_iterator listen_status) const {
if (listen_status == listenStatus.end())
return false;
return listen_status->second->reply_time + LISTEN_EXPIRE_TIME > now;
}
/**
* Assumng the node is synced, should a "put" request be sent to this node now ?
*/
time_point getAnnounceTime(AnnounceStatus::const_iterator ack, const ValueType& type) const {
if (ack == acked.end() or not ack->second)
return time_point::min();
return ack->second->pending() ? time_point::max() : ack->second->reply_time + type.expiration - REANNOUNCE_MARGIN;
}
time_point getAnnounceTime(Value::Id vid, const ValueType& type) const {
return getAnnounceTime(acked.find(vid), type);
}
/**
* Assumng the node is synced, should a "listen" request be sent to this node now ?
*/
time_point getListenTime() const {
time_point t {time_point::max()};
for (auto ls = listenStatus.begin(); ls != listenStatus.end() ; ++ls) {
t = std::min(t, getListenTime(ls));
}
return t;
}
time_point getListenTime(const std::shared_ptr<Query>& q) const {
return getListenTime(listenStatus.find(q));
}
time_point getListenTime(SyncStatus::const_iterator listen_status) const {
if (listen_status == listenStatus.end())
return time_point::min();
return listen_status->second->pending() ? time_point::max() :
listen_status->second->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN;
}
/**
* Is this node expired or candidate
*/
bool isBad() const {
return !node || node->isExpired() || candidate;
}
};
struct Dht::Search {
InfoHash id {};
sa_family_t af;
uint16_t tid;
time_point refill_time {time_point::min()};
time_point step_time {time_point::min()}; /* the time of the last search step */
std::shared_ptr<Scheduler::Job> nextSearchStep {};
bool expired {false}; /* no node, or all nodes expired */
bool done {false}; /* search is over, cached for later */
std::vector<SearchNode> nodes {};
/* pending puts */
std::vector<Announce> announce {};
/* pending gets */
std::multimap<time_point, Get> callbacks {};
/* listeners */
std::map<size_t, LocalListener> listeners {};
size_t listener_token = 1;
/**
* @returns true if the node was not present and added to the search
*/
bool insertNode(const std::shared_ptr<Node>& n, time_point now, const Blob& token={});
SearchNode* getNode(const std::shared_ptr<Node>& n) {
auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) {
return n == sn.node;
});
return (srn == nodes.end()) ? nullptr : &(*srn);
}
/* number of concurrent sync requests */
unsigned currentlySolicitedNodeCount() const {
unsigned count = 0;
for (const auto& n : nodes)
if (not n.isBad() and n.pending(n.getStatus))
count++;
return count;
}
/**
* Can we use this search to announce ?
*/
bool isSynced(time_point now) const;
/**
* Get the time of the last "get" operation performed on this search,
* or time_point::min() if no such operation have been performed.
*/
time_point getLastGetTime() const;
/**
* Is this get operation done ?
*/
bool isDone(const Get& get) const;
time_point getUpdateTime(time_point now) const;
bool isAnnounced(Value::Id id, const ValueType& type, time_point now) const;
bool isListening(time_point now) const;
/**
* @return The number of non-good search nodes.
*/
unsigned getNumberOfBadNodes() const;
/**
* Returns the time of the next "announce" event for this search,
* or time_point::max() if no such event is planned.
* Only makes sense when the search is synced.
*/
time_point getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const;
/**
* Returns the time of the next "listen" event for this search,
* or time_point::max() if no such event is planned.
* Only makes sense when the search is synced.
*/
time_point getListenTime(time_point now) const;
/**
* Returns the time of the next event for this search,
* or time_point::max() if no such event is planned.
*/
time_point getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const;
bool removeExpiredNode(time_point now);
std::vector<std::shared_ptr<Node>> getNodes() const;
void clear() {
announce.clear();
callbacks.clear();
listeners.clear();
nodes.clear();
nextSearchStep.reset();
}
};
void
Dht::setLoggers(LogMethod error, LogMethod warn, LogMethod debug)
{
DHT_LOG.DEBUG = debug;
DHT_LOG.WARN = warn;
DHT_LOG.ERR = error;
}
NodeStatus
Dht::getStatus(sa_family_t af) const
{
unsigned good = 0, dubious = 0, cached = 0, incoming = 0;
unsigned tot = getNodesStats(af, &good, &dubious, &cached, &incoming);
auto& ping = af == AF_INET ? pending_pings4 : pending_pings6;
if (good)
return NodeStatus::Connected;
if (ping or tot)
return NodeStatus::Connecting;
return NodeStatus::Disconnected;
}
void
Dht::shutdown(ShutdownCallback cb) {
/****************************
* Last store maintenance *
****************************/
scheduler.syncTime();
auto remaining = std::make_shared<int>(0);
auto str_donecb = [=](bool, const std::vector<std::shared_ptr<Node>>&) {
--*remaining;
if (!*remaining && cb) { cb(); }
else DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining);
};
for (const auto& str : store) {
*remaining += maintainStorage(str->id, true, str_donecb);
}
DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining);
if (!*remaining && cb) { cb(); }
}
bool
Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); }
/* Every bucket contains an unordered list of nodes. */
std::shared_ptr<Node>
Dht::findNode(const InfoHash& id, sa_family_t af)
{
Bucket* b = findBucket(id, af);
if (!b)
return {};
for (auto& n : b->nodes)
if (n->id == id) return n;
return {};
}
const std::shared_ptr<Node>
Dht::findNode(const InfoHash& id, sa_family_t af) const
{
const Bucket* b = findBucket(id, af);
if (!b)
return {};
for (const auto& n : b->nodes)
if (n->id == id) return n;
return {};
}
/* Every bucket caches the address of a likely node. Ping it. */
int
Dht::sendCachedPing(Bucket& b)
{
/* We set family to 0 when there's no cached node. */
if (!b.cached)
return 0;
DHT_LOG.DEBUG("Sending ping to cached node.");
network_engine.sendPing(b.cached, nullptr, nullptr);
b.cached = {};
return 0;
}
std::vector<Address>
Dht::getPublicAddress(sa_family_t family)
{
std::sort(reported_addr.begin(), reported_addr.end(), [](const ReportedAddr& a, const ReportedAddr& b) {
return a.first > b.first;
});
std::vector<Address> ret;
for (const auto& addr : reported_addr)
if (!family || family == addr.second.first.ss_family)
ret.emplace_back(addr.second);
return ret;
}
bool
Dht::trySearchInsert(const std::shared_ptr<Node>& node)
{
const auto& now = scheduler.time();
if (not node) return false;
bool inserted = false;
auto family = node->getFamily();
auto& srs = family == AF_INET ? searches4 : searches6;
for (auto& srp : srs) {
auto& s = *srp.second;
if (s.insertNode(node, now)) {
inserted = true;
scheduler.edit(s.nextSearchStep, s.getNextStepTime(types, now));
}
}
return inserted;
}
void
Dht::reportedAddr(const sockaddr *sa, socklen_t sa_len)
{
auto it = std::find_if(reported_addr.begin(), reported_addr.end(), [=](const ReportedAddr& addr){
return (addr.second.second == sa_len) &&
std::equal((uint8_t*)&addr.second.first, (uint8_t*)&addr.second.first + addr.second.second, (uint8_t*)sa);
});
if (it == reported_addr.end()) {
if (reported_addr.size() < 32)
reported_addr.emplace_back(1, std::make_pair(*((sockaddr_storage*)sa), sa_len));
} else
it->first++;
}
/* We just learnt about a node, not necessarily a new one. Confirm is 1 if
the node sent a message, 2 if it sent us a reply. */
void
Dht::onNewNode(const std::shared_ptr<Node>& node, int confirm)
{
auto& list = node->getFamily() == AF_INET ? buckets : buckets6;
auto b = list.findBucket(node->id);
if (b == list.end())
return;
for (auto& n : b->nodes) {
if (n == node) {
if (confirm)
trySearchInsert(node);
return;
}
}
/* New node. */
/* Try adding the node to searches */
trySearchInsert(node);
const auto& now = scheduler.time();
bool mybucket = list.contains(b, myid);
if (mybucket) {
if (node->getFamily() == AF_INET)
mybucket_grow_time = now;
else
mybucket6_grow_time = now;
//scheduler.edit(nextNodesConfirmation, now);
}
/* Try to get rid of an expired node. */
for (auto& n : b->nodes) {
if (not n->isExpired())
continue;
n = node;
return;
}
if (b->nodes.size() >= TARGET_NODES) {
/* Bucket full. Ping a dubious node */
bool dubious = false;
for (auto& n : b->nodes) {
/* Pick the first dubious node that we haven't pinged in the
last 9 seconds. This gives nodes the time to reply, but
tends to concentrate on the same nodes, so that we get rid
of bad nodes fast. */
if (not n->isGood(now)) {
dubious = true;
if (not n->isPendingMessage()) {
DHT_LOG.DEBUG("Sending ping to dubious node %s.", n->toString().c_str());
network_engine.sendPing(n, nullptr, nullptr);
break;
}
}
}
if ((mybucket || (is_bootstrap and list.depth(b) < 6)) && (!dubious || list.size() == 1)) {
DHT_LOG.DEBUG("Splitting from depth %u", list.depth(b));
sendCachedPing(*b);
list.split(b);
onNewNode(node, 0);
return;
}
/* No space for this node. Cache it away for later. */
if (confirm or not b->cached)
b->cached = node;
} else {
/* Create a new node. */
b->nodes.emplace_front(node);
}
}
/* Called periodically to purge known-bad nodes. Note that we're very
conservative here: broken nodes in the table don't do much harm, we'll
recover as soon as we find better ones. */
void
Dht::expireBuckets(RoutingTable& list)
{
for (auto& b : list) {
bool changed = false;
b.nodes.remove_if([this,&changed](const std::shared_ptr<Node>& n) {
if (n->isExpired()) {
changed = true;
return true;
}
return false;
});
if (changed)
sendCachedPing(b);
}
}
bool
Dht::Search::removeExpiredNode(time_point now)
{
auto e = nodes.end();
while (e != nodes.cbegin()) {
e = std::prev(e);
const Node& n = *e->node;
if (n.isExpired() and n.time + Node::NODE_EXPIRE_TIME < now) {
//std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl;
nodes.erase(e);
return true;
}
}
return false;
}
/* A search contains a list of nodes, sorted by decreasing distance to the
target. We just got a new candidate, insert it at the right spot or
discard it. */
bool
Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, const Blob& token)
{
auto& node = *snode;
const auto& nid = node.id;
if (node.getFamily() != af)
return false;
bool found = false;
auto n = nodes.end();
while (n != nodes.begin()) {
--n;
if (n->node == snode) {
found = true;
break;
}
if (id.xorCmp(nid, n->node->id) > 0) {
++n;
break;
}
}
bool new_search_node = false;
if (!found) {
// find if and where to trim excessive nodes
auto t = nodes.cend();
size_t bad = 0; // number of bad nodes (if search is not expired)
bool full {false}; // is the search full (has the maximum nodes)
if (expired) {
// if the search is expired, trim to SEARCH_NODES nodes
if (nodes.size() >= SEARCH_NODES) {
full = true;
t = nodes.begin() + SEARCH_NODES;
}
} else {
// otherwise, trim to SEARCH_NODES nodes, not counting bad nodes
bad = getNumberOfBadNodes();
full = nodes.size() - bad >= SEARCH_NODES;
while (std::distance(nodes.cbegin(), t) - bad > SEARCH_NODES) {
--t;
if (t->isBad())
bad--;
}
}
if (full) {
if (t != nodes.cend())
nodes.resize(std::distance(nodes.cbegin(), t));
if (n >= t)
return false;
}
// Reset search timer if the search is empty
if (nodes.empty()) {
step_time = TIME_INVALID;
}
n = nodes.insert(n, SearchNode(snode));
node.time = now;
new_search_node = true;
if (node.isExpired()) {
if (not expired)
bad++;
} else if (expired) {
bad = nodes.size() - 1;
expired = false;
}
while (nodes.size() - bad > SEARCH_NODES) {
if (not expired and nodes.back().isBad())
bad--;
nodes.pop_back();
}
}
if (not token.empty()) {
n->candidate = false;
n->last_get_reply = now;
if (token.size() <= 64)
n->token = token;
expired = false;
}
if (new_search_node) {
removeExpiredNode(now);
}
return new_search_node;
}
std::vector<std::shared_ptr<Node>>
Dht::Search::getNodes() const
{
std::vector<std::shared_ptr<Node>> ret {};
ret.reserve(nodes.size());
for (const auto& sn : nodes)
ret.emplace_back(sn.node);
return ret;
}
void
Dht::expireSearches()
{
auto t = scheduler.time() - SEARCH_EXPIRE_TIME;
auto expired = [&](std::pair<const InfoHash, std::shared_ptr<Search>>& srp) {
auto& sr = *srp.second;
auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t;
if (b) {
DHT_LOG.DEBUG("Removing search %s", srp.first.toString().c_str());
sr.clear();
return b;
} else { return false; }
};
erase_if(searches4, expired);
erase_if(searches6, expired);
}
void
Dht::searchNodeGetDone(const Request& status,
NetworkEngine::RequestAnswer&& answer,
std::weak_ptr<Search> ws,
std::shared_ptr<Query> query)
{
if (auto sr = ws.lock()) {
sr->insertNode(status.node, scheduler.time(), answer.ntoken);
onGetValuesDone(status, answer, sr, query);
}
}
void
Dht::searchNodeGetExpired(const Request& status,
bool over,
std::weak_ptr<Search> ws,
std::shared_ptr<Query> query)
{
if (auto sr = ws.lock()) {
if (auto srn = sr->getNode(status.node)) {
srn->candidate = not over;
if (over)
srn->getStatus.erase(query);
}
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
}
void Dht::paginate(std::weak_ptr<Search> ws, std::shared_ptr<Query> query, SearchNode* n) {
if (auto sr = ws.lock()) {
auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {});
auto onSelectDone =
[this,ws,query](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable
{
if (auto sr = ws.lock()) {
if (auto sn = sr->getNode(status.node)) {
if (answer.fields.empty()) {
searchNodeGetDone(status, std::move(answer), ws, query);
return;
} else {
for (const auto& fvi : answer.fields) {
try {
auto vid = fvi->index.at(Value::Field::Id).getInt();
if (vid == Value::INVALID_ID) continue;
auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid));
sn->pagination_queries[query].push_back(query_for_vid);
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending %s",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
sn->node->toString().c_str(), query_for_vid->toString().c_str());
sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node,
sr->id,
*query_for_vid,
-1,
std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid)
);
} catch (std::out_of_range&) {
DHT_LOG.ERR("[search %s IPv%c] [node %s] received non-id field in response to "\
"'SELECT id' request...",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
sn->node->toString().c_str());
}
}
}
}
}
};
/* add pagination query key for tracking ongoing requests. */
n->pagination_queries[query].push_back(select_q);
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending %s",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n->node->toString().c_str(), select_q->toString().c_str());
n->getStatus[select_q] = network_engine.sendGetValues(n->node,
sr->id,
*select_q,
-1,
onSelectDone,
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q)
);
}
}
Dht::SearchNode*
Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update)
{
if (sr->done or sr->currentlySolicitedNodeCount() >= MAX_REQUESTED_SEARCH_NODES)
return nullptr;
const auto& now = scheduler.time();
const time_point up = update ? sr->getLastGetTime() : time_point::min();
std::weak_ptr<Search> ws = sr;
SearchNode* n = nullptr;
auto cb = sr->callbacks.begin();
do { /* for all queries to send */
/* cases v 'get' v 'find_node' */
auto query = cb != sr->callbacks.end() ? cb->second.query : std::make_shared<Query>();
if (pn) {
if (not pn->canGet(now, up, query))
return nullptr;
n = pn;
} else {
for (auto& sn : sr->nodes) {
if (sn.canGet(now, up, query)) {
n = &sn;
break;
}
}
if (not n)
return nullptr;
}
if (sr->callbacks.empty()) {
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'find_node'",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n->node->toString().c_str());
n->getStatus[query] = network_engine.sendFindNode(n->node,
sr->id,
-1,
[this,ws,query](const Request& status, NetworkEngine::RequestAnswer&& answer) {
if (auto sr = ws.lock()) {
if (auto sn = sr->getNode(status.node)) {
sn->getStatus.erase(query);
}
}
searchNodeGetDone(status, std::forward<NetworkEngine::RequestAnswer>(answer), ws, query);
},
/* std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), */
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
} else {
if (query and not query->select.getSelection().empty()) {
/* The request contains a select. No need to paginate... */
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'get'",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n->node->toString().c_str());
n->getStatus[query] = network_engine.sendGetValues(n->node,
sr->id,
*query,
-1,
std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
} else
paginate(ws, query, n);
}
if (not sr->isSynced(now) or cb == sr->callbacks.end())
break; /* only trying to find nodes, only send the oldest query */
} while (++cb != sr->callbacks.end());
return n;
}
/* When a search is in progress, we periodically call search_step to send
further requests. */
void
Dht::searchStep(std::shared_ptr<Search> sr)
{
if (not sr or sr->expired or sr->done) return;
const auto& now = scheduler.time();
DHT_LOG.DEBUG("[search %s IPv%c] step (%d requests)",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', sr->currentlySolicitedNodeCount());
sr->step_time = now;
if (sr->refill_time + Node::NODE_EXPIRE_TIME < now and sr->nodes.size()-sr->getNumberOfBadNodes() < SEARCH_NODES)
refill(*sr);
/* Check if the first TARGET_NODES (8) live nodes have replied. */
if (sr->isSynced(now)) {
if (not sr->callbacks.empty()) {
// search is synced but some (newer) get operations are not complete
// Call callbacks when done
for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) {
if (sr->isDone(b->second)) {
if (b->second.done_cb)
b->second.done_cb(true, sr->getNodes());
for (auto& n : sr->nodes)
n.getStatus.erase(b->second.query);
b = sr->callbacks.erase(b);
}
else
++b;
}
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->done = true;
}
// true if this node is part of the target nodes cluter.
bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0;
DHT_LOG.DEBUG("[search %s IPv%c] synced%s",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");
if (not sr->listeners.empty()) {
unsigned i = 0;
for (auto& n : sr->nodes) {
if (not n.isSynced(now))
continue;
for (const auto& l : sr->listeners) {
const auto& query = l.second.query;
if (n.getListenTime(query) <= now) {
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'listen'",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
n.node->toString().c_str());
//std::cout << "Sending listen to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl;
const auto& r = n.listenStatus.find(query);
auto last_req = r != n.listenStatus.end() ? r->second : std::shared_ptr<Request> {};
std::weak_ptr<Search> ws = sr;
n.listenStatus[query] = network_engine.sendListen(n.node, sr->id, *query, n.token,
[this,ws,last_req,query](const Request& req,
NetworkEngine::RequestAnswer&& answer) mutable
{ /* on done */
network_engine.cancelRequest(last_req);
if (auto sr = ws.lock()) {
onListenDone(req, answer, sr, query);
searchStep(sr);
}
},
[this,ws,last_req,query](const Request& req, bool over) mutable
{ /* on expired */
network_engine.cancelRequest(last_req);
if (auto sr = ws.lock()) {
searchStep(sr);
if (over)
if (auto sn = sr->getNode(req.node))
sn->listenStatus.erase(query);
}
}
);
}
}
if (not n.candidate and ++i == LISTEN_NODES)
break;
}
}
// Announce requests
for (auto ait = sr->announce.begin(); ait != sr->announce.end();) {
auto& a = *ait;
if (!a.value) continue;
auto vid = a.value->id;
const auto& type = getType(a.value->type);
if (sr->isAnnounced(vid, type, now)) {
if (a.callback) {
a.callback(true, sr->getNodes());
a.callback = nullptr;
}
if (not a.permanent) {
ait = sr->announce.erase(ait);
continue;
}
}
unsigned i = 0;
for (auto& n : sr->nodes) {
if (not n.isSynced(now))
continue;
if (n.getAnnounceTime(vid, type) <= now) {
DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', n.node->toString().c_str(), vid);
std::weak_ptr<Search> ws = sr;
n.acked[vid] = network_engine.sendAnnounceValue(n.node, sr->id, *a.value, a.created, n.token,
[this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer)
{ /* on done */
if (auto sr = ws.lock()) {
onAnnounceDone(status, answer, sr);
searchStep(sr);
}
},
[this,ws](const Request&, bool over)
{ /* on expired */
if (over)
if (auto sr = ws.lock())
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
);
}
if (not n.candidate and ++i == TARGET_NODES)
break;
}
++ait;
}
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->done = true;
}
if (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES) {
unsigned i = 0;
SearchNode* sent;
do {
sent = searchSendGetValues(sr);
if (sent and not sent->candidate)
i++;
}
while (sent and sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES);
/*DHT_LOG.DEBUG("[search %s IPv%c] step: sent %u requests (total %u).",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', i, sr->currentlySolicitedNodeCount());*/
auto expiredn = (size_t)std::count_if(sr->nodes.begin(), sr->nodes.end(), [&](const SearchNode& sn) {
return sn.candidate or sn.node->isExpired();
});
if (i == 0 && expiredn == sr->nodes.size())
{
DHT_LOG.WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
// no nodes or all expired nodes
sr->expired = true;
if (sr->announce.empty() && sr->listeners.empty()) {
// Listening or announcing requires keeping the cluster up to date.
sr->done = true;
}
{
auto get_cbs = std::move(sr->callbacks);
for (const auto& g : get_cbs) {
if (g.second.done_cb)
g.second.done_cb(false, {});
}
}
{
std::vector<DoneCallback> a_cbs;
a_cbs.reserve(sr->announce.size());
for (auto ait = sr->announce.begin() ; ait != sr->announce.end(); ) {
if (ait->callback)
a_cbs.emplace_back(std::move(ait->callback));
if (not ait->permanent)
ait = sr->announce.erase(ait);
else
ait++;
}
for (const auto& a : a_cbs)
a(false, {});
}
}
}
//dumpSearch(*sr, std::cout);
/* periodic searchStep scheduling. */
if (not sr->done)
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now));
}
bool
Dht::Search::isSynced(time_point now) const
{
unsigned i = 0;
for (const auto& n : nodes) {
if (n.isBad())
continue;
if (not n.isSynced(now))
return false;
if (++i == TARGET_NODES)
break;
}
return i > 0;
}
unsigned Dht::Search::getNumberOfBadNodes() const {
return std::count_if(nodes.begin(), nodes.end(),
[=](const SearchNode& sn) { return sn.isBad(); }
);
}
time_point
Dht::Search::getLastGetTime() const
{
time_point last = time_point::min();
for (const auto& g : callbacks)
last = std::max(last, g.second.start);
return last;
}
bool
Dht::Search::isDone(const Get& get) const
{
unsigned i = 0;
for (const auto& sn : nodes) {
if (sn.isBad())
continue;
if (not sn.isDone(get))
return false;
if (++i == TARGET_NODES)
break;
}
return true;
}
time_point
Dht::Search::getUpdateTime(time_point now) const
{
time_point ut = time_point::max();
const auto last_get = getLastGetTime();
unsigned i = 0, t = 0, d = 0;
const auto solicited_nodes = currentlySolicitedNodeCount();
for (const auto& sn : nodes) {
if (sn.node->isExpired() or (sn.candidate and t >= TARGET_NODES))
continue;
auto pending = sn.pending(sn.getStatus);
if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) {
// not isSynced
if (not pending and solicited_nodes < MAX_REQUESTED_SEARCH_NODES)
ut = std::min(ut, now);
if (not sn.candidate)
d++;
} else
ut = std::min(ut, sn.last_get_reply + Node::NODE_EXPIRE_TIME);
t++;
if (not sn.candidate and ++i == TARGET_NODES)
break;
}
if (not callbacks.empty() and d == 0)
// If all synced/updated but some callbacks remain, step now to clear them
return now;
return ut;
}
bool
Dht::Search::isAnnounced(Value::Id id, const ValueType& type, time_point now) const
{
if (nodes.empty())
return false;
unsigned i = 0;
for (const auto& n : nodes) {
if (n.isBad())
continue;
if (not n.isAnnounced(id, type, now))
return false;
if (++i == TARGET_NODES)
break;
}
return i;
}
bool
Dht::Search::isListening(time_point now) const
{
if (nodes.empty() or listeners.empty())
return false;
unsigned i = 0;
for (const auto& n : nodes) {
if (n.isBad())
continue;
SearchNode::SyncStatus::const_iterator ls {};
for (ls = n.listenStatus.begin(); ls != n.listenStatus.end() ; ++ls) {
if (n.isListening(now, ls))
break;
}
if (ls == n.listenStatus.end())
return false;
if (++i == LISTEN_NODES)
break;
}
return i;
}
time_point
Dht::Search::getAnnounceTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const
{
if (nodes.empty())
return time_point::max();
time_point ret {time_point::max()};
for (const auto& a : announce) {
if (!a.value) continue;
auto type_it = types.find(a.value->type);
const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second;
unsigned i = 0, t = 0;
for (const auto& n : nodes) {
if (not n.isSynced(now) or (n.candidate and t >= TARGET_NODES))
continue;
ret = std::min(ret, n.getAnnounceTime(a.value->id, type));
t++;
if (not n.candidate and ++i == TARGET_NODES)
break;
}
}
return ret;
}
time_point
Dht::Search::getListenTime(time_point now) const
{
if (listeners.empty())
return time_point::max();
time_point listen_time {time_point::max()};
unsigned i = 0, t = 0;
for (const auto& sn : nodes) {
if (not sn.isSynced(now) or (sn.candidate and t >= LISTEN_NODES))
continue;
auto lt = sn.getListenTime();
listen_time = std::min(listen_time, lt);
t++;
if (not sn.candidate and ++i == LISTEN_NODES)
break;
}
return listen_time;
}
time_point
Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_point now) const
{
auto next_step = time_point::max();
if (expired or done)
return next_step;
auto ut = getUpdateTime(now);
if (ut != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl;
next_step = std::min(next_step, ut);
}
if (isSynced(now))
{
auto at = getAnnounceTime(types, now);
if (at != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl;
next_step = std::min(next_step, at);
}
auto lt = getListenTime(now);
if (lt != time_point::max()) {
//std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl;
next_step = std::min(next_step, lt);
}
}
return next_step;
}
unsigned Dht::refill(Dht::Search& sr) {
auto now = scheduler.time();
/* we search for up to SEARCH_NODES good nodes. */
auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES);
if (cached_nodes.empty()) {
DHT_LOG.ERR("[search %s IPv%c] no nodes from cache while refilling search",
sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6');
return 0;
}
unsigned inserted = 0;
for (auto& i : cached_nodes) {
/* try to insert the nodes. Search::insertNode will know how many to insert. */
if (sr.insertNode(i, now))
++inserted;
}
DHT_LOG.DEBUG("[search %s IPv%c] refilled search with %u nodes from node cache",
sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted);
sr.refill_time = now;
return inserted;
}
/* Start a search. */
std::shared_ptr<Dht::Search>
Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, Query q)
{
if (!isRunning(af)) {
DHT_LOG.ERR("[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
if (dcb)
dcb(false, {});
return {};
}
auto& srs = af == AF_INET ? searches4 : searches6;
const auto& srp = srs.find(id);
std::shared_ptr<Search> sr {};
if (srp != srs.end()) {
sr = srp->second;
sr->done = false;
sr->expired = false;
} else {
if (searches4.size() + searches6.size() < MAX_SEARCHES) {
sr = std::make_shared<Search>();
srs.emplace(id, sr);
} else {
for (auto it = srs.begin(); it!=srs.end();) {
auto& s = *it->second;
if ((s.done or s.expired) and s.announce.empty() and s.listeners.empty()) {
sr = it->second;
break;
}
}
if (not sr)
throw DhtException("Can't create search");
}
sr->af = af;
sr->tid = search_id++;
sr->step_time = TIME_INVALID;
sr->id = id;
sr->done = false;
sr->expired = false;
sr->nodes.clear();
sr->nodes.reserve(SEARCH_NODES+1);
DHT_LOG.WARN("[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
if (search_id == 0)
search_id++;
}
if (gcb or qcb) {
auto now = scheduler.time();
sr->callbacks.insert(std::make_pair<time_point, Get>(
std::move(now),
Get { scheduler.time(), f, std::make_shared<Query>(q), {},
qcb ? qcb : QueryCallback {}, gcb ? gcb : GetCallback {}, dcb
}
));
}
refill(*sr);
if (sr->nextSearchStep)
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, scheduler.time()));
else
sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr));
return sr;
}
void
Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback,
time_point created, bool permanent)
{
const auto& now = scheduler.time();
if (!value) {
if (callback)
callback(false, {});
return;
}
storageStore(id, value, created);
auto& srs = af == AF_INET ? searches4 : searches6;
auto srp = srs.find(id);
auto sr = srp == srs.end() ? search(id, af) : srp->second;
if (!sr) {
if (callback)
callback(false, {});
return;
}
sr->done = false;
sr->expired = false;
auto a_sr = std::find_if(sr->announce.begin(), sr->announce.end(), [&](const Announce& a){
return a.value->id == value->id;
});
if (a_sr == sr->announce.end()) {
sr->announce.emplace_back(Announce {permanent, value, std::min(now, created), callback});
for (auto& n : sr->nodes)
n.acked[value->id].reset();
}
else {
if (a_sr->value != value) {
a_sr->value = value;
for (auto& n : sr->nodes)
n.acked[value->id].reset();
}
if (sr->isAnnounced(value->id, getType(value->type), now)) {
if (a_sr->callback)
a_sr->callback(true, {});
a_sr->callback = {};
if (callback) {
callback(true, {});
}
return;
} else {
if (a_sr->callback)
a_sr->callback(false, {});
a_sr->callback = callback;
}
}
scheduler.edit(sr->nextSearchStep, scheduler.time());
//TODO
//if (tm < search_time) {
// DHT_LOG.ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(),l
// (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
// search_time = tm;
//}
}
size_t
Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f, const std::shared_ptr<Query>& q)
{
const auto& now = scheduler.time();
if (!isRunning(af))
return 0;
// DHT_LOG.ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
//DHT_LOG.WARN("listenTo %s", id.toString().c_str());
auto& srs = af == AF_INET ? searches4 : searches6;
auto srp = srs.find(id);
std::shared_ptr<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second;
if (!sr)
throw DhtException("Can't create search");
DHT_LOG.ERR("[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
sr->done = false;
auto token = ++sr->listener_token;
sr->listeners.emplace(token, LocalListener{q, f, cb});
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now));
return token;
}
size_t
Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where)
{
scheduler.syncTime();
Query q {{}, where};
auto vals = std::make_shared<std::map<Value::Id, std::shared_ptr<Value>>>();
auto token = ++listener_token;
auto gcb = [=](const std::vector<std::shared_ptr<Value>>& values) {
std::vector<std::shared_ptr<Value>> newvals {};
for (const auto& v : values) {
auto it = vals->find(v->id);
if (it == vals->cend() || !(*it->second == *v))
newvals.push_back(v);
}
if (!newvals.empty()) {
if (!cb(newvals)) {
cancelListen(id, token);
return false;
}
for (const auto& v : newvals) {
auto it = vals->emplace(v->id, v);
if (not it.second)
it.first->second = v;
}
}
return true;
};
auto query = std::make_shared<Query>(q);
auto filter = f.chain(q.where.getFilter());
auto st = findStorage(id);
size_t tokenlocal = 0;
if (st == store.end() && store.size() < MAX_HASHES) {
store.emplace_back(new Storage(id, scheduler.time()));
st = std::prev(store.end());
}
if (st != store.end()) {
if (not (*st)->empty()) {
std::vector<std::shared_ptr<Value>> newvals = (*st)->get(filter);
if (not newvals.empty()) {
if (!cb(newvals))
return 0;
for (const auto& v : newvals) {
auto it = vals->emplace(v->id, v);
if (not it.second)
it.first->second = v;
}
}
}
tokenlocal = ++(*st)->listener_token;
(*st)->local_listeners.emplace(tokenlocal, LocalListener{query, filter, gcb});
}
auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query);
auto token6 = Dht::listenTo(id, AF_INET6, gcb, filter, query);
DHT_LOG.DEBUG("Added listen : %d -> %d %d %d", token, tokenlocal, token4, token6);
listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6));
return token;
}
bool
Dht::cancelListen(const InfoHash& id, size_t token)
{
scheduler.syncTime();
auto it = listeners.find(token);
if (it == listeners.end()) {
DHT_LOG.WARN("Listen token not found: %d", token);
return false;
}
DHT_LOG.DEBUG("cancelListen %s with token %d", id.toString().c_str(), token);
auto st = findStorage(id);
auto tokenlocal = std::get<0>(it->second);
if (st != store.end() && tokenlocal)
(*st)->local_listeners.erase(tokenlocal);
auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& sp : srs) {
auto& s = sp.second;
if (s->id != id) continue;
auto af_token = s->af == AF_INET ? std::get<1>(it->second) : std::get<2>(it->second);
if (af_token == 0)
continue;
std::shared_ptr<Query> query;
const auto& ll = s->listeners.find(af_token);
if (ll != s->listeners.cend())
query = ll->second.query;
for (auto& sn : s->nodes) {
if (s->listeners.empty()) { /* also erase requests for all searchnodes. */
for (auto& ls : sn.listenStatus)
network_engine.cancelRequest(ls.second);
sn.listenStatus.clear();
} else if (query)
sn.listenStatus.erase(query);
}
s->listeners.erase(af_token);
}
};
searches_cancel_listen(searches4);
searches_cancel_listen(searches6);
listeners.erase(it);
return true;
}
void
Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, time_point created, bool permanent)
{
scheduler.syncTime();
if (val->id == Value::INVALID_ID) {
crypto::random_device rdev;
std::uniform_int_distribution<Value::Id> rand_id {};
val->id = rand_id(rdev);
}
DHT_LOG.DEBUG("put: adding %s -> %s", id.toString().c_str(), val->toString().c_str());
auto ok = std::make_shared<bool>(false);
auto done = std::make_shared<bool>(false);
auto done4 = std::make_shared<bool>(false);
auto done6 = std::make_shared<bool>(false);
auto donecb = [=](const std::vector<std::shared_ptr<Node>>& nodes) {
// Callback as soon as the value is announced on one of the available networks
if (callback && !*done && (*done4 && *done6)) {
callback(*ok, nodes);
*done = true;
}
};
announce(id, AF_INET, val, [=](bool ok4, const std::vector<std::shared_ptr<Node>>& nodes) {
DHT_LOG.DEBUG("Announce done IPv4 %d", ok4);
*done4 = true;
*ok |= ok4;
donecb(nodes);
}, created, permanent);
announce(id, AF_INET6, val, [=](bool ok6, const std::vector<std::shared_ptr<Node>>& nodes) {
DHT_LOG.DEBUG("Announce done IPv6 %d", ok6);
*done6 = true;
*ok |= ok6;
donecb(nodes);
}, created, permanent);
}
template <typename T>
struct OpStatus {
struct Status {
bool done {false};
bool ok {false};
};
Status status;
Status status4;
Status status6;
std::vector<std::shared_ptr<T>> values;
std::vector<std::shared_ptr<Node>> nodes;
};
template <typename T>
void doneCallbackWrapper(DoneCallback dcb, const std::vector<std::shared_ptr<Node>>& nodes, std::shared_ptr<OpStatus<T>> op) {
if (op->status.done)
return;
op->nodes.insert(op->nodes.end(), nodes.begin(), nodes.end());
if (op->status.ok || (op->status4.done and op->status6.done)) {
bool ok = op->status.ok || op->status4.ok || op->status6.ok;
op->status.done = true;
if (dcb)
dcb(ok, op->nodes);
}
}
template <typename T, typename Cb>
bool callbackWrapper(Cb get_cb,
DoneCallback done_cb,
const std::vector<std::shared_ptr<T>>& values,
std::function<std::vector<std::shared_ptr<T>>(const std::vector<std::shared_ptr<T>>&)> add_values,
std::shared_ptr<OpStatus<T>> op)
{
if (op->status.done)
return false;
auto newvals = add_values(values);
if (not newvals.empty()) {
op->status.ok = !get_cb(newvals);
op->values.insert(op->values.end(), newvals.begin(), newvals.end());
}
doneCallbackWrapper(done_cb, {}, op);
return !op->status.ok;
}
void
Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filter&& filter, Where&& where)
{
scheduler.syncTime();
Query q {{}, where};
auto op = std::make_shared<OpStatus<Value>>();
auto f = filter.chain(q.where.getFilter());
auto add_values = [=](const std::vector<std::shared_ptr<Value>>& values) {
std::vector<std::shared_ptr<Value>> newvals {};
for (const auto& v : values) {
auto it = std::find_if(op->values.cbegin(), op->values.cend(), [&](const std::shared_ptr<Value>& sv) {
return sv == v or *sv == *v;
});
if (it == op->values.cend()) {
if (not f or f(*v))
newvals.push_back(v);
}
}
return newvals;
};
auto gcb = std::bind(callbackWrapper<Value, GetCallback>, getcb, donecb, _1, add_values, op);
/* Try to answer this search locally. */
gcb(getLocal(id, f));
Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) {
//DHT_LOG.WARN("DHT done IPv4");
op->status4.done = true;
op->status4.ok = ok;
doneCallbackWrapper(donecb, nodes, op);
}, f, q);
Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) {
//DHT_LOG.WARN("DHT done IPv6");
op->status6.done = true;
op->status6.ok = ok;
doneCallbackWrapper(donecb, nodes, op);
}, f, q);
}
void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q)
{
scheduler.syncTime();
auto op = std::make_shared<OpStatus<FieldValueIndex>>();
auto f = q.where.getFilter();
auto values = getLocal(id, f);
auto add_fields = [=](const std::vector<std::shared_ptr<FieldValueIndex>>& fields) {
std::vector<std::shared_ptr<FieldValueIndex>> newvals {};
for (const auto& f : fields) {
auto it = std::find_if(op->values.cbegin(), op->values.cend(),
[&](const std::shared_ptr<FieldValueIndex>& sf) {
return sf == f or f->containedIn(*sf);
});
if (it == op->values.cend()) {
auto lesser = std::find_if(op->values.begin(), op->values.end(),
[&](const std::shared_ptr<FieldValueIndex>& sf) {
return sf->containedIn(*f);
});
if (lesser != op->values.end())
op->values.erase(lesser);
newvals.push_back(f);
}
}
return newvals;
};
std::vector<std::shared_ptr<FieldValueIndex>> local_fields(values.size());
std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const std::shared_ptr<Value>& v) {
return std::make_shared<FieldValueIndex>(*v, q.select);
});
auto qcb = std::bind(callbackWrapper<FieldValueIndex, QueryCallback>, cb, done_cb, _1, add_fields, op);
/* Try to answer this search locally. */
qcb(local_fields);
Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) {
//DHT_LOG.WARN("DHT done IPv4");
op->status4.done = true;
op->status4.ok = ok;
doneCallbackWrapper(done_cb, nodes, op);
}, f, q);
Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) {
//DHT_LOG.WARN("DHT done IPv6");
op->status6.done = true;
op->status6.ok = ok;
doneCallbackWrapper(done_cb, nodes, op);
}, f, q);
}
std::vector<std::shared_ptr<Value>>
Dht::getLocal(const InfoHash& id, Value::Filter f) const
{
auto s = findStorage(id);
if (s == store.end()) return {};
return (*s)->get(f);
}
std::shared_ptr<Value>
Dht::getLocalById(const InfoHash& id, Value::Id vid) const
{
auto s = findStorage(id);
if (s != store.end())
return (*s)->getById(vid);
return {};
}
std::vector<std::shared_ptr<Value>>
Dht::getPut(const InfoHash& id)
{
std::vector<std::shared_ptr<Value>> ret;
auto find_values = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
auto srp = srs.find(id);
if (srp == srs.end())
return;
auto& search = srp->second;
ret.reserve(ret.size() + search->announce.size());
for (const auto& a : search->announce)
ret.push_back(a.value);
};
find_values(searches4);
find_values(searches6);
return ret;
}
std::shared_ptr<Value>
Dht::getPut(const InfoHash& id, const Value::Id& vid)
{
auto find_value = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
auto srp = srs.find(id);
if (srp == srs.end())
return std::shared_ptr<Value> {};
auto& search = srp->second;
for (auto& a : search->announce) {
if (a.value->id == vid)
return a.value;
}
return std::shared_ptr<Value> {};
};
auto v4 = find_value(searches4);
if (v4) return v4;
auto v6 = find_value(searches6);
if (v6) return v6;
return {};
}
bool
Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
{
bool canceled {false};
auto sr_cancel_put = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
auto srp = srs.find(id);
if (srp == srs.end())
return;
auto& sr = srp->second;
for (auto it = sr->announce.begin(); it != sr->announce.end();) {
if (it->value->id == vid) {
canceled = true;
it = sr->announce.erase(it);
}
else
++it;
}
};
sr_cancel_put(searches4);
sr_cancel_put(searches6);
return canceled;
}
// Storage
decltype(Dht::store)::iterator
Dht::findStorage(const InfoHash& id)
{
return std::find_if(store.begin(), store.end(), [&](const std::unique_ptr<Storage>& st) {
return st->id == id;
});
}
decltype(Dht::store)::const_iterator
Dht::findStorage(const InfoHash& id) const
{
return std::find_if(store.cbegin(), store.cend(), [&](const std::unique_ptr<Storage>& st) {
return st->id == id;
});
}
void
Dht::storageChanged(Storage& st, ValueStorage& v)
{
DHT_LOG.DEBUG("[Storage %s] changed.", st.id.toString().c_str());
if (not st.local_listeners.empty()) {
DHT_LOG.DEBUG("[Storage %s] %lu local listeners.", st.id.toString().c_str(), st.local_listeners.size());
std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> cbs;
for (const auto& l : st.local_listeners) {
std::vector<std::shared_ptr<Value>> vals;
if (not l.second.filter or l.second.filter(*v.data))
vals.push_back(v.data);
if (not vals.empty()) {
DHT_LOG.DEBUG("[Storage %s] Sending update local listener with token %lu.",
st.id.toString().c_str(),
l.first);
cbs.emplace_back(l.second.get_cb, std::move(vals));
}
}
// listeners are copied: they may be deleted by the callback
for (auto& cb : cbs)
cb.first(cb.second);
}
DHT_LOG.DEBUG("[Storage %s] %lu remote listeners.", st.id.toString().c_str(), st.listeners.size());
for (const auto& l : st.listeners) {
auto f = l.second.query.where.getFilter();
if (f and not f(*v.data))
continue;
DHT_LOG.DEBUG("[Storage %s] Sending update to %s.", st.id.toString().c_str(), l.first->toString().c_str());
std::vector<std::shared_ptr<Value>> vals {};
vals.push_back(v.data);
Blob ntoken = makeToken((const sockaddr*)&l.first->ss, false);
network_engine.tellListener(l.first, l.second.rid, st.id, 0, ntoken, {}, {},
std::move(vals), l.second.query);
}
}
bool
Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created)
{
const auto& now = scheduler.time();
created = std::min(created, now);
if ( created + getType(value->id).expiration < clock::now() )
return false;
auto st = findStorage(id);
if (st == store.end()) {
if (store.size() >= MAX_HASHES)
return false;
store.emplace_back(new Storage(id, now));
st = std::prev(store.end());
}
auto store = (*st)->store(value, created, max_store_size - total_store_size);
if (std::get<0>(store)) {
total_store_size += std::get<1>(store);
total_values += std::get<2>(store);
storageChanged(*(*st), *std::get<0>(store));
}
return std::get<0>(store);
}
std::tuple<Dht::ValueStorage*, ssize_t, ssize_t>
Dht::Storage::store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left) {
auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) {
return vr.data == value || vr.data->id == value->id;
});
if (it != values.end()) {
/* Already there, only need to refresh */
it->time = created;
ssize_t size_diff = value->size() - it->data->size();
if (size_diff <= size_left and it->data != value) {
//DHT_LOG.DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str());
it->data = value;
total_size += size_diff;
return std::make_tuple(&(*it), size_diff, 0);
}
return std::make_tuple(nullptr, 0, 0);
} else {
//DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str());
ssize_t size = value->size();
if (size <= size_left and values.size() < MAX_VALUES) {
total_size += size;
values.emplace_back(value, created);
return std::make_tuple(&values.back(), size, 1);
}
return std::make_tuple(nullptr, 0, 0);
}
}
void
Dht::Storage::clear()
{
values.clear();
total_size = 0;
}
void
Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t rid, Query&& query)
{
const auto& now = scheduler.time();
auto st = findStorage(id);
if (st == store.end()) {
if (store.size() >= MAX_HASHES)
return;
store.emplace_back(new Storage(id, now));
st = std::prev(store.end());
}
auto l = (*st)->listeners.find(node);
if (l == (*st)->listeners.end()) {
auto vals = (*st)->get(query.where.getFilter());
if (not vals.empty()) {
network_engine.tellListener(node, rid, id, WANT4 | WANT6, makeToken((sockaddr*)&node->ss, false),
buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
std::move(vals), query);
}
(*st)->listeners.emplace(node, Listener {rid, now, std::forward<Query>(query)});
}
else
l->second.refresh(rid, now);
}
void
Dht::expireStorage()
{
const auto& now = scheduler.time();
auto i = store.begin();
while (i != store.end()) {
for (auto l = (*i)->listeners.cbegin(); l != (*i)->listeners.cend();){
bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now;
if (expired) {
DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str());
(*i)->listeners.erase(l++);
} else
++l;
}
auto stats = (*i)->expire(types, now);
total_store_size += stats.first;
total_values += stats.second;
if ((*i)->empty() && (*i)->listeners.empty() && (*i)->local_listeners.empty()) {
DHT_LOG.DEBUG("Discarding expired value %s", (*i)->id.toString().c_str());
i = store.erase(i);
}
else
++i;
}
}
std::pair<ssize_t, ssize_t>
Dht::Storage::expire(const std::map<ValueType::Id, ValueType>& types, time_point now)
{
auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) {
if (!v.data) return false; // should not happen
auto type_it = types.find(v.data->type);
const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second;
bool expired = v.time + type.expiration < now;
//if (expired)
// DHT_LOG.DEBUG("Discarding expired value %s", v.data->toString().c_str());
return !expired;
});
ssize_t del_num = std::distance(r, values.end());
ssize_t size_diff {};
std::for_each(r, values.end(), [&](const ValueStorage& v){
size_diff -= v.data->size();
});
total_size += size_diff;
values.erase(r, values.end());
return {size_diff, -del_num};
}
void
Dht::connectivityChanged()
{
const auto& now = scheduler.time();
scheduler.edit(nextNodesConfirmation, now);
mybucket_grow_time = now;
mybucket6_grow_time = now;
reported_addr.clear();
network_engine.connectivityChanged();
auto stop_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& sp : srs)
for (auto& sn : sp.second->nodes)
sn.listenStatus.clear();
};
stop_listen(searches4);
stop_listen(searches6);
}
void
Dht::rotateSecrets()
{
const auto& now = scheduler.time();
uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45));
auto rotate_secrets_time = now + time_dist(rd);
oldsecret = secret;
{
crypto::random_device rdev;
std::generate_n(secret.begin(), secret.size(), std::bind(rand_byte, std::ref(rdev)));
}
scheduler.add(rotate_secrets_time, std::bind(&Dht::rotateSecrets, this));
}
Blob
Dht::makeToken(const sockaddr *sa, bool old) const
{
void *ip;
size_t iplen;
in_port_t port;
if (sa->sa_family == AF_INET) {
sockaddr_in *sin = (sockaddr_in*)sa;
ip = &sin->sin_addr;
iplen = 4;
port = htons(sin->sin_port);
} else if (sa->sa_family == AF_INET6) {
sockaddr_in6 *sin6 = (sockaddr_in6*)sa;
ip = &sin6->sin6_addr;
iplen = 16;
port = htons(sin6->sin6_port);
} else {
return {};
}
const auto& c1 = old ? oldsecret : secret;
Blob data;
data.reserve(sizeof(secret)+2+iplen);
data.insert(data.end(), c1.begin(), c1.end());
data.insert(data.end(), (uint8_t*)ip, (uint8_t*)ip+iplen);
data.insert(data.end(), (uint8_t*)&port, ((uint8_t*)&port)+2);
size_t sz = TOKEN_SIZE;
Blob ret {};
ret.resize(sz);
gnutls_datum_t gnudata = {data.data(), (unsigned int)data.size()};
if (gnutls_fingerprint(GNUTLS_DIG_SHA512, &gnudata, ret.data(), &sz) != GNUTLS_E_SUCCESS)
throw DhtException("Can't compute SHA512");
ret.resize(sz);
return ret;
}
bool
Dht::tokenMatch(const Blob& token, const sockaddr *sa) const
{
if (!sa || token.size() != TOKEN_SIZE)
return false;
if (token == makeToken(sa, false))
return true;
if (token == makeToken(sa, true))
return true;
return false;
}
unsigned
Dht::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const
{
const auto& now = scheduler.time();
unsigned good = 0, dubious = 0, cached = 0, incoming = 0;
auto& list = (af == AF_INET) ? buckets : buckets6;
for (const auto& b : list) {
for (auto& n : b.nodes) {
if (n->isGood(now)) {
good++;
if (n->time > n->reply_time)
incoming++;
} else {
dubious++;
}
}
if (b.cached)
cached++;
}
if (good_return)
*good_return = good;
if (dubious_return)
*dubious_return = dubious;
if (cached_return)
*cached_return = cached;
if (incoming_return)
*incoming_return = incoming;
return good + dubious;
}
void
Dht::dumpBucket(const Bucket& b, std::ostream& out) const
{
const auto& now = scheduler.time();
using namespace std::chrono;
out << b.first << " count " << b.nodes.size() << " age " << duration_cast<seconds>(now - b.time).count() << " sec";
if (b.cached)
out << " (cached)";
out << std::endl;
for (auto& n : b.nodes) {
out << " Node " << n->toString();
if (n->time != n->reply_time)
out << " age " << duration_cast<seconds>(now - n->time).count() << ", reply: " << duration_cast<seconds>(now - n->reply_time).count();
else
out << " age " << duration_cast<seconds>(now - n->time).count();
if (n->isExpired())
out << " [expired]";
else if (n->isGood(now))
out << " [good]";
out << std::endl;
}
}
void
Dht::dumpSearch(const Search& sr, std::ostream& out) const
{
const auto& now = scheduler.time();
using namespace std::chrono;
out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " gets: " << sr.callbacks.size();
out << ", age: " << duration_cast<seconds>(now - sr.step_time).count() << " s";
if (sr.done)
out << " [done]";
if (sr.expired)
out << " [expired]";
bool synced = sr.isSynced(now);
out << (synced ? " [synced]" : " [not synced]");
if (synced && sr.isListening(now)) {
auto lt = sr.getListenTime(now);
out << " [listening, next in " << duration_cast<seconds>(lt-now).count() << " s]";
}
out << std::endl;
/*printing the queries*/
if (sr.callbacks.size() + sr.listeners.size() > 0)
out << "Queries:" << std::endl;
for (const auto& cb : sr.callbacks) {
out << *cb.second.query << std::endl;
}
for (const auto& l : sr.listeners) {
out << *l.second.query << std::endl;
}
for (const auto& n : sr.announce) {
bool announced = sr.isAnnounced(n.value->id, getType(n.value->type), now);
out << "Announcement: " << *n.value << (announced ? " [announced]" : "") << std::endl;
}
out << " Common bits InfoHash Conn. Get Ops IP" << std::endl;
unsigned i = 0;
auto last_get = sr.getLastGetTime();
for (const auto& n : sr.nodes) {
i++;
out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id;
out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' ');
out << " [";
if (auto pendingCount = n.node->getPendingMessageCount())
out << pendingCount;
else
out << ' ';
out << (n.node->isExpired() ? 'x' : ' ') << "]";
// Get status
{
char g_i = n.pending(n.getStatus) ? (n.candidate ? 'c' : 'f') : ' ';
char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-';
out << " [" << s_i << g_i << "] ";
}
// Listen status
if (not sr.listeners.empty()) {
if (n.listenStatus.empty())
out << " ";
else
out << "["
<< (n.isListening(now) ? 'l' : (n.pending(n.listenStatus) ? 'f' : ' ')) << "] ";
}
// Announce status
if (not sr.announce.empty()) {
if (n.acked.empty()) {
out << " ";
for (size_t a=0; a < sr.announce.size(); a++)
out << ' ';
} else {
out << "[";
for (const auto& a : sr.announce) {
auto ack = n.acked.find(a.value->id);
if (ack == n.acked.end() or not ack->second) {
out << ' ';
} else {
if (ack->second->reply_time + getType(a.value->type).expiration > now)
out << 'a';
else if (ack->second->pending())
out << 'f';
}
}
out << "] ";
}
}
out << print_addr(n.node->ss, n.node->sslen);
out << std::endl;
}
}
void
Dht::dumpTables() const
{
std::stringstream out;
out << "My id " << myid << std::endl;
out << "Buckets IPv4 :" << std::endl;
for (const auto& b : buckets)
dumpBucket(b, out);
out << "Buckets IPv6 :" << std::endl;
for (const auto& b : buckets6)
dumpBucket(b, out);
auto dump_searches = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& srp : srs)
dumpSearch(*srp.second, out);
};
dump_searches(searches4);
dump_searches(searches6);
out << std::endl;
out << getStorageLog() << std::endl;
DHT_LOG.DEBUG("%s", out.str().c_str());
}
std::string
Dht::getStorageLog() const
{
const auto& now = scheduler.time();
using namespace std::chrono;
std::stringstream out;
for (const auto& st : store) {
out << "Storage " << (*st).id << " " << (*st).listeners.size() << " list., " << (*st).valueCount() << " values (" << (*st).totalSize() << " bytes)" << std::endl;
if (not (*st).local_listeners.empty())
out << " " << (*st).local_listeners.size() << " local listeners" << std::endl;
for (const auto& l : (*st).listeners) {
out << " " << "Listener " << l.first->toString();
auto since = duration_cast<seconds>(now - l.second.time);
auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now);
out << " (since " << since.count() << "s, exp in " << expires.count() << "s)" << std::endl;
}
}
out << "Total " << store.size() << " storages, " << total_values << " values (" << (total_store_size/1024) << " ĶB)" << std::endl;
return out.str();
}
std::string
Dht::getRoutingTablesLog(sa_family_t af) const
{
auto& list = (af == AF_INET) ? buckets : buckets6;
std::stringstream out;
for (const auto& b : list)
dumpBucket(b, out);
return out.str();
}
std::string
Dht::getSearchesLog(sa_family_t af) const
{
std::stringstream out;
out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl;
if (not af or af == AF_INET)
for (const auto& sr : searches4)
dumpSearch(*sr.second, out);
if (not af or af == AF_INET6)
for (const auto& sr : searches6)
dumpSearch(*sr.second, out);
return out.str();
}
Dht::~Dht()
{
for (auto& s : searches4)
s.second->clear();
for (auto& s : searches6)
s.second->clear();
}
Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {}
Dht::Dht(int s, int s6, Config config)
: myid(config.node_id), is_bootstrap(config.is_bootstrap), store(),
network_engine(myid, config.network, s, s6, DHT_LOG, scheduler,
std::bind(&Dht::onError, this, _1, _2),
std::bind(&Dht::onNewNode, this, _1, _2),
std::bind(&Dht::onReportedAddr, this, _1, _2, _3),
std::bind(&Dht::onPing, this, _1),
std::bind(&Dht::onFindNode, this, _1, _2, _3),
std::bind(&Dht::onGetValues, this, _1, _2, _3, _4),
std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5),
std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5))
{
scheduler.syncTime();
if (s < 0 && s6 < 0)
return;
if (s >= 0) {
buckets = {Bucket {AF_INET}};
if (!set_nonblocking(s, 1))
throw DhtException("Can't set socket to non-blocking mode");
}
if (s6 >= 0) {
buckets6 = {Bucket {AF_INET6}};
if (!set_nonblocking(s6, 1))
throw DhtException("Can't set socket to non-blocking mode");
}
search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd);
uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)};
auto confirm_nodes_time = scheduler.time() + time_dis(rd);
DHT_LOG.DEBUG("Scheduling %s", myid.toString().c_str());
nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this));
// Fill old secret
{
crypto::random_device rdev;
std::generate_n(secret.begin(), secret.size(), std::bind(rand_byte, std::ref(rdev)));
}
rotateSecrets();
expire();
DHT_LOG.DEBUG("DHT initialised with node ID %s", myid.toString().c_str());
}
bool
Dht::neighbourhoodMaintenance(RoutingTable& list)
{
//DHT_LOG.DEBUG("neighbourhoodMaintenance");
auto b = list.findBucket(myid);
if (b == list.end())
return false;
InfoHash id = myid;
id[HASH_LEN-1] = rand_byte(rd);
std::bernoulli_distribution rand_trial(1./8.);
auto q = b;
if (std::next(q) != list.end() && (q->nodes.empty() || rand_trial(rd)))
q = std::next(q);
if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
auto r = std::prev(b);
if (!r->nodes.empty())
q = r;
}
/* Since our node-id is the same in both DHTs, it's probably
profitable to query both families. */
auto n = q->randomNode();
if (n) {
DHT_LOG.DEBUG("[find %s IPv%c] sending find for neighborhood maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4');
network_engine.sendFindNode(n, id, network_engine.want(), nullptr, nullptr);
}
return true;
}
bool
Dht::bucketMaintenance(RoutingTable& list)
{
std::bernoulli_distribution rand_trial(1./8.);
std::bernoulli_distribution rand_trial_38(1./38.);
for (auto b = list.begin(); b != list.end(); ++b) {
if (b->time < scheduler.time() - std::chrono::minutes(10) || b->nodes.empty()) {
/* This bucket hasn't seen any positive confirmation for a long
time. Pick a random id in this bucket's range, and send
a request to a random node. */
InfoHash id = list.randomId(b);
auto q = b;
/* If the bucket is empty, we try to fill it from a neighbour.
We also sometimes do it gratuitiously to recover from
buckets full of broken nodes. */
if (std::next(b) != list.end() && (q->nodes.empty() || rand_trial(rd)))
q = std::next(b);
if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
auto r = std::prev(b);
if (!r->nodes.empty())
q = r;
}
auto n = q->randomNode();
if (n) {
want_t want = -1;
if (network_engine.want() != want) {
auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET);
if (otherbucket && otherbucket->nodes.size() < TARGET_NODES)
/* The corresponding bucket in the other family
is emptyish -- querying both is useful. */
want = WANT4 | WANT6;
else if (rand_trial_38(rd))
/* Most of the time, this just adds overhead.
However, it might help stitch back one of
the DHTs after a network collapse, so query
both, but only very occasionally. */
want = WANT4 | WANT6;
}
DHT_LOG.DEBUG("[find %s IPv%c] sending for bucket maintenance.", id.toString().c_str(), q->af == AF_INET6 ? '6' : '4');
network_engine.sendFindNode(n, id, want, nullptr, nullptr);
/* In order to avoid sending queries back-to-back,
give up for now and reschedule us soon. */
return true;
}
}
}
return false;
}
void
Dht::dataPersistence() {
const auto& now = scheduler.time();
auto storage_maintenance_time = time_point::max();
for (auto &str : store) {
if (now > str->maintenance_time) {
maintainStorage(str->id);
str->maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
}
storage_maintenance_time = std::min(storage_maintenance_time, str->maintenance_time);
}
scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this));
}
size_t
Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) {
const auto& now = scheduler.time();
size_t announce_per_af = 0;
auto local_storage = findStorage(id);
if (local_storage == store.end()) { return 0; }
bool want4 = true, want6 = true;
auto nodes = buckets.findClosestNodes(id, now);
if (!nodes.empty()) {
if (force || id.xorCmp(nodes.back()->id, myid) < 0) {
for (auto &value : (*local_storage)->getValues()) {
const auto& vt = getType(value.data->type);
if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there
announce(id, AF_INET, value.data, donecb, value.time);
++announce_per_af;
}
}
want4 = false;
}
}
else { want4 = false; }
auto nodes6 = buckets6.findClosestNodes(id, now);
if (!nodes6.empty()) {
if (force || id.xorCmp(nodes6.back()->id, myid) < 0) {
for (auto &value : (*local_storage)->getValues()) {
const auto& vt = getType(value.data->type);
if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there
announce(id, AF_INET6, value.data, donecb, value.time);
++announce_per_af;
}
}
want6 = false;
}
}
else { want6 = false; }
if (not want4 and not want6) {
DHT_LOG.DEBUG("Discarding storage values %s", id.toString().c_str());
(*local_storage)->clear();
}
return announce_per_af;
}
void
Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen)
{
if (buflen == 0)
return;
try {
network_engine.processMessage(buf, buflen, from, fromlen);
} catch (const std::exception& e) {
DHT_LOG.ERR("Can't parse message from %s: %s", print_addr(from, fromlen).c_str(), e.what());
//auto code = e.getCode();
//if (code == DhtProtocolException::INVALID_TID_SIZE or code == DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) {
/* This is really annoying, as it means that we will
time-out all our searches that go through this node.
Kill it. */
//const auto& id = e.getNodeId();
//blacklistNode(&id, from, fromlen);
///}
}
}
time_point
Dht::periodic(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen)
{
scheduler.syncTime();
processMessage(buf, buflen, from, fromlen);
return scheduler.run();
}
void
Dht::expire()
{
uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));
expireBuckets(buckets);
expireBuckets(buckets6);
expireStorage();
expireSearches();
scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this));
}
void
Dht::confirmNodes()
{
using namespace std::chrono;
bool soon = false;
const auto& now = scheduler.time();
if (searches4.empty() and getStatus(AF_INET) == NodeStatus::Connected) {
DHT_LOG.DEBUG("[confirm nodes] initial IPv4 'get' for my id (%s).", myid.toString().c_str());
search(myid, AF_INET);
}
if (searches6.empty() and getStatus(AF_INET6) == NodeStatus::Connected) {
DHT_LOG.DEBUG("[confirm nodes] initial IPv6 'get' for my id (%s).", myid.toString().c_str());
search(myid, AF_INET6);
}
soon |= bucketMaintenance(buckets);
soon |= bucketMaintenance(buckets6);
if (!soon) {
if (mybucket_grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(buckets);
if (mybucket6_grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(buckets6);
}
/* In order to maintain all buckets' age within 600 seconds, worst
case is roughly 27 seconds, assuming the table is 22 bits deep.
We want to keep a margin for neighborhood maintenance, so keep
this within 25 seconds. */
auto time_dis = soon ?
uniform_duration_distribution<> {seconds(5) , seconds(25)}
: uniform_duration_distribution<> {seconds(60), seconds(180)};
auto confirm_nodes_time = now + time_dis(rd);
nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this));
}
std::vector<ValuesExport>
Dht::exportValues() const
{
std::vector<ValuesExport> e {};
e.reserve(store.size());
for (const auto& h : store) {
ValuesExport ve;
ve.first = h->id;
msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_array(h->getValues().size());
for (const auto& v : h->getValues()) {
pk.pack_array(2);
pk.pack(v.time.time_since_epoch().count());
v.data->msgpack_pack(pk);
}
ve.second = {buffer.data(), buffer.data()+buffer.size()};
e.push_back(std::move(ve));
}
return e;
}
void
Dht::importValues(const std::vector<ValuesExport>& import)
{
for (const auto& h : import) {
if (h.second.empty())
continue;
try {
msgpack::unpacked msg;
msgpack::unpack(msg, (const char*)h.second.data(), h.second.size());
auto valarr = msg.get();
if (valarr.type != msgpack::type::ARRAY)
throw msgpack::type_error();
for (unsigned i = 0; i < valarr.via.array.size; i++) {
auto& valel = valarr.via.array.ptr[i];
if (valel.via.array.size < 2)
throw msgpack::type_error();
time_point val_time;
Value tmp_val;
try {
val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}};
tmp_val.msgpack_unpack(valel.via.array.ptr[1]);
} catch (const std::exception&) {
DHT_LOG.ERR("Error reading value at %s", h.first.toString().c_str());
continue;
}
if (val_time + getType(tmp_val.type).expiration < scheduler.time()) {
DHT_LOG.DEBUG("Discarding expired value at %s", h.first.toString().c_str());
continue;
}
storageStore(h.first, std::make_shared<Value>(std::move(tmp_val)), val_time);
}
} catch (const std::exception&) {
DHT_LOG.ERR("Error reading values at %s", h.first.toString().c_str());
continue;
}
}
}
std::vector<NodeExport>
Dht::exportNodes()
{
const auto& now = scheduler.time();
std::vector<NodeExport> nodes;
const auto b4 = buckets.findBucket(myid);
if (b4 != buckets.end()) {
for (auto& n : b4->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
const auto b6 = buckets6.findBucket(myid);
if (b6 != buckets6.end()) {
for (auto& n : b6->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = buckets.begin(); b != buckets.end(); ++b) {
if (b == b4) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = buckets6.begin(); b != buckets6.end(); ++b) {
if (b == b6) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
return nodes;
}
bool
Dht::insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen)
{
if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6)
return false;
scheduler.syncTime();
auto n = network_engine.insertNode(id, sa, salen);
return !!n;
}
int
Dht::pingNode(const sockaddr* sa, socklen_t salen)
{
scheduler.syncTime();
DHT_LOG.DEBUG("Sending ping to %s", print_addr(sa, salen).c_str());
auto& count = sa->sa_family == AF_INET ? pending_pings4 : pending_pings6;
count++;
network_engine.sendPing(sa, salen, [&](const Request&, NetworkEngine::RequestAnswer&&){
count--;
}, [&](const Request&, bool last){
if (last)
count--;
});
return -1;
}
void
Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) {
if (e.getCode() == DhtProtocolException::UNAUTHORIZED) {
network_engine.cancelRequest(req);
unsigned cleared = 0;
for (auto& srp : req->node->getFamily() == AF_INET ? searches4 : searches6) {
auto& sr = srp.second;
for (auto& n : sr->nodes) {
if (n.node != req->node) continue;
n.token.clear();
n.last_get_reply = time_point::min();
cleared++;
searchSendGetValues(sr);
break;
}
}
DHT_LOG.WARN("[node %s] token flush (%d searches affected)", req->node->toString().c_str(), cleared);
}
}
void
Dht::onReportedAddr(const InfoHash& id, sockaddr* addr , socklen_t addr_length)
{
const auto& b = (addr->sa_family == AF_INET ? buckets : buckets6).findBucket(id);
b->time = scheduler.time();
if (addr and addr_length)
reportedAddr(addr, addr_length);
}
NetworkEngine::RequestAnswer
Dht::onPing(std::shared_ptr<Node>)
{
return {};
}
NetworkEngine::RequestAnswer
Dht::onFindNode(std::shared_ptr<Node> node, InfoHash& target, want_t want)
{
const auto& now = scheduler.time();
NetworkEngine::RequestAnswer answer;
answer.ntoken = makeToken((sockaddr*)&node->ss, false);
if (want & WANT4)
answer.nodes4 = buckets.findClosestNodes(target, now, TARGET_NODES);
if (want & WANT6)
answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES);
return answer;
}
NetworkEngine::RequestAnswer
Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t, const Query& query)
{
if (hash == zeroes) {
DHT_LOG.WARN("[node %s] Eek! Got get_values with no info_hash.", node->toString().c_str());
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::GET_NO_INFOHASH
};
}
const auto& now = scheduler.time();
NetworkEngine::RequestAnswer answer {};
auto st = findStorage(hash);
answer.ntoken = makeToken((sockaddr*)&node->ss, false);
answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES);
answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES);
if (st != store.end() && not (*st)->empty()) {
answer.values = (*st)->get(query.where.getFilter());
DHT_LOG.DEBUG("[node %s] sending %u values.", node->toString().c_str(), answer.values.size());
} else {
DHT_LOG.DEBUG("[node %s] sending nodes.", node->toString().c_str());
}
return answer;
}
void Dht::onGetValuesDone(const Request& status,
NetworkEngine::RequestAnswer& a,
std::shared_ptr<Search>& sr,
const std::shared_ptr<Query>& orig_query)
{
if (not sr) {
DHT_LOG.WARN("[search unknown] got reply to 'get'. Ignoring.");
return;
}
DHT_LOG.DEBUG("[search %s IPv%c] got reply to 'get' from %s with %u nodes",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', status.node->toString().c_str(), a.nodes4.size());
if (not a.ntoken.empty()) {
if (not a.values.empty() or not a.fields.empty()) {
DHT_LOG.DEBUG("[search %s IPv%c] found %u values",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6',
a.values.size());
for (auto& getp : sr->callbacks) { /* call all callbacks for this search */
auto& get = getp.second;
if (not (get.get_cb or get.query_cb) or
(orig_query and get.query and not get.query->isSatisfiedBy(*orig_query)))
continue;
if (get.query_cb) { /* in case of a request with query */
if (not a.fields.empty()) {
get.query_cb(a.fields);
} else if (not a.values.empty()) {
std::vector<std::shared_ptr<FieldValueIndex>> fields(a.values.size());
std::transform(a.values.begin(), a.values.end(), fields.begin(),
[&](const std::shared_ptr<Value>& v) {
return std::make_shared<FieldValueIndex>(*v, orig_query ? orig_query->select : Select {});
});
get.query_cb(fields);
}
} else if (get.get_cb) { /* in case of a vanilla get request */
std::vector<std::shared_ptr<Value>> tmp;
std::copy_if(a.values.begin(), a.values.end(), std::back_inserter(tmp),
[&](const std::shared_ptr<Value>& v) {
return not static_cast<bool>(get.filter) or get.filter(*v);
}
);
if (not tmp.empty())
get.get_cb(tmp);
}
}
/* callbacks for local search listeners */
std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> tmp_lists;
for (auto& l : sr->listeners) {
if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query)))
continue;
std::vector<std::shared_ptr<Value>> tmp;
std::copy_if(a.values.begin(), a.values.end(), std::back_inserter(tmp),
[&](const std::shared_ptr<Value>& v) {
return not static_cast<bool>(l.second.filter) or l.second.filter(*v);
}
);
if (not tmp.empty())
tmp_lists.emplace_back(l.second.get_cb, tmp);
}
for (auto& l : tmp_lists)
l.first(l.second);
}
} else {
DHT_LOG.WARN("[node %s] no token provided. Ignoring response content.", status.node->toString().c_str());
network_engine.blacklistNode(status.node);
}
if (not sr->done) {
searchSendGetValues(sr);
// Force to recompute the next step time
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
}
NetworkEngine::RequestAnswer
Dht::onListen(std::shared_ptr<Node> node, InfoHash& hash, Blob& token, size_t rid, Query&& query)
{
if (hash == zeroes) {
DHT_LOG.WARN("Listen with no info_hash.");
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::LISTEN_NO_INFOHASH
};
}
if (!tokenMatch(token, (sockaddr*)&node->ss)) {
DHT_LOG.WARN("[node %s] incorrect token %s for 'listen'.", node->toString().c_str(), hash.toString().c_str());
throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::LISTEN_WRONG_TOKEN};
}
storageAddListener(hash, node, rid, std::forward<Query>(query));
return {};
}
void
Dht::onListenDone(const Request& status,
NetworkEngine::RequestAnswer& answer,
std::shared_ptr<Search>& sr,
const std::shared_ptr<Query>& orig_query)
{
DHT_LOG.DEBUG("[search %s] Got reply to listen.", sr->id.toString().c_str());
if (sr) {
if (not answer.values.empty()) { /* got new values from listen request */
DHT_LOG.DEBUG("[listen %s] Got new values.", sr->id.toString().c_str());
onGetValuesDone(status, answer, sr, orig_query);
}
if (not sr->done) {
const auto& now = scheduler.time();
searchSendGetValues(sr);
scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(types, now));
}
} else
DHT_LOG.DEBUG("Unknown search or announce!");
}
NetworkEngine::RequestAnswer
Dht::onAnnounce(std::shared_ptr<Node> node,
InfoHash& hash,
Blob& token,
std::vector<std::shared_ptr<Value>> values,
time_point created)
{
if (hash == zeroes) {
DHT_LOG.WARN("Put with no info_hash.");
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::PUT_NO_INFOHASH
};
}
if (!tokenMatch(token, (sockaddr*)&node->ss)) {
DHT_LOG.WARN("[node %s] incorrect token %s for 'put'.", node->toString().c_str(), hash.toString().c_str());
throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN};
}
{
// We store a value only if we think we're part of the
// SEARCH_NODES nodes around the target id.
auto closest_nodes = (node->getFamily() == AF_INET ? buckets : buckets6)
.findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
DHT_LOG.WARN("[node %s] announce too far from the target. Dropping value.", node->toString().c_str());
return {};
}
}
for (const auto& v : values) {
if (v->id == Value::INVALID_ID) {
DHT_LOG.WARN("[value %s %s] incorrect value id", hash.toString().c_str(), v->id);
throw DhtProtocolException {
DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
DhtProtocolException::PUT_INVALID_ID
};
}
auto lv = getLocalById(hash, v->id);
std::shared_ptr<Value> vc = v;
if (lv) {
if (*lv == *vc) {
DHT_LOG.WARN("[value %s %lu] nothing to do.", hash.toString().c_str(), lv->id);
} else {
const auto& type = getType(lv->type);
if (type.editPolicy(hash, lv, vc, node->id, (sockaddr*)&node->ss, node->sslen)) {
DHT_LOG.DEBUG("[value %s %lu] editing %s.",
hash.toString().c_str(), lv->id, vc->toString().c_str());
storageStore(hash, vc, created);
} else {
DHT_LOG.DEBUG("[value %s %lu] rejecting edition of %s because of storage policy.",
hash.toString().c_str(), lv->id, vc->toString().c_str());
}
}
} else {
// Allow the value to be edited by the storage policy
const auto& type = getType(vc->type);
if (type.storePolicy(hash, vc, node->id, (sockaddr*)&node->ss, node->sslen)) {
DHT_LOG.DEBUG("[value %s %lu] storing %s.", hash.toString().c_str(), vc->id, vc->toString().c_str());
storageStore(hash, vc, created);
} else {
DHT_LOG.DEBUG("[value %s %lu] rejecting storage of %s.",
hash.toString().c_str(), vc->id, vc->toString().c_str());
}
}
}
return {};
}
void
Dht::onAnnounceDone(const Request&, NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr)
{
const auto& now = scheduler.time();
DHT_LOG.DEBUG("[search %s IPv%c] got reply to put!",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
searchSendGetValues(sr);
// If the value was just successfully announced, call the callback
sr->announce.erase(std::remove_if(sr->announce.begin(), sr->announce.end(),
[&](Announce& a) {
if (!a.value || a.value->id != answer.vid)
return false;
auto type = getType(a.value->type);
if (sr->isAnnounced(answer.vid, type, now)) {
if (a.callback) {
a.callback(true, sr->getNodes());
a.callback = nullptr;
}
if (not a.permanent)
return true;
}
return false;
}), sr->announce.end());
}
}