-
Adrien Béraud authoredAdrien Béraud authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht.cpp 92.29 KiB
/*
* Copyright (C) 2014-2020 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "dht.h"
#include "rng.h"
#include "search.h"
#include "storage.h"
#include "request.h"
#include <msgpack.hpp>
#include <algorithm>
#include <random>
#include <sstream>
#include <fstream>
namespace dht {
using namespace std::placeholders;
constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
constexpr duration Dht::LISTEN_EXPIRE_TIME;
constexpr duration Dht::LISTEN_EXPIRE_TIME_PUBLIC;
constexpr duration Dht::REANNOUNCE_MARGIN;
static constexpr size_t MAX_REQUESTS_PER_SEC {8 * 1024};
NodeStatus
Dht::updateStatus(sa_family_t af)
{
auto& d = dht(af);
auto old = d.status;
d.status = d.getStatus(scheduler.time());
if (d.status != old) {
auto& other = dht(af == AF_INET ? AF_INET6 : AF_INET);
if (other.status == NodeStatus::Disconnected && d.status == NodeStatus::Disconnected)
onDisconnected();
else if (other.status == NodeStatus::Connected || d.status == NodeStatus::Connected) {
onConnected();
}
}
return d.status;
}
NodeStatus
Dht::Kad::getStatus(time_point now) const
{
unsigned dubious = 0;
for (const auto& b : buckets) {
for (auto& n : b.nodes) {
if (n->isGood(now)) {
return NodeStatus::Connected;
} else if (not n->isExpired())
dubious++;
}
}
auto& ping = pending_pings;
if (dubious or ping)
return NodeStatus::Connecting;
return NodeStatus::Disconnected;
}
void
Dht::shutdown(ShutdownCallback cb, bool stop)
{
if (not persistPath.empty())
saveState(persistPath);
if (stop) {
for (auto dht : {&dht4, &dht6}) {
for (auto& sr : dht->searches) {
for (const auto& r : sr.second->callbacks)
r.second.done_cb(false, {});
sr.second->callbacks.clear();
for (const auto& a : sr.second->announce) {
if (a.callback) a.callback(false, {});
}
sr.second->announce.clear();
sr.second->listeners.clear();
}
}
network_engine.clear();
}
if (not maintain_storage) {
if (cb) cb();
return;
}
// Last store maintenance
scheduler.syncTime();
auto remaining = std::make_shared<int>(0);
auto str_donecb = [=](bool, const std::vector<Sp<Node>>&) {
--*remaining;
if (logger_)
logger_->w("shuting down node: %u ops remaining", *remaining);
if (!*remaining && cb) { cb(); }
};
for (auto& str : store)
*remaining += maintainStorage(str, true, str_donecb);
if (logger_)
logger_->w("shuting down node: after storage, %u ops", *remaining);
if (!*remaining) {
if (cb) cb();
}
}
bool
Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); }
/* Every bucket contains an unordered list of nodes. */
const Sp<Node>
Dht::findNode(const InfoHash& id, sa_family_t af) const
{
if (const Bucket* b = findBucket(id, af))
for (const auto& n : b->nodes)
if (n->id == id) return n;
return {};
}
/* Every bucket caches the address of a likely node. Ping it. */
void
Dht::sendCachedPing(Bucket& b)
{
if (b.cached)
if (logger_)
logger_->d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str());
b.sendCachedPing(network_engine);
}
std::vector<SockAddr>
Dht::getPublicAddress(sa_family_t family)
{
std::sort(reported_addr.begin(), reported_addr.end(), [](const ReportedAddr& a, const ReportedAddr& b) {
return a.first > b.first;
});
std::vector<SockAddr> ret;
ret.reserve(!family ? reported_addr.size() : reported_addr.size()/2);
for (const auto& addr : reported_addr)
if (!family || family == addr.second.getFamily())
ret.emplace_back(addr.second);
return ret;
}
bool
Dht::trySearchInsert(const Sp<Node>& node)
{
const auto& now = scheduler.time();
if (not node) return false;
auto& srs = searches(node->getFamily());
auto closest = srs.lower_bound(node->id);
bool inserted {false};
// insert forward
for (auto it = closest; it != srs.end(); it++) {
auto& s = *it->second;
if (s.insertNode(node, now)) {
inserted = true;
scheduler.edit(s.nextSearchStep, now);
} else if (not s.expired and not s.done)
break;
}
// insert backward
for (auto it = closest; it != srs.begin();) {
--it;
auto& s = *it->second;
if (s.insertNode(node, now)) {
inserted = true;
scheduler.edit(s.nextSearchStep, now);
} else if (not s.expired and not s.done)
break;
}
return inserted;
}
void
Dht::reportedAddr(const SockAddr& addr)
{
auto it = std::find_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& a){
return a.second == addr;
});
if (it == reported_addr.end()) {
if (reported_addr.size() < 32)
reported_addr.emplace_back(1, addr);
} else
it->first++;
}
/* We just learnt about a node, not necessarily a new one. Confirm is 1 if
the node sent a message, 2 if it sent us a reply. */
void
Dht::onNewNode(const Sp<Node>& node, int confirm)
{
const auto& now = scheduler.time();
auto& b = buckets(node->getFamily());
auto wasEmpty = confirm < 2 && b.grow_time < now - std::chrono::minutes(5);
if (b.onNewNode(node, confirm, now, myid, network_engine) or confirm) {
trySearchInsert(node);
if (wasEmpty) {
scheduler.edit(nextNodesConfirmation, now + std::chrono::seconds(1));
}
}
}
/* Called periodically to purge known-bad nodes. Note that we're very
conservative here: broken nodes in the table don't do much harm, we'll
recover as soon as we find better ones. */
void
Dht::expireBuckets(RoutingTable& list)
{
for (auto& b : list) {
bool changed = false;
b.nodes.remove_if([&changed](const Sp<Node>& n) {
if (n->isExpired()) {
changed = true;
return true;
}
return false;
});
if (changed)
sendCachedPing(b);
}
}
void
Dht::expireSearches()
{
auto t = scheduler.time() - SEARCH_EXPIRE_TIME;
auto expired = [&](std::pair<const InfoHash, Sp<Search>>& srp) {
auto& sr = *srp.second;
auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t;
if (b) {
if (logger_)
logger_->d(srp.first, "[search %s] removing search", srp.first.toString().c_str());
sr.clear();
return b;
} else { return false; }
};
erase_if(dht4.searches, expired);
erase_if(dht6.searches, expired);
}
void
Dht::searchNodeGetDone(const net::Request& req,
net::RequestAnswer&& answer,
std::weak_ptr<Search> ws,
Sp<Query> query)
{
const auto& now = scheduler.time();
if (auto sr = ws.lock()) {
sr->insertNode(req.node, now, answer.ntoken);
if (auto srn = sr->getNode(req.node)) {
/* all other get requests which are satisfied by this answer
should not be sent anymore */
for (auto& g : sr->callbacks) {
auto& q = g.second.query;
if (q->isSatisfiedBy(*query) and q != query) {
auto dummy_req = std::make_shared<net::Request>();
dummy_req->cancel();
srn->getStatus[q] = std::move(dummy_req);
}
}
auto syncTime = srn->getSyncTime(scheduler.time());
if (srn->syncJob)
scheduler.edit(srn->syncJob, syncTime);
else
srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr));
}
onGetValuesDone(req.node, answer, sr, query);
}
}
void
Dht::searchNodeGetExpired(const net::Request& status,
bool over,
std::weak_ptr<Search> ws,
Sp<Query> query)
{
if (auto sr = ws.lock()) {
if (auto srn = sr->getNode(status.node)) {
srn->candidate = not over;
if (over)
srn->getStatus.erase(query);
}
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
}
void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) {
auto sr = ws.lock();
if (not sr) return;
auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {});
auto onSelectDone = [this,ws,query](const net::Request& status,
net::RequestAnswer&& answer) mutable {
// Retrieve search
auto sr = ws.lock();
if (not sr) return;
const auto& id = sr->id;
// Retrieve search node
auto sn = sr->getNode(status.node);
if (not sn) return;
// backward compatibility
if (answer.fields.empty()) {
searchNodeGetDone(status, std::move(answer), ws, query);
return;
}
for (const auto& fvi : answer.fields) {
try {
auto vid = fvi->index.at(Value::Field::Id).getInt();
if (vid == Value::INVALID_ID) continue;
auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid));
sn->pagination_queries[query].push_back(query_for_vid);
if (logger_)
logger_->d(id, sn->node->id, "[search %s] [node %s] sending %s",
id.toString().c_str(), sn->node->toString().c_str(), query_for_vid->toString().c_str());
sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node,
id,
*query_for_vid,
-1,
std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid)
);
} catch (const std::out_of_range&) {
if (logger_)
logger_->e(id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\
"'SELECT id' request...",
id.toString().c_str(), sn->node->toString().c_str());
}
}
};
/* add pagination query key for tracking ongoing requests. */
n->pagination_queries[query].push_back(select_q);
if (logger_)
logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending %s",
sr->id.toString().c_str(), n->node->toString().c_str(), select_q->toString().c_str());
n->getStatus[select_q] = network_engine.sendGetValues(n->node,
sr->id,
*select_q,
-1,
onSelectDone,
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q)
);
}
Dht::SearchNode*
Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update)
{
if (sr->done or sr->currentlySolicitedNodeCount() >= MAX_REQUESTED_SEARCH_NODES)
return nullptr;
const auto& now = scheduler.time();
std::weak_ptr<Search> ws = sr;
auto cb = sr->callbacks.begin();
static const auto ANY_QUERY = std::make_shared<Query>(Select {}, Where {}, true);
do { /* for all requests to send */
SearchNode* n = nullptr;
auto& query = sr->callbacks.empty() ? ANY_QUERY : cb->second.query;
const time_point up = (not sr->callbacks.empty() and update)
? sr->getLastGetTime(*query)
: time_point::min();
if (pn and pn->canGet(now, up, query)) {
n = pn;
} else {
for (auto& sn : sr->nodes) {
if (sn->canGet(now, up, query)) {
n = sn.get();
break;
}
}
}
if (sr->callbacks.empty()) { /* 'find_node' request */
if (not n)
return nullptr;
/* if (logger_)
logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'",
sr->id.toString().c_str(), n->node->toString().c_str());*/
n->getStatus[query] = network_engine.sendFindNode(n->node,
sr->id,
-1,
std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
} else { /* 'get' request */
if (not n)
continue;
if (query and not query->select.empty()) {
/* The request contains a select. No need to paginate... */
/* if (logger_)
logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'",
sr->id.toString().c_str(), n->node->toString().c_str());*/
n->getStatus[query] = network_engine.sendGetValues(n->node,
sr->id,
*query,
-1,
std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
} else
paginate(ws, query, n);
}
/* We only try to send one request. return. */
return n;
} while (++cb != sr->callbacks.end());
/* no request were sent */
return nullptr;
}
void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
if (sr->announce.empty())
return;
unsigned i = 0;
std::weak_ptr<Search> ws = sr;
auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer)
{ /* when put done */
if (auto sr = ws.lock()) {
onAnnounceDone(req.node, answer, sr);
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
};
auto onExpired = [this,ws](const net::Request&, bool over)
{ /* when put expired */
if (over)
if (auto sr = ws.lock())
scheduler.edit(sr->nextSearchStep, scheduler.time());
};
auto onSelectDone =
[this,ws,onDone,onExpired](const net::Request& req, net::RequestAnswer&& answer) mutable
{ /* on probing done */
auto sr = ws.lock();
if (not sr) return;
const auto& now = scheduler.time();
sr->insertNode(req.node, scheduler.time(), answer.ntoken);
auto sn = sr->getNode(req.node);
if (not sn) return;
if (not sn->isSynced(now)) {
/* Search is now unsynced. Let's call searchStep to sync again. */
scheduler.edit(sr->nextSearchStep, now);
return;
}
for (auto& a : sr->announce) {
if (sn->getAnnounceTime(a.value->id) > now)
continue;
bool hasValue {false};
uint16_t seq_no = 0;
try {
const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(),
[&a](const Sp<FieldValueIndex>& i){
return i->index.at(Value::Field::Id).getInt() == a.value->id;
});
if (f != answer.fields.cend() and *f) {
hasValue = true;
seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt());
}
} catch (std::out_of_range&) { }
auto next_refresh_time = now + getType(a.value->type).expiration;
/* only put the value if the node doesn't already have it */
if (not hasValue or seq_no < a.value->seq) {
if (logger_)
logger_->d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
auto created = a.permanent ? time_point::max() : a.created;
sn->acked[a.value->id] = {
network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired),
next_refresh_time
};
} else if (hasValue and a.permanent) {
if (logger_)
logger_->w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)",
sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
sn->acked[a.value->id] = {
network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone,
[this, ws, node=sn->node, v=a.value,
onDone,
onExpired,
created = a.permanent ? time_point::max() : a.created,
next_refresh_time
](const net::Request& /*req*/, net::DhtProtocolException&& e){
if (e.getCode() == net::DhtProtocolException::NOT_FOUND) {
if (logger_)
logger_->e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str());
if (auto sr = ws.lock()) {
if (auto sn = sr->getNode(node)) {
sn->acked[v->id] = {
network_engine.sendAnnounceValue(sn->node, sr->id, v, created, sn->token, onDone, onExpired),
next_refresh_time
};
scheduler.edit(sr->nextSearchStep, scheduler.time());
return true;
}
}
}
return false;
}, onExpired),
next_refresh_time
};
} else {
if (logger_)
logger_->w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.",
sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED);
ack_req->reply_time = now;
sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time);
/* step to clear announces */
scheduler.edit(sr->nextSearchStep, now);
}
if (a.permanent) {
scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, [this,ws] {
if (auto sr = ws.lock()) {
searchStep(sr);
}
});
}
}
};
static const auto PROBE_QUERY = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum));
const auto& now = scheduler.time();
for (auto& np : sr->nodes) {
auto& n = *np;
if (not n.isSynced(now))
continue;
auto gs = n.probe_query ? n.getStatus.find(n.probe_query) : n.getStatus.end();
if (gs != n.getStatus.end() and gs->second and gs->second->pending()) {
continue;
}
bool sendQuery = false;
for (auto& a : sr->announce) {
if (n.getAnnounceTime(a.value->id) <= now) {
if (a.permanent) {
sendQuery = true;
} else {
if (logger_)
logger_->w(sr->id, n.node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
sr->id.toString().c_str(), n.node->toString().c_str(), a.value->id);
n.acked[a.value->id] = {
network_engine.sendAnnounceValue(n.node, sr->id, a.value, a.created, n.token, onDone, onExpired),
now + getType(a.value->type).expiration
};
}
}
}
if (sendQuery) {
n.probe_query = PROBE_QUERY;
if (logger_)
logger_->d(sr->id, n.node->id, "[search %s] [node %s] sending %s",
sr->id.toString().c_str(), n.node->toString().c_str(), n.probe_query->toString().c_str());
auto req = network_engine.sendGetValues(n.node,
sr->id,
*PROBE_QUERY,
-1,
onSelectDone,
std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, PROBE_QUERY));
n.getStatus[PROBE_QUERY] = std::move(req);
}
if (not n.candidate and ++i == TARGET_NODES)
break;
}
}
void
Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n)
{
const auto& listenExp = getListenExpiration();
std::weak_ptr<Search> ws = sr;
for (const auto& l : sr->listeners) {
const auto& query = l.second.query;
auto r = n.listenStatus.find(query);
if (n.getListenTime(r, listenExp) > scheduler.time())
continue;
// if (logger_)
// logger_->d(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'",
// sr->id.toString().c_str(), n.node->toString().c_str());
if (r == n.listenStatus.end()) {
r = n.listenStatus.emplace(std::piecewise_construct,
std::forward_as_tuple(query),
std::forward_as_tuple(
l.second.get_cb,
l.second.sync_cb,
n.node->openSocket([this,ws,query](const Sp<Node>& node, net::RequestAnswer&& answer) mutable {
/* on new values */
if (auto sr = ws.lock()) {
scheduler.edit(sr->nextSearchStep, scheduler.time());
sr->insertNode(node, scheduler.time(), answer.ntoken);
if (auto sn = sr->getNode(node)) {
sn->onValues(query, std::move(answer), types, scheduler);
}
}
}))).first;
r->second.cacheExpirationJob = scheduler.add(time_point::max(), [this,ws,query,node=n.node]{
if (auto sr = ws.lock()) {
if (auto sn = sr->getNode(node)) {
sn->expireValues(query, scheduler);
}
}
});
}
auto new_req = network_engine.sendListen(n.node, sr->id, *query, n.token, r->second.socketId,
[this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable
{ /* on done */
if (auto sr = ws.lock()) {
scheduler.edit(sr->nextSearchStep, scheduler.time());
if (auto sn = sr->getNode(req.node)) {
scheduler.add(sn->getListenTime(query, getListenExpiration()), std::bind(&Dht::searchStep, this, sr));
sn->onListenSynced(query);
}
onListenDone(req.node, answer, sr);
}
},
[this,ws,query](const net::Request& req, bool over) mutable
{ /* on request expired */
if (auto sr = ws.lock()) {
scheduler.edit(sr->nextSearchStep, scheduler.time());
if (over)
if (auto sn = sr->getNode(req.node))
sn->listenStatus.erase(query);
}
}
);
// Here the request may have failed and the CachedListenStatus removed
r = n.listenStatus.find(query);
if (r != n.listenStatus.end()) {
r->second.req = new_req;
}
}
}
/* When a search is in progress, we periodically call search_step to send
further requests. */
void
Dht::searchStep(Sp<Search> sr)
{
if (not sr or sr->expired or sr->done) return;
const auto& now = scheduler.time();
/*if (auto req_count = sr->currentlySolicitedNodeCount())
if (logger_)
logger_->d(sr->id, "[search %s IPv%c] step (%d requests)",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', req_count);*/
sr->step_time = now;
if (sr->refill_time + Node::NODE_EXPIRE_TIME < now and sr->nodes.size()-sr->getNumberOfBadNodes() < SEARCH_NODES)
refill(*sr);
/* Check if the first TARGET_NODES (8) live nodes have replied. */
if (sr->isSynced(now)) {
if (not (sr->callbacks.empty() and sr->announce.empty())) {
// search is synced but some (newer) get operations are not complete
// Call callbacks when done
std::vector<Get> completed_gets;
for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) {
if (sr->isDone(b->second)) {
sr->setDone(b->second);
completed_gets.emplace_back(std::move(b->second));
b = sr->callbacks.erase(b);
}
else
++b;
}
// clear corresponding queries
for (const auto& get : completed_gets)
for (auto& sn : sr->nodes) {
sn->getStatus.erase(get.query);
sn->pagination_queries.erase(get.query);
}
/* clearing callbacks for announced values */
sr->checkAnnounced();
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->setDone();
}
// true if this node is part of the target nodes cluter.
/*bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0;
logger__DBG("[search %s IPv%c] synced%s",
sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");*/
if (not sr->listeners.empty()) {
unsigned i = 0;
for (auto& n : sr->nodes) {
if (not n->isSynced(now))
continue;
searchSynchedNodeListen(sr, *n);
if (not n->candidate and ++i == LISTEN_NODES)
break;
}
}
// Announce requests
searchSendAnnounceValue(sr);
if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
sr->setDone();
}
while (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES and searchSendGetValues(sr));
if (sr->getNumberOfConsecutiveBadNodes() >= std::min<size_t>(sr->nodes.size(), SEARCH_MAX_BAD_NODES))
{
if (logger_)
logger_->w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
sr->expire();
if (not public_stable)
connectivityChanged(sr->af);
}
/* dumpSearch(*sr, std::cout); */
}
unsigned Dht::refill(Dht::Search& sr) {
const auto& now = scheduler.time();
sr.refill_time = now;
/* we search for up to SEARCH_NODES good nodes. */
auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES);
if (cached_nodes.empty()) {
if (logger_)
logger_->e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search",
sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6');
return 0;
}
unsigned inserted = 0;
for (auto& i : cached_nodes) {
/* try to insert the nodes. Search::insertNode will know how many to insert. */
if (sr.insertNode(i, now))
++inserted;
}
if (logger_)
logger_->d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache",
sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted);
return inserted;
}
/* Start a search. */
Sp<Dht::Search>
Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, const Sp<Query>& q)
{
if (!isRunning(af)) {
if (logger_)
logger_->e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
if (dcb)
dcb(false, {});
return {};
}
auto& srs = searches(af);
const auto& srp = srs.find(id);
Sp<Search> sr {};
if (srp != srs.end()) {
sr = srp->second;
sr->done = false;
sr->expired = false;
} else {
if (srs.size() < max_searches) {
sr = std::make_shared<Search>();
srs.emplace(id, sr);
} else {
for (auto it = srs.begin(); it!=srs.end();) {
auto& s = *it->second;
if ((s.done or s.expired) and s.announce.empty() and s.listeners.empty()) {
sr = it->second;
break;
}
}
if (not sr) {
if (logger_)
logger_->e(id, "[search %s IPv%c] maximum number of searches reached !", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
return {};
}
}
sr->af = af;
sr->tid = search_id++;
sr->step_time = time_point::min();
sr->id = id;
sr->done = false;
sr->expired = false;
sr->nodes.clear();
sr->nodes.reserve(SEARCH_NODES+1);
sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, sr));
if (logger_)
logger_->w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
if (search_id == 0)
search_id++;
}
sr->get(f, q, qcb, gcb, dcb, scheduler);
refill(*sr);
return sr;
}
void
Dht::announce(const InfoHash& id,
sa_family_t af,
Sp<Value> value,
DoneCallback callback,
time_point created,
bool permanent)
{
auto& srs = searches(af);
auto srp = srs.find(id);
if (auto sr = srp == srs.end() ? search(id, af) : srp->second) {
sr->put(value, callback, created, permanent);
scheduler.edit(sr->nextSearchStep, scheduler.time());
} else if (callback) {
callback(false, {});
}
}
size_t
Dht::listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f, const Sp<Query>& q)
{
if (!isRunning(af))
return 0;
// logger__ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
//logger__WARN("listenTo %s", id.toString().c_str());
auto& srs = searches(af);
auto srp = srs.find(id);
Sp<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second;
if (!sr)
throw DhtException("Can't create search");
if (logger_)
logger_->w(id, "[search %s IPv%c] listen", id.to_c_str(), (af == AF_INET) ? '4' : '6');
return sr->listen(cb, f, q, scheduler);
}
size_t
Dht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where where)
{
if (not id) {
if (logger_)
logger_->w(id, "Listen called with invalid key");
return 0;
}
scheduler.syncTime();
auto token = ++listener_token;
auto gcb = OpValueCache::cacheCallback(std::move(cb), [this, id, token]{
cancelListen(id, token);
});
auto query = std::make_shared<Query>(Select{}, std::move(where));
auto filter = f.chain(query->where.getFilter());
auto st = store.find(id);
if (st == store.end() && store.size() < max_store_keys)
st = store.emplace(id, scheduler.time() + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME).first;
size_t tokenlocal = 0;
if (st != store.end()) {
tokenlocal = st->second.listen(gcb, filter, query);
if (tokenlocal == 0)
return 0;
}
auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query);
auto token6 = token4 == 0 ? 0 : Dht::listenTo(id, AF_INET6, gcb, filter, query);
if (token6 == 0 && st != store.end()) {
st->second.cancelListen(tokenlocal);
return 0;
}
listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6));
return token;
}
bool
Dht::cancelListen(const InfoHash& id, size_t token)
{
scheduler.syncTime();
auto it = listeners.find(token);
if (it == listeners.end()) {
if (logger_)
logger_->w(id, "Listen token not found: %d", token);
return false;
}
if (logger_)
logger_->d(id, "cancelListen %s with token %d", id.toString().c_str(), token);
if (auto tokenlocal = std::get<0>(it->second)) {
auto st = store.find(id);
if (st != store.end())
st->second.cancelListen(tokenlocal);
}
auto searches_cancel_listen = [this,&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) {
if (token) {
auto srp = srs.find(id);
if (srp != srs.end())
srp->second->cancelListen(token, scheduler);
}
};
searches_cancel_listen(dht4.searches, std::get<1>(it->second));
searches_cancel_listen(dht6.searches, std::get<2>(it->second));
listeners.erase(it);
return true;
}
struct OpStatus {
struct Status {
bool done {false};
bool ok {false};
Status(bool done=false, bool ok=false) : done(done), ok(ok) {}
};
Status status;
Status status4;
Status status6;
};
template <typename T>
struct GetStatus : public OpStatus {
T values;
std::vector<Sp<Node>> nodes;
};
void
Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point created, bool permanent)
{
if (not id or not val) {
if (logger_)
logger_->w(id, "Put called with invalid key or value");
if (callback)
callback(false, {});
return;
}
if (val->id == Value::INVALID_ID)
val->id = std::uniform_int_distribution<Value::Id>{1}(rd);
scheduler.syncTime();
const auto& now = scheduler.time();
created = std::min(now, created);
storageStore(id, val, created, {}, permanent);
if (logger_)
logger_->d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str());
auto op = std::make_shared<OpStatus>();
auto donecb = [callback](const std::vector<Sp<Node>>& nodes, OpStatus& op) {
// Callback as soon as the value is announced on one of the available networks
if (callback and not op.status.done and (op.status4.done && op.status6.done)) {
callback(op.status4.ok or op.status6.ok, nodes);
op.status.done = true;
}
};
announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) {
if (logger_)
logger_->d(id, "Announce done IPv4 %d", ok4);
auto& o = *op;
o.status4 = {true, ok4};
donecb(nodes, o);
}, created, permanent);
announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) {
if (logger_)
logger_->d(id, "Announce done IPv6 %d", ok6);
auto& o = *op;
o.status6 = {true, ok6};
donecb(nodes, o);
}, created, permanent);
}
template <typename T>
void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, GetStatus<T>& op) {
if (op.status.done)
return;
op.nodes.insert(op.nodes.end(), nodes.begin(), nodes.end());
if (op.status.ok or (op.status4.done and op.status6.done)) {
bool ok = op.status.ok or op.status4.ok or op.status6.ok;
op.status.done = true;
if (dcb)
dcb(ok, op.nodes);
}
}
template <typename T, typename St, typename Cb, typename Av, typename Cv>
bool callbackWrapper(Cb get_cb, DoneCallback done_cb, const std::vector<Sp<T>>& values,
Av add_values, Cv cache_values, GetStatus<St>& op)
{
if (op.status.done)
return false;
auto newvals = add_values(values);
if (not newvals.empty()) {
op.status.ok = !get_cb(newvals);
cache_values(newvals);
}
doneCallbackWrapper(done_cb, {}, op);
return !op.status.ok;
}
void
Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filter&& filter, Where&& where)
{
if (not id) {
if (logger_)
logger_->w(id, "Get called with invalid key");
if (donecb)
donecb(false, {});
return;
}
scheduler.syncTime();
auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>();
auto gcb = [getcb, donecb, op](const std::vector<Sp<Value>>& vals) {
auto& o = *op;
return callbackWrapper(getcb, donecb, vals, [&o](const std::vector<Sp<Value>>& values) {
std::vector<Sp<Value>> newvals {};
for (const auto& v : values) {
auto it = o.values.find(v->id);
if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) {
newvals.push_back(v);
}
}
return newvals;
}, [&o](const std::vector<Sp<Value>>& newvals) {
for (const auto& v : newvals)
o.values[v->id] = v;
}, o);
};
auto q = std::make_shared<Query>(Select {}, std::move(where));
auto f = filter.chain(q->where.getFilter());
/* Try to answer this search locally. */
gcb(getLocal(id, f));
Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
//logger__WARN("DHT done IPv4");
op->status4 = {true, ok};
doneCallbackWrapper(donecb, nodes, *op);
}, f, q);
Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
//logger__WARN("DHT done IPv6");
op->status6 = {true, ok};
doneCallbackWrapper(donecb, nodes, *op);
}, f, q);
}
void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q)
{
if (not id) {
if (logger_)
logger_->w(id, "Query called with invalid key");
if (done_cb)
done_cb(false, {});
return;
}
scheduler.syncTime();
auto op = std::make_shared<GetStatus<std::vector<Sp<FieldValueIndex>>>>();
auto f = q.where.getFilter();
auto qcb = [cb, done_cb, op](const std::vector<Sp<FieldValueIndex>>& fields){
auto& o = *op;
return callbackWrapper(cb, done_cb, fields, [&](const std::vector<Sp<FieldValueIndex>>& fields) {
std::vector<Sp<FieldValueIndex>> newvals {};
for (const auto& f : fields) {
auto it = std::find_if(o.values.cbegin(), o.values.cend(),
[&](const Sp<FieldValueIndex>& sf) {
return sf == f or f->containedIn(*sf);
});
if (it == o.values.cend()) {
auto lesser = std::find_if(o.values.begin(), o.values.end(),
[&](const Sp<FieldValueIndex>& sf) {
return sf->containedIn(*f);
});
if (lesser != o.values.end())
o.values.erase(lesser);
newvals.push_back(f);
}
}
return newvals;
}, [&](const std::vector<Sp<FieldValueIndex>>& fields){
o.values.insert(o.values.end(), fields.begin(), fields.end());
}, o);
};
/* Try to answer this search locally. */
auto values = getLocal(id, f);
std::vector<Sp<FieldValueIndex>> local_fields(values.size());
std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) {
return std::make_shared<FieldValueIndex>(*v, q.select);
});
qcb(local_fields);
auto sq = std::make_shared<Query>(std::move(q));
Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
//logger__WARN("DHT done IPv4");
op->status4 = {true, ok};
doneCallbackWrapper(done_cb, nodes, *op);
}, f, sq);
Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
//logger__WARN("DHT done IPv6");
op->status6 = {true, ok};
doneCallbackWrapper(done_cb, nodes, *op);
}, f, sq);
}
std::vector<Sp<Value>>
Dht::getLocal(const InfoHash& id, const Value::Filter& f) const
{
auto s = store.find(id);
if (s == store.end()) return {};
return s->second.get(f);
}
Sp<Value>
Dht::getLocalById(const InfoHash& id, Value::Id vid) const
{
auto s = store.find(id);
if (s != store.end())
return s->second.getById(vid);
return {};
}
std::vector<Sp<Value>>
Dht::getPut(const InfoHash& id) const
{
std::vector<Sp<Value>> ret;
auto find_values = [&](const std::map<InfoHash, Sp<Search>>& srs) {
auto srp = srs.find(id);
if (srp == srs.end()) return;
auto vals = srp->second->getPut();
ret.insert(ret.end(), vals.begin(), vals.end());
};
find_values(dht4.searches);
find_values(dht6.searches);
return ret;
}
Sp<Value>
Dht::getPut(const InfoHash& id, const Value::Id& vid) const
{
auto find_value = [&](const std::map<InfoHash, Sp<Search>>& srs) {
auto srp = srs.find(id);
return (srp != srs.end()) ? srp->second->getPut(vid) : Sp<Value> {};
};
if (auto v4 = find_value(dht4.searches))
return v4;
if (auto v6 = find_value(dht6.searches))
return v6;
return {};
}
bool
Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
{
bool canceled {false};
auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>>& srs) {
auto srp = srs.find(id);
return (srp != srs.end()) ? srp->second->cancelPut(vid) : false;
};
canceled |= sr_cancel_put(dht4.searches);
canceled |= sr_cancel_put(dht6.searches);
if (canceled)
storageErase(id, vid);
return canceled;
}
// Storage
void
Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newValue)
{
if (newValue) {
if (not st.local_listeners.empty()) {
if (logger_)
logger_->d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size());
std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs;
cbs.reserve(st.local_listeners.size());
for (const auto& l : st.local_listeners) {
std::vector<Sp<Value>> vals;
if (not l.second.filter or l.second.filter(*v.data))
vals.push_back(v.data);
if (not vals.empty()) {
if (logger_)
logger_->d(id, "[store %s] sending update local listener with token %lu",
id.toString().c_str(),
l.first);
cbs.emplace_back(l.second.get_cb, std::move(vals));
}
}
// listeners are copied: they may be deleted by the callback
for (auto& cb : cbs)
cb.first(cb.second, false);
}
}
if (not st.listeners.empty()) {
if (logger_)
logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
for (const auto& node_listeners : st.listeners) {
for (const auto& l : node_listeners.second) {
auto f = l.second.query.where.getFilter();
if (f and not f(*v.data))
continue;
if (logger_)
logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending update",
id.toString().c_str(),
node_listeners.first->toString().c_str());
std::vector<Sp<Value>> vals {};
vals.push_back(v.data);
Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
network_engine.tellListener(node_listeners.first, l.first, id, 0, ntoken, {}, {},
std::move(vals), l.second.query, l.second.version);
}
}
}
}
bool
Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa, bool permanent)
{
const auto& now = scheduler.time();
created = std::min(created, now);
auto expiration = permanent ? time_point::max() : created + getType(value->type).expiration;
if (expiration < now)
return false;
auto st = store.find(id);
if (st == store.end()) {
if (store.size() >= max_store_keys)
return false;
auto st_i = store.emplace(id, now);
st = st_i.first;
if (maintain_storage and st_i.second)
scheduler.add(st->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
}
StorageBucket* store_bucket {nullptr};
if (sa)
store_bucket = &store_quota[sa];
auto store = st->second.store(id, value, created, expiration, store_bucket);
if (auto vs = store.first) {
total_store_size += store.second.size_diff;
total_values += store.second.values_diff;
if (not permanent) {
scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
}
if (total_store_size > max_store_size) {
expireStore();
}
storageChanged(id, st->second, *vs, store.second.values_diff > 0);
}
return std::get<0>(store);
}
bool
Dht::storageErase(const InfoHash& id, Value::Id vid)
{
auto st = store.find(id);
if (st == store.end())
return false;
auto ret = st->second.remove(id, vid);
total_store_size += ret.size_diff;
total_values += ret.values_diff;
return ret.values_diff;
}
void
Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_id, Query&& query, int version)
{
const auto& now = scheduler.time();
auto st = store.find(id);
if (st == store.end()) {
if (store.size() >= max_store_keys)
return;
st = store.emplace(id, now).first;
}
auto& node_listeners = st->second.listeners[node];
auto l = node_listeners.find(socket_id);
if (l == node_listeners.end()) {
auto vals = st->second.get(query.where.getFilter());
if (not vals.empty()) {
network_engine.tellListener(node, socket_id, id, WANT4 | WANT6, makeToken(node->getAddr(), false),
dht4.buckets.findClosestNodes(id, now, TARGET_NODES), dht6.buckets.findClosestNodes(id, now, TARGET_NODES),
std::move(vals), query, version);
}
node_listeners.emplace(socket_id, Listener {now, std::forward<Query>(query), version});
}
else
l->second.refresh(now, std::forward<Query>(query));
}
void
Dht::expireStore(decltype(store)::iterator i)
{
const auto& id = i->first;
auto& st = i->second;
auto stats = st.expire(id, scheduler.time());
total_store_size += stats.first;
total_values -= stats.second.size();
if (not stats.second.empty()) {
if (logger_)
logger_->d(id, "[store %s] discarded %ld expired values (%ld bytes)",
id.toString().c_str(), stats.second.size(), -stats.first);
if (not st.listeners.empty()) {
if (logger_)
logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
std::vector<Value::Id> ids;
ids.reserve(stats.second.size());
for (const auto& v : stats.second)
ids.emplace_back(v->id);
for (const auto& node_listeners : st.listeners) {
for (const auto& l : node_listeners.second) {
if (logger_)
logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending expired",
id.toString().c_str(),
node_listeners.first->toString().c_str());
Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids, l.second.version);
}
}
}
for (const auto& local_listeners : st.local_listeners) {
local_listeners.second.get_cb(stats.second, true);
}
}
}
void
Dht::expireStorage(InfoHash h)
{
auto i = store.find(h);
if (i != store.end())
expireStore(i);
}
void
Dht::expireStore()
{
// removing expired values
for (auto i = store.begin(); i != store.end();) {
expireStore(i);
if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) {
if (logger_)
logger_->d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str());
i = store.erase(i);
}
else
++i;
}
// remove more values if storage limit is exceeded
while (total_store_size > max_store_size) {
// find IP using the most storage
if (store_quota.empty()) {
if (logger_)
logger_->w("No space left: local data consumes all the quota!");
break;
}
auto largest = store_quota.begin();
for (auto it = ++largest; it != store_quota.end(); ++it) {
if (it->second.size() > largest->second.size())
largest = it;
}
if (logger_)
logger_->w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str());
while (true) {
auto exp_value = largest->second.getOldest();
auto storage = store.find(exp_value.first);
if (storage != store.end()) {
auto ret = storage->second.remove(exp_value.first, exp_value.second);
total_store_size += ret.size_diff;
total_values += ret.values_diff;
if (logger_)
logger_->w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size);
if (ret.values_diff)
break;
}
}
}
// remove unused quota entires
for (auto i = store_quota.begin(); i != store_quota.end();) {
if (i->second.size() == 0)
i = store_quota.erase(i);
else
++i;
}
}
void
Dht::connectivityChanged(sa_family_t af)
{
const auto& now = scheduler.time();
scheduler.edit(nextNodesConfirmation, now);
buckets(af).connectivityChanged(now);
network_engine.connectivityChanged(af);
reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){
return addr.second.getFamily() == af;
}), reported_addr.end());
}
void
Dht::rotateSecrets()
{
oldsecret = secret;
secret = std::uniform_int_distribution<uint64_t>{}(rd);
uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45));
auto rotate_secrets_time = scheduler.time() + time_dist(rd);
scheduler.add(rotate_secrets_time, std::bind(&Dht::rotateSecrets, this));
}
Blob
Dht::makeToken(const SockAddr& addr, bool old) const
{
const void *ip;
size_t iplen;
in_port_t port;
auto family = addr.getFamily();
if (family == AF_INET) {
const auto& sin = addr.getIPv4();
ip = &sin.sin_addr;
iplen = 4;
port = sin.sin_port;
} else if (family == AF_INET6) {
const auto& sin6 = addr.getIPv6();
ip = &sin6.sin6_addr;
iplen = 16;
port = sin6.sin6_port;
} else {
return {};
}
const auto& c1 = old ? oldsecret : secret;
Blob data;
data.reserve(sizeof(secret)+sizeof(in_port_t)+iplen);
data.insert(data.end(), (uint8_t*)&c1, ((uint8_t*)&c1) + sizeof(c1));
data.insert(data.end(), (uint8_t*)ip, (uint8_t*)ip+iplen);
data.insert(data.end(), (uint8_t*)&port, ((uint8_t*)&port)+sizeof(in_port_t));
return crypto::hash(data, TOKEN_SIZE);
}
bool
Dht::tokenMatch(const Blob& token, const SockAddr& addr) const
{
if (not addr or token.size() != TOKEN_SIZE)
return false;
if (token == makeToken(addr, false))
return true;
if (token == makeToken(addr, true))
return true;
return false;
}
NodeStats
Dht::getNodesStats(sa_family_t af) const
{
NodeStats stats = dht(af).getNodesStats(scheduler.time(), myid);
stats.node_cache_size = network_engine.getNodeCacheSize(af);
return stats;
}
NodeStats
Dht::Kad::getNodesStats(time_point now, const InfoHash& myid) const
{
NodeStats stats {};
for (const auto& b : buckets) {
for (auto& n : b.nodes) {
if (n->isGood(now)) {
stats.good_nodes++;
if (n->isIncoming())
stats.incoming_nodes++;
} else if (not n->isExpired())
stats.dubious_nodes++;
}
if (b.cached)
stats.cached_nodes++;
}
stats.table_depth = buckets.depth(buckets.findBucket(myid));
stats.searches = searches.size();
return stats;
}
void
Dht::dumpBucket(const Bucket& b, std::ostream& out) const
{
const auto& now = scheduler.time();
using namespace std::chrono;
out << b.first << " count: " << b.nodes.size() << " updated: " << print_time_relative(now, b.time);
if (b.cached)
out << " (cached)";
out << std::endl;
for (auto& n : b.nodes) {
out << " Node " << n->toString();
const auto& t = n->getTime();
const auto& r = n->getReplyTime();
if (t != r)
out << " updated: " << print_time_relative(now, t) << ", replied: " << print_time_relative(now, r);
else
out << " updated: " << print_time_relative(now, t);
if (n->isExpired())
out << " [expired]";
else if (n->isGood(now))
out << " [good]";
out << std::endl;
}
}
void
Dht::dumpSearch(const Search& sr, std::ostream& out) const
{
const auto& now = scheduler.time();
const auto& listen_expire = getListenExpiration();
using namespace std::chrono;
out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " gets: " << sr.callbacks.size();
out << ", last step: " << print_time_relative(now, sr.step_time);
if (sr.done)
out << " [done]";
if (sr.expired)
out << " [expired]";
bool synced = sr.isSynced(now);
out << (synced ? " [synced]" : " [not synced]");
if (synced && sr.isListening(now, listen_expire))
out << " [listening]";
out << std::endl;
/*printing the queries*/
if (sr.callbacks.size() + sr.listeners.size() > 0)
out << "Queries:" << std::endl;
for (const auto& cb : sr.callbacks) {
out << *cb.second.query << std::endl;
}
for (const auto& l : sr.listeners) {
out << *l.second.query << std::endl;
}
for (const auto& a : sr.announce) {
bool announced = sr.isAnnounced(a.value->id);
out << "Announcement: " << *a.value << (announced ? " [announced]" : "") << std::endl;
}
out << " Common bits InfoHash Conn. Get Ops IP" << std::endl;
auto last_get = sr.getLastGetTime();
for (const auto& np : sr.nodes) {
auto& n = *np;
out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id;
out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' ');
out << " [";
if (auto pendingCount = n.node->getPendingMessageCount())
out << pendingCount;
else
out << ' ';
out << (n.node->isExpired() ? 'x' : ' ') << "]";
// Get status
{
char g_i = n.pending(n.getStatus) ? (n.candidate ? 'c' : 'f') : ' ';
char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-';
out << " [" << s_i << g_i << "] ";
}
// Listen status
if (not sr.listeners.empty()) {
if (n.listenStatus.empty())
out << " ";
else
out << "["
<< (n.isListening(now,listen_expire) ? 'l' : (n.pending(n.listenStatus) ? 'f' : ' ')) << "] ";
}
// Announce status
if (not sr.announce.empty()) {
if (n.acked.empty()) {
out << " ";
for (size_t a=0; a < sr.announce.size(); a++)
out << ' ';
} else {
out << "[";
for (const auto& a : sr.announce) {
auto ack = n.acked.find(a.value->id);
if (ack == n.acked.end() or not ack->second.first) {
out << ' ';
} else {
out << ack->second.first->getStateChar();
}
}
out << "] ";
}
}
out << n.node->getAddrStr() << std::endl;
}
}
void
Dht::dumpTables() const
{
std::stringstream out;
out << "My id " << myid << std::endl;
out << "Buckets IPv4 :" << std::endl;
for (const auto& b : dht4.buckets)
dumpBucket(b, out);
out << "Buckets IPv6 :" << std::endl;
for (const auto& b : dht6.buckets)
dumpBucket(b, out);
auto dump_searches = [&](std::map<InfoHash, Sp<Search>> srs) {
for (auto& srp : srs)
dumpSearch(*srp.second, out);
};
dump_searches(dht4.searches);
dump_searches(dht6.searches);
out << std::endl;
out << getStorageLog() << std::endl;
if (logger_)
logger_->d("%s", out.str().c_str());
}
std::string
Dht::getStorageLog() const
{
std::stringstream out;
for (const auto& s : store)
out << printStorageLog(s);
out << std::endl << std::endl;
std::multimap<size_t, const SockAddr*> q_map;
for (const auto& ip : store_quota)
if (ip.second.size())
q_map.emplace(ip.second.size(), &ip.first);
for (auto ip = q_map.rbegin(); ip != q_map.rend(); ++ip)
out << "IP " << ip->second->toString() << " uses " << ip->first << " bytes" << std::endl;
out << std::endl;
out << "Total " << store.size() << " storages, " << total_values << " values (";
if (total_store_size < 1024)
out << total_store_size << " bytes)";
else
out << (total_store_size/1024) << " / " << (max_store_size/1024) << " KB)";
out << std::endl;
return out.str();
}
std::string
Dht::getStorageLog(const InfoHash& h) const
{
auto s = store.find(h);
if (s == store.end()) {
std::stringstream out;
out << "Storage " << h << " empty" << std::endl;
return out.str();
}
return printStorageLog(*s);
}
std::string
Dht::printStorageLog(const decltype(store)::value_type& s) const
{
std::stringstream out;
using namespace std::chrono;
const auto& st = s.second;
out << "Storage " << s.first << " "
<< st.listeners.size() << " list., "
<< st.valueCount() << " values ("
<< st.totalSize() << " bytes)" << std::endl;
if (not st.local_listeners.empty())
out << " " << st.local_listeners.size() << " local listeners" << std::endl;
for (const auto& node_listeners : st.listeners) {
const auto& node = node_listeners.first;
out << " " << "Listener " << node->toString() << " : " << node_listeners.second.size() << " entries" << std::endl;
}
return out.str();
}
std::string
Dht::getRoutingTablesLog(sa_family_t af) const
{
std::stringstream out;
for (const auto& b : buckets(af))
dumpBucket(b, out);
return out.str();
}
std::string
Dht::getSearchesLog(sa_family_t af) const
{
std::stringstream out;
auto num_searches = dht4.searches.size() + dht6.searches.size();
if (num_searches > 8) {
if (not af or af == AF_INET)
for (const auto& sr : dht4.searches)
out << "[search " << sr.first << " IPv4]" << std::endl;
if (not af or af == AF_INET6)
for (const auto& sr : dht6.searches)
out << "[search " << sr.first << " IPv6]" << std::endl;
} else {
out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl;
if (not af or af == AF_INET)
for (const auto& sr : dht4.searches)
dumpSearch(*sr.second, out);
if (not af or af == AF_INET6)
for (const auto& sr : dht6.searches)
dumpSearch(*sr.second, out);
}
out << "Total: " << num_searches << " searches (" << dht4.searches.size() << " IPv4, " << dht6.searches.size() << " IPv6)." << std::endl;
return out.str();
}
std::string
Dht::getSearchLog(const InfoHash& id, sa_family_t af) const
{
std::stringstream out;
if (af == AF_UNSPEC) {
out << getSearchLog(id, AF_INET) << getSearchLog(id, AF_INET6);
} else {
auto& srs = searches(af);
auto sr = srs.find(id);
if (sr != srs.end())
dumpSearch(*sr->second, out);
}
return out.str();
}
Dht::~Dht()
{
for (auto& s : dht4.searches)
s.second->clear();
for (auto& s : dht6.searches)
s.second->clear();
}
net::NetworkConfig
fromDhtConfig(const Config& config)
{
net::NetworkConfig netConf;
netConf.network = config.network;
netConf.max_req_per_sec = config.max_req_per_sec ? config.max_req_per_sec : MAX_REQUESTS_PER_SEC;
netConf.max_peer_req_per_sec = config.max_peer_req_per_sec
? config.max_peer_req_per_sec
: netConf.max_req_per_sec/8;
return netConf;
}
Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l)
: DhtInterface(l),
myid(config.node_id ? config.node_id : InfoHash::getRandom(rd)),
store(),
store_quota(),
max_store_keys(config.max_store_size ? (int)config.max_store_size : MAX_HASHES),
max_searches(config.max_searches ? (int)config.max_searches : MAX_SEARCHES),
network_engine(myid, fromDhtConfig(config), std::move(sock), logger_, rd, scheduler,
std::bind(&Dht::onError, this, _1, _2),
std::bind(&Dht::onNewNode, this, _1, _2),
std::bind(&Dht::onReportedAddr, this, _1, _2),
std::bind(&Dht::onPing, this, _1),
std::bind(&Dht::onFindNode, this, _1, _2, _3),
std::bind(&Dht::onGetValues, this, _1, _2, _3, _4),
std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5, _6),
std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5),
std::bind(&Dht::onRefresh, this, _1, _2, _3, _4)),
persistPath(config.persist_path),
is_bootstrap(config.is_bootstrap),
maintain_storage(config.maintain_storage),
public_stable(config.public_stable)
{
scheduler.syncTime();
auto s = network_engine.getSocket();
if (not s or (not s->hasIPv4() and not s->hasIPv6()))
throw DhtException("Opened socket required");
if (s->hasIPv4()) {
dht4.buckets = {Bucket {AF_INET}};
dht4.buckets.is_client = config.is_bootstrap;
}
if (s->hasIPv6()) {
dht6.buckets = {Bucket {AF_INET6}};
dht6.buckets.is_client = config.is_bootstrap;
}
search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd);
uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)};
nextNodesConfirmation = scheduler.add(scheduler.time() + time_dis(rd), std::bind(&Dht::confirmNodes, this));
// Fill old secret
secret = std::uniform_int_distribution<uint64_t>{}(rd);
rotateSecrets();
if (not persistPath.empty())
loadState(persistPath);
expire();
if (logger_)
logger_->d("DHT node initialised with ID %s", myid.toString().c_str());
}
bool
Dht::neighbourhoodMaintenance(RoutingTable& list)
{
//logger__DBG("neighbourhoodMaintenance");
auto b = list.findBucket(myid);
if (b == list.end())
return false;
InfoHash id = myid;
#ifdef _WIN32
std::uniform_int_distribution<int> rand_byte{ 0, std::numeric_limits<uint8_t>::max() };
#else
std::uniform_int_distribution<uint8_t> rand_byte;
#endif
id[HASH_LEN-1] = rand_byte(rd);
std::bernoulli_distribution rand_trial(1./8.);
auto q = b;
if (std::next(q) != list.end() && (q->nodes.empty() || rand_trial(rd)))
q = std::next(q);
if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
auto r = std::prev(b);
if (!r->nodes.empty())
q = r;
}
auto n = q->randomNode(rd);
if (n) {
if (logger_)
logger_->d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance",
n->toString().c_str(), id.toString().c_str());
/* Since our node-id is the same in both DHTs, it's probably
profitable to query both families. */
network_engine.sendFindNode(n, id, network_engine.want());
}
return true;
}
bool
Dht::bucketMaintenance(RoutingTable& list)
{
std::bernoulli_distribution rand_trial(1./8.);
std::bernoulli_distribution rand_trial_38(1./38.);
bool sent {false};
for (auto b = list.begin(); b != list.end(); ++b) {
if (b->time < scheduler.time() - std::chrono::minutes(10) || b->nodes.empty()) {
/* This bucket hasn't seen any positive confirmation for a long
time. Pick a random id in this bucket's range, and send a request
to a random node. */
InfoHash id = list.randomId(b, rd);
auto q = b;
/* If the bucket is empty, we try to fill it from a neighbour.
We also sometimes do it gratuitiously to recover from
buckets full of broken nodes. */
if (std::next(b) != list.end() && (q->nodes.empty() || rand_trial(rd)))
q = std::next(b);
if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
auto r = std::prev(b);
if (!r->nodes.empty())
q = r;
}
auto n = q->randomNode(rd);
if (n and not n->isPendingMessage()) {
want_t want = -1;
if (network_engine.want() != want) {
auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET);
if (otherbucket && otherbucket->nodes.size() < TARGET_NODES)
/* The corresponding bucket in the other family
is emptyish -- querying both is useful. */
want = WANT4 | WANT6;
else if (rand_trial_38(rd))
/* Most of the time, this just adds overhead.
However, it might help stitch back one of
the DHTs after a network collapse, so query
both, but only very occasionally. */
want = WANT4 | WANT6;
}
if (logger_)
logger_->d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str());
//auto start = scheduler.time();
network_engine.sendFindNode(n, id, want, nullptr, [this,n](const net::Request&, bool over) {
if (over) {
const auto& end = scheduler.time();
// using namespace std::chrono;
// if (logger_)
// logger_->d(n->id, "[node %s] bucket maintenance op expired after %s", n->toString().c_str(), print_duration(end-start).c_str());
scheduler.edit(nextNodesConfirmation, end + Node::MAX_RESPONSE_TIME);
}
});
sent = true;
}
}
}
return sent;
}
void
Dht::dataPersistence(InfoHash id)
{
const auto& now = scheduler.time();
auto str = store.find(id);
if (str != store.end() and now > str->second.maintenance_time) {
if (logger_)
logger_->d(id, "[storage %s] maintenance (%u values, %u bytes)",
id.toString().c_str(), str->second.valueCount(), str->second.totalSize());
maintainStorage(*str);
str->second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
scheduler.add(str->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
}
}
size_t
Dht::maintainStorage(decltype(store)::value_type& storage, bool force, const DoneCallback& donecb)
{
const auto& now = scheduler.time();
size_t announce_per_af = 0;
auto maintain = [&](sa_family_t af){
bool want = true;
auto nodes = buckets(af).findClosestNodes(storage.first, now);
if (!nodes.empty()) {
if (force || storage.first.xorCmp(nodes.back()->id, myid) < 0) {
for (auto &value : storage.second.getValues()) {
const auto& vt = getType(value.data->type);
if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there
announce(storage.first, af, value.data, donecb, value.created);
++announce_per_af;
}
}
want = false;
}
}
return want;
};
bool want4 = maintain(AF_INET), want6 = maintain(AF_INET6);
if (not want4 and not want6) {
if (logger_)
logger_->d(storage.first, "Discarding storage values %s", storage.first.toString().c_str());
auto diff = storage.second.clear();
total_store_size += diff.size_diff;
total_values += diff.values_diff;
}
return announce_per_af;
}
time_point
Dht::periodic(const uint8_t *buf, size_t buflen, SockAddr from, const time_point& now)
{
scheduler.syncTime(now);
if (buflen) {
try {
network_engine.processMessage(buf, buflen, std::move(from));
} catch (const std::exception& e) {
if (logger_)
logger_->w("Can't process message: %s", e.what());
}
}
return scheduler.run();
}
void
Dht::expire()
{
uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));
expireBuckets(dht4.buckets);
expireBuckets(dht6.buckets);
expireStore();
expireSearches();
scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this));
}
void
Dht::onConnected()
{
if (bootstrapJob) {
bootstrapJob->cancel();
bootstrapJob.reset();
}
bootstrap_period = std::chrono::seconds(10);
auto callbacks = std::move(onConnectCallbacks_);
while (not callbacks.empty()) {
callbacks.front()();
callbacks.pop();
}
}
void
Dht::onDisconnected()
{
if (dht4.status != NodeStatus::Disconnected || dht6.status != NodeStatus::Disconnected)
return;
if (logger_)
logger_->d(myid, "Bootstraping");
for (const auto& boootstrap : bootstrap_nodes) {
try {
auto ips = network_engine.getSocket()->resolve(boootstrap.first, boootstrap.second);
for (auto& ip : ips) {
if (ip.getPort() == 0)
ip.setPort(net::DHT_DEFAULT_PORT);
pingNode(ip);
}
} catch (const std::exception& e) {
if (logger_)
logger_->e(myid, "Can't resolve %s:%s: %s", boootstrap.first.c_str(), boootstrap.second.c_str(), e.what());
}
}
if (bootstrapJob)
bootstrapJob->cancel();
bootstrapJob = scheduler.add(scheduler.time() + bootstrap_period, std::bind(&Dht::onDisconnected, this));
bootstrap_period *= 2;
}
void
Dht::confirmNodes()
{
using namespace std::chrono;
bool soon = false;
const auto& now = scheduler.time();
if (dht4.searches.empty() and dht4.status == NodeStatus::Connected) {
if (logger_)
logger_->d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str());
search(myid, AF_INET);
}
if (dht6.searches.empty() and dht6.status == NodeStatus::Connected) {
if (logger_)
logger_->d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str());
search(myid, AF_INET6);
}
soon |= bucketMaintenance(dht4.buckets);
soon |= bucketMaintenance(dht6.buckets);
if (!soon) {
if (dht4.buckets.grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(dht4.buckets);
if (dht6.buckets.grow_time >= now - seconds(150))
soon |= neighbourhoodMaintenance(dht6.buckets);
}
/* In order to maintain all buckets' age within 600 seconds, worst
case is roughly 27 seconds, assuming the table is 22 bits deep.
We want to keep a margin for neighborhood maintenance, so keep
this within 25 seconds. */
auto time_dis = soon
? uniform_duration_distribution<> {seconds(5) , seconds(25)}
: uniform_duration_distribution<> {seconds(60), seconds(180)};
auto confirm_nodes_time = now + time_dis(rd);
scheduler.edit(nextNodesConfirmation, confirm_nodes_time);
}
std::vector<ValuesExport>
Dht::exportValues() const
{
std::vector<ValuesExport> e {};
e.reserve(store.size());
for (const auto& h : store) {
ValuesExport ve;
ve.first = h.first;
msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer);
const auto& vals = h.second.getValues();
pk.pack_array(vals.size());
for (const auto& v : vals) {
pk.pack_array(2);
pk.pack(v.created.time_since_epoch().count());
v.data->msgpack_pack(pk);
}
ve.second = {buffer.data(), buffer.data()+buffer.size()};
e.push_back(std::move(ve));
}
return e;
}
void
Dht::importValues(const std::vector<ValuesExport>& import)
{
const auto& now = scheduler.time();
for (const auto& value : import) {
if (value.second.empty())
continue;
try {
msgpack::unpacked msg;
msgpack::unpack(msg, (const char*)value.second.data(), value.second.size());
auto valarr = msg.get();
if (valarr.type != msgpack::type::ARRAY)
throw msgpack::type_error();
for (unsigned i = 0; i < valarr.via.array.size; i++) {
auto& valel = valarr.via.array.ptr[i];
if (valel.type != msgpack::type::ARRAY or valel.via.array.size < 2)
throw msgpack::type_error();
time_point val_time;
Value tmp_val;
try {
val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}};
tmp_val.msgpack_unpack(valel.via.array.ptr[1]);
} catch (const std::exception&) {
if (logger_)
logger_->e(value.first, "Error reading value at %s", value.first.toString().c_str());
continue;
}
val_time = std::min(val_time, now);
storageStore(value.first, std::make_shared<Value>(std::move(tmp_val)), val_time);
}
} catch (const std::exception&) {
if (logger_)
logger_->e(value.first, "Error reading values at %s", value.first.toString().c_str());
continue;
}
}
}
std::vector<NodeExport>
Dht::exportNodes() const
{
const auto& now = scheduler.time();
std::vector<NodeExport> nodes;
const auto b4 = dht4.buckets.findBucket(myid);
if (b4 != dht4.buckets.end()) {
for (auto& n : b4->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
const auto b6 = dht6.buckets.findBucket(myid);
if (b6 != dht6.buckets.end()) {
for (auto& n : b6->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = dht4.buckets.begin(); b != dht4.buckets.end(); ++b) {
if (b == b4) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
for (auto b = dht6.buckets.begin(); b != dht6.buckets.end(); ++b) {
if (b == b6) continue;
for (auto& n : b->nodes)
if (n->isGood(now))
nodes.push_back(n->exportNode());
}
return nodes;
}
void
Dht::insertNode(const InfoHash& id, const SockAddr& addr)
{
if (addr.getFamily() != AF_INET && addr.getFamily() != AF_INET6)
return;
scheduler.syncTime();
network_engine.insertNode(id, addr);
}
void
Dht::pingNode(SockAddr sa, DoneCallbackSimple&& cb)
{
scheduler.syncTime();
if (logger_)
logger_->d("Sending ping to %s", sa.toString().c_str());
auto& count = dht(sa.getFamily()).pending_pings;
count++;
network_engine.sendPing(std::move(sa), [&count,cb](const net::Request&, net::RequestAnswer&&) {
count--;
if (cb)
cb(true);
}, [&count,cb](const net::Request&, bool last){
if (last) {
count--;
if (cb)
cb(false);
}
});
}
void
Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) {
const auto& node = req->node;
if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) {
if (logger_)
logger_->e(node->id, "[node %s] token flush", node->toString().c_str());
node->authError();
for (auto& srp : searches(node->getFamily())) {
auto& sr = srp.second;
for (auto& n : sr->nodes) {
if (n->node != node) continue;
n->token.clear();
n->last_get_reply = time_point::min();
searchSendGetValues(sr);
scheduler.edit(sr->nextSearchStep, scheduler.time());
break;
}
}
} else if (e.getCode() == net::DhtProtocolException::NOT_FOUND) {
if (logger_)
logger_->e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str());
node->cancelRequest(req);
}
}
void
Dht::onReportedAddr(const InfoHash& /*id*/, const SockAddr& addr)
{
if (addr)
reportedAddr(addr);
}
net::RequestAnswer
Dht::onPing(Sp<Node>)
{
return {};
}
net::RequestAnswer
Dht::onFindNode(Sp<Node> node, const InfoHash& target, want_t want)
{
const auto& now = scheduler.time();
net::RequestAnswer answer;
answer.ntoken = makeToken(node->getAddr(), false);
if (want & WANT4)
answer.nodes4 = dht4.buckets.findClosestNodes(target, now, TARGET_NODES);
if (want & WANT6)
answer.nodes6 = dht6.buckets.findClosestNodes(target, now, TARGET_NODES);
return answer;
}
net::RequestAnswer
Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query)
{
if (not hash) {
if (logger_)
logger_->w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str());
throw net::DhtProtocolException {
net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
net::DhtProtocolException::GET_NO_INFOHASH
};
}
const auto& now = scheduler.time();
net::RequestAnswer answer {};
auto st = store.find(hash);
answer.ntoken = makeToken(node->getAddr(), false);
answer.nodes4 = dht4.buckets.findClosestNodes(hash, now, TARGET_NODES);
answer.nodes6 = dht6.buckets.findClosestNodes(hash, now, TARGET_NODES);
if (st != store.end() && not st->second.empty()) {
answer.values = st->second.get(query.where.getFilter());
if (logger_)
logger_->d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size());
}
return answer;
}
void Dht::onGetValuesDone(const Sp<Node>& node,
net::RequestAnswer& a,
Sp<Search>& sr,
const Sp<Query>& orig_query)
{
if (not sr) {
if (logger_)
logger_->w("[search unknown] got reply to 'get'. Ignoring.");
return;
}
/* if (logger_)
logger_->d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes",
sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size());*/
if (not a.ntoken.empty()) {
if (not a.values.empty() or not a.fields.empty()) {
if (logger_)
logger_->d(sr->id, node->id, "[search %s] [node %s] found %u values",
sr->id.toString().c_str(), node->toString().c_str(), a.values.size());
for (auto& getp : sr->callbacks) { /* call all callbacks for this search */
auto& get = getp.second;
if (not (get.get_cb or get.query_cb) or
(orig_query and get.query and not get.query->isSatisfiedBy(*orig_query)))
continue;
if (get.query_cb) { /* in case of a request with query */
if (not a.fields.empty()) {
get.query_cb(a.fields);
} else if (not a.values.empty()) {
std::vector<Sp<FieldValueIndex>> fields;
fields.reserve(a.values.size());
for (const auto& v : a.values)
fields.emplace_back(std::make_shared<FieldValueIndex>(*v, orig_query ? orig_query->select : Select {}));
get.query_cb(fields);
}
} else if (get.get_cb) { /* in case of a vanilla get request */
std::vector<Sp<Value>> tmp;
for (const auto& v : a.values)
if (not get.filter or get.filter(*v))
tmp.emplace_back(v);
if (not tmp.empty())
get.get_cb(tmp);
}
}
/* callbacks for local search listeners */
/*std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> tmp_lists;
for (auto& l : sr->listeners) {
if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query)))
continue;
std::vector<Sp<Value>> tmp;
for (const auto& v : a.values)
if (not l.second.filter or l.second.filter(*v))
tmp.emplace_back(v);
if (not tmp.empty())
tmp_lists.emplace_back(l.second.get_cb, std::move(tmp));
}
for (auto& l : tmp_lists)
l.first(l.second, false);*/
} else if (not a.expired_values.empty()) {
if (logger_)
logger_->w(sr->id, node->id, "[search %s] [node %s] %u expired values",
sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size());
}
} else {
if (logger_)
logger_->w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str());
network_engine.blacklistNode(node);
}
if (not sr->done) {
searchSendGetValues(sr);
// Force to recompute the next step time
scheduler.edit(sr->nextSearchStep, scheduler.time());
}
}
net::RequestAnswer
Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query, int version)
{
if (not hash) {
if (logger_)
logger_->w(node->id, "[node %s] listen with no info_hash", node->toString().c_str());
throw net::DhtProtocolException {
net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
net::DhtProtocolException::LISTEN_NO_INFOHASH
};
}
if (not tokenMatch(token, node->getAddr())) {
if (logger_)
logger_->w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str());
throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN};
}
Query q = query;
storageAddListener(hash, node, socket_id, std::move(q), version);
return {};
}
void
Dht::onListenDone(const Sp<Node>& /* node */, net::RequestAnswer& /* answer */, Sp<Search>& sr)
{
// if (logger_)
// logger_->d(sr->id, node->id, "[search %s] [node %s] got listen confirmation",
// sr->id.toString().c_str(), node->toString().c_str(), answer.values.size());
if (not sr->done) {
const auto& now = scheduler.time();
searchSendGetValues(sr);
scheduler.edit(sr->nextSearchStep, now);
}
}
net::RequestAnswer
Dht::onAnnounce(Sp<Node> n,
const InfoHash& hash,
const Blob& token,
const std::vector<Sp<Value>>& values,
const time_point& creation_date)
{
auto& node = *n;
if (not hash) {
if (logger_)
logger_->w(node.id, "put with no info_hash");
throw net::DhtProtocolException {
net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
net::DhtProtocolException::PUT_NO_INFOHASH
};
}
if (!tokenMatch(token, node.getAddr())) {
if (logger_)
logger_->w(hash, node.id, "[node %s] incorrect token %s for 'put'", node.toString().c_str(), hash.toString().c_str());
throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::PUT_WRONG_TOKEN};
}
{
// We store a value only if we think we're part of the
// SEARCH_NODES nodes around the target id.
auto closest_nodes = buckets(node.getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
if (logger_)
logger_->w(hash, node.id, "[node %s] announce too far from the target. Dropping value.", node.toString().c_str());
return {};
}
}
auto created = std::min(creation_date, scheduler.time());
for (const auto& v : values) {
if (v->id == Value::INVALID_ID) {
if (logger_)
logger_->w(hash, node.id, "[value %s] incorrect value id", hash.toString().c_str());
throw net::DhtProtocolException {
net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
net::DhtProtocolException::PUT_INVALID_ID
};
}
auto lv = getLocalById(hash, v->id);
Sp<Value> vc = v;
if (lv) {
if (*lv == *vc) {
storageRefresh(hash, v->id);
if (logger_)
logger_->d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str());
} else {
const auto& type = getType(lv->type);
if (type.editPolicy(hash, lv, vc, node.id, node.getAddr())) {
if (logger_)
logger_->d(hash, node.id, "[store %s] editing %s",
hash.toString().c_str(), vc->toString().c_str());
storageStore(hash, vc, created, node.getAddr());
} else {
if (logger_)
logger_->d(hash, node.id, "[store %s] rejecting edition of %s because of storage policy",
hash.toString().c_str(), vc->toString().c_str());
}
}
} else {
// Allow the value to be edited by the storage policy
const auto& type = getType(vc->type);
if (type.storePolicy(hash, vc, node.id, node.getAddr())) {
// if (logger_)
// logger_->d(hash, node.id, "[store %s] storing %s", hash.toString().c_str(), std::to_string(vc->id).c_str());
storageStore(hash, vc, created, node.getAddr());
} else {
if (logger_)
logger_->d(hash, node.id, "[store %s] rejecting storage of %s",
hash.toString().c_str(), vc->toString().c_str());
}
}
}
return {};
}
net::RequestAnswer
Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid)
{
using namespace net;
if (not tokenMatch(token, node->getAddr())) {
if (logger_)
logger_->w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str());
throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN};
}
if (storageRefresh(hash, vid)) {
if (logger_)
logger_->d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str());
} else {
if (logger_)
logger_->d(hash, node->id, "[store %s] [node %s] got refresh for unknown value",
hash.toString().c_str(), node->toString().c_str());
throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND};
}
return {};
}
bool
Dht::storageRefresh(const InfoHash& id, Value::Id vid)
{
const auto& now = scheduler.time();
auto s = store.find(id);
if (s != store.end()) {
// Values like for a permanent put can be refreshed. So, inform remote listeners that the value
// need to be refreshed
auto& st = s->second;
if (not st.listeners.empty()) {
if (logger_)
logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
std::vector<Value::Id> ids = {vid};
for (const auto& node_listeners : st.listeners) {
for (const auto& l : node_listeners.second) {
if (logger_)
logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending refresh",
id.toString().c_str(),
node_listeners.first->toString().c_str());
Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
network_engine.tellListenerRefreshed(node_listeners.first, l.first, id, ntoken, ids, l.second.version);
}
}
}
auto expiration = s->second.refresh(now, vid, types);
if (expiration != time_point::max())
scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
return true;
}
return false;
}
void
Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr)
{
if (logger_)
logger_->d(sr->id, node->id, "[search %s] [node %s] got reply to put!",
sr->id.toString().c_str(), node->toString().c_str());
searchSendGetValues(sr);
sr->checkAnnounced(answer.vid);
}
struct DhtState {
unsigned v {1};
InfoHash id;
std::vector<NodeExport> nodes;
std::vector<ValuesExport> values;
MSGPACK_DEFINE_MAP(v, id, nodes, values)
};
void
Dht::saveState(const std::string& path) const
{
DhtState state;
state.id = myid;
state.nodes = exportNodes();
state.values = exportValues();
std::ofstream file(path);
msgpack::pack(file, state);
}
void
Dht::loadState(const std::string& path)
{
if (logger_)
logger_->d("Importing state from %s", path.c_str());
try {
// Import nodes from binary file
msgpack::unpacker pac;
{
// Read whole file
std::ifstream file(path, std::ios::binary|std::ios::ate);
if (!file.is_open()) {
return;
}
auto size = file.tellg();
file.seekg (0, std::ios::beg);
pac.reserve_buffer(size);
file.read (pac.buffer(), size);
pac.buffer_consumed(size);
}
// Import nodes
msgpack::object_handle oh;
if (pac.next(oh)) {
auto state = oh.get().as<DhtState>();
if (logger_)
logger_->d("Importing %zu nodes", state.nodes.size());
if (state.id)
myid = state.id;
std::vector<Sp<Node>> tmpNodes;
tmpNodes.reserve(state.nodes.size());
for (const auto& node : state.nodes)
tmpNodes.emplace_back(network_engine.insertNode(node.id, SockAddr(node.ss, node.sslen)));
importValues(state.values);
}
} catch (const std::exception& e) {
if (logger_)
logger_->w("Error importing state from %s: %s", path.c_str(), e.what());
}
}
}