Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht_proxy_client.cpp 48.19 KiB
/*
 *  Copyright (C) 2014-2020 Savoir-faire Linux Inc.
 *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
 *          Adrien Béraud <adrien.beraud@savoirfairelinux.com>
 *          Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

#include "dht_proxy_client.h"
#include "dhtrunner.h"
#include "op_cache.h"
#include "utils.h"

#include <http_parser.h>
#include <deque>

namespace dht {

struct DhtProxyClient::InfoState {
    std::atomic_uint ipv4 {0}, ipv6 {0};
    std::atomic_bool cancel {false};
};

struct DhtProxyClient::OperationState {
    std::atomic_bool ok {true};
    std::atomic_bool stop {false};
};

struct DhtProxyClient::Listener
{
    Listener(OpValueCache&& c):
        cache(std::move(c))
    {}

    unsigned callbackId;
    OpValueCache cache;
    CacheValueCallback cb;
    Sp<OperationState> opstate;
    std::shared_ptr<http::Request> request;
    std::unique_ptr<asio::steady_timer> refreshSubscriberTimer;
};

struct PermanentPut {
    PermanentPut(const Sp<Value>& v, std::unique_ptr<asio::steady_timer>&& j,
                 const Sp<std::atomic_bool>& o):
        value(v), refreshPutTimer(std::move(j)), ok(o)
    {}

    Sp<Value> value;
    std::unique_ptr<asio::steady_timer> refreshPutTimer;
    Sp<std::atomic_bool> ok;
};

struct DhtProxyClient::ProxySearch {
    SearchCache ops {};
    std::unique_ptr<asio::steady_timer> opExpirationTimer;
    std::map<size_t, Listener> listeners {};
    std::map<Value::Id, PermanentPut> puts {};
    std::set<Sp<Value>> pendingPuts  {};
};

struct LineSplit {
    void append(const char* d, size_t l) {
        buf_.insert(buf_.end(), d, d+l);
    }
    bool getLine(char c) {
        auto it = buf_.begin();
        while (it != buf_.end()) {
            if (*(it++) == c) {
                line_.clear();
                line_.insert(line_.end(), buf_.begin(), it);
                buf_.erase(buf_.begin(), it);
                return true;
            }
        }
        return false;
    }
    const std::string& line() const { return  line_; }
private:
    std::deque<char> buf_ {};
    std::string line_ {};
};

std::string
getRandomSessionId(size_t length = 8) {
    static constexpr const char chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!#$%&()*+,./:;<=>?@[]^_`{|}~";
    std::string str(length, 0);
    crypto::random_device rdev;
    std::uniform_int_distribution<> dist(0, (sizeof(chars)/sizeof(char)) - 2);
    std::generate_n( str.begin(), length, [&]{ return chars[dist(rdev)]; } );
    return str;
}

DhtProxyClient::DhtProxyClient() {}

DhtProxyClient::DhtProxyClient(
        std::shared_ptr<dht::crypto::Certificate> serverCA, dht::crypto::Identity clientIdentity,
        std::function<void()> signal, const std::string& serverHost,
        const std::string& pushClientId, std::shared_ptr<dht::Logger> logger)
    : DhtInterface(logger)
    , proxyUrl_(serverHost)
    , clientIdentity_(clientIdentity), serverCertificate_(serverCA)
    , pushClientId_(pushClientId), pushSessionId_(getRandomSessionId())
    , loopSignal_(signal)
    , jsonReader_(Json::CharReaderBuilder{}.newCharReader())
{
    jsonBuilder_["commentStyle"] = "None";
    jsonBuilder_["indentation"] = "";
    if (logger_) {
        if (serverCertificate_)
            logger_->d("[proxy:client] using ca certificate for ssl:\n%s",
                       serverCertificate_->toString(false/*chain*/).c_str());
        if (clientIdentity_.first and clientIdentity_.second)
            logger_->d("[proxy:client] using client certificate for ssl:\n%s",
                       clientIdentity_.second->toString(false/*chain*/).c_str());
    }
    // run http client
    httpClientThread_ = std::thread([this](){
        try {
            if (logger_)
                logger_->d("[proxy:client] starting io_context");
            // Ensures the httpContext_ won't run out of work
            auto work = asio::make_work_guard(httpContext_);
            httpContext_.run();
            if (logger_)
                logger_->d("[proxy:client] http client io_context stopped");
        }
        catch(const std::exception& ex){
            if (logger_)
                logger_->e("[proxy:client] run error: %s", ex.what());
        }
    });
    if (!proxyUrl_.empty())
        startProxy();
}

void
DhtProxyClient::startProxy()
{
    if (proxyUrl_.empty())
        return;

    if (logger_)
        logger_->d("[proxy:client] start proxy with %s", proxyUrl_.c_str());

    nextProxyConfirmationTimer_ = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now());
    nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1));

    listenerRestartTimer_ = std::make_unique<asio::steady_timer>(httpContext_);

    loopSignal_();
}

void
DhtProxyClient::handleProxyConfirm(const asio::error_code &ec)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:client] confirm error: %s", ec.message().c_str());
        return;
    }
    if (proxyUrl_.empty())
        return;
    getConnectivityStatus();
}

