Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht.cpp 92.29 KiB
/*
 *  Copyright (C) 2014-2020 Savoir-faire Linux Inc.
 *  Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
 *              Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

#include "dht.h"
#include "rng.h"
#include "search.h"
#include "storage.h"
#include "request.h"

#include <msgpack.hpp>

#include <algorithm>
#include <random>
#include <sstream>
#include <fstream>

namespace dht {

using namespace std::placeholders;

constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
constexpr duration Dht::LISTEN_EXPIRE_TIME;
constexpr duration Dht::LISTEN_EXPIRE_TIME_PUBLIC;
constexpr duration Dht::REANNOUNCE_MARGIN;
static constexpr size_t MAX_REQUESTS_PER_SEC {8 * 1024};

NodeStatus
Dht::updateStatus(sa_family_t af)
{
    auto& d = dht(af);
    auto old = d.status;
    d.status = d.getStatus(scheduler.time());
    if (d.status != old) {
        auto& other = dht(af == AF_INET ? AF_INET6 : AF_INET);
        if (other.status == NodeStatus::Disconnected && d.status == NodeStatus::Disconnected)
            onDisconnected();
        else if (other.status == NodeStatus::Connected || d.status == NodeStatus::Connected) {
            onConnected();
        }
    }
    return d.status;
}

NodeStatus
Dht::Kad::getStatus(time_point now) const
{
    unsigned dubious = 0;
    for (const auto& b : buckets) {
        for (auto& n : b.nodes) {
            if (n->isGood(now)) {
                return NodeStatus::Connected;
            } else if (not n->isExpired())
                dubious++;
        }
    }
    auto& ping = pending_pings;
    if (dubious or ping)
        return NodeStatus::Connecting;
    return NodeStatus::Disconnected;
}

void
Dht::shutdown(ShutdownCallback cb, bool stop)
{
    if (not persistPath.empty())
        saveState(persistPath);

    if (stop) {
        for (auto dht : {&dht4, &dht6}) {
            for (auto& sr : dht->searches) {
                for (const auto& r : sr.second->callbacks)
                    r.second.done_cb(false, {});
                sr.second->callbacks.clear();
                for (const auto& a : sr.second->announce) {
                    if (a.callback) a.callback(false, {});
                }
                sr.second->announce.clear();
                sr.second->listeners.clear();
            }
        }
        network_engine.clear();
    }

    if (not maintain_storage) {
        if (cb) cb();
        return;
    }

    // Last store maintenance
    scheduler.syncTime();
    auto remaining = std::make_shared<int>(0);
    auto str_donecb = [=](bool, const std::vector<Sp<Node>>&) {
        --*remaining;
        if (logger_)
            logger_->w("shuting down node: %u ops remaining", *remaining);
        if (!*remaining && cb) { cb(); }
    };

    for (auto& str : store)
        *remaining += maintainStorage(str, true, str_donecb);

    if (logger_)
        logger_->w("shuting down node: after storage, %u ops", *remaining);

    if (!*remaining) {
        if (cb) cb();
    }
}

bool
Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); }

/* Every bucket contains an unordered list of nodes. */
const Sp<Node>
Dht::findNode(const InfoHash& id, sa_family_t af) const
{
    if (const Bucket* b = findBucket(id, af))
        for (const auto& n : b->nodes)
            if (n->id == id) return n;
    return {};
}

/* Every bucket caches the address of a likely node.  Ping it. */
void
Dht::sendCachedPing(Bucket& b)
{
    if (b.cached)
        if (logger_)
            logger_->d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str());
    b.sendCachedPing(network_engine);
}

std::vector<SockAddr>
Dht::getPublicAddress(sa_family_t family)
{
    std::sort(reported_addr.begin(), reported_addr.end(), [](const ReportedAddr& a, const ReportedAddr& b) {
        return a.first > b.first;
    });
    std::vector<SockAddr> ret;
    ret.reserve(!family ? reported_addr.size() : reported_addr.size()/2);
    for (const auto& addr : reported_addr)
        if (!family || family == addr.second.getFamily())
            ret.emplace_back(addr.second);
    return ret;
}

bool
Dht::trySearchInsert(const Sp<Node>& node)
{
    const auto& now = scheduler.time();
    if (not node) return false;

    auto& srs = searches(node->getFamily());
    auto closest = srs.lower_bound(node->id);
    bool inserted {false};

    // insert forward
    for (auto it = closest; it != srs.end(); it++) {
        auto& s = *it->second;
        if (s.insertNode(node, now)) {
            inserted = true;
            scheduler.edit(s.nextSearchStep, now);
        } else if (not s.expired and not s.done)
            break;
    }
    // insert backward
    for (auto it = closest; it != srs.begin();) {
        --it;
        auto& s = *it->second;
        if (s.insertNode(node, now)) {
            inserted = true;
            scheduler.edit(s.nextSearchStep, now);
        } else if (not s.expired and not s.done)
            break;
    }
    return inserted;
}

void
Dht::reportedAddr(const SockAddr& addr)
{
    auto it = std::find_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& a){
        return a.second == addr;
    });
    if (it == reported_addr.end()) {
        if (reported_addr.size() < 32)
            reported_addr.emplace_back(1, addr);
    } else
        it->first++;
}

/* We just learnt about a node, not necessarily a new one.  Confirm is 1 if
   the node sent a message, 2 if it sent us a reply. */
void
Dht::onNewNode(const Sp<Node>& node, int confirm)
{
    const auto& now = scheduler.time();
    auto& b = buckets(node->getFamily());
    auto wasEmpty = confirm < 2 && b.grow_time < now - std::chrono::minutes(5);
    if (b.onNewNode(node, confirm, now, myid, network_engine) or confirm) {
        trySearchInsert(node);
        if (wasEmpty) {
            scheduler.edit(nextNodesConfirmation, now + std::chrono::seconds(1));
        }
    }
}

/* Called periodically to purge known-bad nodes.  Note that we're very
   conservative here: broken nodes in the table don't do much harm, we'll
   recover as soon as we find better ones. */
void
Dht::expireBuckets(RoutingTable& list)
{
    for (auto& b : list) {
        bool changed = false;
        b.nodes.remove_if([&changed](const Sp<Node>& n) {
            if (n->isExpired()) {
                changed = true;
                return true;
            }
            return false;
        });
        if (changed)
            sendCachedPing(b);
    }
}

void
Dht::expireSearches()
{
    auto t = scheduler.time() - SEARCH_EXPIRE_TIME;
    auto expired = [&](std::pair<const InfoHash, Sp<Search>>& srp) {
        auto& sr = *srp.second;
        auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t;
        if (b) {
            if (logger_)
                logger_->d(srp.first, "[search %s] removing search", srp.first.toString().c_str());
            sr.clear();
            return b;
        } else { return false; }
    };
    erase_if(dht4.searches, expired);
    erase_if(dht6.searches, expired);
}

void
Dht::searchNodeGetDone(const net::Request& req,
        net::RequestAnswer&& answer,
        std::weak_ptr<Search> ws,
        Sp<Query> query)
{
    const auto& now = scheduler.time();
    if (auto sr = ws.lock()) {
        sr->insertNode(req.node, now, answer.ntoken);
        if (auto srn = sr->getNode(req.node)) {
            /* all other get requests which are satisfied by this answer
               should not be sent anymore */
            for (auto& g : sr->callbacks) {
                auto& q = g.second.query;
                if (q->isSatisfiedBy(*query) and q != query) {
                    auto dummy_req = std::make_shared<net::Request>();
                    dummy_req->cancel();
                    srn->getStatus[q] = std::move(dummy_req);
                }
            }
            auto syncTime = srn->getSyncTime(scheduler.time());
            if (srn->syncJob)
                scheduler.edit(srn->syncJob, syncTime);
            else
                srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr));
        }
        onGetValuesDone(req.node, answer, sr, query);
    }
}

void
Dht::searchNodeGetExpired(const net::Request& status,
        bool over,
        std::weak_ptr<Search> ws,
        Sp<Query> query)
{
    if (auto sr = ws.lock()) {
        if (auto srn = sr->getNode(status.node)) {
            srn->candidate = not over;
            if (over)
                srn->getStatus.erase(query);
        }
        scheduler.edit(sr->nextSearchStep, scheduler.time());
    }
}

void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) {
    auto sr = ws.lock();
    if (not sr) return;
    auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {});
    auto onSelectDone = [this,ws,query](const net::Request& status,
                                        net::RequestAnswer&& answer) mutable {
        // Retrieve search
        auto sr = ws.lock();
        if (not sr) return;
        const auto& id = sr->id;
        // Retrieve search node
        auto sn = sr->getNode(status.node);
        if (not sn) return;
        // backward compatibility
        if (answer.fields.empty()) {
            searchNodeGetDone(status, std::move(answer), ws, query);
            return;
        }
        for (const auto& fvi : answer.fields) {
            try {
                auto vid = fvi->index.at(Value::Field::Id).getInt();
                if (vid == Value::INVALID_ID) continue;
                auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid));
                sn->pagination_queries[query].push_back(query_for_vid);
                if (logger_)
                    logger_->d(id, sn->node->id, "[search %s] [node %s] sending %s",
                        id.toString().c_str(), sn->node->toString().c_str(), query_for_vid->toString().c_str());
                sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node,
                        id,
                        *query_for_vid,
                        -1,
                        std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
                        std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid)
                        );
            } catch (const std::out_of_range&) {
                if (logger_)
                    logger_->e(id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\
                        "'SELECT id' request...",
                        id.toString().c_str(), sn->node->toString().c_str());
            }
        }
    };
    /* add pagination query key for tracking ongoing requests. */
    n->pagination_queries[query].push_back(select_q);

    if (logger_)
        logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending %s",
            sr->id.toString().c_str(), n->node->toString().c_str(), select_q->toString().c_str());
    n->getStatus[select_q] = network_engine.sendGetValues(n->node,
            sr->id,
            *select_q,
            -1,
            onSelectDone,
            std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q)
            );
}

