Skip to content
Snippets Groups Projects
Select Git revision
  • 517bd5f22beb3894e8146a348bae598c64334df1
  • master default
  • cmake_fixes
  • pulls/1772757862/750
  • copilot/fix-770
  • windows_ci_static
  • c_link
  • cpack
  • windows_ci
  • cert_pk_id
  • proxy_push_result
  • cnode_put_id
  • update-windows-build
  • proxy
  • resubscribe_on_token_change
  • actions
  • client_mode
  • llhttp
  • search_node_add
  • crypto_aes_gcm_argon2
  • ios_notifications
  • v3.4.0
  • v3.3.1
  • v3.3.1rc1
  • v3.3.1rc2
  • v3.3.0
  • v3.2.0
  • v3.1.11
  • v3.1.10
  • v3.1.9
  • v3.1.8.2
  • v3.1.8.1
  • v3.1.8
  • v3.1.7
  • v3.1.6
  • v3.1.5
  • v3.1.4
  • v3.1.3
  • v3.1.2
  • v3.1
  • v3.0.1
41 results

dhtrunner.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    dhtrunner.cpp 17.74 KiB
    /*
     *  Copyright (C) 2014-2016 Savoir-faire Linux Inc.
     *  Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *              Simon Désaulniers <sim.desaulniers@gmail.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
     */
    
    #include "dhtrunner.h"
    #include "securedht.h"
    
    #ifndef _WIN32
    #include <unistd.h>
    #else
    #include <io.h>
    #endif
    
    #ifndef _WIN32
    #include <sys/socket.h>
    #else
    #include <winsock2.h>
    #include <ws2tcpip.h>
    #define close(x) closesocket(x)
    #endif
    
    namespace dht {
    
    DhtRunner::DhtRunner() : dht_()
    {
    #ifdef _WIN32
        WSADATA wsd;
        if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
            throw DhtException("Can't initialize Winsock2");
    #endif
    }
    
    DhtRunner::~DhtRunner()
    {
        join();
    #ifdef _WIN32
        WSACleanup();
    #endif
    }
    
    void
    DhtRunner::run(in_port_t port, DhtRunner::Config config)
    {
        sockaddr_in sin4;
        std::fill_n((uint8_t*)&sin4, sizeof(sin4), 0);
        sin4.sin_family = AF_INET;
        sin4.sin_port = htons(port);
        sockaddr_in6 sin6;
        std::fill_n((uint8_t*)&sin6, sizeof(sin6), 0);
        sin6.sin6_family = AF_INET6;
        sin6.sin6_port = htons(port);
        run(&sin4, &sin6, config);
    }
    
    void
    DhtRunner::run(const char* ip4, const char* ip6, const char* service, DhtRunner::Config config)
    {
        auto res4 = getAddrInfo(ip4, service);
        auto res6 = getAddrInfo(ip6, service);
        run(res4.empty() ? nullptr : (sockaddr_in*) &res4.front().first,
            res6.empty() ? nullptr : (sockaddr_in6*)&res6.front().first, config);
    }
    
    void
    DhtRunner::run(const sockaddr_in* local4, const sockaddr_in6* local6, DhtRunner::Config config)
    {
        if (running)
            return;
        if (rcv_thread.joinable())
            rcv_thread.join();
        running = true;
        doRun(local4, local6, config.dht_config);
        if (not config.threaded)
            return;
        dht_thread = std::thread([this]() {
            while (running) {
                std::unique_lock<std::mutex> lk(dht_mtx);
                auto wakeup = loop_();
                cv.wait_until(lk, wakeup, [this]() {
                    if (not running)
                        return true;
                    {
                        std::lock_guard<std::mutex> lck(sock_mtx);
                        if (not rcv.empty())
                            return true;
                    }
                    {
                        std::lock_guard<std::mutex> lck(storage_mtx);
                        if (not pending_ops_prio.empty())
                            return true;
                        if (not pending_ops.empty() and getStatus() != NodeStatus::Connecting)
                            return true;
                    }
                    return false;
                });
            }
        });
    }
    
    void
    DhtRunner::shutdown(ShutdownCallback cb) {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops_prio.emplace([=](SecureDht& dht) mutable {
            dht.shutdown(cb);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::join()
    {
        running = false;
        cv.notify_all();
        if (dht_thread.joinable())
            dht_thread.join();
        if (rcv_thread.joinable())
            rcv_thread.join();
        {
            std::lock_guard<std::mutex> lck(storage_mtx);
            pending_ops = decltype(pending_ops)();
            pending_ops_prio = decltype(pending_ops_prio)();
        }
        {
            std::lock_guard<std::mutex> lck(dht_mtx);
            dht_.reset();
            status4 = NodeStatus::Disconnected;
            status6 = NodeStatus::Disconnected;
            bound4 = {};
            bound6 = {};
        }
    }
    
    void
    DhtRunner::dumpTables() const
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        dht_->dumpTables();
    }
    
    InfoHash
    DhtRunner::getId() const
    {
        if (!dht_)
            return {};
        return dht_->getId();
    }
    
    InfoHash
    DhtRunner::getNodeId() const
    {
        if (!dht_)
            return {};
        return dht_->getNodeId();
    }
    
    
    std::pair<size_t, size_t>
    DhtRunner::getStoreSize() const {
        std::lock_guard<std::mutex> lck(dht_mtx);
        if (!dht_)
            return {};
        return dht_->getStoreSize();
    }
    
    void
    DhtRunner::setStorageLimit(size_t limit) {
        std::lock_guard<std::mutex> lck(dht_mtx);
        if (!dht_)
            throw std::runtime_error("dht is not running");
        return dht_->setStorageLimit(limit);
    }
    
    std::vector<NodeExport>
    DhtRunner::exportNodes() const {
        std::lock_guard<std::mutex> lck(dht_mtx);
        if (!dht_)
            return {};
        return dht_->exportNodes();
    }
    
    std::vector<ValuesExport>
    DhtRunner::exportValues() const {
        std::lock_guard<std::mutex> lck(dht_mtx);
        if (!dht_)
            return {};
        return dht_->exportValues();
    }
    
    void
    DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) {
        std::lock_guard<std::mutex> lck(dht_mtx);
        dht_->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug));
    }
    
    void
    DhtRunner::registerType(const ValueType& type) {
        std::lock_guard<std::mutex> lck(dht_mtx);
        dht_->registerType(type);
    }
    
    void
    DhtRunner::importValues(const std::vector<ValuesExport>& values) {
        std::lock_guard<std::mutex> lck(dht_mtx);
        dht_->importValues(values);
    }
    
    int
    DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        return dht_->getNodesStats(af, good_return, dubious_return, cached_return, incoming_return);
    }
    
    std::vector<unsigned>
    DhtRunner::getNodeMessageStats(bool in) const
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        return dht_->getNodeMessageStats(in);
    }
    
    std::string
    DhtRunner::getStorageLog() const
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        return dht_->getStorageLog();
    }
    std::string
    DhtRunner::getRoutingTablesLog(sa_family_t af) const
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        return dht_->getRoutingTablesLog(af);
    }
    std::string
    DhtRunner::getSearchesLog(sa_family_t af) const
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        return dht_->getSearchesLog(af);
    }
    std::vector<Address>
    DhtRunner::getPublicAddress(sa_family_t af)
    {
        std::lock_guard<std::mutex> lck(dht_mtx);
        return dht_->getPublicAddress(af);
    }
    std::vector<std::string>
    DhtRunner::getPublicAddressStr(sa_family_t af)
    {
        auto addrs = getPublicAddress(af);
        std::vector<std::string> ret(addrs.size());
        std::transform(addrs.begin(), addrs.end(), ret.begin(), dht::printAddr);
        return ret;
    }
    
    void
    DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) {
        std::lock_guard<std::mutex> lck(dht_mtx);
        dht_->registerCertificate(cert);
    }
    void
    DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) {
        std::lock_guard<std::mutex> lck(dht_mtx);
        dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
    }
    
    time_point
    DhtRunner::loop_()
    {
        if (!dht_)
            return {};
    
        decltype(pending_ops) ops {};
        {
            std::lock_guard<std::mutex> lck(storage_mtx);
            ops = std::move(pending_ops_prio);
        }
        while (not ops.empty()) {
            ops.front()(*dht_);
            ops.pop();
        }
        if (getStatus() != NodeStatus::Connecting) {
            {
                std::lock_guard<std::mutex> lck(storage_mtx);
                ops = std::move(pending_ops);
            }
            while (not ops.empty()) {
                ops.front()(*dht_);
                ops.pop();
            }
        }
    
        time_point wakeup {};
        decltype(rcv) received {};
        {
            std::lock_guard<std::mutex> lck(sock_mtx);
            // move to stack
            received = std::move(rcv);
        }
        if (not received.empty()) {
            for (const auto& pck : received) {
                auto& buf = pck.first;
                auto& from = pck.second;
                wakeup = dht_->periodic(buf.data(), buf.size()-1, (sockaddr*)&from.first, from.second);
            }
            received.clear();
        } else {
            wakeup = dht_->periodic(nullptr, 0, nullptr, 0);
        }
    
        NodeStatus nstatus4 = dht_->getStatus(AF_INET);
        NodeStatus nstatus6 = dht_->getStatus(AF_INET6);
        if (nstatus4 != status4 || nstatus6 != status6) {
            status4 = nstatus4;
            status6 = nstatus6;
            if (statusCb)
                statusCb(status4, status6);
        }
    
        return wakeup;
    }
    
    void
    DhtRunner::doRun(const sockaddr_in* sin4, const sockaddr_in6* sin6, SecureDht::Config config)
    {
        dht_.reset();
    
        int s4 = -1,
            s6 = -1;
    
        bound4 = {};
        if (sin4) {
            s4 = socket(PF_INET, SOCK_DGRAM, 0);
            if(s4 >= 0) {
                int rc = bind(s4, (sockaddr*)sin4, sizeof(sockaddr_in));
                if(rc < 0)
                    throw DhtException("Can't bind IPv4 socket on " + dht::print_addr((sockaddr*)sin4, sizeof(sockaddr_in)));
                bound4.second = sizeof(bound4.first);
                getsockname(s4, (sockaddr*)&bound4.first, &bound4.second);
            }
        }
    
    #if 1
        bound6 = {};
        if (sin6) {
            s6 = socket(PF_INET6, SOCK_DGRAM, 0);
            if(s6 >= 0) {
                int val = 1;
                int rc = setsockopt(s6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&val, sizeof(val));
                if(rc < 0)
                    throw DhtException("Can't set IPV6_V6ONLY");
    
                rc = bind(s6, (sockaddr*)sin6, sizeof(sockaddr_in6));
                if(rc < 0)
                    throw DhtException("Can't bind IPv6 socket on " + dht::print_addr((sockaddr*)sin6, sizeof(sockaddr_in6)));
                bound6.second = sizeof(bound6.first);
                getsockname(s6, (sockaddr*)&bound6.first, &bound6.second);
            }
        }
    #endif
    
        dht_ = std::unique_ptr<SecureDht>(new SecureDht {s4, s6, config});
    
        rcv_thread = std::thread([this,s4,s6]() {
            try {
                while (true) {
                    struct timeval tv {/*.tv_sec = */0, /*.tv_usec = */250000};
                    fd_set readfds;
    
                    FD_ZERO(&readfds);
                    if(s4 >= 0)
                        FD_SET(s4, &readfds);
                    if(s6 >= 0)
                        FD_SET(s6, &readfds);
    
                    int rc = select(s4 > s6 ? s4 + 1 : s6 + 1, &readfds, nullptr, nullptr, &tv);
                    if(rc < 0) {
                        if(errno != EINTR) {
                            perror("select");
                            std::this_thread::sleep_for( std::chrono::seconds(1) );
                        }
                    }
    
                    if(!running)
                        break;
    
                    if(rc > 0) {
                        std::array<uint8_t, 1024 * 64> buf;
                        sockaddr_storage from;
                        socklen_t fromlen {sizeof(from)};
    
                        if(s4 >= 0 && FD_ISSET(s4, &readfds))
                            rc = recvfrom(s4, (char*)buf.data(), buf.size(), 0, (struct sockaddr*)&from, &fromlen);
                        else if(s6 >= 0 && FD_ISSET(s6, &readfds))
                            rc = recvfrom(s6, (char*)buf.data(), buf.size(), 0, (struct sockaddr*)&from, &fromlen);
                        else
                            break;
                        if (rc > 0) {
                            {
                                std::lock_guard<std::mutex> lck(sock_mtx);
                                rcv.emplace_back(Blob {buf.begin(), buf.begin()+rc+1}, std::make_pair(from, fromlen));
                            }
                            cv.notify_all();
                        }
                    }
                }
            } catch (const std::exception& e) {
                std::cerr << "Error in DHT networking thread: " << e.what() << std::endl;
            }
            if (s4 >= 0)
                close(s4);
            if (s6 >= 0)
                close(s6);
        });
    }
    
    void
    DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) mutable {
            dht.get(hash, vcb, dcb, std::move(f), std::move(w));
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, Value::Filter f, Where w)
    {
        get(InfoHash::get(key), vcb, dcb, f, w);
    }
    void DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) mutable {
            dht.query(hash, cb, done_cb, std::move(q));
        });
        cv.notify_all();
    }
    
    std::future<size_t>
    DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f, Where w)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        auto ret_token = std::make_shared<std::promise<size_t>>();
        pending_ops.emplace([=](SecureDht& dht) mutable {
            ret_token->set_value(dht.listen(hash, vcb, std::move(f), std::move(w)));
        });
        cv.notify_all();
        return ret_token->get_future();
    }
    
    std::future<size_t>
    DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Where w)
    {
        return listen(InfoHash::get(key), vcb, f, w);
    }
    
    void
    DhtRunner::cancelListen(InfoHash h, size_t token)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.cancelListen(h, token);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> token)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            auto tk = token.get();
            dht.cancelListen(h, tk);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        auto sv = std::make_shared<Value>(std::move(value));
        pending_ops.emplace([=](SecureDht& dht) {
            dht.put(hash, sv, cb, created, permanent);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.put(hash, value, cb, created, permanent);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, time_point created, bool permanent)
    {
        put(InfoHash::get(key), std::forward<Value>(value), cb, created, permanent);
    }
    
    void
    DhtRunner::cancelPut(const InfoHash& h , const Value::Id& id)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.cancelPut(h, id);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.putSigned(hash, value, cb);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::putSigned(InfoHash hash, Value&& value, DoneCallback cb)
    {
        putSigned(hash, std::make_shared<Value>(std::move(value)), cb);
    }
    
    void
    DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb)
    {
        putSigned(InfoHash::get(key), std::forward<Value>(value), cb);
    }
    
    void
    DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.putEncrypted(hash, to, value, cb);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb)
    {
        putEncrypted(hash, to, std::make_shared<Value>(std::move(value)), cb);
    }
    
    void
    DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb)
    {
        putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), cb);
    }
    
    std::vector<std::pair<sockaddr_storage, socklen_t>>
    DhtRunner::getAddrInfo(const char* host, const char* service)
    {
        std::vector<std::pair<sockaddr_storage, socklen_t>> ips {};
        if (not host or not service or strlen(host) == 0)
            return ips;
    
        addrinfo hints;
        memset(&hints, 0, sizeof(hints));
        hints.ai_socktype = SOCK_DGRAM;
        addrinfo* info = nullptr;
        int rc = getaddrinfo(host, service, &hints, &info);
        if(rc != 0)
            throw std::invalid_argument(std::string("Error: `") + host + ":" + service + "`: " + gai_strerror(rc));
    
        addrinfo* infop = info;
        while (infop) {
            ips.emplace_back(sockaddr_storage(), infop->ai_addrlen);
            std::copy_n((uint8_t*)infop->ai_addr, infop->ai_addrlen, (uint8_t*)&ips.back().first);
            infop = infop->ai_next;
        }
        freeaddrinfo(info);
        return ips;
    }
    
    void
    DhtRunner::bootstrap(const char* host, const char* service)
    {
        bootstrap(getAddrInfo(host, service));
    }
    
    void
    DhtRunner::bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops_prio.emplace([=](SecureDht& dht) {
            for (auto& node : nodes)
                dht.pingNode((sockaddr*)&node.first, node.second);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops_prio.emplace([=](SecureDht& dht) {
            for (auto& node : nodes)
                dht.insertNode(node);
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::connectivityChanged()
    {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.connectivityChanged();
        });
        cv.notify_all();
    }
    
    void
    DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb) {
        std::lock_guard<std::mutex> lck(storage_mtx);
        pending_ops.emplace([=](SecureDht& dht) {
            dht.findCertificate(hash, cb);
        });
        cv.notify_all();
    }
    
    }