DhtProxyClient::~DhtProxyClient()
{
    stop();
}

void
DhtProxyClient::stop()
{
    if (not isDestroying_.exchange(true)) {
        resolver_.reset();
        cancelAllListeners();
        if (infoState_)
            infoState_->cancel = true;
        {
            std::lock_guard<std::mutex> lock(requestLock_);
            for (auto& request : requests_)
                request.second->cancel();
        }
        if (not httpContext_.stopped())
            httpContext_.stop();
        if (httpClientThread_.joinable())
            httpClientThread_.join();
        requests_.clear();
    }
}

std::vector<Sp<Value>>
DhtProxyClient::getLocal(const InfoHash& k, const Value::Filter& filter) const {
    std::lock_guard<std::mutex> lock(searchLock_);
    auto s = searches_.find(k);
    if (s == searches_.end())
        return {};
    return s->second.ops.get(filter);
}

Sp<Value>
DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const {
    std::lock_guard<std::mutex> lock(searchLock_);
    auto s = searches_.find(k);
    if (s == searches_.end())
        return {};
    return s->second.ops.get(id);
}

void
DhtProxyClient::cancelAllListeners()
{
    std::lock_guard<std::mutex> lock(searchLock_);
    if (logger_)
        logger_->d("[proxy:client] [listeners] [%zu searches] cancel all", searches_.size());
    for (auto& s: searches_) {
        s.second.ops.cancelAll([&](size_t token){
            auto l = s.second.listeners.find(token);
            if (l == s.second.listeners.end())
                return;
            l->second.opstate->stop.store(true);
            l->second.request->cancel();
            // implicit request.reset()
            s.second.listeners.erase(token);
        });
    }
}

void
DhtProxyClient::shutdown(ShutdownCallback cb, bool)
{
    stop();
    if (cb)
        cb();
}

NodeStatus
DhtProxyClient::getStatus(sa_family_t af) const
{
    std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
    switch (af)
    {
    case AF_INET:
        return statusIpv4_;
    case AF_INET6:
        return statusIpv6_;
    default:
        return NodeStatus::Disconnected;
    }
}

bool
DhtProxyClient::isRunning(sa_family_t af) const
{
    std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
    switch (af)
    {
    case AF_INET:
        return statusIpv4_ != NodeStatus::Disconnected;
    case AF_INET6:
        return statusIpv6_ != NodeStatus::Disconnected;
    default:
        return false;
    }
}

time_point
DhtProxyClient::periodic(const uint8_t*, size_t, SockAddr, const time_point& /*now*/)
{
    // Exec all currently stored callbacks
    decltype(callbacks_) callbacks;
    {
        std::lock_guard<std::mutex> lock(lockCallbacks_);
        callbacks = std::move(callbacks_);
    }
    for (auto& callback : callbacks)
        callback();
    callbacks.clear();
    return time_point::max();
}

void
DhtProxyClient::setHeaderFields(http::Request& request){
    request.set_header_field(restinio::http_field_t::accept, "*/*");
    request.set_header_field(restinio::http_field_t::content_type, "application/json");
}

void
DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w)
{
    if (logger_)
        logger_->d("[proxy:client] [get] [search %s]", key.to_c_str());

    if (isDestroying_) {
        if (donecb) donecb(false, {});
        return;
    }
    try {
        auto request = buildRequest("/" + key.toString());
        auto reqid = request->id();
        //request->set_connection_type(restinio::http_connection_header_t::keep_alive);
        request->set_method(restinio::http_method_get());
        setHeaderFields(*request);

        auto opstate = std::make_shared<OperationState>();
        Value::Filter filter = w.empty() ? f : f.chain(w.getFilter());

        auto rxBuf = std::make_shared<LineSplit>();
        request->add_on_body_callback([this, key, opstate, filter, rxBuf, cb](const char* at, size_t length){
            try {
                auto& b = *rxBuf;
                b.append(at, length);
                // one value per body line
                std::vector<Sp<Value>> values;
                while (b.getLine('\n') and !opstate->stop) {
                    std::string err;
                    Json::Value json;
                    const auto& line = b.line();
                    if (!jsonReader_->parse(line.data(), line.data() + line.size(), &json, &err)){
                        opstate->ok.store(false);
                        return;
                    }
                    auto value = std::make_shared<Value>(json);
                    if ((not filter or filter(*value)) and cb)
                        values.emplace_back(std::move(value));
                }
                if (not values.empty() and cb) {
                    {
                        std::lock_guard<std::mutex> lock(lockCallbacks_);
                        callbacks_.emplace_back([opstate, cb, values = std::move(values)](){
                            if (not opstate->stop.load() and not cb(values)){
                                opstate->stop.store(true);
                            }
                        });
                    }
                    loopSignal_();
                }
            } catch(const std::exception& e) {
                if (logger_)
                    logger_->e("[proxy:client] [get %s] body parsing error: %s", key.to_c_str(), e.what());
                opstate->ok.store(false);
            }
        });
        request->add_on_done_callback([this, reqid, opstate, donecb, key] (const http::Response& response){
            if (response.status_code != 200) {
                if (logger_)
                    logger_->e("[proxy:client] [get %s] failed with code=%i", key.to_c_str(), response.status_code);
                opstate->ok.store(false);
                if (not response.aborted and response.status_code == 0)
                    opFailed();
            }
            if (donecb) {
                {
                    std::lock_guard<std::mutex> lock(lockCallbacks_);
                    callbacks_.emplace_back([donecb, opstate](){
                        donecb(opstate->ok, {});
                        opstate->stop.store(true);
                    });
                }
                loopSignal_();
            }
            if (not isDestroying_) {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_.erase(reqid);
            }
        });
        {
            std::lock_guard<std::mutex> l(requestLock_);
            requests_[reqid] = request;
        }
        request->send();
    }
    catch (const std::exception &e){
        if (logger_)
            logger_->e("[proxy:client] [get %s] error: %s", key.to_c_str(), e.what());
    }
}