Dht::SearchNode*
Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update)
{
    if (sr->done or sr->currentlySolicitedNodeCount() >= MAX_REQUESTED_SEARCH_NODES)
        return nullptr;

    const auto& now = scheduler.time();

    std::weak_ptr<Search> ws = sr;
    auto cb = sr->callbacks.begin();
    static const auto ANY_QUERY = std::make_shared<Query>(Select {}, Where {}, true);
    do { /* for all requests to send */
        SearchNode* n = nullptr;
        auto& query = sr->callbacks.empty() ? ANY_QUERY : cb->second.query;
        const time_point up = (not sr->callbacks.empty() and update)
                                ? sr->getLastGetTime(*query)
                                : time_point::min();

        if (pn and pn->canGet(now, up, query)) {
            n = pn;
        } else {
            for (auto& sn : sr->nodes) {
                if (sn->canGet(now, up, query)) {
                    n = sn.get();
                    break;
                }
            }
        }

        if (sr->callbacks.empty()) { /* 'find_node' request */
            if (not n)
                return nullptr;

            /* if (logger_)
                   logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'",
                        sr->id.toString().c_str(), n->node->toString().c_str());*/
            n->getStatus[query] = network_engine.sendFindNode(n->node,
                    sr->id,
                    -1,
                    std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
                    std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));

        } else { /* 'get' request */
            if (not n)
                continue;

            if (query and not query->select.empty()) {
                /* The request contains a select. No need to paginate... */
                /* if (logger_)
                       logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'",
                            sr->id.toString().c_str(), n->node->toString().c_str());*/
                n->getStatus[query] = network_engine.sendGetValues(n->node,
                        sr->id,
                        *query,
                        -1,
                        std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
                        std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
            } else
                paginate(ws, query, n);
        }

        /* We only try to send one request. return. */
        return n;

    } while (++cb != sr->callbacks.end());

    /* no request were sent */
    return nullptr;
}

void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
    if (sr->announce.empty())
        return;
    unsigned i = 0;
    std::weak_ptr<Search> ws = sr;

    auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer)
    { /* when put done */
        if (auto sr = ws.lock()) {
            onAnnounceDone(req.node, answer, sr);
            scheduler.edit(sr->nextSearchStep, scheduler.time());
        }
    };

    auto onExpired = [this,ws](const net::Request&, bool over)
    { /* when put expired */
        if (over)
            if (auto sr = ws.lock())
                scheduler.edit(sr->nextSearchStep, scheduler.time());
    };

    auto onSelectDone =
    [this,ws,onDone,onExpired](const net::Request& req, net::RequestAnswer&& answer) mutable
    { /* on probing done */
        auto sr = ws.lock();
        if (not sr) return;
        const auto& now = scheduler.time();
        sr->insertNode(req.node, scheduler.time(), answer.ntoken);
        auto sn = sr->getNode(req.node);
        if (not sn) return;

        if (not sn->isSynced(now)) {
            /* Search is now unsynced. Let's call searchStep to sync again. */
            scheduler.edit(sr->nextSearchStep, now);
            return;
        }
        for (auto& a : sr->announce) {
            if (sn->getAnnounceTime(a.value->id) > now)
                continue;
            bool hasValue {false};
            uint16_t seq_no = 0;
            try {
                const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(),
                        [&a](const Sp<FieldValueIndex>& i){
                            return i->index.at(Value::Field::Id).getInt() == a.value->id;
                        });
                if (f != answer.fields.cend() and *f) {
                    hasValue = true;
                    seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt());
                }
            } catch (std::out_of_range&) { }

            auto next_refresh_time = now + getType(a.value->type).expiration;
            /* only put the value if the node doesn't already have it */
            if (not hasValue or seq_no < a.value->seq) {
                if (logger_)
                    logger_->d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
                        sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
                auto created = a.permanent ? time_point::max() : a.created;
                sn->acked[a.value->id] = {
                    network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired),
                    next_refresh_time
                };
            } else if (hasValue and a.permanent) {
                if (logger_)
                    logger_->w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)",
                        sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
                sn->acked[a.value->id] = {
                    network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone,
                    [this, ws, node=sn->node, v=a.value,
                     onDone,
                     onExpired,
                     created = a.permanent ? time_point::max() : a.created,
                     next_refresh_time
                    ](const net::Request& /*req*/, net::DhtProtocolException&& e){
                        if (e.getCode() == net::DhtProtocolException::NOT_FOUND) {
                            if (logger_)
                                logger_->e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str());
                            if (auto sr = ws.lock()) {
                                if (auto sn = sr->getNode(node)) {
                                    sn->acked[v->id] = {
                                        network_engine.sendAnnounceValue(sn->node, sr->id, v, created, sn->token, onDone, onExpired),
                                        next_refresh_time
                                    };
                                    scheduler.edit(sr->nextSearchStep, scheduler.time());
                                    return true;
                                }
                            }
                        }
                        return false;
                    }, onExpired),
                    next_refresh_time
                };
            } else {
                if (logger_)
                    logger_->w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.",
                        sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
                auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED);
                ack_req->reply_time = now;
                sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time);

                /* step to clear announces */
                scheduler.edit(sr->nextSearchStep, now);
            }
            if (a.permanent) {
                scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, [this,ws] {
                    if (auto sr = ws.lock()) {
                        searchStep(sr);
                    }
                });
            }
        }
    };

    static const auto PROBE_QUERY = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum));

    const auto& now = scheduler.time();
    for (auto& np : sr->nodes) {
        auto& n = *np;
        if (not n.isSynced(now))
            continue;

        auto gs = n.probe_query ? n.getStatus.find(n.probe_query) : n.getStatus.end();
        if (gs != n.getStatus.end() and gs->second and gs->second->pending()) {
            continue;
        }

        bool sendQuery = false;
        for (auto& a : sr->announce) {
            if (n.getAnnounceTime(a.value->id) <= now) {
                if (a.permanent) {
                    sendQuery = true;
                } else {
                    if (logger_)
                        logger_->w(sr->id, n.node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
                            sr->id.toString().c_str(), n.node->toString().c_str(), a.value->id);
                    n.acked[a.value->id] = {
                        network_engine.sendAnnounceValue(n.node, sr->id, a.value, a.created, n.token, onDone, onExpired),
                        now + getType(a.value->type).expiration
                    };
                }
            }
        }

        if (sendQuery) {
            n.probe_query = PROBE_QUERY;
            if (logger_)
                logger_->d(sr->id, n.node->id, "[search %s] [node %s] sending %s",
                    sr->id.toString().c_str(), n.node->toString().c_str(), n.probe_query->toString().c_str());
            auto req = network_engine.sendGetValues(n.node,
                    sr->id,
                    *PROBE_QUERY,
                    -1,
                    onSelectDone,
                    std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, PROBE_QUERY));
            n.getStatus[PROBE_QUERY] = std::move(req);
        }
        if (not n.candidate and ++i == TARGET_NODES)
            break;
    }
}

void
Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n)
{
    const auto& listenExp = getListenExpiration();
    std::weak_ptr<Search> ws = sr;
    for (const auto& l : sr->listeners) {
        const auto& query = l.second.query;

        auto r = n.listenStatus.find(query);
        if (n.getListenTime(r, listenExp) > scheduler.time())
            continue;
        // if (logger_)
        //     logger_->d(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'",
        //        sr->id.toString().c_str(), n.node->toString().c_str());

        if (r == n.listenStatus.end()) {
            r = n.listenStatus.emplace(std::piecewise_construct,
                std::forward_as_tuple(query),
                std::forward_as_tuple(
                    l.second.get_cb,
                    l.second.sync_cb,
                    n.node->openSocket([this,ws,query](const Sp<Node>& node, net::RequestAnswer&& answer) mutable {
                        /* on new values */
                        if (auto sr = ws.lock()) {
                            scheduler.edit(sr->nextSearchStep, scheduler.time());
                            sr->insertNode(node, scheduler.time(), answer.ntoken);
                            if (auto sn = sr->getNode(node)) {
                                sn->onValues(query, std::move(answer), types, scheduler);
                            }
                        }
                    }))).first;
            r->second.cacheExpirationJob = scheduler.add(time_point::max(), [this,ws,query,node=n.node]{
                if (auto sr = ws.lock()) {
                    if (auto sn = sr->getNode(node)) {
                        sn->expireValues(query, scheduler);
                    }
                }
            });
        }
        auto new_req = network_engine.sendListen(n.node, sr->id, *query, n.token, r->second.socketId,
            [this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable
            { /* on done */
                if (auto sr = ws.lock()) {
                    scheduler.edit(sr->nextSearchStep, scheduler.time());
                    if (auto sn = sr->getNode(req.node)) {
                        scheduler.add(sn->getListenTime(query, getListenExpiration()), std::bind(&Dht::searchStep, this, sr));
                        sn->onListenSynced(query);
                    }
                    onListenDone(req.node, answer, sr);
                }
            },
            [this,ws,query](const net::Request& req, bool over) mutable
            { /* on request expired */
                if (auto sr = ws.lock()) {
                    scheduler.edit(sr->nextSearchStep, scheduler.time());
                    if (over)
                        if (auto sn = sr->getNode(req.node))
                            sn->listenStatus.erase(query);
                }
            }
        );
        // Here the request may have failed and the CachedListenStatus removed
        r = n.listenStatus.find(query);
        if (r != n.listenStatus.end()) {
            r->second.req = new_req;
        }
    }
}

/* When a search is in progress, we periodically call search_step to send
   further requests. */
void
Dht::searchStep(Sp<Search> sr)
{
    if (not sr or sr->expired or sr->done) return;

    const auto& now = scheduler.time();
    /*if (auto req_count = sr->currentlySolicitedNodeCount())
        if (logger_)
            logger_->d(sr->id, "[search %s IPv%c] step (%d requests)",
                sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', req_count);*/
    sr->step_time = now;

    if (sr->refill_time + Node::NODE_EXPIRE_TIME < now and sr->nodes.size()-sr->getNumberOfBadNodes() < SEARCH_NODES)
        refill(*sr);

    /* Check if the first TARGET_NODES (8) live nodes have replied. */
    if (sr->isSynced(now)) {
        if (not (sr->callbacks.empty() and sr->announce.empty())) {
            // search is synced but some (newer) get operations are not complete
            // Call callbacks when done
            std::vector<Get> completed_gets;
            for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) {
                if (sr->isDone(b->second)) {
                    sr->setDone(b->second);
                    completed_gets.emplace_back(std::move(b->second));
                    b = sr->callbacks.erase(b);
                }
                else
                    ++b;
            }
            // clear corresponding queries
            for (const auto& get : completed_gets)
                for (auto& sn : sr->nodes) {
                    sn->getStatus.erase(get.query);
                    sn->pagination_queries.erase(get.query);
                }

            /* clearing callbacks for announced values */
            sr->checkAnnounced();

            if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
                sr->setDone();
        }

        // true if this node is part of the target nodes cluter.
        /*bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0;

        logger__DBG("[search %s IPv%c] synced%s",
                sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");*/

        if (not sr->listeners.empty()) {
            unsigned i = 0;
            for (auto& n : sr->nodes) {
                if (not n->isSynced(now))
                    continue;
                searchSynchedNodeListen(sr, *n);
                if (not n->candidate and ++i == LISTEN_NODES)
                    break;
            }
        }

        // Announce requests
        searchSendAnnounceValue(sr);

        if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
            sr->setDone();
    }

    while (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES and searchSendGetValues(sr));


    if (sr->getNumberOfConsecutiveBadNodes() >= std::min<size_t>(sr->nodes.size(), SEARCH_MAX_BAD_NODES))
    {
        if (logger_)
            logger_->w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
        sr->expire();
        if (not public_stable)
            connectivityChanged(sr->af);
    }

    /* dumpSearch(*sr, std::cout); */
}

