Skip to content
Snippets Groups Projects
Select Git revision
  • 4.0.0
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
30 results

config.h

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    network_engine.cpp 46.58 KiB
    /*
     *  Copyright (C) 2014-2022 Savoir-faire Linux Inc.
     *  Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *              Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <https://www.gnu.org/licenses/>.
     */
    
    #include "network_engine.h"
    #include "request.h"
    #include "default_types.h"
    #include "log_enable.h"
    #include "parsed_message.h"
    
    #include <msgpack.hpp>
    
    namespace dht {
    namespace net {
    using namespace std::chrono_literals;
    
    const std::string DhtProtocolException::GET_NO_INFOHASH {"Get_values with no info_hash"};
    const std::string DhtProtocolException::LISTEN_NO_INFOHASH {"Listen with no info_hash"};
    const std::string DhtProtocolException::LISTEN_WRONG_TOKEN {"Listen with wrong token"};
    const std::string DhtProtocolException::PUT_NO_INFOHASH {"Put with no info_hash"};
    const std::string DhtProtocolException::PUT_WRONG_TOKEN {"Put with wrong token"};
    const std::string DhtProtocolException::PUT_INVALID_ID {"Put with invalid id"};
    const std::string DhtProtocolException::STORAGE_NOT_FOUND {"Access operation for unknown storage"};
    
    constexpr std::chrono::seconds NetworkEngine::UDP_REPLY_TIME;
    constexpr std::chrono::seconds NetworkEngine::RX_MAX_PACKET_TIME;
    constexpr std::chrono::seconds NetworkEngine::RX_TIMEOUT;
    
    const std::string NetworkEngine::my_v {"RNG1"};
    
    static constexpr uint8_t v4prefix[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0};
    
    constexpr unsigned SEND_NODES {8};
    
    
    struct NetworkEngine::PartialMessage {
        SockAddr from;
        time_point start;
        time_point last_part;
        std::unique_ptr<ParsedMessage> msg;
    };
    
    std::vector<Blob>
    serializeValues(const std::vector<Sp<Value>>& st)
    {
        std::vector<Blob> svals;
        svals.reserve(st.size());
        for (const auto& v : st)
            svals.emplace_back(packMsg(v));
        return svals;
    }
    
    void
    packToken(msgpack::packer<msgpack::sbuffer>& pk, const Blob& token)
    {
        pk.pack_bin(token.size());
        pk.pack_bin_body((char*)token.data(), token.size());
    }
    
    RequestAnswer::RequestAnswer(ParsedMessage&& msg)
     : ntoken(std::move(msg.token)),
       values(std::move(msg.values)),
       refreshed_values(std::move(msg.refreshed_values)),
       expired_values(std::move(msg.expired_values)),
       fields(std::move(msg.fields)),
       nodes4(std::move(msg.nodes4)),
       nodes6(std::move(msg.nodes6))
    {}
    
    NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c,
            std::unique_ptr<DatagramSocket>&& sock,
            const Sp<Logger>& log,
            std::mt19937_64& rand,
            Scheduler& scheduler,
            decltype(NetworkEngine::onError)&& onError,
            decltype(NetworkEngine::onNewNode)&& onNewNode,
            decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
            decltype(NetworkEngine::onPing)&& onPing,
            decltype(NetworkEngine::onFindNode)&& onFindNode,
            decltype(NetworkEngine::onGetValues)&& onGetValues,
            decltype(NetworkEngine::onListen)&& onListen,
            decltype(NetworkEngine::onAnnounce)&& onAnnounce,
            decltype(NetworkEngine::onRefresh)&& onRefresh) :
        onError(std::move(onError)),
        onNewNode(std::move(onNewNode)),
        onReportedAddr(std::move(onReportedAddr)),
        onPing(std::move(onPing)),
        onFindNode(std::move(onFindNode)),
        onGetValues(std::move(onGetValues)),
        onListen(std::move(onListen)),
        onAnnounce(std::move(onAnnounce)),
        onRefresh(std::move(onRefresh)),
        myid(myid), config(c), dht_socket(std::move(sock)), logger_(log), rd(rand),
        cache(rd),
        rate_limiter(config.max_req_per_sec),
        scheduler(scheduler)
    {}
    
    NetworkEngine::~NetworkEngine() {
        clear();
    }
    
    void
    NetworkEngine::tellListener(const Sp<Node>& node, Tid socket_id, const InfoHash& hash, want_t want,
            const Blob& ntoken, std::vector<Sp<Node>>&& nodes,
            std::vector<Sp<Node>>&& nodes6, std::vector<Sp<Value>>&& values,
            const Query& query, int version)
    {
        auto nnodes = bufferNodes(node->getFamily(), hash, want, nodes, nodes6);
        try {
            if (version >= 1) {
                sendUpdateValues(node, hash, values, scheduler.time(), ntoken, socket_id);
            } else {
                sendNodesValues(node->getAddr(), socket_id, nnodes.first, nnodes.second, values, query, ntoken);
            }
        } catch (const std::overflow_error& e) {
            if (logger_)
                logger_->e("Can't send value: buffer not large enough !");
        }
    }
    
    void
    NetworkEngine::tellListenerRefreshed(const Sp<Node>& n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values, int version)
    {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
    
        pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0));
    
        pk.pack(version >= 1 ? KEY_A : KEY_U);
            pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0));
            pk.pack(KEY_REQ_ID); pk.pack(myid);
            if (version >= 1) {
                pk.pack(KEY_REQ_SID);   pk.pack(socket_id);
            }
            if (not token.empty()) {
                pk.pack(KEY_REQ_TOKEN); packToken(pk, token);
            }
            if (not values.empty()) {
                pk.pack(KEY_REQ_REFRESHED);
                pk.pack(values);
                if (logger_)
                    logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size());
            }
    
        pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        if (version >= 1) {
            Tid tid (n->getNewTid());
    
            pk.pack(KEY_Q);   pk.pack(QUERY_UPDATE);
            pk.pack(KEY_TID); pk.pack(tid);
    
            auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n,
                Blob(buffer.data(), buffer.data() + buffer.size()),
                [=](const Request&, ParsedMessage&&) { /* on done */ },
                [=](const Request&, bool) { /* on expired */ }
            );
            sendRequest(req);
            ++out_stats.updateValue;
            return;
        }
        pk.pack(KEY_TID); pk.pack(socket_id);
    
        // send response
        send(n->getAddr(), buffer.data(), buffer.size());
    }
    
    void
    NetworkEngine::tellListenerExpired(const Sp<Node>& n, Tid socket_id, const InfoHash&, const Blob& token, const std::vector<Value::Id>& values, int version)
    {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
    
        pk.pack_map(4 + (version >= 1 ? 1 : 0) + (config.network?1:0));
    
        pk.pack(version >= 1 ? KEY_A : KEY_U);
            pk.pack_map(1 + (version >= 1 ? 1 : 0) + (not values.empty()?1:0) + (not token.empty()?1:0));
            pk.pack(KEY_REQ_ID); pk.pack(myid);
            if (version >= 1) {
                pk.pack(KEY_REQ_SID);   pk.pack(socket_id);
            }
            if (not token.empty()) {
                pk.pack(KEY_REQ_TOKEN); packToken(pk, token);
            }
            if (not values.empty()) {
                pk.pack(KEY_REQ_EXPIRED);
                pk.pack(values);
                if (logger_)
                    logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size());
            }
    
        pk.pack(KEY_Y); pk.pack(version >= 1 ? KEY_Q : KEY_R);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        if (version >= 1) {
            Tid tid (n->getNewTid());
    
            pk.pack(KEY_Q);   pk.pack(QUERY_UPDATE);
            pk.pack(KEY_TID); pk.pack(tid);
    
            auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n,
                Blob(buffer.data(), buffer.data() + buffer.size()),
                [=](const Request&, ParsedMessage&&) { /* on done */ },
                [=](const Request&, bool) { /* on expired */ }
            );
            sendRequest(req);
            ++out_stats.updateValue;
            return;
        }
        pk.pack(KEY_TID); pk.pack(socket_id);
    
        // send response
        send(n->getAddr(), buffer.data(), buffer.size());
    }
    
    
    bool
    NetworkEngine::isRunning(sa_family_t af) const
    {
        switch (af) {
        case 0:
            return dht_socket->hasIPv4() or dht_socket->hasIPv6();
        case AF_INET:
            return dht_socket->hasIPv4();
        case AF_INET6:
            return dht_socket->hasIPv6();
        default:
            return false;
        }
    }
    
    void
    NetworkEngine::clear()
    {
        for (auto& request : requests) {
            request.second->cancel();
            request.second->node->setExpired();
        }
        requests.clear();
    }
    
    void
    NetworkEngine::connectivityChanged(sa_family_t af)
    {
        cache.clearBadNodes(af);
    }
    
    void
    NetworkEngine::requestStep(Sp<Request> sreq)
    {
        auto& req = *sreq;
        if (not req.pending())
            return;
    
        auto now = scheduler.time();
        auto& node = *req.node;
        if (req.isExpired(now)) {
            // if (logger_)
            //     logger_->d(node.id, "[node %s] expired !", node.toString().c_str());
            node.setExpired();
            if (not node.id)
                requests.erase(req.tid);
            return;
        } else if (req.attempt_count == 1) {
            req.on_expired(req, false);
        }
    
        auto err = send(node.getAddr(), (char*)req.msg.data(), req.msg.size(), node.getReplyTime() < now - UDP_REPLY_TIME);
        if (err == ENETUNREACH  ||
            err == EHOSTUNREACH ||
            err == EAFNOSUPPORT ||
            err == EPIPE        ||
            err == EPERM)
        {
            node.setExpired();
            if (not node.id)
                requests.erase(req.tid);
        } else {
            req.last_try = now;
            if (err != EAGAIN) {
                ++req.attempt_count;
                req.attempt_duration +=
                    req.attempt_duration + uniform_duration_distribution<>(0ms, ((duration)Node::MAX_RESPONSE_TIME)/4)(rd);
                if (not req.parts.empty()){
                    sendValueParts(req.tid, req.parts, node.getAddr());
                }
            }
            std::weak_ptr<Request> wreq = sreq;
            scheduler.add(req.last_try + req.attempt_duration, [this,wreq] {
                if (auto req = wreq.lock())
                    requestStep(req);
            });
        }
    }
    
    /**
     * Sends a request to a node. Request::MAX_ATTEMPT_COUNT attempts will
     * be made before the request expires.
     */
    void
    NetworkEngine::sendRequest(const Sp<Request>& request)
    {
        auto& node = *request->node;
        if (not node.id)
            requests.emplace(request->tid, request);
        request->start = scheduler.time();
        node.requested(request);
        requestStep(request);
    }
    
    
    /* Rate control for requests we receive. */
    bool
    NetworkEngine::rateLimit(const SockAddr& addr)
    {
        const auto& now = scheduler.time();
    
        // occasional IP limiter maintenance (a few times every second at max rate)
        if (limiter_maintenance++ == config.max_peer_req_per_sec) {
            for (auto it = address_rate_limiter.begin(); it != address_rate_limiter.end();) {
                if (it->second.maintain(now) == 0)
                    address_rate_limiter.erase(it++);
                else
                    ++it;
            }
            limiter_maintenance = 0;
        }
    
        // invoke per IP, then global rate limiter
        return (config.max_peer_req_per_sec < 0
                or address_rate_limiter
                    .emplace(addr, config.max_peer_req_per_sec).first->second
                    .limit(now))
                and rate_limiter.limit(now);
    }
    
    bool
    NetworkEngine::isMartian(const SockAddr& addr)
    {
        if (addr.getPort() == 0)
            return true;
        switch(addr.getFamily()) {
        case AF_INET: {
            const auto& sin = addr.getIPv4();
            const uint8_t* address = (const uint8_t*)&sin.sin_addr;
            return (address[0] == 0) ||
                  ((address[0] & 0xE0) == 0xE0);
        }
        case AF_INET6: {
            if (addr.getLength() < sizeof(sockaddr_in6))
                return true;
            const auto& sin6 = addr.getIPv6();
            const uint8_t* address = (const uint8_t*)&sin6.sin6_addr;
            return address[0] == 0xFF ||
                  (address[0] == 0xFE && (address[1] & 0xC0) == 0x80) ||
                   memcmp(address, InfoHash::zero().data(), 16) == 0 ||
                   memcmp(address, v4prefix,      12) == 0;
        }
        default:
            return true;
        }
    }
    
    /* The internal blacklist is an LRU cache of nodes that have sent
       incorrect messages. */
    void
    NetworkEngine::blacklistNode(const Sp<Node>& n)
    {
        n->setExpired();
        blacklist.emplace(n->getAddr());
    }
    
    bool
    NetworkEngine::isNodeBlacklisted(const SockAddr& addr) const
    {
        return blacklist.find(addr) != blacklist.end();
    }
    
    void
    NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f)
    {
        auto from = f.getMappedIPv4();
        if (isMartian(from)) {
            if (logger_)
                logger_->w("Received packet from martian node %s", from.toString().c_str());
            return;
        }
    
        if (isNodeBlacklisted(from)) {
            if (logger_)
                logger_->w("Received packet from blacklisted node %s", from.toString().c_str());
            return;
        }
    
        std::unique_ptr<ParsedMessage> msg {new ParsedMessage};
        try {
            msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen);
            msg->msgpack_unpack(msg_res.get());
        } catch (const std::exception& e) {
            if (logger_)
                logger_->w("Can't parse message of size %lu: %s", buflen, e.what());
            // if (logger_)
            //     logger_->DBG.logPrintable(buf, buflen);
            return;
        }
    
        if (msg->network != config.network) {
            if (logger_)
                logger_->d("Received message from other config.network %u", msg->network);
            return;
        }
    
        const auto& now = scheduler.time();
    
        // partial value data
        if (msg->type == MessageType::ValueData) {
            auto pmsg_it = partial_messages.find(msg->tid);
            if (pmsg_it == partial_messages.end()) {
                if (logIncoming_)
                    if (logger_)
                        logger_->d("Can't find partial message");
                rateLimit(from);
                return;
            }
            if (!pmsg_it->second.from.equals(from)) {
                if (logger_)
                    logger_->d("Received partial message data from unexpected IP address");
                rateLimit(from);
                return;
            }
            // append data block
            if (pmsg_it->second.msg->append(*msg)) {
                pmsg_it->second.last_part = now;
                // check data completion
                if (pmsg_it->second.msg->complete()) {
                    try {
                        // process the full message
                        process(std::move(pmsg_it->second.msg), from);
                        partial_messages.erase(pmsg_it);
                    } catch (...) {
                        return;
                    }
                } else
                    scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, msg->tid));
            }
            return;
        }
    
        if (msg->id == myid or not msg->id) {
            if (logger_)
                logger_->d("Received message from self");
            return;
        }
    
        if (msg->type > MessageType::Reply) {
            /* Rate limit requests. */
            if (!rateLimit(from)) {
                if (logger_)
                    logger_->w("Dropping request due to rate limiting");
                return;
            }
        }
    
        if (msg->value_parts.empty()) {
            try {
                process(std::move(msg), from);
            } catch(...) {
                return;
            }
        } else {
            // starting partial message session
            auto k = msg->tid;
            auto& pmsg = partial_messages[k];
            if (not pmsg.msg) {
                pmsg.from = from;
                pmsg.msg = std::move(msg);
                pmsg.start = now;
                pmsg.last_part = now;
                scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, k));
                scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, k));
            } else
                if (logger_)
                    logger_->e("Partial message with given TID already exists");
        }
    }
    
    void
    NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from)
    {
        const auto& now = scheduler.time();
        auto node = cache.getNode(msg->id, from, now, true, msg->is_client);
    
        if (msg->type == MessageType::ValueUpdate) {
            auto rsocket = node->getSocket(msg->tid);
            if (not rsocket)
                throw DhtProtocolException {DhtProtocolException::UNKNOWN_TID, "Can't find socket", msg->id};
            node->received(now, {});
            onNewNode(node, 2);
            deserializeNodes(*msg, from);
            rsocket->on_receive(node, std::move(*msg));
        }
        else if (msg->type == MessageType::Error or msg->type == MessageType::Reply) {
            auto rsocket = node->getSocket(msg->tid);
            auto req = node->getRequest(msg->tid);
    
            /* either response for a request or data for an opened socket */
            if (not req and not rsocket) {
                auto req_it = requests.find(msg->tid);
                if (req_it != requests.end() and not req_it->second->node->id) {
                    req = req_it->second;
                    req->node = node;
                    requests.erase(req_it);
                } else {
                    node->received(now, req);
                    if (not node->isClient())
                        onNewNode(node, 1);
                    if (logger_)
                        logger_->d(node->id, "[node %s] can't find transaction with id %u", node->toString().c_str(), msg->tid);
                    return;
                }
            }
    
            node->received(now, req);
    
            if (not node->isClient())
                onNewNode(node, 2);
            onReportedAddr(msg->id, msg->addr);
    
            if (req and (req->cancelled() or req->expired() or req->completed())) {
                if (logger_)
                    logger_->w(node->id, "[node %s] response to expired, cancelled or completed request", node->toString().c_str());
                return;
            }
    
            switch (msg->type) {
            case MessageType::Error: {
                if (msg->id and req and (
                    (msg->error_code == DhtProtocolException::NOT_FOUND    and req->getType() == MessageType::Refresh) or
                    (msg->error_code == DhtProtocolException::UNAUTHORIZED and (req->getType() == MessageType::AnnounceValue
                                                                             or req->getType() == MessageType::Listen))))
                {
                    req->last_try = time_point::min();
                    req->reply_time = time_point::min();
                    if (not req->setError(DhtProtocolException {msg->error_code}))
                        onError(req, DhtProtocolException {msg->error_code});
                } else {
                    if (logIncoming_)
                        if (logger_)
                            logger_->w(msg->id, "[node %s %s] received unknown error message %u",
                            msg->id.toString().c_str(), from.toString().c_str(), msg->error_code);
                }
                break;
            }
            case MessageType::Reply:
                if (req) { /* request reply */
                    auto& r = *req;
                    if (r.getType() == MessageType::AnnounceValue
                     or r.getType() == MessageType::Listen
                     or r.getType() == MessageType::Refresh) {
                        r.node->authSuccess();
                    }
                    r.reply_time = scheduler.time();
    
                    deserializeNodes(*msg, from);
                    r.setDone(std::move(*msg));
                    break;
                } else { /* request socket data */
                    deserializeNodes(*msg, from);
                    rsocket->on_receive(node, std::move(*msg));
                }
                break;
            default:
                break;
            }
        } else {
            node->received(now, {});
            if (not node->isClient())
                onNewNode(node, 1);
            try {
                switch (msg->type) {
                case MessageType::Ping:
                    ++in_stats.ping;
                    if (logIncoming_)
                        if (logger_)
                            logger_->d(node->id, "[node %s] sending pong", node->toString().c_str());
                    onPing(node);
                    sendPong(from, msg->tid);
                    break;
                case MessageType::FindNode: {
                    // if (logger_)
                    //     logger_->d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want);
                    ++in_stats.find;
                    RequestAnswer answer = onFindNode(node, msg->target, msg->want);
                    auto nnodes = bufferNodes(from.getFamily(), msg->target, msg->want, answer.nodes4, answer.nodes6);
                    sendNodesValues(from, msg->tid, nnodes.first, nnodes.second, {}, {}, answer.ntoken);
                    break;
                }
                case MessageType::GetValues: {
                    // if (logger_)
                    //     logger_->d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
                    ++in_stats.get;
                    RequestAnswer answer = onGetValues(node, msg->info_hash, msg->want, msg->query);
                    auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6);
                    sendNodesValues(from, msg->tid, nnodes.first, nnodes.second, answer.values, msg->query, answer.ntoken);
                    break;
                }
                case MessageType::AnnounceValue: {
                    if (logIncoming_ and logger_)
                        logger_->d(msg->info_hash, node->id, "[node %s] got 'put' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
                    ++in_stats.put;
                    onAnnounce(node, msg->info_hash, msg->token, msg->values, msg->created);
    
                    /* Note that if storageStore failed, we lie to the requestor.
                       This is to prevent them from backtracking, and hence
                       polluting the DHT. */
                    for (auto& v : msg->values) {
                       sendValueAnnounced(from, msg->tid, v->id);
                    }
                    break;
                }
                case MessageType::Refresh:
                    if (logIncoming_ and logger_)
                        logger_->d(msg->info_hash, node->id, "[node %s] got 'refresh' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
                    onRefresh(node, msg->info_hash, msg->token, msg->value_id);
                    /* Same note as above in MessageType::AnnounceValue applies. */
                    sendValueAnnounced(from, msg->tid, msg->value_id);
                    break;
                case MessageType::Listen: {
                    if (logIncoming_ and logger_)
                        logger_->d(msg->info_hash, node->id, "[node %s] got 'listen' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
                    ++in_stats.listen;
                    RequestAnswer answer = onListen(node, msg->info_hash, msg->token, msg->socket_id, std::move(msg->query), msg->version);
                    auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6);
                    sendListenConfirmation(from, msg->tid);
                    break;
                }
                case MessageType::UpdateValue: {
                    if (logIncoming_ and logger_)
                        logger_->d(msg->info_hash, node->id, "[node %s] got 'update' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
                    ++in_stats.updateValue;
                    if (auto rsocket = node->getSocket(msg->socket_id))
                        rsocket->on_receive(node, std::move(*msg));
                    else if (logger_)
                        logger_->e(msg->info_hash, node->id, "[node %s] 'update' request without socket for %s", node->toString().c_str(), msg->info_hash.toString().c_str());
                    sendListenConfirmation(from, msg->tid);
                    break;
                }
                default:
                    break;
                }
            } catch (const std::overflow_error& e) {
                if (logger_)
                    logger_->e("Can't send value: buffer not large enough !");
            } catch (const DhtProtocolException& e) {
                sendError(from, msg->tid, e.getCode(), e.getMsg().c_str(), true);
            }
        }
    }
    
    void
    insertAddr(msgpack::packer<msgpack::sbuffer>& pk, const SockAddr& addr)
    {
        size_t addr_len = std::min<size_t>(addr.getLength(),
                         (addr.getFamily() == AF_INET) ? sizeof(in_addr) : sizeof(in6_addr));
        void* addr_ptr = (addr.getFamily() == AF_INET) ? (void*)&addr.getIPv4().sin_addr
                                                    : (void*)&addr.getIPv6().sin6_addr;
        pk.pack("sa");
        pk.pack_bin(addr_len);
        pk.pack_bin_body((char*)addr_ptr, addr_len);
    }
    
    int
    NetworkEngine::send(const SockAddr& addr, const char *buf, size_t len, bool confirmed)
    {
        return dht_socket ? dht_socket->sendTo(addr, (const uint8_t*)buf, len, confirmed) : ENOTCONN;
    }
    
    Sp<Request>
    NetworkEngine::sendPing(const Sp<Node>& node, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
        Tid tid (node->getNewTid());
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        pk.pack(KEY_A); pk.pack_map(1);
         pk.pack(KEY_REQ_ID); pk.pack(myid);
    
        pk.pack(KEY_Q); pk.pack(QUERY_PING);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_Q);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::Ping, tid, node,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request& req_status, ParsedMessage&&) {
                if (logger_)
                    logger_->d(req_status.node->id, "[node %s] got pong !", req_status.node->toString().c_str());
                if (on_done) {
                    on_done(req_status, {});
                }
            },
            [=](const Request& req_status, bool done) { /* on expired */
                if (on_expired) {
                    on_expired(req_status, done);
                }
            }
        );
        sendRequest(req);
        ++out_stats.ping;
        return req;
    }
    
    void
    NetworkEngine::sendPong(const SockAddr& addr, Tid tid) {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(4+(config.network?1:0));
    
        pk.pack(KEY_R); pk.pack_map(2);
          pk.pack(KEY_REQ_ID); pk.pack(myid);
          insertAddr(pk, addr);
    
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_R);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        send(addr, buffer.data(), buffer.size());
    }
    
    Sp<Request>
    NetworkEngine::sendFindNode(const Sp<Node>& n, const InfoHash& target, want_t want,
            RequestCb&& on_done, RequestExpiredCb&& on_expired) {
        Tid tid (n->getNewTid());
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        pk.pack(KEY_A); pk.pack_map(2 + (want>0?1:0));
          pk.pack(KEY_REQ_ID);     pk.pack(myid);
          pk.pack(KEY_REQ_TARGET); pk.pack(target);
        if (want > 0) {
          pk.pack(KEY_REQ_WANT);
          pk.pack_array(((want & WANT4)?1:0) + ((want & WANT6)?1:0));
          if (want & WANT4) pk.pack(AF_INET);
          if (want & WANT6) pk.pack(AF_INET6);
        }
    
        pk.pack(KEY_Q); pk.pack(QUERY_FIND);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_Q);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::FindNode, tid, n,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request& req_status, ParsedMessage&& msg) { /* on done */
                if (on_done) {
                    on_done(req_status, {std::forward<ParsedMessage>(msg)});
                }
            },
            [=](const Request& req_status, bool done) { /* on expired */
                if (on_expired) {
                    on_expired(req_status, done);
                }
            }
        );
        sendRequest(req);
        ++out_stats.find;
        return req;
    }
    
    
    Sp<Request>
    NetworkEngine::sendGetValues(const Sp<Node>& n, const InfoHash& info_hash, const Query& query, want_t want,
            RequestCb&& on_done, RequestExpiredCb&& on_expired) {
        Tid tid (n->getNewTid());
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        unsigned sendQuery = (not query.where.empty() or not query.select.empty()) ? 1 : 0;
        unsigned sendWant = (want > 0) ? 1 : 0;
    
        pk.pack(KEY_A);  pk.pack_map(2 + sendQuery + sendWant);
          pk.pack(KEY_REQ_ID); pk.pack(myid);
          pk.pack(KEY_REQ_H);  pk.pack(info_hash);
          if (sendQuery) {
            pk.pack(KEY_Q); pk.pack(query);
          }
          if (sendWant) {
            pk.pack(KEY_REQ_WANT);
            unsigned sendWant4 = (want & WANT4) ? 1 : 0;
            unsigned sendWant6 = (want & WANT6) ? 1 : 0;
            pk.pack_array(sendWant4 + sendWant6);
            if (sendWant4) pk.pack(AF_INET);
            if (sendWant6) pk.pack(AF_INET6);
          }
    
        pk.pack(KEY_Q); pk.pack(QUERY_GET);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_Q);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::GetValues, tid, n,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request& req_status, ParsedMessage&& msg) { /* on done */
                if (on_done) {
                    on_done(req_status, {std::forward<ParsedMessage>(msg)});
                }
            },
            [=](const Request& req_status, bool done) { /* on expired */
                if (on_expired) {
                    on_expired(req_status, done);
                }
            }
        );
        sendRequest(req);
        ++out_stats.get;
        return req;
    }
    
    SockAddr deserializeIPv4(const uint8_t* ni) {
        SockAddr addr;
        addr.setFamily(AF_INET);
        auto& sin = addr.getIPv4();
        std::memcpy(&sin.sin_addr, ni, 4);
        std::memcpy(&sin.sin_port, ni + 4, 2);
        return addr;
    }
    SockAddr deserializeIPv6(const uint8_t* ni) {
        SockAddr addr;
        addr.setFamily(AF_INET6);
        auto& sin6 = addr.getIPv6();
        std::memcpy(&sin6.sin6_addr, ni, 16);
        std::memcpy(&sin6.sin6_port, ni + 16, 2);
        return addr;
    }
    
    void
    NetworkEngine::deserializeNodes(ParsedMessage& msg, const SockAddr& from) {
        if (msg.nodes4_raw.size() % NODE4_INFO_BUF_LEN != 0 || msg.nodes6_raw.size() % NODE6_INFO_BUF_LEN != 0) {
            throw DhtProtocolException {DhtProtocolException::WRONG_NODE_INFO_BUF_LEN};
        }
        // deserialize nodes
        const auto& now = scheduler.time();
        for (unsigned i = 0, n = msg.nodes4_raw.size() / NODE4_INFO_BUF_LEN; i < n; i++) {
            const uint8_t* ni = msg.nodes4_raw.data() + i * NODE4_INFO_BUF_LEN;
            const auto& ni_id = *reinterpret_cast<const InfoHash*>(ni);
            if (ni_id == myid)
                continue;
            SockAddr addr = deserializeIPv4(ni + ni_id.size());
            if (addr.isLoopback() and from.getFamily() == AF_INET) {
                auto port = addr.getPort();
                addr = from;
                addr.setPort(port);
            }
            if (isMartian(addr) || isNodeBlacklisted(addr))
                continue;
            msg.nodes4.emplace_back(cache.getNode(ni_id, addr, now, false));
            onNewNode(msg.nodes4.back(), 0);
        }
        for (unsigned i = 0, n = msg.nodes6_raw.size() / NODE6_INFO_BUF_LEN; i < n; i++) {
            const uint8_t* ni = msg.nodes6_raw.data() + i * NODE6_INFO_BUF_LEN;
            const auto& ni_id = *reinterpret_cast<const InfoHash*>(ni);
            if (ni_id == myid)
                continue;
            SockAddr addr = deserializeIPv6(ni + ni_id.size());
            if (addr.isLoopback() and from.getFamily() == AF_INET6) {
                auto port = addr.getPort();
                addr = from;
                addr.setPort(port);
            }
            if (isMartian(addr) || isNodeBlacklisted(addr))
                continue;
            msg.nodes6.emplace_back(cache.getNode(ni_id, addr, now, false));
            onNewNode(msg.nodes6.back(), 0);
        }
    }
    
    std::vector<Blob>
    NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<Sp<Value>>& st)
    {
        auto svals = serializeValues(st);
        size_t total_size = 0;
        for (const auto& v : svals)
            total_size += v.size();
    
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack(KEY_REQ_VALUES);
        pk.pack_array(svals.size());
        // try to put everything in a single UDP packet
        if (svals.size() < 50 && total_size < MAX_PACKET_VALUE_SIZE) {
            for (const auto& b : svals)
                buffer.write((const char*)b.data(), b.size());
            // if (logger_)
            //     logger_->d("sending %lu bytes of values", total_size);
            svals.clear();
        } else {
            for (const auto& b : svals)
                pk.pack(b.size());
        }
        return svals;
    }
    
    void
    NetworkEngine::sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr)
    {
        msgpack::sbuffer buffer;
        unsigned i=0;
        for (const auto& v: svals) {
            size_t start {0}, end;
            do {
                end = std::min(start + MTU, v.size());
                buffer.clear();
                msgpack::packer<msgpack::sbuffer> pk(&buffer);
                pk.pack_map(3+(config.network?1:0));
                if (config.network) {
                    pk.pack(KEY_NETID); pk.pack(config.network);
                }
                pk.pack(KEY_Y); pk.pack(KEY_V);
                pk.pack(KEY_TID); pk.pack(tid);
                pk.pack(KEY_V); pk.pack_map(1);
                    pk.pack(i); pk.pack_map(2);
                        pk.pack("o"sv); pk.pack(start);
                        pk.pack("d"sv); pk.pack_bin(end-start);
                                                   pk.pack_bin_body((const char*)v.data()+start, end-start);
                send(addr, buffer.data(), buffer.size());
                start = end;
            } while (start != v.size());
            i++;
        }
    }
    
    void
    NetworkEngine::sendNodesValues(const SockAddr& addr, Tid tid, const Blob& nodes, const Blob& nodes6,
            const std::vector<Sp<Value>>& st, const Query& query, const Blob& token)
    {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(4+(config.network?1:0));
    
        pk.pack(KEY_R);
        pk.pack_map(2 + (not st.empty()?1:0) + (nodes.size()>0?1:0) + (nodes6.size()>0?1:0) + (not token.empty()?1:0));
        pk.pack(KEY_REQ_ID); pk.pack(myid);
        insertAddr(pk, addr);
        if (nodes.size() > 0) {
            pk.pack(KEY_REQ_NODES4);
            pk.pack_bin(nodes.size());
            pk.pack_bin_body((const char*)nodes.data(), nodes.size());
        }
        if (nodes6.size() > 0) {
            pk.pack(KEY_REQ_NODES6);
            pk.pack_bin(nodes6.size());
            pk.pack_bin_body((const char*)nodes6.data(), nodes6.size());
        }
        if (not token.empty()) {
            pk.pack(KEY_REQ_TOKEN); packToken(pk, token);
        }
        std::vector<Blob> svals {};
        if (not st.empty()) { /* pack complete values */
            if (query.select.empty()) {
                svals = packValueHeader(buffer, st);
            } else { /* pack fields */
                auto fields = query.select.getSelection();
                pk.pack(KEY_REQ_FIELDS);
                pk.pack_map(2);
                pk.pack("f"sv); pk.pack(fields);
                pk.pack("v"sv); pk.pack_array(st.size()*fields.size());
                for (const auto& v : st)
                    v->msgpack_pack_fields(fields, pk);
                //DHT_LOG_DBG("sending closest nodes (%d+%d nodes.), %u value headers containing %u fields",
                //        nodes.size(), nodes6.size(), st.size(), fields.size());
            }
        }
    
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_R);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        // send response
        send(addr, buffer.data(), buffer.size());
    
        // send parts
        if (not svals.empty())
            sendValueParts(tid, svals, addr);
    }
    
    Blob
    NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes)
    {
        std::sort(nodes.begin(), nodes.end(), [&](const Sp<Node>& a, const Sp<Node>& b){
            return id.xorCmp(a->id, b->id) < 0;
        });
        size_t nnode = std::min<size_t>(SEND_NODES, nodes.size());
        Blob bnodes;
        if (af == AF_INET) {
            bnodes.resize(NODE4_INFO_BUF_LEN * nnode);
            for (size_t i=0; i<nnode; i++) {
                const Node& n = *nodes[i];
                const auto& sin = n.getAddr().getIPv4();
                auto dest = bnodes.data() + NODE4_INFO_BUF_LEN * i;
                memcpy(dest, n.id.data(), HASH_LEN);
                memcpy(dest + HASH_LEN, &sin.sin_addr, sizeof(in_addr));
                memcpy(dest + HASH_LEN + sizeof(in_addr), &sin.sin_port, sizeof(in_port_t));
            }
        } else if (af == AF_INET6) {
            bnodes.resize(NODE6_INFO_BUF_LEN * nnode);
            for (size_t i=0; i<nnode; i++) {
                const Node& n = *nodes[i];
                const auto& sin6 = n.getAddr().getIPv6();
                auto dest = bnodes.data() + NODE6_INFO_BUF_LEN * i;
                memcpy(dest, n.id.data(), HASH_LEN);
                memcpy(dest + HASH_LEN, &sin6.sin6_addr, sizeof(in6_addr));
                memcpy(dest + HASH_LEN + sizeof(in6_addr), &sin6.sin6_port, sizeof(in_port_t));
            }
        }
        return bnodes;
    }
    
    std::pair<Blob, Blob>
    NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, want_t want,
            std::vector<Sp<Node>>& nodes4, std::vector<Sp<Node>>& nodes6)
    {
        if (want < 0)
            want = af == AF_INET ? WANT4 : WANT6;
    
        Blob bnodes4;
        if (want & WANT4)
            bnodes4 = bufferNodes(AF_INET, id, nodes4);
    
        Blob bnodes6;
        if (want & WANT6)
            bnodes6 = bufferNodes(AF_INET6, id, nodes6);
    
        return {std::move(bnodes4), std::move(bnodes6)};
    }
    
    Sp<Request>
    NetworkEngine::sendListen(const Sp<Node>& n,
            const InfoHash& hash,
            const Query& query,
            const Blob& token,
            Tid socketId,
            RequestCb&& on_done,
            RequestExpiredCb&& on_expired)
    {
        Tid tid (n->getNewTid());
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        auto has_query = not query.where.empty() or not query.select.empty();
        pk.pack(KEY_A); pk.pack_map(5 + has_query);
          pk.pack(KEY_REQ_ID);    pk.pack(myid);
          pk.pack(KEY_VERSION);   pk.pack(1);
          pk.pack(KEY_REQ_H);     pk.pack(hash);
          pk.pack(KEY_REQ_TOKEN); packToken(pk, token);
          pk.pack(KEY_REQ_SID);   pk.pack(socketId);
          if (has_query) {
              pk.pack(KEY_REQ_QUERY); pk.pack(query);
          }
    
        pk.pack(KEY_Q); pk.pack(QUERY_LISTEN);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_Q);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::Listen, tid, n,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request& req_status, ParsedMessage&& msg) { /* on done */
                if (on_done)
                    on_done(req_status, {std::forward<ParsedMessage>(msg)});
            },
            [=](const Request& req_status, bool done) { /* on expired */
                if (on_expired)
                    on_expired(req_status, done);
            }
        );
        sendRequest(req);
        ++out_stats.listen;
        return req;
    }
    
    void
    NetworkEngine::sendListenConfirmation(const SockAddr& addr, Tid tid) {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(4+(config.network?1:0));
    
        pk.pack(KEY_R); pk.pack_map(2);
          pk.pack(KEY_REQ_ID); pk.pack(myid);
          insertAddr(pk, addr);
    
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_R);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        send(addr, buffer.data(), buffer.size());
    }
    
    Sp<Request>
    NetworkEngine::sendAnnounceValue(const Sp<Node>& n,
            const InfoHash& infohash,
            const Sp<Value>& value,
            time_point created,
            const Blob& token,
            RequestCb&& on_done,
            RequestExpiredCb&& on_expired)
    {
        Tid tid (n->getNewTid());
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        pk.pack(KEY_A); pk.pack_map((created < scheduler.time() ? 5 : 4));
          pk.pack(KEY_REQ_ID);     pk.pack(myid);
          pk.pack(KEY_REQ_H);      pk.pack(infohash);
          auto v = packValueHeader(buffer, {value});
          if (created < scheduler.time()) {
              pk.pack(KEY_REQ_CREATION);
              pk.pack(to_time_t(created));
          }
          pk.pack(KEY_REQ_TOKEN);  pk.pack(token);
    
        pk.pack(KEY_Q);   pk.pack(QUERY_PUT);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y);   pk.pack(KEY_Q);
        pk.pack(KEY_UA);  pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::AnnounceValue, tid, n,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request& req_status, ParsedMessage&& msg) { /* on done */
                if (msg.value_id == Value::INVALID_ID) {
                    if (logger_)
                        logger_->d(infohash, "Unknown search or announce!");
                } else {
                    if (on_done) {
                        RequestAnswer answer {};
                        answer.vid = msg.value_id;
                        on_done(req_status, std::move(answer));
                    }
                }
            },
            [=](const Request& req_status, bool done) { /* on expired */
                if (on_expired) {
                    on_expired(req_status, done);
                }
            }
        );
        req->parts = std::move(v);
        sendRequest(req);
        ++out_stats.put;
        return req;
    }
    
    Sp<Request>
    NetworkEngine::sendUpdateValues(const Sp<Node>& n,
                                    const InfoHash& infohash,
                                    const std::vector<Sp<Value>>& values,
                                    time_point created,
                                    const Blob& token,
                                    const size_t& socket_id)
    {
        Tid tid (n->getNewTid());
        Tid sid (socket_id);
    
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        pk.pack(KEY_A); pk.pack_map((created < scheduler.time() ? 7 : 6));
          pk.pack(KEY_REQ_ID);     pk.pack(myid);
          pk.pack(KEY_VERSION);    pk.pack(1);
          pk.pack(KEY_REQ_H);      pk.pack(infohash);
          pk.pack(KEY_REQ_SID);   pk.pack(sid);
          auto v = packValueHeader(buffer, values);
          if (created < scheduler.time()) {
              pk.pack(KEY_REQ_CREATION);
              pk.pack(to_time_t(created));
          }
          pk.pack(KEY_REQ_TOKEN);  pk.pack(token);
    
        pk.pack(KEY_Q);   pk.pack(QUERY_UPDATE);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y);   pk.pack(KEY_Q);
        pk.pack(KEY_UA);  pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::UpdateValue, tid, n,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request&, ParsedMessage&&) { /* on done */ },
            [=](const Request&, bool) { /* on expired */ }
        );
        req->parts = std::move(v);
        sendRequest(req);
        ++out_stats.updateValue;
        return req;
    }
    
    Sp<Request>
    NetworkEngine::sendRefreshValue(const Sp<Node>& n,
                    const InfoHash& infohash,
                    const Value::Id& vid,
                    const Blob& token,
                    RequestCb&& on_done,
                    RequestErrorCb&& on_error,
                    RequestExpiredCb&& on_expired)
    {
        Tid tid (n->getNewTid());
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(5+(config.network?1:0));
    
        pk.pack(KEY_A); pk.pack_map(4);
          pk.pack(KEY_REQ_ID);       pk.pack(myid);
          pk.pack(KEY_REQ_H);        pk.pack(infohash);
          pk.pack(KEY_REQ_VALUE_ID); pk.pack(vid);
          pk.pack(KEY_REQ_TOKEN);    pk.pack(token);
    
        pk.pack(KEY_Q); pk.pack(QUERY_REFRESH);
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_Q);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        auto req = std::make_shared<Request>(MessageType::Refresh, tid, n,
            Blob(buffer.data(), buffer.data() + buffer.size()),
            [=](const Request& req_status, ParsedMessage&& msg) { /* on done */
                if (msg.value_id == Value::INVALID_ID) {
                    if (logger_)
                        logger_->d(infohash, "Unknown search or announce!");
                } else {
                    if (on_done) {
                        RequestAnswer answer {};
                        answer.vid = msg.value_id;
                        on_done(req_status, std::move(answer));
                    }
                }
            },
            on_error,
            [=](const Request& req_status, bool done) { /* on expired */
                if (on_expired) {
                    on_expired(req_status, done);
                }
            }
        );
        sendRequest(req);
        ++out_stats.refresh;
        return req;
    }
    
    void
    NetworkEngine::sendValueAnnounced(const SockAddr& addr, Tid tid, Value::Id vid) {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(4+(config.network?1:0));
    
        pk.pack(KEY_R); pk.pack_map(3);
          pk.pack(KEY_REQ_ID);  pk.pack(myid);
          pk.pack(KEY_REQ_VALUE_ID); pk.pack(vid);
          insertAddr(pk, addr);
    
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_R);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        send(addr, buffer.data(), buffer.size());
    }
    
    void
    NetworkEngine::sendError(const SockAddr& addr,
            Tid tid,
            uint16_t code,
            const std::string& message,
            bool include_id)
    {
        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> pk(&buffer);
        pk.pack_map(4 + (include_id?1:0) + (config.network?1:0));
    
        pk.pack(KEY_E); pk.pack_array(2);
          pk.pack(code);
          pk.pack(message);
    
        if (include_id) {
            pk.pack(KEY_R); pk.pack_map(1);
              pk.pack(KEY_REQ_ID); pk.pack(myid);
        }
    
        pk.pack(KEY_TID); pk.pack(tid);
        pk.pack(KEY_Y); pk.pack(KEY_E);
        pk.pack(KEY_UA); pk.pack(my_v);
        if (config.network) {
            pk.pack(KEY_NETID); pk.pack(config.network);
        }
    
        send(addr, buffer.data(), buffer.size());
    }
    
    void
    NetworkEngine::maintainRxBuffer(Tid tid)
    {
        auto msg = partial_messages.find(tid);
        if (msg != partial_messages.end()) {
            const auto& now = scheduler.time();
            if (msg->second.start + RX_MAX_PACKET_TIME < now
             || msg->second.last_part + RX_TIMEOUT < now) {
                if (logger_)
                    logger_->w("Dropping expired partial message from %s", msg->second.from.toString().c_str());
                partial_messages.erase(msg);
            }
        }
    }
    
    
    } /* namespace net  */
    } /* namespace dht */