void
DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent)
{
    if (not val or isDestroying_) {
        if (cb) cb(false, {});
        return;
    }
    if (logger_)
        logger_->d("[proxy:client] [put] [search %s]", key.to_c_str());

    std::shared_ptr<std::atomic_bool> ok;
    if (permanent) {
        std::lock_guard<std::mutex> lock(searchLock_);
        ok = std::make_shared<std::atomic_bool>(true);
        auto& search = searches_[key];
        if (val->id) {
            auto id = val->id;
            auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN);
            refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
            search.puts.erase(id);
            search.puts.emplace(std::piecewise_construct,
                std::forward_as_tuple(id),
                std::forward_as_tuple(val, std::move(refreshPutTimer), ok));
        } else {
            search.pendingPuts.emplace(val);
        }
    }
    doPut(key, val, [this, cb, ok](bool result){
        if (ok)
            *ok = result;
        if (cb) {
            std::lock_guard<std::mutex> lock(lockCallbacks_);
            callbacks_.emplace_back([cb, result](){
                cb(result, {});
            });
        }
        loopSignal_();
    }, created, permanent);
}

void
DhtProxyClient::handleRefreshPut(const asio::error_code &ec, InfoHash key, Value::Id id)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:client] [put] [refresh %s] %s", key.toString().c_str(), ec.message().c_str());
        return;
    }
    if (logger_)
        logger_->d("[proxy:client] [put] [refresh %s]", key.to_c_str());
    std::lock_guard<std::mutex> lock(searchLock_);
    auto search = searches_.find(key);
    if (search != searches_.end()) {
        auto p = search->second.puts.find(id);
        if (p != search->second.puts.end()){
            doPut(key, p->second.value, [ok = p->second.ok](bool result){
                *ok = result;
            }, time_point::max(), true);
            p->second.refreshPutTimer->expires_after(proxy::OP_TIMEOUT - proxy::OP_MARGIN);
            p->second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
        }
    }
}

std::shared_ptr<http::Request>
DhtProxyClient::buildRequest(const std::string& target)
{
    auto resolver = resolver_;
    if (not resolver)
        resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_);
    auto request = target.empty()
        ? std::make_shared<http::Request>(httpContext_, resolver)
        : std::make_shared<http::Request>(httpContext_, resolver, target);
    if (serverCertificate_)
        request->set_certificate_authority(serverCertificate_);
    if (clientIdentity_.first and clientIdentity_.second)
        request->set_identity(clientIdentity_);
    request->set_header_field(restinio::http_field_t::user_agent, "RESTinio client");
    return request;
}

void
DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, time_point /*created*/, bool permanent)
{
    if (logger_)
        logger_->d("[proxy:client] [put] [search %s] executing for %s", key.to_c_str(), val->toString().c_str());

    try {
        auto request = buildRequest("/" + key.toString());
        auto reqid = request->id();
        request->set_method(restinio::http_method_post());
        setHeaderFields(*request);

        auto json = val->toJson();
        if (permanent) {
            if (deviceKey_.empty()) {
                json["permanent"] = true;
            } else {
#ifdef OPENDHT_PUSH_NOTIFICATIONS
                Json::Value refresh;
                getPushRequest(refresh);
                json["permanent"] = refresh;
#else
                json["permanent"] = true;
#endif
            }
        }
        request->set_body(Json::writeString(jsonBuilder_, json));
        request->add_on_done_callback([this, reqid, cb, val, key, permanent] (const http::Response& response){
            bool ok = response.status_code == 200;
            if (ok) {
                if (val->id == Value::INVALID_ID) {
                    std::string err;
                    Json::Value parsedValue;
                    if (jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &parsedValue, &err)){
                        auto id = dht::Value(parsedValue).id;
                        val->id = id;
                        if (permanent) {
                            std::lock_guard<std::mutex> lock(searchLock_);
                            auto& search = searches_[key];
                            auto it = search.pendingPuts.find(val);
                            if (it != search.pendingPuts.end()) {
                                auto sok = std::make_shared<std::atomic_bool>(ok);
                                auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN);
                                refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
                                search.puts.emplace(std::piecewise_construct,
                                    std::forward_as_tuple(id),
                                    std::forward_as_tuple(val, std::move(refreshPutTimer), sok));
                                search.pendingPuts.erase(it);
                            }
                        }
                    } else {
                        if (logger_)
                            logger_->e("[proxy:client] [status] failed to parse value from  server", response.status_code);
                    }
                }
            } else {
                if (logger_)
                    logger_->e("[proxy:client] [status] failed with code=%i", response.status_code);
                if (not response.aborted and response.status_code == 0)
                    opFailed();
            }
            if (cb)
                cb(ok);
            if (not isDestroying_) {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_.erase(reqid);
            }
        });
        {
            std::lock_guard<std::mutex> l(requestLock_);
            requests_[reqid] = request;
        }
        request->send();
    }
    catch (const std::exception &e){
        if (logger_)
            logger_->e("[proxy:client] [put %s] error: %s", key.to_c_str(), e.what());
    }
}