unsigned Dht::refill(Dht::Search& sr) {
    const auto& now = scheduler.time();
    sr.refill_time = now;
    /* we search for up to SEARCH_NODES good nodes. */
    auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES);

    if (cached_nodes.empty()) {
        if (logger_)
            logger_->e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search",
                sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6');
        return 0;
    }

    unsigned inserted = 0;
    for (auto& i : cached_nodes) {
        /* try to insert the nodes. Search::insertNode will know how many to insert. */
        if (sr.insertNode(i, now))
            ++inserted;
    }
    if (logger_)
        logger_->d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache",
            sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted);
    return inserted;
}


/* Start a search. */
Sp<Dht::Search>
Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, const Sp<Query>& q)
{
    if (!isRunning(af)) {
        if (logger_)
            logger_->e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
        if (dcb)
            dcb(false, {});
        return {};
    }

    auto& srs = searches(af);
    const auto& srp = srs.find(id);
    Sp<Search> sr {};

    if (srp != srs.end()) {
        sr = srp->second;
        sr->done = false;
        sr->expired = false;
    } else {
        if (srs.size() < max_searches) {
            sr = std::make_shared<Search>();
            srs.emplace(id, sr);
        } else {
            for (auto it = srs.begin(); it!=srs.end();) {
                auto& s = *it->second;
                if ((s.done or s.expired) and s.announce.empty() and s.listeners.empty()) {
                    sr = it->second;
                    break;
                }
            }
            if (not sr) {
                if (logger_)
                    logger_->e(id, "[search %s IPv%c] maximum number of searches reached !", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
                return {};
            }
        }
        sr->af = af;
        sr->tid = search_id++;
        sr->step_time = time_point::min();
        sr->id = id;
        sr->done = false;
        sr->expired = false;
        sr->nodes.clear();
        sr->nodes.reserve(SEARCH_NODES+1);
        sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, sr));
        if (logger_)
            logger_->w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
        if (search_id == 0)
            search_id++;
    }

    sr->get(f, q, qcb, gcb, dcb, scheduler);
    refill(*sr);

    return sr;
}

void
Dht::announce(const InfoHash& id,
        sa_family_t af,
        Sp<Value> value,
        DoneCallback callback,
        time_point created,
        bool permanent)
{
    auto& srs = searches(af);
    auto srp = srs.find(id);
    if (auto sr = srp == srs.end() ? search(id, af) : srp->second) {
        sr->put(value, callback, created, permanent);
        scheduler.edit(sr->nextSearchStep, scheduler.time());
    } else if (callback) {
        callback(false, {});
    }
}

size_t
Dht::listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f, const Sp<Query>& q)
{
    if (!isRunning(af))
        return 0;
       // logger__ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));

    //logger__WARN("listenTo %s", id.toString().c_str());
    auto& srs = searches(af);
    auto srp = srs.find(id);
    Sp<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second;
    if (!sr)
        throw DhtException("Can't create search");
    if (logger_)
        logger_->w(id, "[search %s IPv%c] listen", id.to_c_str(), (af == AF_INET) ? '4' : '6');
    return sr->listen(cb, f, q, scheduler);
}

size_t
Dht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where where)
{
    if (not id) {
        if (logger_)
            logger_->w(id, "Listen called with invalid key");
        return 0;
    }
    scheduler.syncTime();

    auto token = ++listener_token;
    auto gcb = OpValueCache::cacheCallback(std::move(cb), [this, id, token]{
        cancelListen(id, token);
    });

    auto query = std::make_shared<Query>(Select{}, std::move(where));
    auto filter = f.chain(query->where.getFilter());
    auto st = store.find(id);
    if (st == store.end() && store.size() < max_store_keys)
        st = store.emplace(id, scheduler.time() + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME).first;

    size_t tokenlocal = 0;
    if (st != store.end()) {
        tokenlocal = st->second.listen(gcb, filter, query);
        if (tokenlocal == 0)
            return 0;
    }

    auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query);
    auto token6 = token4 == 0 ? 0 : Dht::listenTo(id, AF_INET6, gcb, filter, query);
    if (token6 == 0 && st != store.end()) {
        st->second.cancelListen(tokenlocal);
        return 0;
    }

    listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6));
    return token;
}

bool
Dht::cancelListen(const InfoHash& id, size_t token)
{
    scheduler.syncTime();

    auto it = listeners.find(token);
    if (it == listeners.end()) {
        if (logger_)
            logger_->w(id, "Listen token not found: %d", token);
        return false;
    }
    if (logger_)
        logger_->d(id, "cancelListen %s with token %d", id.toString().c_str(), token);
    if (auto tokenlocal = std::get<0>(it->second)) {
        auto st = store.find(id);
        if (st != store.end())
            st->second.cancelListen(tokenlocal);
    }
    auto searches_cancel_listen = [this,&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) {
        if (token) {
            auto srp = srs.find(id);
            if (srp != srs.end())
                srp->second->cancelListen(token, scheduler);
        }
    };
    searches_cancel_listen(dht4.searches, std::get<1>(it->second));
    searches_cancel_listen(dht6.searches, std::get<2>(it->second));
    listeners.erase(it);
    return true;
}

struct OpStatus {
    struct Status {
        bool done {false};
        bool ok {false};
        Status(bool done=false, bool ok=false) : done(done), ok(ok) {}
    };
    Status status;
    Status status4;
    Status status6;
};

template <typename T>
struct GetStatus : public OpStatus {
    T values;
    std::vector<Sp<Node>> nodes;
};

void
Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point created, bool permanent)
{
    if (not id or not val) {
        if (logger_)
            logger_->w(id, "Put called with invalid key or value");
        if (callback)
            callback(false, {});
        return;
    }
    if (val->id == Value::INVALID_ID)
        val->id = std::uniform_int_distribution<Value::Id>{1}(rd);
    scheduler.syncTime();
    const auto& now = scheduler.time();
    created = std::min(now, created);
    storageStore(id, val, created, {}, permanent);

    if (logger_)
        logger_->d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str());

    auto op = std::make_shared<OpStatus>();
    auto donecb = [callback](const std::vector<Sp<Node>>& nodes, OpStatus& op) {
        // Callback as soon as the value is announced on one of the available networks
        if (callback and not op.status.done and (op.status4.done && op.status6.done)) {
            callback(op.status4.ok or op.status6.ok, nodes);
            op.status.done = true;
        }
    };
    announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) {
        if (logger_)
            logger_->d(id, "Announce done IPv4 %d", ok4);
        auto& o = *op;
        o.status4 = {true, ok4};
        donecb(nodes, o);
    }, created, permanent);
    announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) {
        if (logger_)
            logger_->d(id, "Announce done IPv6 %d", ok6);
        auto& o = *op;
        o.status6 = {true, ok6};
        donecb(nodes, o);
    }, created, permanent);
}

template <typename T>
void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, GetStatus<T>& op) {
    if (op.status.done)
        return;
    op.nodes.insert(op.nodes.end(), nodes.begin(), nodes.end());
    if (op.status.ok or (op.status4.done and op.status6.done)) {
        bool ok = op.status.ok or op.status4.ok or op.status6.ok;
        op.status.done = true;
        if (dcb)
            dcb(ok, op.nodes);
    }
}

template <typename T, typename St, typename Cb, typename Av, typename Cv>
bool callbackWrapper(Cb get_cb, DoneCallback done_cb, const std::vector<Sp<T>>& values,
    Av add_values, Cv cache_values, GetStatus<St>& op)
{
    if (op.status.done)
        return false;
    auto newvals = add_values(values);
    if (not newvals.empty()) {
        op.status.ok = !get_cb(newvals);
        cache_values(newvals);
    }
    doneCallbackWrapper(done_cb, {}, op);
    return !op.status.ok;
}

