Skip to content
Snippets Groups Projects
Select Git revision
  • 441cad2e86c17eb5ebbada74518d479f52f63902
  • master default
  • windows_ci_static
  • c_link
  • cpack
  • windows_ci
  • cert_pk_id
  • proxy_push_result
  • cnode_put_id
  • update-windows-build
  • proxy
  • resubscribe_on_token_change
  • actions
  • client_mode
  • llhttp
  • search_node_add
  • crypto_aes_gcm_argon2
  • ios_notifications
  • log_fmt
  • v2asio
  • fix-msvc
  • v3.4.0
  • v3.3.1
  • v3.3.1rc1
  • v3.3.1rc2
  • v3.3.0
  • v3.2.0
  • v3.1.11
  • v3.1.10
  • v3.1.9
  • v3.1.8.2
  • v3.1.8.1
  • v3.1.8
  • v3.1.7
  • v3.1.6
  • v3.1.5
  • v3.1.4
  • v3.1.3
  • v3.1.2
  • v3.1
  • v3.0.1
41 results

dhtnode.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    dht_proxy_client.cpp 48.25 KiB
    /*
     *  Copyright (C) 2014-2020 Savoir-faire Linux Inc.
     *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
     *          Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *          Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <https://www.gnu.org/licenses/>.
     */
    
    #include "dht_proxy_client.h"
    #include "dhtrunner.h"
    #include "op_cache.h"
    #include "utils.h"
    
    #include <http_parser.h>
    #include <deque>
    
    namespace dht {
    
    struct DhtProxyClient::InfoState {
        std::atomic_uint ipv4 {0}, ipv6 {0};
        std::atomic_bool cancel {false};
    };
    
    struct DhtProxyClient::OperationState {
        std::atomic_bool ok {true};
        std::atomic_bool stop {false};
    };
    
    struct DhtProxyClient::Listener
    {
        Listener(OpValueCache&& c):
            cache(std::move(c))
        {}
    
        unsigned callbackId;
        OpValueCache cache;
        CacheValueCallback cb;
        Sp<OperationState> opstate;
        std::shared_ptr<http::Request> request;
        std::unique_ptr<asio::steady_timer> refreshSubscriberTimer;
    };
    
    struct PermanentPut {
        PermanentPut(const Sp<Value>& v, std::unique_ptr<asio::steady_timer>&& j,
                     const Sp<std::atomic_bool>& o):
            value(v), refreshPutTimer(std::move(j)), ok(o)
        {}
    
        Sp<Value> value;
        std::unique_ptr<asio::steady_timer> refreshPutTimer;
        Sp<std::atomic_bool> ok;
    };
    
    struct DhtProxyClient::ProxySearch {
        SearchCache ops {};
        std::unique_ptr<asio::steady_timer> opExpirationTimer;
        std::map<size_t, Listener> listeners {};
        std::map<Value::Id, PermanentPut> puts {};
        std::set<Sp<Value>> pendingPuts  {};
    };
    
    struct LineSplit {
        void append(const char* d, size_t l) {
            buf_.insert(buf_.end(), d, d+l);
        }
        bool getLine(char c) {
            auto it = buf_.begin();
            while (it != buf_.end()) {
                if (*(it++) == c) {
                    line_.clear();
                    line_.insert(line_.end(), buf_.begin(), it);
                    buf_.erase(buf_.begin(), it);
                    return true;
                }
            }
            return false;
        }
        const std::string& line() const { return  line_; }
    private:
        std::deque<char> buf_ {};
        std::string line_ {};
    };
    
    std::string
    getRandomSessionId(size_t length = 8) {
        static constexpr const char chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!#$%&()*+,./:;<=>?@[]^_`{|}~";
        std::string str(length, 0);
        crypto::random_device rdev;
        std::uniform_int_distribution<> dist(0, (sizeof(chars)/sizeof(char)) - 2);
        std::generate_n( str.begin(), length, [&]{ return chars[dist(rdev)]; } );
        return str;
    }
    
    DhtProxyClient::DhtProxyClient() {}
    
    DhtProxyClient::DhtProxyClient(
            std::shared_ptr<dht::crypto::Certificate> serverCA, dht::crypto::Identity clientIdentity,
            std::function<void()> signal, const std::string& serverHost,
            const std::string& pushClientId, std::shared_ptr<dht::Logger> logger)
        : DhtInterface(logger)
        , proxyUrl_(serverHost)
        , clientIdentity_(clientIdentity), serverCertificate_(serverCA)
        , pushClientId_(pushClientId), pushSessionId_(getRandomSessionId())
        , loopSignal_(signal)
        , jsonReader_(Json::CharReaderBuilder{}.newCharReader())
    {
        jsonBuilder_["commentStyle"] = "None";
        jsonBuilder_["indentation"] = "";
        if (logger_) {
            if (serverCertificate_)
                logger_->d("[proxy:client] using ca certificate for ssl:\n%s",
                           serverCertificate_->toString(false/*chain*/).c_str());
            if (clientIdentity_.first and clientIdentity_.second)
                logger_->d("[proxy:client] using client certificate for ssl:\n%s",
                           clientIdentity_.second->toString(false/*chain*/).c_str());
        }
        // run http client
        httpClientThread_ = std::thread([this](){
            try {
                if (logger_)
                    logger_->d("[proxy:client] starting io_context");
                // Ensures the httpContext_ won't run out of work
                auto work = asio::make_work_guard(httpContext_);
                httpContext_.run();
                if (logger_)
                    logger_->d("[proxy:client] http client io_context stopped");
            }
            catch(const std::exception& ex){
                if (logger_)
                    logger_->e("[proxy:client] run error: %s", ex.what());
            }
        });
        if (!proxyUrl_.empty())
            startProxy();
    }
    
    void
    DhtProxyClient::startProxy()
    {
        if (proxyUrl_.empty())
            return;
    
        if (logger_)
            logger_->d("[proxy:client] start proxy with %s", proxyUrl_.c_str());
    
        nextProxyConfirmationTimer_ = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now());
        nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1));
    
        listenerRestartTimer_ = std::make_unique<asio::steady_timer>(httpContext_);
    
        loopSignal_();
    }
    
    void
    DhtProxyClient::handleProxyConfirm(const asio::error_code &ec)
    {
        if (ec == asio::error::operation_aborted)
            return;
        else if (ec){
            if (logger_)
                logger_->e("[proxy:client] confirm error: %s", ec.message().c_str());
            return;
        }
        if (proxyUrl_.empty())
            return;
        getConnectivityStatus();
    }
    
    DhtProxyClient::~DhtProxyClient()
    {
        stop();
    }
    
    void
    DhtProxyClient::stop()
    {
        if (not isDestroying_.exchange(true)) {
            resolver_.reset();
            cancelAllListeners();
            if (infoState_)
                infoState_->cancel = true;
            {
                std::lock_guard<std::mutex> lock(requestLock_);
                for (auto& request : requests_)
                    request.second->cancel();
            }
            if (not httpContext_.stopped())
                httpContext_.stop();
            if (httpClientThread_.joinable())
                httpClientThread_.join();
            requests_.clear();
        }
    }
    
    std::vector<Sp<Value>>
    DhtProxyClient::getLocal(const InfoHash& k, const Value::Filter& filter) const {
        std::lock_guard<std::mutex> lock(searchLock_);
        auto s = searches_.find(k);
        if (s == searches_.end())
            return {};
        return s->second.ops.get(filter);
    }
    
    Sp<Value>
    DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const {
        std::lock_guard<std::mutex> lock(searchLock_);
        auto s = searches_.find(k);
        if (s == searches_.end())
            return {};
        return s->second.ops.get(id);
    }
    
    void
    DhtProxyClient::cancelAllListeners()
    {
        std::lock_guard<std::mutex> lock(searchLock_);
        if (logger_)
            logger_->d("[proxy:client] [listeners] [%zu searches] cancel all", searches_.size());
        for (auto& s: searches_) {
            s.second.ops.cancelAll([&](size_t token){
                auto l = s.second.listeners.find(token);
                if (l == s.second.listeners.end())
                    return;
                l->second.opstate->stop.store(true);
                l->second.request->cancel();
                // implicit request.reset()
                s.second.listeners.erase(token);
            });
        }
    }
    
    void
    DhtProxyClient::shutdown(ShutdownCallback cb, bool)
    {
        stop();
        if (cb)
            cb();
    }
    
    NodeStatus
    DhtProxyClient::getStatus(sa_family_t af) const
    {
        std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
        switch (af)
        {
        case AF_INET:
            return statusIpv4_;
        case AF_INET6:
            return statusIpv6_;
        default:
            return NodeStatus::Disconnected;
        }
    }
    
    bool
    DhtProxyClient::isRunning(sa_family_t af) const
    {
        std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
        switch (af)
        {
        case AF_INET:
            return statusIpv4_ != NodeStatus::Disconnected;
        case AF_INET6:
            return statusIpv6_ != NodeStatus::Disconnected;
        default:
            return false;
        }
    }
    
    time_point
    DhtProxyClient::periodic(const uint8_t*, size_t, SockAddr, const time_point& /*now*/)
    {
        // Exec all currently stored callbacks
        decltype(callbacks_) callbacks;
        {
            std::lock_guard<std::mutex> lock(lockCallbacks_);
            callbacks = std::move(callbacks_);
        }
        for (auto& callback : callbacks)
            callback();
        callbacks.clear();
        return time_point::max();
    }
    
    void
    DhtProxyClient::setHeaderFields(http::Request& request){
        request.set_header_field(restinio::http_field_t::accept, "*/*");
        request.set_header_field(restinio::http_field_t::content_type, "application/json");
    }
    
    void
    DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w)
    {
        if (logger_)
            logger_->d("[proxy:client] [get] [search %s]", key.to_c_str());
    
        if (isDestroying_) {
            if (donecb) donecb(false, {});
            return;
        }
        try {
            auto request = buildRequest("/" + key.toString());
            auto reqid = request->id();
            //request->set_connection_type(restinio::http_connection_header_t::keep_alive);
            request->set_method(restinio::http_method_get());
            setHeaderFields(*request);
    
            auto opstate = std::make_shared<OperationState>();
            Value::Filter filter = w.empty() ? f : f.chain(w.getFilter());
    
            auto rxBuf = std::make_shared<LineSplit>();
            request->add_on_body_callback([this, key, opstate, filter, rxBuf, cb](const char* at, size_t length){
                try {
                    auto& b = *rxBuf;
                    b.append(at, length);
                    // one value per body line
                    std::vector<Sp<Value>> values;
                    while (b.getLine('\n') and !opstate->stop) {
                        std::string err;
                        Json::Value json;
                        const auto& line = b.line();
                        if (!jsonReader_->parse(line.data(), line.data() + line.size(), &json, &err)){
                            opstate->ok.store(false);
                            return;
                        }
                        auto value = std::make_shared<Value>(json);
                        if ((not filter or filter(*value)) and cb)
                            values.emplace_back(std::move(value));
                    }
                    if (not values.empty() and cb) {
                        {
                            std::lock_guard<std::mutex> lock(lockCallbacks_);
                            callbacks_.emplace_back([opstate, cb, values = std::move(values)](){
                                if (not opstate->stop.load() and not cb(values)){
                                    opstate->stop.store(true);
                                }
                            });
                        }
                        loopSignal_();
                    }
                } catch(const std::exception& e) {
                    if (logger_)
                        logger_->e("[proxy:client] [get %s] body parsing error: %s", key.to_c_str(), e.what());
                    opstate->ok.store(false);
                }
            });
            request->add_on_done_callback([this, reqid, opstate, donecb, key] (const http::Response& response){
                if (response.status_code != 200) {
                    if (logger_)
                        logger_->e("[proxy:client] [get %s] failed with code=%i", key.to_c_str(), response.status_code);
                    opstate->ok.store(false);
                    if (not response.aborted and response.status_code == 0)
                        opFailed();
                }
                if (donecb) {
                    {
                        std::lock_guard<std::mutex> lock(lockCallbacks_);
                        callbacks_.emplace_back([donecb, opstate](){
                            donecb(opstate->ok, {});
                            opstate->stop.store(true);
                        });
                    }
                    loopSignal_();
                }
                if (not isDestroying_) {
                    std::lock_guard<std::mutex> l(requestLock_);
                    requests_.erase(reqid);
                }
            });
            {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_[reqid] = request;
            }
            request->send();
        }
        catch (const std::exception &e){
            if (logger_)
                logger_->e("[proxy:client] [get %s] error: %s", key.to_c_str(), e.what());
        }
    }
    
    void
    DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent)
    {
        if (not val or isDestroying_) {
            if (cb) cb(false, {});
            return;
        }
        if (logger_)
            logger_->d("[proxy:client] [put] [search %s]", key.to_c_str());
    
        std::shared_ptr<std::atomic_bool> ok;
        if (permanent) {
            std::lock_guard<std::mutex> lock(searchLock_);
            ok = std::make_shared<std::atomic_bool>(true);
            auto& search = searches_[key];
            if (val->id) {
                auto id = val->id;
                auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN);
                refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
                search.puts.erase(id);
                search.puts.emplace(std::piecewise_construct,
                    std::forward_as_tuple(id),
                    std::forward_as_tuple(val, std::move(refreshPutTimer), ok));
            } else {
                search.pendingPuts.emplace(val);
            }
        }
        doPut(key, val, [this, cb, ok](bool result){
            if (ok)
                *ok = result;
            if (cb) {
                std::lock_guard<std::mutex> lock(lockCallbacks_);
                callbacks_.emplace_back([cb, result](){
                    cb(result, {});
                });
            }
            loopSignal_();
        }, created, permanent);
    }
    
    void
    DhtProxyClient::handleRefreshPut(const asio::error_code &ec, InfoHash key, Value::Id id)
    {
        if (ec == asio::error::operation_aborted)
            return;
        else if (ec){
            if (logger_)
                logger_->e("[proxy:client] [put] [refresh %s] %s", key.toString().c_str(), ec.message().c_str());
            return;
        }
        if (logger_)
            logger_->d("[proxy:client] [put] [refresh %s]", key.to_c_str());
        std::lock_guard<std::mutex> lock(searchLock_);
        auto search = searches_.find(key);
        if (search != searches_.end()) {
            auto p = search->second.puts.find(id);
            if (p != search->second.puts.end()){
                doPut(key, p->second.value, [ok = p->second.ok](bool result){
                    *ok = result;
                }, time_point::max(), true);
                p->second.refreshPutTimer->expires_after(proxy::OP_TIMEOUT - proxy::OP_MARGIN);
                p->second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
            }
        }
    }
    
    std::shared_ptr<http::Request>
    DhtProxyClient::buildRequest(const std::string& target)
    {
        auto resolver = resolver_;
        if (not resolver)
            resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_);
        auto request = target.empty()
            ? std::make_shared<http::Request>(httpContext_, resolver)
            : std::make_shared<http::Request>(httpContext_, resolver, target);
        if (serverCertificate_)
            request->set_certificate_authority(serverCertificate_);
        if (clientIdentity_.first and clientIdentity_.second)
            request->set_identity(clientIdentity_);
        request->set_header_field(restinio::http_field_t::user_agent, "RESTinio client");
        return request;
    }
    
    void
    DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, time_point /*created*/, bool permanent)
    {
        if (logger_)
            logger_->d("[proxy:client] [put] [search %s] executing for %s", key.to_c_str(), val->toString().c_str());
    
        try {
            auto request = buildRequest("/" + key.toString());
            auto reqid = request->id();
            request->set_method(restinio::http_method_post());
            setHeaderFields(*request);
    
            auto json = val->toJson();
            if (permanent) {
                if (deviceKey_.empty()) {
                    json["permanent"] = true;
                } else {
    #ifdef OPENDHT_PUSH_NOTIFICATIONS
                    Json::Value refresh;
                    getPushRequest(refresh);
                    json["permanent"] = refresh;
    #else
                    json["permanent"] = true;
    #endif
                }
            }
            request->set_body(Json::writeString(jsonBuilder_, json));
            request->add_on_done_callback([this, reqid, cb, val, key, permanent] (const http::Response& response){
                bool ok = response.status_code == 200;
                if (ok) {
                    if (val->id == Value::INVALID_ID) {
                        std::string err;
                        Json::Value parsedValue;
                        if (jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &parsedValue, &err)){
                            auto id = dht::Value(parsedValue).id;
                            val->id = id;
                            if (permanent) {
                                std::lock_guard<std::mutex> lock(searchLock_);
                                auto& search = searches_[key];
                                auto it = search.pendingPuts.find(val);
                                if (it != search.pendingPuts.end()) {
                                    auto sok = std::make_shared<std::atomic_bool>(ok);
                                    auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN);
                                    refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
                                    search.puts.emplace(std::piecewise_construct,
                                        std::forward_as_tuple(id),
                                        std::forward_as_tuple(val, std::move(refreshPutTimer), sok));
                                    search.pendingPuts.erase(it);
                                }
                            }
                        } else {
                            if (logger_)
                                logger_->e("[proxy:client] [status] failed to parse value from  server", response.status_code);
                        }
                    }
                } else {
                    if (logger_)
                        logger_->e("[proxy:client] [status] failed with code=%i", response.status_code);
                    if (not response.aborted and response.status_code == 0)
                        opFailed();
                }
                if (cb)
                    cb(ok);
                if (not isDestroying_) {
                    std::lock_guard<std::mutex> l(requestLock_);
                    requests_.erase(reqid);
                }
            });
            {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_[reqid] = request;
            }
            request->send();
        }
        catch (const std::exception &e){
            if (logger_)
                logger_->e("[proxy:client] [put %s] error: %s", key.to_c_str(), e.what());
        }
    }
    
    /**
     * Get data currently being put at the given hash.
     */
    std::vector<Sp<Value>>
    DhtProxyClient::getPut(const InfoHash& key) const {
        std::vector<Sp<Value>> ret;
        auto search = searches_.find(key);
        if (search != searches_.end()) {
            ret.reserve(search->second.puts.size());
            for (const auto& put : search->second.puts)
                ret.emplace_back(put.second.value);
        }
        return ret;
    }
    
    /**
     * Get data currently being put at the given hash with the given id.
     */
    Sp<Value>
    DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) const {
        auto search = searches_.find(key);
        if (search == searches_.end())
            return {};
        auto val = search->second.puts.find(id);
        if (val == search->second.puts.end())
            return {};
        return val->second.value;
    }
    
    /**
     * Stop any put/announce operation at the given location,
     * for the value with the given id.
     */
    bool
    DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id)
    {
        auto search = searches_.find(key);
        if (search == searches_.end())
            return false;
        if (logger_)
            logger_->d("[proxy:client] [put] [search %s] cancel", key.to_c_str());
        return search->second.puts.erase(id) > 0;
    }
    
    NodeStats
    DhtProxyClient::getNodesStats(sa_family_t af) const
    {
        return af == AF_INET ? stats4_ : stats6_;
    }
    
    void
    DhtProxyClient::getProxyInfos()
    {
        if (logger_)
            logger_->d("[proxy:client] [info] requesting proxy server node information");
        auto infoState = std::make_shared<InfoState>();
        {
            std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
            if (infoState_)
                infoState_->cancel = true;
            infoState_ = infoState;
            if (statusIpv4_ == NodeStatus::Disconnected)
                statusIpv4_ = NodeStatus::Connecting;
            if (statusIpv6_ == NodeStatus::Disconnected)
                statusIpv6_ = NodeStatus::Connecting;
        }
        if (logger_)
            logger_->d("[proxy:client] [status] sending request");
    
        auto resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_);
        queryProxyInfo(infoState, resolver, AF_INET);
        queryProxyInfo(infoState, resolver, AF_INET6);
        resolver_ = resolver;
    }
    
    void
    DhtProxyClient::queryProxyInfo(const Sp<InfoState>& infoState, const Sp<http::Resolver>& resolver, sa_family_t family)
    {
        if (logger_)
            logger_->d("[proxy:client] [status] query ipv%i info", family == AF_INET ? 4 : 6);
        try {
            auto request = std::make_shared<http::Request>(httpContext_, resolver, family);
            if (serverCertificate_)
                request->set_certificate_authority(serverCertificate_);
            auto reqid = request->id();
            request->set_method(restinio::http_method_get());
            setHeaderFields(*request);
            request->add_on_done_callback([this, reqid, family, infoState] (const http::Response& response){
                if (infoState->cancel.load())
                    return;
                if (response.status_code != 200) {
                    if (logger_)
                        logger_->e("[proxy:client] [status] ipv%i failed with code=%i",
                                    family == AF_INET ? 4 : 6, response.status_code);
                    // pass along the failures
                    if ((family == AF_INET and infoState->ipv4 == 0) or (family == AF_INET6 and infoState->ipv6 == 0))
                        onProxyInfos(Json::Value{}, family);
                } else {
                    std::string err;
                    Json::Value proxyInfos;
                    if (!jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){
                        onProxyInfos(Json::Value{}, family);
                    } else if (not infoState->cancel) {
                        onProxyInfos(proxyInfos, family);
                    }
                }
                if (not isDestroying_) {
                    std::lock_guard<std::mutex> l(requestLock_);
                    requests_.erase(reqid);
                }
            });
    
            if (infoState->cancel.load())
                return;
            {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_[reqid] = request;
            }
            request->send();
        }
        catch (const std::exception &e){
            if (logger_)
                logger_->e("[proxy:client] [status] error sending request: %s", e.what());
        }
    }
    
    void
    DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, const sa_family_t family)
    {
        if (isDestroying_)
            return;
        std::unique_lock<std::mutex> l(lockCurrentProxyInfos_);
        auto oldStatus = std::max(statusIpv4_, statusIpv6_);
        auto& status = family == AF_INET ? statusIpv4_ : statusIpv6_;
        if (not proxyInfos.isMember("node_id")) {
            if (logger_)
                logger_->e("[proxy:client] [info] request failed for %s", family == AF_INET ? "ipv4" : "ipv6");
            status = NodeStatus::Disconnected;
        } else {
            if (logger_)
                logger_->d("[proxy:client] [info] got proxy reply for %s",
                           family == AF_INET ? "ipv4" : "ipv6");
            try {
                myid = InfoHash(proxyInfos["node_id"].asString());
                stats4_ = NodeStats(proxyInfos["ipv4"]);
                stats6_ = NodeStats(proxyInfos["ipv6"]);
                if (stats4_.good_nodes + stats6_.good_nodes)
                    status = NodeStatus::Connected;
                else if (stats4_.dubious_nodes + stats6_.dubious_nodes)
                    status = NodeStatus::Connecting;
                else
                    status = NodeStatus::Disconnected;
    
                auto publicIp = parsePublicAddress(proxyInfos["public_ip"]);
                auto publicFamily = publicIp.getFamily();
                if (publicFamily == AF_INET)
                    publicAddressV4_ = publicIp;
                else if (publicFamily == AF_INET6)
                    publicAddressV6_ = publicIp;
            } catch (const std::exception& e) {
                if (logger_)
                    logger_->e("[proxy:client] [info] error processing: %s", e.what());
            }
        }
        auto newStatus = std::max(statusIpv4_, statusIpv6_);
        if (newStatus == NodeStatus::Connected) {
            if (oldStatus == NodeStatus::Disconnected || oldStatus == NodeStatus::Connecting) {
                listenerRestartTimer_->expires_at(std::chrono::steady_clock::now());
                listenerRestartTimer_->async_wait(std::bind(&DhtProxyClient::restartListeners, this, std::placeholders::_1));
                if (not onConnectCallbacks_.empty()) {
                    std::lock_guard<std::mutex> lock(lockCallbacks_);
                    callbacks_.emplace_back([cbs = std::move(onConnectCallbacks_)]() mutable {
                        while (not cbs.empty()) {
                            cbs.front()();
                            cbs.pop();
                        }
                    });
                }
            }
            nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(15));
            nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1));
        }
        else if (newStatus == NodeStatus::Disconnected) {
            nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(1));
            nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1));
        }
        l.unlock();
        loopSignal_();
    }
    
    SockAddr
    DhtProxyClient::parsePublicAddress(const Json::Value& val)
    {
        auto public_ip = val.asString();
        auto hostAndService = splitPort(public_ip);
        auto sa = SockAddr::resolve(hostAndService.first);
        if (sa.empty()) return {};
        return sa.front().getMappedIPv4();
    }
    
    std::vector<SockAddr>
    DhtProxyClient::getPublicAddress(sa_family_t family)
    {
        std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
        std::vector<SockAddr> result;
        if (publicAddressV6_ && family != AF_INET) result.emplace_back(publicAddressV6_);
        if (publicAddressV4_ && family != AF_INET6) result.emplace_back(publicAddressV4_);
        return result;
    }
    
    size_t
    DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where)
    {
        if (logger_)
            logger_->d("[proxy:client] [listen] [search %s]", key.to_c_str());
        if (isDestroying_)
            return 0;
    
        std::lock_guard<std::mutex> lock(searchLock_);
        auto& search = searches_[key];
        auto query = std::make_shared<Query>(Select{}, std::move(where));
        return search.ops.listen(cb, query, filter, [this, key](Sp<Query>, ValueCallback cb, SyncCallback) -> size_t {
            // Find search
            auto search = searches_.find(key);
            if (search == searches_.end()) {
                if (logger_)
                    logger_->e("[proxy:client] [listen] [search %s] search not found", key.to_c_str());
                return 0;
            }
            if (logger_)
                logger_->d("[proxy:client] [listen] [search %s] sending %s", key.to_c_str(),
                      deviceKey_.empty() ? "listen" : "subscribe");
            // Add listener
            auto token = ++listenerToken_;
            auto l = search->second.listeners.find(token);
            if (l == search->second.listeners.end()) {
                l = search->second.listeners.emplace(std::piecewise_construct,
                        std::forward_as_tuple(token),
                        std::forward_as_tuple(std::move(cb))).first;
            } else {
                if (l->second.opstate)
                    l->second.opstate->stop = true;
            }
            // Add cache callback
            auto opstate = std::make_shared<OperationState>();
            l->second.opstate = opstate;
            l->second.cb = [this,key,token,opstate](const std::vector<Sp<Value>>& values, bool expired, system_clock::time_point t){
                if (opstate->stop)
                    return false;
                std::lock_guard<std::mutex> lock(searchLock_);
                auto s = searches_.find(key);
                if (s != searches_.end()) {
                    auto l = s->second.listeners.find(token);
                    if (l != s->second.listeners.end()) {
                        return l->second.cache.onValue(values, expired, t);
                    }
                }
                return false;
            };
            if (not deviceKey_.empty()) {
                /*
                 * Relaunch push listeners even if a timeout is not received
                 * (if the proxy crash for any reason)
                 */
                if (!l->second.refreshSubscriberTimer)
                    l->second.refreshSubscriberTimer = std::make_unique<asio::steady_timer>(httpContext_);
                l->second.refreshSubscriberTimer->expires_at(std::chrono::steady_clock::now() +
                                                             proxy::OP_TIMEOUT - proxy::OP_MARGIN);
                l->second.refreshSubscriberTimer->async_wait(std::bind(&DhtProxyClient::handleResubscribe, this,
                                                             std::placeholders::_1, key, token, opstate));
            }
            ListenMethod method;
            restinio::http_request_header_t header;
            if (deviceKey_.empty()){ // listen
                method = ListenMethod::LISTEN;
                header.method(restinio::http_method_get());
                header.request_target("/key/" + key.toString() + "/listen");
            }
            else {
                method = ListenMethod::SUBSCRIBE;
                header.method(restinio::http_method_subscribe());
                header.request_target("/" + key.toString());
            }
            sendListen(header, l->second.cb, opstate, l->second, method);
            return token;
        });
    }
    
    void
    DhtProxyClient::handleResubscribe(const asio::error_code &ec, const InfoHash& key,
                                      const size_t token, std::shared_ptr<OperationState> opstate)
    {
        if (ec == asio::error::operation_aborted)
            return;
        else if (ec){
            if (logger_)
                logger_->e("[proxy:client] [resubscribe %s] %s", key.toString().c_str(), ec.message().c_str());
            return;
        }
        if (opstate->stop)
            return;
        std::lock_guard<std::mutex> lock(searchLock_);
        auto s = searches_.find(key);
        if (s != searches_.end()){
            auto l = s->second.listeners.find(token);
            if (l != s->second.listeners.end()) {
                resubscribe(key, token, l->second);
            }
            else {
                if (logger_)
                    logger_->e("[proxy:client] [resubscribe %s] token not found", key.toString().c_str());
            }
        }
    }
    
    bool
    DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken)
    {
        if (logger_)
            logger_->d(key, "[proxy:client] [search %s] cancel listen %zu", key.to_c_str(), gtoken);
    
        std::lock_guard<std::mutex> lock(searchLock_);
        // find the listener in cache
        auto it = searches_.find(key);
        if (it == searches_.end())
            return false;
        auto& ops = it->second.ops;
        bool canceled = ops.cancelListen(gtoken, std::chrono::steady_clock::now());
    
        // define real cancel listen only once
        if (not it->second.opExpirationTimer)
            it->second.opExpirationTimer = std::make_unique<asio::steady_timer>(httpContext_, ops.getExpiration());
        else
            it->second.opExpirationTimer->expires_at(ops.getExpiration());
        it->second.opExpirationTimer->async_wait(std::bind(&DhtProxyClient::handleExpireListener, this, std::placeholders::_1, key));
        return canceled;
    }
    
    void
    DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& key)
    {
        if (ec == asio::error::operation_aborted)
            return;
        else if (ec){
            if (logger_)
                logger_->e("[proxy:client] [listen %s] error in cancel: %s", key.toString().c_str(), ec.message().c_str());
            return;
        }
        if (logger_)
            logger_->d("[proxy:client] [listen %s] expire listener", key.toString().c_str());
    
        std::lock_guard<std::mutex> lock(searchLock_);
        auto search = searches_.find(key);
        if (search == searches_.end())
            return;
    
        // everytime a new expiry is set, a previous gets aborted
        time_point next = search->second.ops.expire(std::chrono::steady_clock::now(), [&](size_t ltoken) {
            auto it = search->second.listeners.find(ltoken);
            if (it == search->second.listeners.end())
                return;
    
            auto& listener = it->second;
            listener.opstate->stop = true;
    
            if (not deviceKey_.empty()) {
                // UNSUBSCRIBE
                auto request = buildRequest("/" + key.toString());
                auto reqid = request->id();
                try {
                    request->set_method(restinio::http_method_unsubscribe());
                    setHeaderFields(*request);
    
                    Json::Value body;
                    body["key"] = deviceKey_;
                    body["client_id"] = pushClientId_;
                    request->set_body(Json::writeString(jsonBuilder_, body));
                    request->add_on_done_callback([this, reqid, key] (const http::Response& response){
                        if (response.status_code != 200) {
                            if (logger_)
                                logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i",
                                            key.to_c_str(), response.status_code);
                            if (not response.aborted and response.status_code == 0)
                                opFailed();
                        }
                        if (not isDestroying_) {
                            std::lock_guard<std::mutex> l(requestLock_);
                            requests_.erase(reqid);
                        }
                    });
                    {
                        std::lock_guard<std::mutex> l(requestLock_);
                        requests_[reqid] = request;
                    }
                    request->send();
                }
                catch (const std::exception &e){
                    if (logger_)
                         logger_->e("[proxy:client] [unsubscribe %s] failed: %s", key.to_c_str(), e.what());
                }
            } else {
                // stop the request
                listener.request.reset();
            }
            search->second.listeners.erase(it);
            if (logger_)
                logger_->d("[proxy:client] [listen:cancel] [search %s] %zu listener remaining",
                        key.to_c_str(), search->second.listeners.size());
        });
        if (next != time_point::max()){
            search->second.opExpirationTimer->expires_at(next);
            search->second.opExpirationTimer->async_wait(std::bind(
                &DhtProxyClient::handleExpireListener, this, std::placeholders::_1, key));
        }
        if (search->second.listeners.empty()){
            searches_.erase(search);
        }
    }
    
    void
    DhtProxyClient::sendListen(const restinio::http_request_header_t& header,
                               const CacheValueCallback& cb,
                               const Sp<OperationState>& opstate,
                               Listener& listener, ListenMethod method)
    {
        if (logger_)
            logger_->e("[proxy:client] [listen] sendListen: %d", (int)method);
        try {
            auto request = buildRequest();
            listener.request = request;
            auto reqid = request->id();
            request->set_header(header);
            setHeaderFields(*request);
            if (method == ListenMethod::LISTEN)
                request->set_connection_type(restinio::http_connection_header_t::keep_alive);
    #ifdef OPENDHT_PUSH_NOTIFICATIONS
            std::string body;
            if (method != ListenMethod::LISTEN)
                body = fillBody(method == ListenMethod::RESUBSCRIBE);
            request->set_body(body);
    #endif
            auto rxBuf = std::make_shared<LineSplit>();
            request->add_on_body_callback([this, reqid, opstate, rxBuf, cb](const char* at, size_t length){
                try {
                    auto& b = *rxBuf;
                    b.append(at, length);
    
                    // one value per body line
                    while (b.getLine('\n') and !opstate->stop) {
                        std::string err;
                        Json::Value json;
                        const auto& line = b.line();
                        if (!jsonReader_->parse(line.data(), line.data() + line.size(), &json, &err)){
                            opstate->ok.store(false);
                            return;
                        }
                        if (json.size() == 0) { // it's the end
                            break;
                        }
    
                        auto value = std::make_shared<Value>(json);
                        if (cb){
                            auto expired = json.get("expired", Json::Value(false)).asBool();
                            {
                                std::lock_guard<std::mutex> lock(lockCallbacks_);
                                callbacks_.emplace_back([cb, value, opstate, expired]() {
                                    if (not opstate->stop.load() and not cb({value}, expired, system_clock::time_point::min()))
                                        opstate->stop.store(true);
                                });
                            }
                            loopSignal_();
                        }
                    }
                } catch(const std::exception& e) {
                    if (logger_)
                        logger_->e("[proxy:client] [listen] request #%i error in parsing: %s", reqid, e.what());
                    opstate->ok.store(false);
                }
            });
            request->add_on_done_callback([this, opstate, reqid] (const http::Response& response) {
                if (response.status_code != 200) {
                    if (logger_)
                        logger_->e("[proxy:client] [listen] send request #%i failed with code=%i",
                                    reqid, response.status_code);
                    opstate->ok.store(false);
                    if (not response.aborted and response.status_code == 0)
                        opFailed();
                }
                if (not isDestroying_) {
                    std::lock_guard<std::mutex> l(requestLock_);
                    requests_.erase(reqid);
                }
            });
            {
                std::lock_guard<std::mutex> l(requestLock_);
                requests_[reqid] = request;
            }
            request->send();
        }
        catch (const std::exception &e){
            if (logger_)
                logger_->e("[proxy:client] [listen] request failed: %s", e.what());
        }
    }
    
    void
    DhtProxyClient::opFailed()
    {
        if (isDestroying_)
            return;
        if (logger_)
            logger_->e("[proxy:client] proxy request failed");
        {
            std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
            statusIpv4_ = NodeStatus::Disconnected;
            statusIpv6_ = NodeStatus::Disconnected;
        }
        getConnectivityStatus();
        loopSignal_();
    }
    
    void
    DhtProxyClient::getConnectivityStatus()
    {
        if (logger_)
            logger_->d("[proxy:client] [connectivity] get status");
        if (!isDestroying_)
            getProxyInfos();
    }
    
    void
    DhtProxyClient::restartListeners(const asio::error_code &ec)
    {
        if (ec == asio::error::operation_aborted)
            return;
        else if (ec){
            if (logger_)
                logger_->e("[proxy:client] restart error: %s", ec.message().c_str());
            return;
        }
    
        if (isDestroying_)
            return;
        if (logger_)
            logger_->d("[proxy:client] [listeners] refresh permanent puts");
    
        std::lock_guard<std::mutex> lock(searchLock_);
        for (auto& search : searches_) {
            auto key = search.first;
            for (auto& put : search.second.puts) {
                doPut(key, put.second.value, [ok = put.second.ok](bool result){
                    *ok = result;
                }, time_point::max(), true);
                if (!put.second.refreshPutTimer) {
                    put.second.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_);
                }
                put.second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
                put.second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this,
                                                       std::placeholders::_1, key, put.first));
            }
        }
        if (not deviceKey_.empty()) {
            if (logger_)
                logger_->d("[proxy:client] [listeners] resubscribe due to a connectivity change");
            // Connectivity changed, refresh all subscribe
            for (auto& search : searches_)
                for (auto& listener : search.second.listeners)
                    if (!listener.second.opstate->ok)
                        resubscribe(search.first, listener.first, listener.second);
            return;
        }
        if (logger_)
            logger_->d("[proxy:client] [listeners] restarting listeners");
        for (auto& search: searches_) {
            for (auto& l: search.second.listeners) {
                auto& listener = l.second;
                if (auto opstate = listener.opstate)
                    opstate->stop = true;
                listener.request->cancel();
                listener.request.reset();
            }
        }
        for (auto& search: searches_) {
            for (auto& l: search.second.listeners) {
                auto& listener = l.second;
                auto opstate = listener.opstate;
                // Redo listen
                opstate->stop.store(false);
                opstate->ok.store(true);
                auto cb = listener.cb;
                // define header
                restinio::http_request_header_t header;
                header.method(restinio::http_method_get());
                header.request_target("/key/" + search.first.toString() + "/listen");
                sendListen(header, cb, opstate, listener, ListenMethod::LISTEN);
            }
        }
    }
    
    void
    DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string>& notification)
    {
    #ifdef OPENDHT_PUSH_NOTIFICATIONS
        {
            // If a push notification is received, the proxy is up and running
            std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
            statusIpv4_ = NodeStatus::Connected;
            statusIpv6_ = NodeStatus::Connected;
        }
        auto launchLoop = false;
        try {
            auto sessionId = notification.find("s");
            if (sessionId != notification.end() and sessionId->second != pushSessionId_) {
                if (logger_)
                    logger_->d("[proxy:client] [push] ignoring push for other session");
                return;
            }
            std::lock_guard<std::mutex> lock(searchLock_);
            auto timeout = notification.find("timeout");
            if (timeout != notification.cend()) {
                InfoHash key(timeout->second);
                auto& search = searches_.at(key);
                auto vidIt = notification.find("vid");
                if (vidIt != notification.end()) {
                    // Refresh put
                    auto vid = std::stoull(vidIt->second);
                    auto& put = search.puts.at(vid);
                    if (!put.refreshPutTimer)
                        put.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now());
                    else
                        put.refreshPutTimer->expires_at(std::chrono::steady_clock::now());
                    put.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, vid));
                } else {
                    // Refresh listen
                    for (auto& list : search.listeners)
                        resubscribe(key, list.first, list.second);
                }
            } else {
                auto key = InfoHash(notification.at("key"));
                system_clock::time_point sendTime = system_clock::time_point::min();
                try {
                    sendTime = system_clock::time_point(std::chrono::milliseconds(std::stoull(notification.at("t"))));
                } catch (...) {}
                auto& search = searches_.at(key);
                for (auto& list : search.listeners) {
                    if (list.second.opstate->stop)
                        continue;
                    if (logger_)
                        logger_->d("[proxy:client] [push] [search %s] received", key.to_c_str());
                    auto expired = notification.find("exp");
                    auto token = list.first;
                    auto opstate = list.second.opstate;
                    if (expired == notification.end()) {
                        auto cb = list.second.cb;
                        auto oldValues = list.second.cache.getValues();
                        get(key, [cb, sendTime](const std::vector<Sp<Value>>& vals) {
                            return cb(vals, false, sendTime);
                        }, [cb, oldValues, sendTime](bool /*ok*/) {
                            // Decrement old values refcount to expire values not
                            // present in the new list
                            cb(oldValues, true, sendTime);
                        });
                    } else {
                        std::stringstream ss(expired->second);
                        std::vector<Value::Id> ids;
                        while(ss.good()) {
                            std::string substr;
                            getline(ss, substr, ',');
                            ids.emplace_back(std::stoull(substr));
                        }
                        {
                            std::lock_guard<std::mutex> lock(lockCallbacks_);
                            callbacks_.emplace_back([this, key, token, opstate, ids, sendTime]() {
                                if (opstate->stop)
                                    return;
                                std::lock_guard<std::mutex> lock(searchLock_);
                                auto s = searches_.find(key);
                                if (s == searches_.end())
                                    return;
                                auto l = s->second.listeners.find(token);
                                if (l == s->second.listeners.end())
                                    return;
                                if (not opstate->stop and not l->second.cache.onValuesExpired(ids, sendTime))
                                    opstate->stop = true;
                            });
                        }
                        launchLoop = true;
                    }
                }
            }
        } catch (const std::exception& e) {
            if (logger_)
                logger_->e("[proxy:client] [push] receive error: %s", e.what());
        }
        if (launchLoop)
            loopSignal_();
    #else
        (void) notification;
    #endif
    }
    
    void
    DhtProxyClient::resubscribe(const InfoHash& key, const size_t token, Listener& listener)
    {
    #ifdef OPENDHT_PUSH_NOTIFICATIONS
        if (deviceKey_.empty())
            return;
        if (logger_)
            logger_->d("[proxy:client] [resubscribe] [search %s]", key.to_c_str());
    
        auto opstate = listener.opstate;
        opstate->stop = true;
        if (listener.request){
            listener.request.reset();
        }
        opstate->stop = false;
        opstate->ok = true;
    
        restinio::http_request_header_t header;
        header.method(restinio::http_method_subscribe());
        header.request_target("/" + key.toString());
        if (!listener.refreshSubscriberTimer){
            listener.refreshSubscriberTimer = std::make_unique<asio::steady_timer>(httpContext_);
        }
        listener.refreshSubscriberTimer->expires_at(std::chrono::steady_clock::now() +
                                                    proxy::OP_TIMEOUT - proxy::OP_MARGIN);
        listener.refreshSubscriberTimer->async_wait(std::bind(&DhtProxyClient::handleResubscribe, this,
                                                    std::placeholders::_1, key, token, opstate));
        auto vcb = listener.cb;
        sendListen(header, vcb, opstate, listener, ListenMethod::RESUBSCRIBE);
    #else
        (void) key;
        (void) listener;
    #endif
    }
    
    #ifdef OPENDHT_PUSH_NOTIFICATIONS
    void
    DhtProxyClient::getPushRequest(Json::Value& body) const
    {
        body["key"] = deviceKey_;
        body["client_id"] = pushClientId_;
        body["session_id"] = pushSessionId_;
    #ifdef __ANDROID__
        body["platform"] = "android";
    #endif
    #ifdef __APPLE__
        body["platform"] = "apple";
    #endif
    }
    
    std::string
    DhtProxyClient::fillBody(bool resubscribe)
    {
        // Fill body with
        // {
        //   "key":"device_key",
        // }
        Json::Value body;
        getPushRequest(body);
        if (resubscribe) {
            // This is the first listen, we want to retrieve previous values.
            body["refresh"] = true;
        }
        auto content = Json::writeString(jsonBuilder_, body) + "\n";
        std::replace(content.begin(), content.end(), '\n', ' ');
        return content;
    }
    #endif // OPENDHT_PUSH_NOTIFICATIONS
    
    } // namespace dht