/**
 * Get data currently being put at the given hash.
 */
std::vector<Sp<Value>>
DhtProxyClient::getPut(const InfoHash& key) const {
    std::vector<Sp<Value>> ret;
    auto search = searches_.find(key);
    if (search != searches_.end()) {
        ret.reserve(search->second.puts.size());
        for (const auto& put : search->second.puts)
            ret.emplace_back(put.second.value);
    }
    return ret;
}

/**
 * Get data currently being put at the given hash with the given id.
 */
Sp<Value>
DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) const {
    auto search = searches_.find(key);
    if (search == searches_.end())
        return {};
    auto val = search->second.puts.find(id);
    if (val == search->second.puts.end())
        return {};
    return val->second.value;
}

/**
 * Stop any put/announce operation at the given location,
 * for the value with the given id.
 */
bool
DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id)
{
    auto search = searches_.find(key);
    if (search == searches_.end())
        return false;
    if (logger_)
        logger_->d("[proxy:client] [put] [search %s] cancel", key.to_c_str());
    return search->second.puts.erase(id) > 0;
}

NodeStats
DhtProxyClient::getNodesStats(sa_family_t af) const
{
    return af == AF_INET ? stats4_ : stats6_;
}

void
DhtProxyClient::getProxyInfos()
{
    if (logger_)
        logger_->d("[proxy:client] [info] requesting proxy server node information");
    auto infoState = std::make_shared<InfoState>();
    {
        std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
        if (infoState_)
            infoState_->cancel = true;
        infoState_ = infoState;
        if (statusIpv4_ == NodeStatus::Disconnected)
            statusIpv4_ = NodeStatus::Connecting;
        if (statusIpv6_ == NodeStatus::Disconnected)
            statusIpv6_ = NodeStatus::Connecting;
    }
    if (logger_)
        logger_->d("[proxy:client] [status] sending request");

    auto resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_);
    queryProxyInfo(infoState, resolver, AF_INET);
    queryProxyInfo(infoState, resolver, AF_INET6);
    resolver_ = resolver;
}

void
DhtProxyClient::queryProxyInfo(const Sp<InfoState>& infoState, const Sp<http::Resolver>& resolver, sa_family_t family)
{
    if (logger_)
        logger_->d("[proxy:client] [status] query ipv%i info", family == AF_INET ? 4 : 6);
    try {
        auto request = std::make_shared<http::Request>(httpContext_, resolver, family);
        if (serverCertificate_)
            request->set_certificate_authority(serverCertificate_);
        auto reqid = request->id();
        request->set_method(restinio::http_method_get());
        setHeaderFields(*request);
        request->add_on_done_callback([this, reqid, family, infoState] (const http::Response& response){
            if (infoState->cancel.load())
                return;
            if (response.status_code != 200) {
                if (logger_)
                    logger_->e("[proxy:client] [status] ipv%i failed with code=%i",
                                family == AF_INET ? 4 : 6, response.status_code);
                // pass along the failures
                if ((family == AF_INET and infoState->ipv4 == 0) or (family == AF_INET6 and infoState->ipv6 == 0))
                    onProxyInfos(Json::Value{}, family);
            } else {
                std::string err;
                Json::Value proxyInfos;
                if (!jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){
                    onProxyInfos(Json::Value{}, family);
                } else if (not infoState->cancel) {
                    onProxyInfos(proxyInfos, family);
                }
            }
            if (not isDestroying_) {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_.erase(reqid);
            }
        });

        if (infoState->cancel.load())
            return;
        {
            std::lock_guard<std::mutex> l(requestLock_);
            requests_[reqid] = request;
        }
        request->send();
    }
    catch (const std::exception &e){
        if (logger_)
            logger_->e("[proxy:client] [status] error sending request: %s", e.what());
    }
}