void
Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filter&& filter, Where&& where)
{
    if (not id) {
        if (logger_)
            logger_->w(id, "Get called with invalid key");
        if (donecb)
            donecb(false, {});
        return;
    }
    scheduler.syncTime();

    auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>();
    auto gcb = [getcb, donecb, op](const std::vector<Sp<Value>>& vals) {
        auto& o = *op;
        return callbackWrapper(getcb, donecb, vals, [&o](const std::vector<Sp<Value>>& values) {
            std::vector<Sp<Value>> newvals {};
            for (const auto& v : values) {
                auto it = o.values.find(v->id);
                if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) {
                    newvals.push_back(v);
                }
            }
            return newvals;
        }, [&o](const std::vector<Sp<Value>>& newvals) {
            for (const auto& v : newvals)
                o.values[v->id] = v;
        }, o);
    };

    auto q = std::make_shared<Query>(Select {}, std::move(where));
    auto f = filter.chain(q->where.getFilter());

    /* Try to answer this search locally. */
    gcb(getLocal(id, f));

    Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
        //logger__WARN("DHT done IPv4");
        op->status4 = {true, ok};
        doneCallbackWrapper(donecb, nodes, *op);
    }, f, q);
    Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
        //logger__WARN("DHT done IPv6");
        op->status6 = {true, ok};
        doneCallbackWrapper(donecb, nodes, *op);
    }, f, q);
}

void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q)
{
    if (not id) {
        if (logger_)
            logger_->w(id, "Query called with invalid key");
        if (done_cb)
            done_cb(false, {});
        return;
    }
    scheduler.syncTime();
    auto op = std::make_shared<GetStatus<std::vector<Sp<FieldValueIndex>>>>();
    auto f = q.where.getFilter();
    auto qcb = [cb, done_cb, op](const std::vector<Sp<FieldValueIndex>>& fields){
        auto& o = *op;
        return callbackWrapper(cb, done_cb, fields, [&](const std::vector<Sp<FieldValueIndex>>& fields) {
            std::vector<Sp<FieldValueIndex>> newvals {};
            for (const auto& f : fields) {
                auto it = std::find_if(o.values.cbegin(), o.values.cend(),
                    [&](const Sp<FieldValueIndex>& sf) {
                        return sf == f or f->containedIn(*sf);
                    });
                if (it == o.values.cend()) {
                    auto lesser = std::find_if(o.values.begin(), o.values.end(),
                        [&](const Sp<FieldValueIndex>& sf) {
                            return sf->containedIn(*f);
                        });
                    if (lesser != o.values.end())
                        o.values.erase(lesser);
                    newvals.push_back(f);
                }
            }
            return newvals;
        }, [&](const std::vector<Sp<FieldValueIndex>>& fields){
            o.values.insert(o.values.end(), fields.begin(), fields.end());
        }, o);
    };

    /* Try to answer this search locally. */
    auto values = getLocal(id, f);
    std::vector<Sp<FieldValueIndex>> local_fields(values.size());
    std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) {
        return std::make_shared<FieldValueIndex>(*v, q.select);
    });
    qcb(local_fields);

    auto sq = std::make_shared<Query>(std::move(q));
    Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
        //logger__WARN("DHT done IPv4");
        op->status4 = {true, ok};
        doneCallbackWrapper(done_cb, nodes, *op);
    }, f, sq);
    Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
        //logger__WARN("DHT done IPv6");
        op->status6 = {true, ok};
        doneCallbackWrapper(done_cb, nodes, *op);
    }, f, sq);
}

std::vector<Sp<Value>>
Dht::getLocal(const InfoHash& id, const Value::Filter& f) const
{
    auto s = store.find(id);
    if (s == store.end()) return {};
    return s->second.get(f);
}

Sp<Value>
Dht::getLocalById(const InfoHash& id, Value::Id vid) const
{
    auto s = store.find(id);
    if (s != store.end())
        return s->second.getById(vid);
    return {};
}

std::vector<Sp<Value>>
Dht::getPut(const InfoHash& id) const
{
    std::vector<Sp<Value>> ret;
    auto find_values = [&](const std::map<InfoHash, Sp<Search>>& srs) {
        auto srp = srs.find(id);
        if (srp == srs.end()) return;
        auto vals = srp->second->getPut();
        ret.insert(ret.end(), vals.begin(), vals.end());
    };
    find_values(dht4.searches);
    find_values(dht6.searches);
    return ret;
}

Sp<Value>
Dht::getPut(const InfoHash& id, const Value::Id& vid) const
{
    auto find_value = [&](const std::map<InfoHash, Sp<Search>>& srs) {
        auto srp = srs.find(id);
        return (srp != srs.end()) ? srp->second->getPut(vid) : Sp<Value> {};
    };
    if (auto v4 = find_value(dht4.searches))
        return v4;
    if (auto v6 = find_value(dht6.searches))
        return v6;
    return {};
}

bool
Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
{
    bool canceled {false};
    auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>>& srs) {
        auto srp = srs.find(id);
        return (srp != srs.end()) ? srp->second->cancelPut(vid) : false;
    };
    canceled |= sr_cancel_put(dht4.searches);
    canceled |= sr_cancel_put(dht6.searches);
    if (canceled)
        storageErase(id, vid);
    return canceled;
}

// Storage

void
Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newValue)
{
    if (newValue) {
        if (not st.local_listeners.empty()) {
            if (logger_)
                logger_->d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size());
            std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs;
            cbs.reserve(st.local_listeners.size());
            for (const auto& l : st.local_listeners) {
                std::vector<Sp<Value>> vals;
                if (not l.second.filter or l.second.filter(*v.data))
                    vals.push_back(v.data);
                if (not vals.empty()) {
                    if (logger_)
                        logger_->d(id, "[store %s] sending update local listener with token %lu",
                            id.toString().c_str(),
                            l.first);
                    cbs.emplace_back(l.second.get_cb, std::move(vals));
                }
            }
            // listeners are copied: they may be deleted by the callback
            for (auto& cb : cbs)
                cb.first(cb.second, false);
        }
    }

    if (not st.listeners.empty()) {
        if (logger_)
            logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
        for (const auto& node_listeners : st.listeners) {
            for (const auto& l : node_listeners.second) {
                auto f = l.second.query.where.getFilter();
                if (f and not f(*v.data))
                    continue;
                if (logger_)
                    logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending update",
                        id.toString().c_str(),
                        node_listeners.first->toString().c_str());
                std::vector<Sp<Value>> vals {};
                vals.push_back(v.data);
                Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
                network_engine.tellListener(node_listeners.first, l.first, id, 0, ntoken, {}, {},
                        std::move(vals), l.second.query, l.second.version);
            }
        }
    }
}

bool
Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa, bool permanent)
{
    const auto& now = scheduler.time();
    created = std::min(created, now);
    auto expiration = permanent ? time_point::max() : created + getType(value->type).expiration;
    if (expiration < now)
        return false;

    auto st = store.find(id);
    if (st == store.end()) {
        if (store.size() >= max_store_keys)
            return false;
        auto st_i = store.emplace(id, now);
        st = st_i.first;
        if (maintain_storage and st_i.second)
            scheduler.add(st->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
    }

    StorageBucket* store_bucket {nullptr};
    if (sa)
        store_bucket = &store_quota[sa];

    auto store = st->second.store(id, value, created, expiration, store_bucket);
    if (auto vs = store.first) {
        total_store_size += store.second.size_diff;
        total_values += store.second.values_diff;
        if (not permanent) {
            scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
        }
        if (total_store_size > max_store_size) {
            expireStore();
        }
        storageChanged(id, st->second, *vs, store.second.values_diff > 0);
    }

    return std::get<0>(store);
}

bool
Dht::storageErase(const InfoHash& id, Value::Id vid)
{
    auto st = store.find(id);
    if (st == store.end())
        return false;
    auto ret = st->second.remove(id, vid);
    total_store_size += ret.size_diff;
    total_values += ret.values_diff;
    return ret.values_diff;
}

void
Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_id, Query&& query, int version)
{
    const auto& now = scheduler.time();
    auto st = store.find(id);
    if (st == store.end()) {
        if (store.size() >= max_store_keys)
            return;
        st = store.emplace(id, now).first;
    }
    auto& node_listeners = st->second.listeners[node];
    auto l = node_listeners.find(socket_id);
    if (l == node_listeners.end()) {
        auto vals = st->second.get(query.where.getFilter());
        if (not vals.empty()) {
            network_engine.tellListener(node, socket_id, id, WANT4 | WANT6, makeToken(node->getAddr(), false),
                    dht4.buckets.findClosestNodes(id, now, TARGET_NODES), dht6.buckets.findClosestNodes(id, now, TARGET_NODES),
                    std::move(vals), query, version);
        }
        node_listeners.emplace(socket_id, Listener {now, std::forward<Query>(query), version});
    }
    else
        l->second.refresh(now, std::forward<Query>(query));
}

void
Dht::expireStore(decltype(store)::iterator i)
{
    const auto& id = i->first;
    auto& st = i->second;
    auto stats = st.expire(id, scheduler.time());
    total_store_size += stats.first;
    total_values -= stats.second.size();
    if (not stats.second.empty()) {
        if (logger_)
            logger_->d(id, "[store %s] discarded %ld expired values (%ld bytes)",
            id.toString().c_str(), stats.second.size(), -stats.first);

        if (not st.listeners.empty()) {
            if (logger_)
                logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());

            std::vector<Value::Id> ids;
            ids.reserve(stats.second.size());
            for (const auto& v : stats.second)
                ids.emplace_back(v->id);

            for (const auto& node_listeners : st.listeners) {
                for (const auto& l : node_listeners.second) {
                    if (logger_)
                        logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending expired",
                            id.toString().c_str(),
                            node_listeners.first->toString().c_str());
                    Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
                    network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids, l.second.version);
                }
            }
        }
        for (const auto& local_listeners : st.local_listeners) {
            local_listeners.second.get_cb(stats.second, true);
        }
    }
}

