Skip to content
Snippets Groups Projects
Select Git revision
  • cpack
  • master default
  • windows_ci
  • cert_pk_id
  • proxy_push_result
  • cnode_put_id
  • update-windows-build
  • proxy
  • resubscribe_on_token_change
  • actions
  • client_mode
  • llhttp
  • search_node_add
  • crypto_aes_gcm_argon2
  • ios_notifications
  • log_fmt
  • v2asio
  • fix-msvc
  • message_split
  • meson
  • v3.4.0
  • v3.3.1
  • v3.3.1rc1
  • v3.3.1rc2
  • v3.3.0
  • v3.2.0
  • v3.1.11
  • v3.1.10
  • v3.1.9
  • v3.1.8.2
  • v3.1.8.1
  • v3.1.8
  • v3.1.7
  • v3.1.6
  • v3.1.5
  • v3.1.4
  • v3.1.3
  • v3.1.2
  • v3.1
  • v3.0.1
40 results

op_cache.h

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    op_cache.h 5.96 KiB
    /*
     *  Copyright (C) 2019 Savoir-faire Linux Inc.
     *  Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <https://www.gnu.org/licenses/>.
     */
    #pragma once
    
    #include "value.h"
    #include "value_cache.h"
    #include "listener.h"
    
    namespace dht {
    
    struct OpCacheValueStorage
    {
        Sp<Value> data {};
        unsigned refCount {1};
        OpCacheValueStorage(Sp<Value> val) : data(val) {}
    };
    
    class OpValueCache {
    public:
        OpValueCache(ValueCallback&& cb) noexcept : callback(std::forward<ValueCallback>(cb)) {}
        OpValueCache(OpValueCache&& o) noexcept : values(std::move(o.values)), callback(std::move(o.callback)) {
            o.callback = {};
        }
    
        static ValueCallback cacheCallback(ValueCallback&& cb, std::function<void()>&& onCancel) {
            auto cache = std::make_shared<OpValueCache>(std::forward<ValueCallback>(cb));
            return [cache, onCancel](const std::vector<Sp<Value>>& vals, bool expired){
                auto ret = cache->onValue(vals, expired);
                if (not ret)
                    onCancel();
                return ret;
            };
        }
    
        bool onValue(const std::vector<Sp<Value>>& vals, bool expired) {
            if (expired)
                return onValuesExpired(vals);
            else
                return onValuesAdded(vals);
        }
    
        bool onValuesAdded(const std::vector<Sp<Value>>& vals);
        bool onValuesExpired(const std::vector<Sp<Value>>& vals);
        bool onValuesExpired(const std::vector<Value::Id>& vals);
    
        void onNodeChanged(ListenSyncStatus status) {
            switch (status) {
                case ListenSyncStatus::ADDED: nodes++; break;
                case ListenSyncStatus::REMOVED: nodes--; break;
                case ListenSyncStatus::SYNCED : syncedNodes++; break;
                case ListenSyncStatus::UNSYNCED: syncedNodes--; break;
            }
        }
    
        bool isSynced() const { return nodes > 0 and syncedNodes == nodes; }
    
        std::vector<Sp<Value>> get(const Value::Filter& filter) const;
        Sp<Value> get(Value::Id id) const;
        std::vector<Sp<Value>> getValues() const;
    
    private:
        OpValueCache(const OpValueCache&) = delete;
        OpValueCache& operator=(const OpValueCache&) = delete;
    
        size_t nodes {0};
        size_t syncedNodes {0};
        std::map<Value::Id, OpCacheValueStorage> values {};
        ValueCallback callback;
    };
    
    class OpCache {
    public:
        OpCache() : cache([this](const std::vector<Sp<Value>>& vals, bool expired){
            if (expired)
                onValuesExpired(vals);
            else
                onValuesAdded(vals);
            return true;
        }) {}
    
        bool onValue(const std::vector<Sp<Value>>& vals, bool expired) {
            cache.onValue(vals, expired);
            return not listeners.empty();
        }
        void onNodeChanged(ListenSyncStatus status) {
            cache.onNodeChanged(status);
        }
    
        void onValuesAdded(const std::vector<Sp<Value>>& vals);
        void onValuesExpired(const std::vector<Sp<Value>>& vals);
    
        bool addListener(size_t token, const ValueCallback& cb, const Sp<Query>& q, const Value::Filter& filter) {
            listeners.emplace(token, LocalListener{q, filter, cb});
            auto cached = cache.get(filter);
            if (not cached.empty()) {
                auto ret = cb(cached, false);
                if (not ret)
                    listeners.erase(token);
                return ret;
            }
            return true;
        }
    
        bool removeListener(size_t token, const time_point& now) {
            lastRemoved = now;
            return listeners.erase(token) > 0;
        }
    
        void removeAll() {
            listeners.clear();
        }
    
        bool isDone() {
            return listeners.empty();
        }
    
        std::vector<Sp<Value>> get(const Value::Filter& filter) const {
            return cache.get(filter);
        }
    
        Sp<Value> get(Value::Id id) const {
            return cache.get(id);
        }
    
        bool isSynced() const {
            return cache.isSynced();
        }
    
        bool isExpired(const time_point& now) const {
            return listeners.empty() and (lastRemoved + EXPIRATION < now);
        }
        time_point getExpiration() const;
    
        size_t searchToken;
    private:
        constexpr static const std::chrono::seconds EXPIRATION {60};
        OpCache(const OpCache&) = delete;
        OpCache& operator=(const OpCache&) = delete;
    
        OpValueCache cache;
        std::map<size_t, LocalListener> listeners;
        time_point lastRemoved {clock::now()};
    };
    
    class SearchCache {
    public:
        SearchCache() {}
        SearchCache(SearchCache&&) = default;
    
        using OnListen = std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)>;
        size_t listen(const ValueCallback& get_cb, const Sp<Query>& q, const Value::Filter& filter, const OnListen& onListen);
    
        bool cancelListen(size_t gtoken, const time_point& now);
        void cancelAll(const std::function<void(size_t)>& onCancel);
    
        time_point expire(const time_point& now, const std::function<void(size_t)>& onCancel);
        time_point getExpiration() const {
            return nextExpiration_;
        }
    
        bool get(const Value::Filter& f, const Sp<Query>& q, const GetCallback& gcb, const DoneCallback& dcb) const;
        std::vector<Sp<Value>> get(const Value::Filter& filter) const;
        Sp<Value> get(Value::Id id) const;
    
    private:
        SearchCache(const SearchCache&) = delete;
        SearchCache& operator=(const SearchCache&) = delete;
    
        using OpMap = std::map<Sp<Query>, std::unique_ptr<OpCache>>;
        OpMap ops {};
        OpMap::iterator getOp(const Sp<Query>& q);
        OpMap::const_iterator getOp(const Sp<Query>& q) const;
    
        size_t nextToken_ {1};
        time_point nextExpiration_ {time_point::max()};
    };
    
    }