void
DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, const sa_family_t family)
{
    if (isDestroying_)
        return;
    std::unique_lock<std::mutex> l(lockCurrentProxyInfos_);
    auto oldStatus = std::max(statusIpv4_, statusIpv6_);
    auto& status = family == AF_INET ? statusIpv4_ : statusIpv6_;
    if (not proxyInfos.isMember("node_id")) {
        if (logger_)
            logger_->e("[proxy:client] [info] request failed for %s", family == AF_INET ? "ipv4" : "ipv6");
        status = NodeStatus::Disconnected;
    } else {
        if (logger_)
            logger_->d("[proxy:client] [info] got proxy reply for %s",
                       family == AF_INET ? "ipv4" : "ipv6");
        try {
            myid = InfoHash(proxyInfos["node_id"].asString());
            stats4_ = NodeStats(proxyInfos["ipv4"]);
            stats6_ = NodeStats(proxyInfos["ipv6"]);
            if (stats4_.good_nodes + stats6_.good_nodes)
                status = NodeStatus::Connected;
            else if (stats4_.dubious_nodes + stats6_.dubious_nodes)
                status = NodeStatus::Connecting;
            else
                status = NodeStatus::Disconnected;

            auto publicIp = parsePublicAddress(proxyInfos["public_ip"]);
            auto publicFamily = publicIp.getFamily();
            if (publicFamily == AF_INET)
                publicAddressV4_ = publicIp;
            else if (publicFamily == AF_INET6)
                publicAddressV6_ = publicIp;
        } catch (const std::exception& e) {
            if (logger_)
                logger_->e("[proxy:client] [info] error processing: %s", e.what());
        }
    }
    auto newStatus = std::max(statusIpv4_, statusIpv6_);
    if (newStatus == NodeStatus::Connected) {
        if (oldStatus == NodeStatus::Disconnected || oldStatus == NodeStatus::Connecting) {
            listenerRestartTimer_->expires_at(std::chrono::steady_clock::now());
            listenerRestartTimer_->async_wait(std::bind(&DhtProxyClient::restartListeners, this, std::placeholders::_1));
            if (not onConnectCallbacks_.empty()) {
                std::lock_guard<std::mutex> lock(lockCallbacks_);
                callbacks_.emplace_back([cbs = std::move(onConnectCallbacks_)]() mutable {
                    while (not cbs.empty()) {
                        cbs.front()();
                        cbs.pop();
                    }
                });
            }
        }
        nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(15));
        nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1));
    }
    else if (newStatus == NodeStatus::Disconnected) {
        nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(1));
        nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1));
    }
    l.unlock();
    loopSignal_();
}

SockAddr
DhtProxyClient::parsePublicAddress(const Json::Value& val)
{
    auto public_ip = val.asString();
    auto hostAndService = splitPort(public_ip);
    auto sa = SockAddr::resolve(hostAndService.first);
    if (sa.empty()) return {};
    return sa.front().getMappedIPv4();
}

std::vector<SockAddr>
DhtProxyClient::getPublicAddress(sa_family_t family)
{
    std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
    std::vector<SockAddr> result;
    if (publicAddressV6_ && family != AF_INET) result.emplace_back(publicAddressV6_);
    if (publicAddressV4_ && family != AF_INET6) result.emplace_back(publicAddressV4_);
    return result;
}

size_t
DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where)
{
    if (logger_)
        logger_->d("[proxy:client] [listen] [search %s]", key.to_c_str());
    if (isDestroying_)
        return 0;

    std::lock_guard<std::mutex> lock(searchLock_);
    auto& search = searches_[key];
    auto query = std::make_shared<Query>(Select{}, std::move(where));
    return search.ops.listen(cb, query, filter, [this, key](Sp<Query>, ValueCallback cb, SyncCallback) -> size_t {
        // Find search
        auto search = searches_.find(key);
        if (search == searches_.end()) {
            if (logger_)
                logger_->e("[proxy:client] [listen] [search %s] search not found", key.to_c_str());
            return 0;
        }
        if (logger_)
            logger_->d("[proxy:client] [listen] [search %s] sending %s", key.to_c_str(),
                  deviceKey_.empty() ? "listen" : "subscribe");
        // Add listener
        auto token = ++listenerToken_;
        auto l = search->second.listeners.find(token);
        if (l == search->second.listeners.end()) {
            l = search->second.listeners.emplace(std::piecewise_construct,
                    std::forward_as_tuple(token),
                    std::forward_as_tuple(std::move(cb))).first;
        } else {
            if (l->second.opstate)
                l->second.opstate->stop = true;
        }
        // Add cache callback
        auto opstate = std::make_shared<OperationState>();
        l->second.opstate = opstate;
        l->second.cb = [this,key,token,opstate](const std::vector<Sp<Value>>& values, bool expired, system_clock::time_point t){
            if (opstate->stop)
                return false;
            std::lock_guard<std::mutex> lock(searchLock_);
            auto s = searches_.find(key);
            if (s != searches_.end()) {
                auto l = s->second.listeners.find(token);
                if (l != s->second.listeners.end()) {
                    return l->second.cache.onValue(values, expired, t);
                }
            }
            return false;
        };
        if (not deviceKey_.empty()) {
            /*
             * Relaunch push listeners even if a timeout is not received
             * (if the proxy crash for any reason)
             */
            if (!l->second.refreshSubscriberTimer)
                l->second.refreshSubscriberTimer = std::make_unique<asio::steady_timer>(httpContext_);
            l->second.refreshSubscriberTimer->expires_at(std::chrono::steady_clock::now() +
                                                         proxy::OP_TIMEOUT - proxy::OP_MARGIN);
            l->second.refreshSubscriberTimer->async_wait(std::bind(&DhtProxyClient::handleResubscribe, this,
                                                         std::placeholders::_1, key, token, opstate));
        }
        ListenMethod method;
        restinio::http_request_header_t header;
        if (deviceKey_.empty()){ // listen
            method = ListenMethod::LISTEN;
            header.method(restinio::http_method_get());
            header.request_target("/key/" + key.toString() + "/listen");
        }
        else {
            method = ListenMethod::SUBSCRIBE;
            header.method(restinio::http_method_subscribe());
            header.request_target("/" + key.toString());
        }
        sendListen(header, l->second.cb, opstate, l->second, method);
        return token;
    });
}