void
Dht::expireStorage(InfoHash h)
{
    auto i = store.find(h);
    if (i != store.end())
        expireStore(i);
}

void
Dht::expireStore()
{
    // removing expired values
    for (auto i = store.begin(); i != store.end();) {
        expireStore(i);

        if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) {
            if (logger_)
                logger_->d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str());
            i = store.erase(i);
        }
        else
            ++i;
    }

    // remove more values if storage limit is exceeded
    while (total_store_size > max_store_size) {
        // find IP using the most storage
        if (store_quota.empty()) {
            if (logger_)
                logger_->w("No space left: local data consumes all the quota!");
            break;
        }
        auto largest = store_quota.begin();
        for (auto it = ++largest; it != store_quota.end(); ++it) {
            if (it->second.size() > largest->second.size())
                largest = it;
        }
        if (logger_)
            logger_->w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str());
        while (true) {
            auto exp_value = largest->second.getOldest();
            auto storage = store.find(exp_value.first);
            if (storage != store.end()) {
                auto ret = storage->second.remove(exp_value.first, exp_value.second);
                total_store_size += ret.size_diff;
                total_values += ret.values_diff;
                if (logger_)
                    logger_->w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size);
                if (ret.values_diff)
                    break;
            }
        }
    }

    // remove unused quota entires
    for (auto i = store_quota.begin(); i != store_quota.end();) {
        if (i->second.size() == 0)
            i = store_quota.erase(i);
        else
            ++i;
    }
}

void
Dht::connectivityChanged(sa_family_t af)
{
    const auto& now = scheduler.time();
    scheduler.edit(nextNodesConfirmation, now);
    buckets(af).connectivityChanged(now);
    network_engine.connectivityChanged(af);
    reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){
        return addr.second.getFamily() == af;
    }), reported_addr.end());
}

void
Dht::rotateSecrets()
{
    oldsecret = secret;
    secret = std::uniform_int_distribution<uint64_t>{}(rd);
    uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45));
    auto rotate_secrets_time = scheduler.time() + time_dist(rd);
    scheduler.add(rotate_secrets_time, std::bind(&Dht::rotateSecrets, this));
}

Blob
Dht::makeToken(const SockAddr& addr, bool old) const
{
    const void *ip;
    size_t iplen;
    in_port_t port;

    auto family = addr.getFamily();
    if (family == AF_INET) {
        const auto& sin = addr.getIPv4();
        ip = &sin.sin_addr;
        iplen = 4;
        port = sin.sin_port;
    } else if (family == AF_INET6) {
        const auto& sin6 = addr.getIPv6();
        ip = &sin6.sin6_addr;
        iplen = 16;
        port = sin6.sin6_port;
    } else {
        return {};
    }

    const auto& c1 = old ? oldsecret : secret;
    Blob data;
    data.reserve(sizeof(secret)+sizeof(in_port_t)+iplen);
    data.insert(data.end(), (uint8_t*)&c1, ((uint8_t*)&c1) + sizeof(c1));
    data.insert(data.end(), (uint8_t*)ip, (uint8_t*)ip+iplen);
    data.insert(data.end(), (uint8_t*)&port, ((uint8_t*)&port)+sizeof(in_port_t));
    return crypto::hash(data, TOKEN_SIZE);
}

bool
Dht::tokenMatch(const Blob& token, const SockAddr& addr) const
{
    if (not addr or token.size() != TOKEN_SIZE)
        return false;
    if (token == makeToken(addr, false))
        return true;
    if (token == makeToken(addr, true))
        return true;
    return false;
}

NodeStats
Dht::getNodesStats(sa_family_t af) const
{
    NodeStats stats = dht(af).getNodesStats(scheduler.time(), myid);
    stats.node_cache_size = network_engine.getNodeCacheSize(af);
    return stats;
}

NodeStats
Dht::Kad::getNodesStats(time_point now, const InfoHash& myid) const
{
    NodeStats stats {};
    for (const auto& b : buckets) {
        for (auto& n : b.nodes) {
            if (n->isGood(now)) {
                stats.good_nodes++;
                if (n->isIncoming())
                    stats.incoming_nodes++;
            } else if (not n->isExpired())
                stats.dubious_nodes++;
        }
        if (b.cached)
            stats.cached_nodes++;
    }
    stats.table_depth = buckets.depth(buckets.findBucket(myid));
    stats.searches = searches.size();
    return stats;
}

void
Dht::dumpBucket(const Bucket& b, std::ostream& out) const
{
    const auto& now = scheduler.time();
    using namespace std::chrono;
    out << b.first << " count: " << b.nodes.size() << " updated: " << print_time_relative(now, b.time);
    if (b.cached)
        out << " (cached)";
    out  << std::endl;
    for (auto& n : b.nodes) {
        out << "    Node " << n->toString();
        const auto& t = n->getTime();
        const auto& r = n->getReplyTime();
        if (t != r)
            out << " updated: " << print_time_relative(now, t) << ", replied: " << print_time_relative(now, r);
        else
            out << " updated: " << print_time_relative(now, t);
        if (n->isExpired())
            out << " [expired]";
        else if (n->isGood(now))
            out << " [good]";
        out << std::endl;
    }
}

void
Dht::dumpSearch(const Search& sr, std::ostream& out) const
{
    const auto& now = scheduler.time();
    const auto& listen_expire = getListenExpiration();
    using namespace std::chrono;
    out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " gets: " << sr.callbacks.size();
    out << ", last step: " << print_time_relative(now, sr.step_time);
    if (sr.done)
        out << " [done]";
    if (sr.expired)
        out << " [expired]";
    bool synced = sr.isSynced(now);
    out << (synced ? " [synced]" : " [not synced]");
    if (synced && sr.isListening(now, listen_expire))
        out << " [listening]";
    out << std::endl;

    /*printing the queries*/
    if (sr.callbacks.size() + sr.listeners.size() > 0)
        out << "Queries:" << std::endl;
    for (const auto& cb : sr.callbacks) {
        out << *cb.second.query << std::endl;
    }
    for (const auto& l : sr.listeners) {
        out << *l.second.query << std::endl;
    }

    for (const auto& a : sr.announce) {
        bool announced = sr.isAnnounced(a.value->id);
        out << "Announcement: " << *a.value << (announced ? " [announced]" : "") << std::endl;
    }

    out << " Common bits    InfoHash                       Conn. Get   Ops  IP" << std::endl;
    auto last_get = sr.getLastGetTime();
    for (const auto& np : sr.nodes) {
        auto& n = *np;
        out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id;
        out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' ');
        out << " [";
        if (auto pendingCount = n.node->getPendingMessageCount())
            out << pendingCount;
        else
            out << ' ';
        out << (n.node->isExpired() ? 'x' : ' ') << "]";

        // Get status
        {
            char g_i = n.pending(n.getStatus) ? (n.candidate ? 'c' : 'f') : ' ';
            char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-';
            out << " [" << s_i << g_i << "] ";
        }

        // Listen status
        if (not sr.listeners.empty()) {
            if (n.listenStatus.empty())
                out << "    ";
            else
                out << "["
                    << (n.isListening(now,listen_expire) ? 'l' : (n.pending(n.listenStatus) ? 'f' : ' ')) << "] ";
        }

        // Announce status
        if (not sr.announce.empty()) {
            if (n.acked.empty()) {
                out << "   ";
                for (size_t a=0; a < sr.announce.size(); a++)
                    out << ' ';
            } else {
                out << "[";
                for (const auto& a : sr.announce) {
                    auto ack = n.acked.find(a.value->id);
                    if (ack == n.acked.end() or not ack->second.first) {
                        out << ' ';
                    } else {
                        out << ack->second.first->getStateChar();
                    }
                }
                out << "] ";
            }
        }
        out << n.node->getAddrStr() << std::endl;
    }
}

void
Dht::dumpTables() const
{
    std::stringstream out;
    out << "My id " << myid << std::endl;

    out << "Buckets IPv4 :" << std::endl;
    for (const auto& b : dht4.buckets)
        dumpBucket(b, out);
    out << "Buckets IPv6 :" << std::endl;
    for (const auto& b : dht6.buckets)
        dumpBucket(b, out);

    auto dump_searches = [&](std::map<InfoHash, Sp<Search>> srs) {
        for (auto& srp : srs)
            dumpSearch(*srp.second, out);
    };
    dump_searches(dht4.searches);
    dump_searches(dht6.searches);
    out << std::endl;

    out << getStorageLog() << std::endl;

    if (logger_)
        logger_->d("%s", out.str().c_str());
}

std::string
Dht::getStorageLog() const
{
    std::stringstream out;
    for (const auto& s : store)
        out << printStorageLog(s);
    out << std::endl << std::endl;
    std::multimap<size_t, const SockAddr*> q_map;
    for (const auto& ip : store_quota)
        if (ip.second.size())
            q_map.emplace(ip.second.size(), &ip.first);
    for (auto ip = q_map.rbegin(); ip != q_map.rend(); ++ip)
        out << "IP " << ip->second->toString() << " uses " << ip->first << " bytes" << std::endl;
    out << std::endl;
    out << "Total " << store.size() << " storages, " << total_values << " values (";
    if (total_store_size < 1024)
        out << total_store_size << " bytes)";
    else
        out << (total_store_size/1024) << " / " << (max_store_size/1024) << " KB)";
    out << std::endl;
    return out.str();
}

