Skip to content
Snippets Groups Projects
Select Git revision
  • 8243bc4c3fa7ee691643825162f79d288bc117a2
  • master default
  • windows_ci_static
  • c_link
  • cpack
  • windows_ci
  • cert_pk_id
  • proxy_push_result
  • cnode_put_id
  • update-windows-build
  • proxy
  • resubscribe_on_token_change
  • actions
  • client_mode
  • llhttp
  • search_node_add
  • crypto_aes_gcm_argon2
  • ios_notifications
  • log_fmt
  • v2asio
  • fix-msvc
  • v3.4.0
  • v3.3.1
  • v3.3.1rc1
  • v3.3.1rc2
  • v3.3.0
  • v3.2.0
  • v3.1.11
  • v3.1.10
  • v3.1.9
  • v3.1.8.2
  • v3.1.8.1
  • v3.1.8
  • v3.1.7
  • v3.1.6
  • v3.1.5
  • v3.1.4
  • v3.1.3
  • v3.1.2
  • v3.1
  • v3.0.1
41 results

dht_proxy_client.cpp

Blame
  • Adrien Béraud's avatar
    Adrien Béraud authored
    Use shared flags to avoid blocking for the callback
    while respecting the callback return value.
    8243bc4c
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    dht_proxy_client.cpp 20.67 KiB
    /*
     *  Copyright (C) 2016 Savoir-faire Linux Inc.
     *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <https://www.gnu.org/licenses/>.
     */
    
    #if OPENDHT_PROXY_CLIENT
    
    #include "dht_proxy_client.h"
    
    #include <chrono>
    #include <json/json.h>
    #include <restbed>
    #include <vector>
    
    #include "dhtrunner.h"
    
    #include <iostream>
    
    constexpr const char* const HTTP_PROTO {"http://"};
    
    namespace dht {
    
    DhtProxyClient::DhtProxyClient(const std::string& serverHost)
    : serverHost_(serverHost), lockCurrentProxyInfos_(new std::mutex()),
      scheduler(DHT_LOG), currentProxyInfos_(new Json::Value())
    {
        if (!serverHost_.empty())
            startProxy(serverHost_);
    }
    
    void
    DhtProxyClient::confirmProxy()
    {
        if (serverHost_.empty()) return;
        // Retrieve the connectivity each hours if connected, else every 5 seconds.
        auto disconnected_old_status =  statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
        getConnectivityStatus();
        auto disconnected_new_status = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
        auto time = disconnected_new_status ? std::chrono::seconds(5) : std::chrono::hours(1);
        if (disconnected_old_status && !disconnected_new_status) {
            restartListeners();
        }
        auto confirm_proxy_time = scheduler.time() + time;
        scheduler.edit(nextProxyConfirmation, confirm_proxy_time);
    }
    
    void
    DhtProxyClient::startProxy(const std::string& serverHost)
    {
        serverHost_ = serverHost;
        if (serverHost_.empty()) return;
        auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5);
        nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this));
        auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5);
        nextConnectivityConfirmation = scheduler.add(confirm_connectivity, std::bind(&DhtProxyClient::confirmConnectivity, this));
    
        getConnectivityStatus();
    }
    
    void
    DhtProxyClient::confirmConnectivity()
    {
        // The scheduler must get if the proxy is disconnected
        auto confirm_connectivity = scheduler.time() + std::chrono::seconds(3);
        scheduler.edit(nextConnectivityConfirmation, confirm_connectivity);
    }
    
    DhtProxyClient::~DhtProxyClient()
    {
        cancelAllOperations();
        cancelAllListeners();
    }
    
    void
    DhtProxyClient::cancelAllOperations()
    {
        std::lock_guard<std::mutex> lock(lockOperations_);
        auto operation = operations_.begin();
        while (operation != operations_.end()) {
            if (operation->thread.joinable()) {
                // Close connection to stop operation?
                restbed::Http::close(operation->req);
                operation->thread.join();
                operation = operations_.erase(operation);
            } else {
                ++operation;
            }
        }
    }
    
    void
    DhtProxyClient::cancelAllListeners()
    {
        std::lock_guard<std::mutex> lock(lockListener_);
        for (auto& listener: listeners_) {
            if (listener.thread.joinable()) {
                // Close connection to stop listener?
                if (listener.req)
                    restbed::Http::close(listener.req);
                listener.thread.join();
            }
        }
    }
    
    void
    DhtProxyClient::shutdown(ShutdownCallback cb)
    {
        cancelAllOperations();
        cancelAllListeners();
        if (cb)
            cb();
    }
    
    NodeStatus
    DhtProxyClient::getStatus(sa_family_t af) const
    {
        switch (af)
        {
        case AF_INET:
            return statusIpv4_;
        case AF_INET6:
            return statusIpv6_;
        default:
            return NodeStatus::Disconnected;
        }
    }
    
    bool
    DhtProxyClient::isRunning(sa_family_t af) const
    {
        switch (af)
        {
        case AF_INET:
            return statusIpv4_ == NodeStatus::Connected;
        case AF_INET6:
            return statusIpv6_ == NodeStatus::Connected;
        default:
            return false;
        }
    }
    
    time_point
    DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&)
    {
        // Exec all currently stored callbacks
        scheduler.syncTime();
        if (!callbacks_.empty()) {
            std::lock_guard<std::mutex> lock(lockCallbacks);
            for (auto& callback : callbacks_)
                callback();
            callbacks_.clear();
        }
        // Remove finished operations
        {
            std::lock_guard<std::mutex> lock(lockOperations_);
            auto operation = operations_.begin();
            while (operation != operations_.end()) {
                if (*(operation->finished)) {
                    if (operation->thread.joinable()) {
                        // Close connection to stop operation?
                        restbed::Http::close(operation->req);
                        operation->thread.join();
                    }
                    operation = operations_.erase(operation);
                } else {
                    ++operation;
                }
            }
        }
        return scheduler.run();
    }
    
    void
    DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
                        Value::Filter&& filter, Where&& where)
    {
        restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
        auto req = std::make_shared<restbed::Request>(uri);
        Query query {{}, where};
        auto filterChain = filter.chain(query.where.getFilter());
    
        auto finished = std::make_shared<bool>(false);
        Operation o;
        o.req = req;
        o.finished = finished;
        o.thread = std::thread([=](){
            // Try to contact the proxy and set the status to connected when done.
            // will change the connectivity status
            auto ok = std::make_shared<bool>(true);
            restbed::Http::async(req,
                [=](const std::shared_ptr<restbed::Request>& req,
                    const std::shared_ptr<restbed::Response>& reply) {
                auto code = reply->get_status_code();
    
                if (code == 200) {
                    try {
                        while (restbed::Http::is_open(req) and not *finished) {
                            restbed::Http::fetch("\n", reply);
                            if (*finished)
                                break;
                            std::string body;
                            reply->get_body(body);
                            reply->set_body(""); // Reset the body for the next fetch
    
                            Json::Value json;
                            Json::Reader reader;
                            if (reader.parse(body, json)) {
                                auto value = std::make_shared<Value>(json);
                                if ((not filterChain or filterChain(*value)) && cb) {
                                    std::lock_guard<std::mutex> lock(lockCallbacks);
                                    callbacks_.emplace_back([cb, value, finished]() {
                                        if (not *finished and not cb({value}))
                                            *finished = true;
                                    });
                                }
                            } else {
                                *ok = false;
                            }
                        }
                    } catch (std::runtime_error& e) { }
                } else {
                    *ok = false;
                }
            }).wait();
            if (donecb) {
                std::lock_guard<std::mutex> lock(lockCallbacks);
                callbacks_.emplace_back([=](){
                    donecb(*ok, {});
                });
            }
            if (!ok) {
                // Connection failed, update connectivity
                getConnectivityStatus();
            }
            *finished = true;
        });
        {
            std::lock_guard<std::mutex> lock(lockOperations_);
            operations_.emplace_back(std::move(o));
        }
    }
    
    void
    DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point, bool permanent)
    {
        restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
        auto req = std::make_shared<restbed::Request>(uri);
        req->set_method("POST");
        Json::FastWriter writer;
        auto json = val->toJson();
        if (permanent)
            json["permanent"] = true;
        auto body = writer.write(json);
        req->set_body(body);
        req->set_header("Content-Length", std::to_string(body.size()));
    
        auto finished = std::make_shared<bool>(false);
        Operation o;
        o.req = req;
        o.finished = finished;
        o.thread = std::thread([=](){
            auto ok = std::make_shared<bool>(true);
            restbed::Http::async(req,
                [this, ok](const std::shared_ptr<restbed::Request>& /*req*/,
                            const std::shared_ptr<restbed::Response>& reply) {
                auto code = reply->get_status_code();
    
                if (code == 200) {
                    restbed::Http::fetch("\n", reply);
                    std::string body;
                    reply->get_body(body);
                    reply->set_body(""); // Reset the body for the next fetch
    
                    Json::Value json;
                    Json::Reader reader;
                    try {
                        if (!reader.parse(body, json))
                            *ok = false;
                    } catch (...) {
                        *ok = false;
                    }
                } else {
                    *ok = false;
                }
            }).wait();
            if (cb) {
                std::lock_guard<std::mutex> lock(lockCallbacks);
                callbacks_.emplace_back([=](){
                    cb(*ok, {});
                });
            }
            if (!ok) {
                // Connection failed, update connectivity
                getConnectivityStatus();
            }
            *finished = true;
        });
        {
            std::lock_guard<std::mutex> lock(lockOperations_);
            operations_.emplace_back(std::move(o));
        }
    }
    
    NodeStats
    DhtProxyClient::getNodesStats(sa_family_t af) const
    {
        lockCurrentProxyInfos_->lock();
        auto proxyInfos = *currentProxyInfos_;
        lockCurrentProxyInfos_->unlock();
        NodeStats stats {};
        auto identifier = af == AF_INET6 ? "ipv6" : "ipv4";
        try {
            stats = NodeStats(proxyInfos[identifier]);
        } catch (...) { }
        return stats;
    }
    
    Json::Value
    DhtProxyClient::getProxyInfos() const
    {
        restbed::Uri uri(HTTP_PROTO + serverHost_ + "/");
        auto req = std::make_shared<restbed::Request>(uri);
    
        // Try to contact the proxy and set the status to connected when done.
        // will change the connectivity status
        restbed::Http::async(req,
            [this](const std::shared_ptr<restbed::Request>&,
                           const std::shared_ptr<restbed::Response>& reply) {
            auto code = reply->get_status_code();
    
            if (code == 200) {
                restbed::Http::fetch("\n", reply);
                std::string body;
                reply->get_body(body);
    
                Json::Reader reader;
                lockCurrentProxyInfos_->lock();
                try {
                    reader.parse(body, *currentProxyInfos_);
                } catch (...) {
                    *currentProxyInfos_ = Json::Value();
                }
                lockCurrentProxyInfos_->unlock();
            } else {
                lockCurrentProxyInfos_->lock();
                *currentProxyInfos_ = Json::Value();
                lockCurrentProxyInfos_->unlock();
            }
        }).wait();
        lockCurrentProxyInfos_->lock();
        auto result = *currentProxyInfos_;
        lockCurrentProxyInfos_->unlock();
        return result;
    }
    
    std::vector<SockAddr>
    DhtProxyClient::getPublicAddress(sa_family_t family)
    {
        lockCurrentProxyInfos_->lock();
        auto proxyInfos = *currentProxyInfos_;
        lockCurrentProxyInfos_->unlock();
        // json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port
        auto public_ip = proxyInfos["public_ip"].asString();
        if (!proxyInfos.isMember("public_ip") || (public_ip.length() < 2)) return {};
        std::string ipv4Address = "", ipv6Address = "", port = "";
        if (public_ip[0] == '[') {
            // ipv6 complient
            auto endIp = public_ip.find(']');
            if (public_ip.length() > endIp + 2) {
                port = public_ip.substr(endIp + 2);
                auto ips = public_ip.substr(1, endIp - 1);
                auto ipv4And6Separator = ips.find_last_of(':');
                ipv4Address = ips.substr(ipv4And6Separator + 1);
                ipv6Address = ips.substr(0, ipv4And6Separator - 1);
            }
        } else {
            auto endIp = public_ip.find_last_of(':');
            port = public_ip.substr(endIp + 1);
            ipv4Address = public_ip.substr(0, endIp - 1);
        }
        switch (family)
        {
        case AF_INET:
            return SockAddr::resolve(ipv4Address, port);
        case AF_INET6:
            return SockAddr::resolve(ipv6Address, port);
        default:
            return {};
        }
    }
    
    size_t
    DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filter, Where&& where)
    {
        restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
        auto req = std::make_shared<restbed::Request>(uri);
        req->set_method("LISTEN");
    
        Query query {{}, where};
        auto filterChain = filter.chain(query.where.getFilter());
    
        Listener l;
        ++listener_token_;
        l.key = key.toString();
        l.token = listener_token_;
        l.req = req;
        l.cb = cb;
        l.filterChain = std::move(filterChain);
        l.thread = std::thread([=]()
            {
                auto settings = std::make_shared<restbed::Settings>();
                std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
                settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
    
                struct State {
                    bool ok {true};
                    bool cancel {false};
                };
                auto state = std::make_shared<State>();
                restbed::Http::async(req,
                    [this, filterChain, cb, state](const std::shared_ptr<restbed::Request>& req,
                                                const std::shared_ptr<restbed::Response>& reply) {
                    auto code = reply->get_status_code();
    
                    if (code == 200) {
                        try {
                            while (restbed::Http::is_open(req) and not state->cancel) {
                                restbed::Http::fetch("\n", reply);
                                if (state->cancel)
                                    break;
                                std::string body;
                                reply->get_body(body);
                                reply->set_body(""); // Reset the body for the next fetch
    
                                Json::Value json;
                                Json::Reader reader;
                                if (reader.parse(body, json)) {
                                    auto value = std::make_shared<Value>(json);
                                    if ((not filterChain or filterChain(*value)) && cb)  {
                                        std::lock_guard<std::mutex> lock(lockCallbacks);
                                        callbacks_.emplace_back([cb, value, state]() {
                                            if (not state->cancel and not cb({value})) {
                                                state->cancel = true;
                                            }
                                        });
                                    }
                                } else {
                                    state->ok = false;
                                }
                            }
                        } catch (std::runtime_error&) {
                            state->ok = false;
                        }
                    } else {
                        state->ok = false;
                    }
                }, settings).get();
                if (not state->ok) {
                    getConnectivityStatus();
                }
            }
        );
        {
            std::lock_guard<std::mutex> lock(lockListener_);
            listeners_.emplace_back(std::move(l));
        }
        return listener_token_;
    }
    
    bool
    DhtProxyClient::cancelListen(const InfoHash&, size_t token)
    {
        std::lock_guard<std::mutex> lock(lockListener_);
        for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
            auto& listener = *it;
            if (listener.token == token) {
                if (listener.thread.joinable()) {
                    // Close connection to stop listener?
                    if (listener.req)
                        restbed::Http::close(listener.req);
                    listener.thread.join();
                    listeners_.erase(it);
                    return true;
                }
            }
        }
        return false;
    }
    
    void
    DhtProxyClient::getConnectivityStatus()
    {
        auto proxyInfos = getProxyInfos();
        // NOTE: json["ipvX"] contains NodeStats::toJson()
        try {
            auto goodIpv4 = static_cast<long>(proxyInfos["ipv4"]["good"].asLargestUInt());
            auto dubiousIpv4 = static_cast<long>(proxyInfos["ipv4"]["dubious"].asLargestUInt());
            statusIpv4_ = (goodIpv4 + dubiousIpv4 > 0) ?  NodeStatus::Connected : NodeStatus::Disconnected;
    
            auto goodIpv6 = static_cast<long>(proxyInfos["ipv6"]["good"].asLargestUInt());
            auto dubiousIpv6 = static_cast<long>(proxyInfos["ipv6"]["dubious"].asLargestUInt());
            statusIpv6_ = (goodIpv6 + dubiousIpv6 > 0) ?  NodeStatus::Connected : NodeStatus::Disconnected;
    
            myid = InfoHash(proxyInfos["node_id"].asString());
            if (statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected) {
                const auto& now = scheduler.time();
                scheduler.edit(nextProxyConfirmation, now);
            }
        } catch (...) {
            statusIpv4_ = NodeStatus::Disconnected;
            statusIpv6_ = NodeStatus::Disconnected;
            const auto& now = scheduler.time();
            scheduler.edit(nextProxyConfirmation, now);
        }
    }
    
    void
    DhtProxyClient::restartListeners()
    {
        std::lock_guard<std::mutex> lock(lockListener_);
        for (auto& listener: listeners_) {
            if (listener.thread.joinable())
                listener.thread.join();
            // Redo listen
            auto filterChain = listener.filterChain;
            auto cb = listener.cb;
            restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key);
            auto req = std::make_shared<restbed::Request>(uri);
            req->set_method("LISTEN");
            listener.req = req;
            listener.thread = std::thread([this, filterChain, cb, req]()
                {
                    auto settings = std::make_shared<restbed::Settings>();
                    std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
                    settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
    
                    restbed::Http::async(req,
                        [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req,
                                         const std::shared_ptr<restbed::Response>& reply) {
                        auto code = reply->get_status_code();
    
                        if (code == 200) {
                            try {
                                while (restbed::Http::is_open(req)) {
                                    restbed::Http::fetch("\n", reply);
                                    std::string body;
                                    reply->get_body(body);
                                    reply->set_body(""); // Reset the body for the next fetch
    
                                    Json::Value json;
                                    Json::Reader reader;
                                    if (reader.parse(body, json)) {
                                        auto value = std::make_shared<Value>(json);
                                        if ((not filterChain or filterChain(*value)) && cb) {
                                            auto okCb = std::make_shared<std::promise<bool>>();
                                            auto futureCb = okCb->get_future();
                                            {
                                                std::lock_guard<std::mutex> lock(lockCallbacks);
                                                callbacks_.emplace_back([cb, value, okCb](){
                                                    okCb->set_value(cb({value}));
                                                });
                                            }
                                            futureCb.wait();
                                            if (!futureCb.get()) {
                                                return;
                                            }
                                        }
                                    }
                                }
                            } catch (std::runtime_error&) {
                                // NOTE: Http::close() can occurs here. Ignore this.
                            }
    
                        } else {
                            this->statusIpv4_ = NodeStatus::Disconnected;
                            this->statusIpv6_ = NodeStatus::Disconnected;
                        }
                    }, settings).get();
                    getConnectivityStatus();
                }
            );
        }
    }
    
    
    } // namespace dht
    
    #endif // OPENDHT_PROXY_CLIENT