void
DhtProxyClient::handleResubscribe(const asio::error_code &ec, const InfoHash& key,
                                  const size_t token, std::shared_ptr<OperationState> opstate)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:client] [resubscribe %s] %s", key.toString().c_str(), ec.message().c_str());
        return;
    }
    if (opstate->stop)
        return;
    std::lock_guard<std::mutex> lock(searchLock_);
    auto s = searches_.find(key);
    if (s != searches_.end()){
        auto l = s->second.listeners.find(token);
        if (l != s->second.listeners.end()) {
            resubscribe(key, token, l->second);
        }
        else {
            if (logger_)
                logger_->e("[proxy:client] [resubscribe %s] token not found", key.toString().c_str());
        }
    }
}

bool
DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken)
{
    if (logger_)
        logger_->d(key, "[proxy:client] [search %s] cancel listen %zu", key.to_c_str(), gtoken);

    std::lock_guard<std::mutex> lock(searchLock_);
    // find the listener in cache
    auto it = searches_.find(key);
    if (it == searches_.end())
        return false;
    auto& ops = it->second.ops;
    bool canceled = ops.cancelListen(gtoken, std::chrono::steady_clock::now());

    // define real cancel listen only once
    if (not it->second.opExpirationTimer)
        it->second.opExpirationTimer = std::make_unique<asio::steady_timer>(httpContext_, ops.getExpiration());
    else
        it->second.opExpirationTimer->expires_at(ops.getExpiration());
    it->second.opExpirationTimer->async_wait(std::bind(&DhtProxyClient::handleExpireListener, this, std::placeholders::_1, key));
    return canceled;
}

void
DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& key)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:client] [listen %s] error in cancel: %s", key.toString().c_str(), ec.message().c_str());
        return;
    }
    if (logger_)
        logger_->d("[proxy:client] [listen %s] expire listener", key.toString().c_str());

    std::lock_guard<std::mutex> lock(searchLock_);
    auto search = searches_.find(key);
    if (search == searches_.end())
        return;

    // everytime a new expiry is set, a previous gets aborted
    time_point next = search->second.ops.expire(std::chrono::steady_clock::now(), [&](size_t ltoken) {
        auto it = search->second.listeners.find(ltoken);
        if (it == search->second.listeners.end())
            return;

        auto& listener = it->second;
        listener.opstate->stop = true;

        if (not deviceKey_.empty()) {
            // UNSUBSCRIBE
            auto request = buildRequest("/" + key.toString());
            auto reqid = request->id();
            try {
                request->set_method(restinio::http_method_unsubscribe());
                setHeaderFields(*request);

                Json::Value body;
                body["key"] = deviceKey_;
                body["client_id"] = pushClientId_;
                request->set_body(Json::writeString(jsonBuilder_, body));
                request->add_on_done_callback([this, reqid, key] (const http::Response& response){
                    if (response.status_code != 200) {
                        if (logger_)
                            logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i",
                                        key.to_c_str(), response.status_code);
                        if (not response.aborted and response.status_code == 0)
                            opFailed();
                    }
                    if (not isDestroying_) {
                        std::lock_guard<std::mutex> l(requestLock_);
                        requests_.erase(reqid);
                    }
                });
                {
                    std::lock_guard<std::mutex> l(requestLock_);
                    requests_[reqid] = request;
                }
                request->send();
            }
            catch (const std::exception &e){
                if (logger_)
                     logger_->e("[proxy:client] [unsubscribe %s] failed: %s", key.to_c_str(), e.what());
            }
        } else {
            // stop the request
            listener.request.reset();
        }
        search->second.listeners.erase(it);
        if (logger_)
            logger_->d("[proxy:client] [listen:cancel] [search %s] %zu listener remaining",
                    key.to_c_str(), search->second.listeners.size());
    });
    if (next != time_point::max()){
        search->second.opExpirationTimer->expires_at(next);
        search->second.opExpirationTimer->async_wait(std::bind(
            &DhtProxyClient::handleExpireListener, this, std::placeholders::_1, key));
    }
    if (search->second.listeners.empty()){
        searches_.erase(search);
    }
}