std::string
Dht::getStorageLog(const InfoHash& h) const
{
    auto s = store.find(h);
    if (s == store.end()) {
        std::stringstream out;
        out << "Storage " << h << " empty" << std::endl;
        return out.str();
    }
    return printStorageLog(*s);
}

std::string
Dht::printStorageLog(const decltype(store)::value_type& s) const
{
    std::stringstream out;
    using namespace std::chrono;
    const auto& st = s.second;
    out << "Storage " << s.first << " "
                      << st.listeners.size() << " list., "
                      << st.valueCount() << " values ("
                      << st.totalSize() << " bytes)" << std::endl;
    if (not st.local_listeners.empty())
        out << "   " << st.local_listeners.size() << " local listeners" << std::endl;
    for (const auto& node_listeners : st.listeners) {
        const auto& node = node_listeners.first;
        out << "   " << "Listener " << node->toString() << " : " << node_listeners.second.size() << " entries" << std::endl;
    }
    return out.str();
}

std::string
Dht::getRoutingTablesLog(sa_family_t af) const
{
    std::stringstream out;
    for (const auto& b : buckets(af))
        dumpBucket(b, out);
    return out.str();
}

std::string
Dht::getSearchesLog(sa_family_t af) const
{
    std::stringstream out;
    auto num_searches = dht4.searches.size() + dht6.searches.size();
    if (num_searches > 8) {
        if (not af or af == AF_INET)
            for (const auto& sr : dht4.searches)
                out << "[search " << sr.first << " IPv4]" << std::endl;
        if (not af or af == AF_INET6)
            for (const auto& sr : dht6.searches)
                out << "[search " << sr.first << " IPv6]" << std::endl;
    } else {
        out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl;
        if (not af or af == AF_INET)
            for (const auto& sr : dht4.searches)
                dumpSearch(*sr.second, out);
        if (not af or af == AF_INET6)
            for (const auto& sr : dht6.searches)
                dumpSearch(*sr.second, out);
    }
    out << "Total: " << num_searches << " searches (" << dht4.searches.size() << " IPv4, " << dht6.searches.size() << " IPv6)." << std::endl;
    return out.str();
}

std::string
Dht::getSearchLog(const InfoHash& id, sa_family_t af) const
{
    std::stringstream out;
    if (af == AF_UNSPEC) {
        out << getSearchLog(id, AF_INET) << getSearchLog(id, AF_INET6);
    } else {
        auto& srs = searches(af);
        auto sr = srs.find(id);
        if (sr != srs.end())
            dumpSearch(*sr->second, out);
    }
    return out.str();
}

Dht::~Dht()
{
    for (auto& s : dht4.searches)
        s.second->clear();
    for (auto& s : dht6.searches)
        s.second->clear();
}

net::NetworkConfig
fromDhtConfig(const Config& config)
{
    net::NetworkConfig netConf;
    netConf.network = config.network;
    netConf.max_req_per_sec = config.max_req_per_sec ? config.max_req_per_sec : MAX_REQUESTS_PER_SEC;
    netConf.max_peer_req_per_sec = config.max_peer_req_per_sec
        ? config.max_peer_req_per_sec
        : netConf.max_req_per_sec/8;
    return netConf;
}

Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l)
    : DhtInterface(l),
    myid(config.node_id ? config.node_id : InfoHash::getRandom(rd)),
    store(),
    store_quota(),
    max_store_keys(config.max_store_size ? (int)config.max_store_size : MAX_HASHES),
    max_searches(config.max_searches ? (int)config.max_searches : MAX_SEARCHES),
    network_engine(myid, fromDhtConfig(config), std::move(sock), logger_, rd, scheduler,
            std::bind(&Dht::onError, this, _1, _2),
            std::bind(&Dht::onNewNode, this, _1, _2),
            std::bind(&Dht::onReportedAddr, this, _1, _2),
            std::bind(&Dht::onPing, this, _1),
            std::bind(&Dht::onFindNode, this, _1, _2, _3),
            std::bind(&Dht::onGetValues, this, _1, _2, _3, _4),
            std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5, _6),
            std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5),
            std::bind(&Dht::onRefresh, this, _1, _2, _3, _4)),
    persistPath(config.persist_path),
    is_bootstrap(config.is_bootstrap),
    maintain_storage(config.maintain_storage),
    public_stable(config.public_stable)
{
    scheduler.syncTime();
    auto s = network_engine.getSocket();
    if (not s or (not s->hasIPv4() and not s->hasIPv6()))
        throw DhtException("Opened socket required");
    if (s->hasIPv4()) {
        dht4.buckets = {Bucket {AF_INET}};
        dht4.buckets.is_client = config.is_bootstrap;
    }
    if (s->hasIPv6()) {
        dht6.buckets = {Bucket {AF_INET6}};
        dht6.buckets.is_client = config.is_bootstrap;
    }

    search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd);

    uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)};
    nextNodesConfirmation = scheduler.add(scheduler.time() + time_dis(rd), std::bind(&Dht::confirmNodes, this));

    // Fill old secret
    secret = std::uniform_int_distribution<uint64_t>{}(rd);
    rotateSecrets();

    if (not persistPath.empty())
        loadState(persistPath);

    expire();

    if (logger_)
        logger_->d("DHT node initialised with ID %s", myid.toString().c_str());
}

bool
Dht::neighbourhoodMaintenance(RoutingTable& list)
{
    //logger__DBG("neighbourhoodMaintenance");
    auto b = list.findBucket(myid);
    if (b == list.end())
        return false;

    InfoHash id = myid;
#ifdef _WIN32
    std::uniform_int_distribution<int> rand_byte{ 0, std::numeric_limits<uint8_t>::max() };
#else
    std::uniform_int_distribution<uint8_t> rand_byte;
#endif
    id[HASH_LEN-1] = rand_byte(rd);

    std::bernoulli_distribution rand_trial(1./8.);
    auto q = b;
    if (std::next(q) != list.end() && (q->nodes.empty() || rand_trial(rd)))
        q = std::next(q);
    if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
        auto r = std::prev(b);
        if (!r->nodes.empty())
            q = r;
    }

    auto n = q->randomNode(rd);
    if (n) {
        if (logger_)
            logger_->d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance",
                n->toString().c_str(), id.toString().c_str());
        /* Since our node-id is the same in both DHTs, it's probably
           profitable to query both families. */
        network_engine.sendFindNode(n, id, network_engine.want());
    }

    return true;
}

bool
Dht::bucketMaintenance(RoutingTable& list)
{
    std::bernoulli_distribution rand_trial(1./8.);
    std::bernoulli_distribution rand_trial_38(1./38.);

    bool sent {false};
    for (auto b = list.begin(); b != list.end(); ++b) {
        if (b->time < scheduler.time() - std::chrono::minutes(10) || b->nodes.empty()) {
            /* This bucket hasn't seen any positive confirmation for a long
               time. Pick a random id in this bucket's range, and send a request
               to a random node. */
            InfoHash id = list.randomId(b, rd);
            auto q = b;
            /* If the bucket is empty, we try to fill it from a neighbour.
               We also sometimes do it gratuitiously to recover from
               buckets full of broken nodes. */
            if (std::next(b) != list.end() && (q->nodes.empty() || rand_trial(rd)))
                q = std::next(b);
            if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
                auto r = std::prev(b);
                if (!r->nodes.empty())
                    q = r;
            }

            auto n = q->randomNode(rd);
            if (n and not n->isPendingMessage()) {
                want_t want = -1;

                if (network_engine.want() != want) {
                    auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET);
                    if (otherbucket && otherbucket->nodes.size() < TARGET_NODES)
                        /* The corresponding bucket in the other family
                           is emptyish -- querying both is useful. */
                        want = WANT4 | WANT6;
                    else if (rand_trial_38(rd))
                        /* Most of the time, this just adds overhead.
                           However, it might help stitch back one of
                           the DHTs after a network collapse, so query
                           both, but only very occasionally. */
                        want = WANT4 | WANT6;
                }

                if (logger_)
                    logger_->d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str());
                //auto start = scheduler.time();
                network_engine.sendFindNode(n, id, want, nullptr, [this,n](const net::Request&, bool over) {
                    if (over) {
                        const auto& end = scheduler.time();
                        // using namespace std::chrono;
                        // if (logger_)
                        //     logger_->d(n->id, "[node %s] bucket maintenance op expired after %s", n->toString().c_str(), print_duration(end-start).c_str());
                        scheduler.edit(nextNodesConfirmation, end + Node::MAX_RESPONSE_TIME);
                    }
                });
                sent = true;
            }
        }
    }
    return sent;
}

void
Dht::dataPersistence(InfoHash id)
{
    const auto& now = scheduler.time();
    auto str = store.find(id);
    if (str != store.end() and now > str->second.maintenance_time) {
        if (logger_)
            logger_->d(id, "[storage %s] maintenance (%u values, %u bytes)",
                id.toString().c_str(), str->second.valueCount(), str->second.totalSize());
        maintainStorage(*str);
        str->second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
        scheduler.add(str->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
    }
}

size_t
Dht::maintainStorage(decltype(store)::value_type& storage, bool force, const DoneCallback& donecb)
{
    const auto& now = scheduler.time();
    size_t announce_per_af = 0;

    auto maintain = [&](sa_family_t af){
        bool want = true;
        auto nodes = buckets(af).findClosestNodes(storage.first, now);
        if (!nodes.empty()) {
            if (force || storage.first.xorCmp(nodes.back()->id, myid) < 0) {
                for (auto &value : storage.second.getValues()) {
                    const auto& vt = getType(value.data->type);
                    if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
                        // gotta put that value there
                        announce(storage.first, af, value.data, donecb, value.created);
                        ++announce_per_af;
                    }
                }
                want = false;
            }
        }
        return want;
    };
    bool want4 = maintain(AF_INET), want6 = maintain(AF_INET6);

    if (not want4 and not want6) {
        if (logger_)
            logger_->d(storage.first, "Discarding storage values %s", storage.first.toString().c_str());
        auto diff = storage.second.clear();
        total_store_size += diff.size_diff;
        total_values += diff.values_diff;
    }

    return announce_per_af;
}