void
DhtProxyClient::sendListen(const restinio::http_request_header_t& header,
                           const CacheValueCallback& cb,
                           const Sp<OperationState>& opstate,
                           Listener& listener, ListenMethod method)
{
    if (logger_)
        logger_->e("[proxy:client] [listen] sendListen: %d", (int)method);
    try {
        auto request = buildRequest();
        listener.request = request;
        auto reqid = request->id();
        request->set_header(header);
        setHeaderFields(*request);
        if (method == ListenMethod::LISTEN)
            request->set_connection_type(restinio::http_connection_header_t::keep_alive);
#ifdef OPENDHT_PUSH_NOTIFICATIONS
        std::string body;
        if (method != ListenMethod::LISTEN)
            body = fillBody(method == ListenMethod::RESUBSCRIBE);
        request->set_body(body);
#endif
        auto rxBuf = std::make_shared<LineSplit>();
        request->add_on_body_callback([this, reqid, opstate, rxBuf, cb](const char* at, size_t length){
            try {
                auto& b = *rxBuf;
                b.append(at, length);

                // one value per body line
                while (b.getLine('\n') and !opstate->stop) {
                    std::string err;
                    Json::Value json;
                    const auto& line = b.line();
                    if (!jsonReader_->parse(line.data(), line.data() + line.size(), &json, &err)){
                        opstate->ok.store(false);
                        return;
                    }
                    if (json.size() == 0) { // it's the end
                        break;
                    }

                    auto value = std::make_shared<Value>(json);
                    if (cb){
                        auto expired = json.get("expired", Json::Value(false)).asBool();
                        {
                            std::lock_guard<std::mutex> lock(lockCallbacks_);
                            callbacks_.emplace_back([cb, value, opstate, expired]() {
                                if (not opstate->stop.load() and not cb({value}, expired, system_clock::time_point::min()))
                                    opstate->stop.store(true);
                            });
                        }
                        loopSignal_();
                    }
                }
            } catch(const std::exception& e) {
                if (logger_)
                    logger_->e("[proxy:client] [listen] request #%i error in parsing: %s", reqid, e.what());
                opstate->ok.store(false);
            }
        });
        request->add_on_done_callback([this, opstate, reqid] (const http::Response& response) {
            if (response.status_code != 200) {
                if (logger_)
                    logger_->e("[proxy:client] [listen] send request #%i failed with code=%i",
                                reqid, response.status_code);
                opstate->ok.store(false);
                if (not response.aborted and response.status_code == 0)
                    opFailed();
            }
            if (not isDestroying_) {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_.erase(reqid);
            }
        });
        {
            std::lock_guard<std::mutex> l(requestLock_);
            requests_[reqid] = request;
        }
        request->send();
    }
    catch (const std::exception &e){
        if (logger_)
            logger_->e("[proxy:client] [listen] request failed: %s", e.what());
    }
}

void
DhtProxyClient::opFailed()
{
    if (isDestroying_)
        return;
    if (logger_)
        logger_->e("[proxy:client] proxy request failed");
    {
        std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
        statusIpv4_ = NodeStatus::Disconnected;
        statusIpv6_ = NodeStatus::Disconnected;
    }
    getConnectivityStatus();
    loopSignal_();
}

void
DhtProxyClient::getConnectivityStatus()
{
    if (logger_)
        logger_->d("[proxy:client] [connectivity] get status");
    if (!isDestroying_)
        getProxyInfos();
}

void
DhtProxyClient::restartListeners(const asio::error_code &ec)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:client] restart error: %s", ec.message().c_str());
        return;
    }

    if (isDestroying_)
        return;
    if (logger_)
        logger_->d("[proxy:client] [listeners] refresh permanent puts");

    std::lock_guard<std::mutex> lock(searchLock_);
    for (auto& search : searches_) {
        auto key = search.first;
        for (auto& put : search.second.puts) {
            doPut(key, put.second.value, [ok = put.second.ok](bool result){
                *ok = result;
            }, time_point::max(), true);
            if (!put.second.refreshPutTimer) {
                put.second.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_);
            }
            put.second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
            put.second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this,
                                                   std::placeholders::_1, key, put.first));
        }
    }
    if (not deviceKey_.empty()) {
        if (logger_)
            logger_->d("[proxy:client] [listeners] resubscribe due to a connectivity change");
        // Connectivity changed, refresh all subscribe
        for (auto& search : searches_)
            for (auto& listener : search.second.listeners)
                if (!listener.second.opstate->ok)
                    resubscribe(search.first, listener.first, listener.second);
        return;
    }
    if (logger_)
        logger_->d("[proxy:client] [listeners] restarting listeners");
    for (auto& search: searches_) {
        for (auto& l: search.second.listeners) {
            auto& listener = l.second;
            if (auto opstate = listener.opstate)
                opstate->stop = true;
            listener.request->cancel();
            listener.request.reset();
        }
    }
    for (auto& search: searches_) {
        for (auto& l: search.second.listeners) {
            auto& listener = l.second;
            auto opstate = listener.opstate;
            // Redo listen
            opstate->stop.store(false);
            opstate->ok.store(true);
            auto cb = listener.cb;
            // define header
            restinio::http_request_header_t header;
            header.method(restinio::http_method_get());
            header.request_target("/key/" + search.first.toString() + "/listen");
            sendListen(header, cb, opstate, listener, ListenMethod::LISTEN);
        }
    }
}

void
DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string>& notification)
{
#ifdef OPENDHT_PUSH_NOTIFICATIONS
    {
        // If a push notification is received, the proxy is up and running
        std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
        statusIpv4_ = NodeStatus::Connected;
        statusIpv6_ = NodeStatus::Connected;
    }
    auto launchLoop = false;
    try {
        auto sessionId = notification.find("s");
        if (sessionId != notification.end() and sessionId->second != pushSessionId_) {
            if (logger_)
                logger_->d("[proxy:client] [push] ignoring push for other session");
            return;
        }
        std::lock_guard<std::mutex> lock(searchLock_);
        auto timeout = notification.find("timeout");
        if (timeout != notification.cend()) {
            InfoHash key(timeout->second);
            auto& search = searches_.at(key);
            auto vidIt = notification.find("vid");
            if (vidIt != notification.end()) {
                // Refresh put
                auto vid = std::stoull(vidIt->second);
                auto& put = search.puts.at(vid);
                if (!put.refreshPutTimer)
                    put.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now());
                else
                    put.refreshPutTimer->expires_at(std::chrono::steady_clock::now());
                put.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, vid));
            } else {
                // Refresh listen
                for (auto& list : search.listeners)
                    resubscribe(key, list.first, list.second);
            }
        } else {
            auto key = InfoHash(notification.at("key"));
            system_clock::time_point sendTime = system_clock::time_point::min();
            try {
                sendTime = system_clock::time_point(std::chrono::milliseconds(std::stoull(notification.at("t"))));
            } catch (...) {}
            auto& search = searches_.at(key);
            for (auto& list : search.listeners) {
                if (list.second.opstate->stop)
                    continue;
                if (logger_)
                    logger_->d("[proxy:client] [push] [search %s] received", key.to_c_str());
                auto expired = notification.find("exp");
                auto token = list.first;
                auto opstate = list.second.opstate;
                if (expired == notification.end()) {
                    auto cb = list.second.cb;
                    auto oldValues = list.second.cache.getValues();
                    get(key, [cb, sendTime](const std::vector<Sp<Value>>& vals) {
                        return cb(vals, false, sendTime);
                    }, [cb, oldValues, sendTime](bool /*ok*/) {
                        // Decrement old values refcount to expire values not
                        // present in the new list
                        cb(oldValues, true, sendTime);
                    });
                } else {
                    std::stringstream ss(expired->second);
                    std::vector<Value::Id> ids;
                    while(ss.good()) {
                        std::string substr;
                        getline(ss, substr, ',');
                        ids.emplace_back(std::stoull(substr));
                    }
                    {
                        std::lock_guard<std::mutex> lock(lockCallbacks_);
                        callbacks_.emplace_back([this, key, token, opstate, ids, sendTime]() {
                            if (opstate->stop)
                                return;
                            std::lock_guard<std::mutex> lock(searchLock_);
                            auto s = searches_.find(key);
                            if (s == searches_.end())
                                return;
                            auto l = s->second.listeners.find(token);
                            if (l == s->second.listeners.end())
                                return;
                            if (not opstate->stop and not l->second.cache.onValuesExpired(ids, sendTime))
                                opstate->stop = true;
                        });
                    }
                    launchLoop = true;
                }
            }
        }
    } catch (const std::exception& e) {
        if (logger_)
            logger_->e("[proxy:client] [push] receive error: %s", e.what());
    }
    if (launchLoop)
        loopSignal_();
#else
    (void) notification;
#endif
}

void
DhtProxyClient::resubscribe(const InfoHash& key, const size_t token, Listener& listener)
{
#ifdef OPENDHT_PUSH_NOTIFICATIONS
    if (deviceKey_.empty())
        return;
    if (logger_)
        logger_->d("[proxy:client] [resubscribe] [search %s]", key.to_c_str());

    auto opstate = listener.opstate;
    listener.request.reset(); // This will update ok to false
    opstate->ok = true;

    restinio::http_request_header_t header;
    header.method(restinio::http_method_subscribe());
    header.request_target("/" + key.toString());
    if (!listener.refreshSubscriberTimer){
        listener.refreshSubscriberTimer = std::make_unique<asio::steady_timer>(httpContext_);
    }
    listener.refreshSubscriberTimer->expires_at(std::chrono::steady_clock::now() +
                                                proxy::OP_TIMEOUT - proxy::OP_MARGIN);
    listener.refreshSubscriberTimer->async_wait(std::bind(&DhtProxyClient::handleResubscribe, this,
                                                std::placeholders::_1, key, token, opstate));
    auto vcb = listener.cb;
    sendListen(header, vcb, opstate, listener, ListenMethod::RESUBSCRIBE);
#else
    (void) key;
    (void) listener;
#endif
}

#ifdef OPENDHT_PUSH_NOTIFICATIONS
void
DhtProxyClient::getPushRequest(Json::Value& body) const
{
    body["key"] = deviceKey_;
    body["client_id"] = pushClientId_;
    body["session_id"] = pushSessionId_;
#ifdef __ANDROID__
    body["platform"] = "android";
#endif
#ifdef __APPLE__
    body["platform"] = "apple";
#endif
}

std::string
DhtProxyClient::fillBody(bool resubscribe)
{
    // Fill body with
    // {
    //   "key":"device_key",
    // }
    Json::Value body;
    getPushRequest(body);
    if (resubscribe) {
        // This is the first listen, we want to retrieve previous values.
        body["refresh"] = true;
    }
    auto content = Json::writeString(jsonBuilder_, body) + "\n";
    std::replace(content.begin(), content.end(), '\n', ' ');
    return content;
}
#endif // OPENDHT_PUSH_NOTIFICATIONS

} // namespace dht