time_point
Dht::periodic(const uint8_t *buf, size_t buflen, SockAddr from, const time_point& now)
{
    scheduler.syncTime(now);
    if (buflen) {
        try {
            network_engine.processMessage(buf, buflen, std::move(from));
        } catch (const std::exception& e) {
            if (logger_)
                logger_->w("Can't process message: %s", e.what());
        }
    }
    return scheduler.run();
}

void
Dht::expire()
{
    uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
    auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));

    expireBuckets(dht4.buckets);
    expireBuckets(dht6.buckets);
    expireStore();
    expireSearches();
    scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this));
}

void
Dht::onConnected()
{
    if (bootstrapJob) {
        bootstrapJob->cancel();
        bootstrapJob.reset();
    }
    bootstrap_period = std::chrono::seconds(10);
    auto callbacks = std::move(onConnectCallbacks_);
    while (not callbacks.empty()) {
        callbacks.front()();
        callbacks.pop();
    }
}
void
Dht::onDisconnected()
{
    if (dht4.status != NodeStatus::Disconnected || dht6.status != NodeStatus::Disconnected)
        return;
    if (logger_)
        logger_->d(myid, "Bootstraping");
    for (const auto& boootstrap : bootstrap_nodes) {
        try {
            auto ips = network_engine.getSocket()->resolve(boootstrap.first, boootstrap.second);
            for (auto& ip : ips) {
                if (ip.getPort() == 0)
                    ip.setPort(net::DHT_DEFAULT_PORT);
                pingNode(ip);
            }
        } catch (const std::exception& e) {
            if (logger_)
                logger_->e(myid, "Can't resolve %s:%s: %s", boootstrap.first.c_str(), boootstrap.second.c_str(), e.what());
        }
    }
    if (bootstrapJob)
        bootstrapJob->cancel();
    bootstrapJob = scheduler.add(scheduler.time() + bootstrap_period, std::bind(&Dht::onDisconnected, this));
    bootstrap_period *= 2;
}

void
Dht::confirmNodes()
{
    using namespace std::chrono;
    bool soon = false;
    const auto& now = scheduler.time();

    if (dht4.searches.empty() and dht4.status == NodeStatus::Connected) {
        if (logger_)
            logger_->d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str());
        search(myid, AF_INET);
    }
    if (dht6.searches.empty() and dht6.status == NodeStatus::Connected) {
        if (logger_)
            logger_->d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str());
        search(myid, AF_INET6);
    }

    soon |= bucketMaintenance(dht4.buckets);
    soon |= bucketMaintenance(dht6.buckets);

    if (!soon) {
        if (dht4.buckets.grow_time >= now - seconds(150))
            soon |= neighbourhoodMaintenance(dht4.buckets);
        if (dht6.buckets.grow_time >= now - seconds(150))
            soon |= neighbourhoodMaintenance(dht6.buckets);
    }

    /* In order to maintain all buckets' age within 600 seconds, worst
       case is roughly 27 seconds, assuming the table is 22 bits deep.
       We want to keep a margin for neighborhood maintenance, so keep
       this within 25 seconds. */
    auto time_dis = soon
        ? uniform_duration_distribution<> {seconds(5) , seconds(25)}
        : uniform_duration_distribution<> {seconds(60), seconds(180)};
    auto confirm_nodes_time = now + time_dis(rd);

    scheduler.edit(nextNodesConfirmation, confirm_nodes_time);
}

std::vector<ValuesExport>
Dht::exportValues() const
{
    std::vector<ValuesExport> e {};
    e.reserve(store.size());
    for (const auto& h : store) {
        ValuesExport ve;
        ve.first = h.first;

        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        const auto& vals = h.second.getValues();
        pk.pack_array(vals.size());
        for (const auto& v : vals) {
            pk.pack_array(2);
            pk.pack(v.created.time_since_epoch().count());
            v.data->msgpack_pack(pk);
        }
        ve.second = {buffer.data(), buffer.data()+buffer.size()};
        e.push_back(std::move(ve));
    }
    return e;
}

void
Dht::importValues(const std::vector<ValuesExport>& import)
{
    const auto& now = scheduler.time();

    for (const auto& value : import) {
        if (value.second.empty())
            continue;

        try {
            msgpack::unpacked msg;
            msgpack::unpack(msg, (const char*)value.second.data(), value.second.size());
            auto valarr = msg.get();
            if (valarr.type != msgpack::type::ARRAY)
                throw msgpack::type_error();
            for (unsigned i = 0; i < valarr.via.array.size; i++) {
                auto& valel = valarr.via.array.ptr[i];
                if (valel.type != msgpack::type::ARRAY or valel.via.array.size < 2)
                    throw msgpack::type_error();
                time_point val_time;
                Value tmp_val;
                try {
                    val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}};
                    tmp_val.msgpack_unpack(valel.via.array.ptr[1]);
                } catch (const std::exception&) {
                    if (logger_)
                        logger_->e(value.first, "Error reading value at %s", value.first.toString().c_str());
                    continue;
                }
                val_time = std::min(val_time, now);
                storageStore(value.first, std::make_shared<Value>(std::move(tmp_val)), val_time);
            }
        } catch (const std::exception&) {
            if (logger_)
                logger_->e(value.first, "Error reading values at %s", value.first.toString().c_str());
            continue;
        }
    }
}


std::vector<NodeExport>
Dht::exportNodes() const
{
    const auto& now = scheduler.time();
    std::vector<NodeExport> nodes;
    const auto b4 = dht4.buckets.findBucket(myid);
    if (b4 != dht4.buckets.end()) {
        for (auto& n : b4->nodes)
            if (n->isGood(now))
                nodes.push_back(n->exportNode());
    }
    const auto b6 = dht6.buckets.findBucket(myid);
    if (b6 != dht6.buckets.end()) {
        for (auto& n : b6->nodes)
            if (n->isGood(now))
                nodes.push_back(n->exportNode());
    }
    for (auto b = dht4.buckets.begin(); b != dht4.buckets.end(); ++b) {
        if (b == b4) continue;
        for (auto& n : b->nodes)
            if (n->isGood(now))
                nodes.push_back(n->exportNode());
    }
    for (auto b = dht6.buckets.begin(); b != dht6.buckets.end(); ++b) {
        if (b == b6) continue;
        for (auto& n : b->nodes)
            if (n->isGood(now))
                nodes.push_back(n->exportNode());
    }
    return nodes;
}

void
Dht::insertNode(const InfoHash& id, const SockAddr& addr)
{
    if (addr.getFamily() != AF_INET && addr.getFamily() != AF_INET6)
        return;
    scheduler.syncTime();
    network_engine.insertNode(id, addr);
}

void
Dht::pingNode(SockAddr sa, DoneCallbackSimple&& cb)
{
    scheduler.syncTime();
    if (logger_)
        logger_->d("Sending ping to %s", sa.toString().c_str());
    auto& count = dht(sa.getFamily()).pending_pings;
    count++;
    network_engine.sendPing(std::move(sa), [&count,cb](const net::Request&, net::RequestAnswer&&) {
        count--;
        if (cb)
            cb(true);
    }, [&count,cb](const net::Request&, bool last){
        if (last) {
            count--;
            if (cb)
                cb(false);
        }
    });
}

void
Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) {
    const auto& node = req->node;
    if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) {
        if (logger_)
            logger_->e(node->id, "[node %s] token flush", node->toString().c_str());
        node->authError();
        for (auto& srp : searches(node->getFamily())) {
            auto& sr = srp.second;
            for (auto& n : sr->nodes) {
                if (n->node != node) continue;
                n->token.clear();
                n->last_get_reply = time_point::min();
                searchSendGetValues(sr);
                scheduler.edit(sr->nextSearchStep, scheduler.time());
                break;
            }
        }
    } else if (e.getCode() == net::DhtProtocolException::NOT_FOUND) {
        if (logger_)
            logger_->e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str());
        node->cancelRequest(req);
    }
}

void
Dht::onReportedAddr(const InfoHash& /*id*/, const SockAddr& addr)
{
    if (addr)
        reportedAddr(addr);
}

net::RequestAnswer
Dht::onPing(Sp<Node>)
{
    return {};
}

net::RequestAnswer
Dht::onFindNode(Sp<Node> node, const InfoHash& target, want_t want)
{
    const auto& now = scheduler.time();
    net::RequestAnswer answer;
    answer.ntoken = makeToken(node->getAddr(), false);
    if (want & WANT4)
        answer.nodes4 = dht4.buckets.findClosestNodes(target, now, TARGET_NODES);
    if (want & WANT6)
        answer.nodes6 = dht6.buckets.findClosestNodes(target, now, TARGET_NODES);
    return answer;
}

net::RequestAnswer
Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query)
{
    if (not hash) {
        if (logger_)
            logger_->w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str());
        throw net::DhtProtocolException {
            net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
            net::DhtProtocolException::GET_NO_INFOHASH
        };
    }
    const auto& now = scheduler.time();
    net::RequestAnswer answer {};
    auto st = store.find(hash);
    answer.ntoken = makeToken(node->getAddr(), false);
    answer.nodes4 = dht4.buckets.findClosestNodes(hash, now, TARGET_NODES);
    answer.nodes6 = dht6.buckets.findClosestNodes(hash, now, TARGET_NODES);
    if (st != store.end() && not st->second.empty()) {
        answer.values = st->second.get(query.where.getFilter());
        if (logger_)
            logger_->d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size());
    }
    return answer;
}

void Dht::onGetValuesDone(const Sp<Node>& node,
        net::RequestAnswer& a,
        Sp<Search>& sr,
        const Sp<Query>& orig_query)
{
    if (not sr) {
        if (logger_)
            logger_->w("[search unknown] got reply to 'get'. Ignoring.");
        return;
    }
    /* if (logger_)
           logger_->d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes",
            sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size());*/

    if (not a.ntoken.empty()) {
        if (not a.values.empty() or not a.fields.empty()) {
            if (logger_)
                logger_->d(sr->id, node->id, "[search %s] [node %s] found %u values",
                      sr->id.toString().c_str(), node->toString().c_str(), a.values.size());
            for (auto& getp : sr->callbacks) { /* call all callbacks for this search */
                auto& get = getp.second;
                if (not (get.get_cb or get.query_cb) or
                        (orig_query and get.query and not get.query->isSatisfiedBy(*orig_query)))
                    continue;

                if (get.query_cb) { /* in case of a request with query */
                    if (not a.fields.empty()) {
                        get.query_cb(a.fields);
                    } else if (not a.values.empty()) {
                        std::vector<Sp<FieldValueIndex>> fields;
                        fields.reserve(a.values.size());
                        for (const auto& v : a.values)
                            fields.emplace_back(std::make_shared<FieldValueIndex>(*v, orig_query ? orig_query->select : Select {}));
                        get.query_cb(fields);
                    }
                } else if (get.get_cb) { /* in case of a vanilla get request */
                    std::vector<Sp<Value>> tmp;
                    for (const auto& v : a.values)
                        if (not get.filter or get.filter(*v))
                            tmp.emplace_back(v);
                    if (not tmp.empty())
                        get.get_cb(tmp);
                }
            }

            /* callbacks for local search listeners */
            /*std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> tmp_lists;
            for (auto& l : sr->listeners) {
                if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query)))
                    continue;
                std::vector<Sp<Value>> tmp;
                for (const auto& v : a.values)
                    if (not l.second.filter or l.second.filter(*v))
                        tmp.emplace_back(v);
                if (not tmp.empty())
                    tmp_lists.emplace_back(l.second.get_cb, std::move(tmp));
            }
            for (auto& l : tmp_lists)
                l.first(l.second, false);*/
        } else if (not a.expired_values.empty()) {
            if (logger_)
                logger_->w(sr->id, node->id, "[search %s] [node %s] %u expired values",
                      sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size());
        }
    } else {
        if (logger_)
            logger_->w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str());
        network_engine.blacklistNode(node);
    }

    if (not sr->done) {
        searchSendGetValues(sr);

        // Force to recompute the next step time
        scheduler.edit(sr->nextSearchStep, scheduler.time());
    }
}

net::RequestAnswer
Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query, int version)
{
    if (not hash) {
        if (logger_)
            logger_->w(node->id, "[node %s] listen with no info_hash", node->toString().c_str());
        throw net::DhtProtocolException {
            net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
            net::DhtProtocolException::LISTEN_NO_INFOHASH
        };
    }
    if (not tokenMatch(token, node->getAddr())) {
        if (logger_)
            logger_->w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str());
        throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN};
    }
    Query q = query;
    storageAddListener(hash, node, socket_id, std::move(q), version);
    return {};
}

void
Dht::onListenDone(const Sp<Node>& /* node */, net::RequestAnswer& /* answer */, Sp<Search>& sr)
{
    // if (logger_)
    //     logger_->d(sr->id, node->id, "[search %s] [node %s] got listen confirmation",
    //            sr->id.toString().c_str(), node->toString().c_str(), answer.values.size());

    if (not sr->done) {
        const auto& now = scheduler.time();
        searchSendGetValues(sr);
        scheduler.edit(sr->nextSearchStep, now);
    }
}

net::RequestAnswer
Dht::onAnnounce(Sp<Node> n,
        const InfoHash& hash,
        const Blob& token,
        const std::vector<Sp<Value>>& values,
        const time_point& creation_date)
{
    auto& node = *n;
    if (not hash) {
        if (logger_)
            logger_->w(node.id, "put with no info_hash");
        throw net::DhtProtocolException {
            net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
            net::DhtProtocolException::PUT_NO_INFOHASH
        };
    }
    if (!tokenMatch(token, node.getAddr())) {
        if (logger_)
            logger_->w(hash, node.id, "[node %s] incorrect token %s for 'put'", node.toString().c_str(), hash.toString().c_str());
        throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::PUT_WRONG_TOKEN};
    }
    {
        // We store a value only if we think we're part of the
        // SEARCH_NODES nodes around the target id.
        auto closest_nodes = buckets(node.getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
        if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
            if (logger_)
                logger_->w(hash, node.id, "[node %s] announce too far from the target. Dropping value.", node.toString().c_str());
            return {};
        }
    }

    auto created = std::min(creation_date, scheduler.time());
    for (const auto& v : values) {
        if (v->id == Value::INVALID_ID) {
            if (logger_)
                logger_->w(hash, node.id, "[value %s] incorrect value id", hash.toString().c_str());
            throw net::DhtProtocolException {
                net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
                net::DhtProtocolException::PUT_INVALID_ID
            };
        }
        auto lv = getLocalById(hash, v->id);
        Sp<Value> vc = v;
        if (lv) {
            if (*lv == *vc) {
                storageRefresh(hash, v->id);
                if (logger_)
                    logger_->d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str());
            } else {
                const auto& type = getType(lv->type);
                if (type.editPolicy(hash, lv, vc, node.id, node.getAddr())) {
                    if (logger_)
                        logger_->d(hash, node.id, "[store %s] editing %s",
                            hash.toString().c_str(), vc->toString().c_str());
                    storageStore(hash, vc, created, node.getAddr());
                } else {
                    if (logger_)
                        logger_->d(hash, node.id, "[store %s] rejecting edition of %s because of storage policy",
                            hash.toString().c_str(), vc->toString().c_str());
                }
            }
        } else {
            // Allow the value to be edited by the storage policy
            const auto& type = getType(vc->type);
            if (type.storePolicy(hash, vc, node.id, node.getAddr())) {
                // if (logger_)
                //     logger_->d(hash, node.id, "[store %s] storing %s", hash.toString().c_str(), std::to_string(vc->id).c_str());
                storageStore(hash, vc, created, node.getAddr());
            } else {
                if (logger_)
                    logger_->d(hash, node.id, "[store %s] rejecting storage of %s",
                        hash.toString().c_str(), vc->toString().c_str());
            }
        }
    }
    return {};
}

net::RequestAnswer
Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid)
{
    using namespace net;

    if (not tokenMatch(token, node->getAddr())) {
        if (logger_)
            logger_->w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str());
        throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN};
    }
    if (storageRefresh(hash, vid)) {
        if (logger_)
            logger_->d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str());
    } else {
        if (logger_)
            logger_->d(hash, node->id, "[store %s] [node %s] got refresh for unknown value",
                hash.toString().c_str(), node->toString().c_str());
        throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND};
    }
    return {};
}

bool
Dht::storageRefresh(const InfoHash& id, Value::Id vid)
{
    const auto& now = scheduler.time();
    auto s = store.find(id);
    if (s != store.end()) {
        // Values like for a permanent put can be refreshed. So, inform remote listeners that the value
        // need to be refreshed
        auto& st = s->second;
        if (not st.listeners.empty()) {
            if (logger_)
                logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
            std::vector<Value::Id> ids = {vid};
            for (const auto& node_listeners : st.listeners) {
                for (const auto& l : node_listeners.second) {
                    if (logger_)
                        logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending refresh",
                            id.toString().c_str(),
                            node_listeners.first->toString().c_str());
                    Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
                    network_engine.tellListenerRefreshed(node_listeners.first, l.first, id, ntoken, ids, l.second.version);
                }
            }
        }

        auto expiration = s->second.refresh(now, vid, types);
        if (expiration != time_point::max())
            scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
        return true;
    }
    return false;
}

void
Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr)
{
    if (logger_)
        logger_->d(sr->id, node->id, "[search %s] [node %s] got reply to put!",
            sr->id.toString().c_str(), node->toString().c_str());
    searchSendGetValues(sr);
    sr->checkAnnounced(answer.vid);
}


struct DhtState {
    unsigned v {1};
    InfoHash id;
    std::vector<NodeExport> nodes;
    std::vector<ValuesExport> values;

    MSGPACK_DEFINE_MAP(v, id, nodes, values)
};

void
Dht::saveState(const std::string& path) const
{
    DhtState state;
    state.id = myid;
    state.nodes = exportNodes();
    state.values = exportValues();
    std::ofstream file(path);
    msgpack::pack(file, state);
}

void
Dht::loadState(const std::string& path)
{
    if (logger_)
        logger_->d("Importing state from %s", path.c_str());
    try {
        // Import nodes from binary file
        msgpack::unpacker pac;
        {
            // Read whole file
            std::ifstream file(path, std::ios::binary|std::ios::ate);
            if (!file.is_open()) {
                return;
            }
            auto size = file.tellg();
            file.seekg (0, std::ios::beg);
            pac.reserve_buffer(size);
            file.read (pac.buffer(), size);
            pac.buffer_consumed(size);
        }
        // Import nodes
        msgpack::object_handle oh;
        if (pac.next(oh)) {
            auto state = oh.get().as<DhtState>();
            if (logger_)
                logger_->d("Importing %zu nodes", state.nodes.size());
            if (state.id)
                myid = state.id;
            std::vector<Sp<Node>> tmpNodes;
            tmpNodes.reserve(state.nodes.size());
            for (const auto& node : state.nodes)
                tmpNodes.emplace_back(network_engine.insertNode(node.id, SockAddr(node.ss, node.sslen)));
            importValues(state.values);
        }
    } catch (const std::exception& e) {
        if (logger_)
            logger_->w("Error importing state from %s: %s", path.c_str(), e.what());
    }
}

}