Select Git revision
Adrien Béraud authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
storage.h 10.45 KiB
/*
* Copyright (C) 2014-2022 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include "infohash.h"
#include "value.h"
#include "listener.h"
#include <map>
#include <utility>
namespace dht {
/**
* Tracks storage usage per IP or IP range
*/
class StorageBucket {
public:
void insert(const InfoHash& id, const Value& value, time_point expiration) {
totalSize_ += value.size();
storedValues_.emplace(expiration, std::pair<InfoHash, Value::Id>(id, value.id));
}
void erase(const InfoHash& id, const Value& value, time_point expiration) {
auto range = storedValues_.equal_range(expiration);
for (auto rit = range.first; rit != range.second;) {
if (rit->second.first == id && rit->second.second == value.id) {
totalSize_ -= value.size();
storedValues_.erase(rit);
return;
} else
++rit;
}
// printf("StorageBucket::erase can't find value %s %016" PRIx64 "\n", id.to_c_str(), value.id);
}
void refresh(const InfoHash& id, const Value& value, time_point old_expiration, time_point expiration) {
auto range = storedValues_.equal_range(old_expiration);
for (auto rit = range.first; rit != range.second;) {
if (rit->second.first == id && rit->second.second == value.id) {
storedValues_.erase(rit);
storedValues_.emplace(expiration, std::pair<InfoHash, Value::Id>(id, value.id));
return;
} else
++rit;
}
// printf("StorageBucket::refresh can't find value %s %016" PRIx64 "\n", id.to_c_str(), value.id);
insert(id, value, expiration);
}
size_t size() const { return totalSize_; }
std::pair<InfoHash, Value::Id> getOldest() const { return storedValues_.empty() ? std::pair<InfoHash, Value::Id>{} : storedValues_.begin()->second; }
private:
std::multimap<time_point, std::pair<InfoHash, Value::Id>> storedValues_;
size_t totalSize_ {0};
};
struct ValueStorage {
Sp<Value> data {};
time_point created {};
time_point expiration {};
Sp<Scheduler::Job> expiration_job {};
StorageBucket* store_bucket {nullptr};
ValueStorage() {}
ValueStorage(const Sp<Value>& v, time_point t, time_point e)
: data(v), created(t), expiration(e) {}
};
struct Storage {
time_point maintenance_time {};
std::map<Sp<Node>, std::map<size_t, Listener>> listeners;
std::map<size_t, LocalListener> local_listeners {};
size_t listener_token {1};
/* The maximum number of values we store for a given hash. */
static constexpr unsigned MAX_VALUES {64 * 1024};
/**
* Changes caused by an operation on the storage.
*/
struct StoreDiff {
/** Difference in stored size caused by the op */
ssize_t size_diff;
/** Difference in number of values */
ssize_t values_diff;
/** Difference in number of listeners */
ssize_t listeners_diff;
};
Storage() {}
Storage(time_point t) : maintenance_time(t) {}
Storage(Storage&& o) noexcept = default;
Storage& operator=(Storage&& o) = default;
bool empty() const {
return values.empty();
}
StoreDiff clear();
size_t valueCount() const {
return values.size();
}
size_t totalSize() const {
return total_size;
}
const std::vector<ValueStorage>& getValues() const { return values; }
Sp<Value> getById(Value::Id vid) const {
for (auto& v : values)
if (v.data->id == vid) return v.data;
return {};
}
std::vector<Sp<Value>> get(const Value::Filter& f = {}) const {
std::vector<Sp<Value>> newvals {};
if (not f) newvals.reserve(values.size());
for (auto& v : values) {
if (not f || f(*v.data))
newvals.push_back(v.data);
}
return newvals;
}
/**
* Stores a new value in this storage, or replace a previous value
*
* @return <storage, change_size, change_value_num>
* storage: set if a change happened
* change_size: size difference
* change_value_num: change of value number (0 or 1)
*/
std::pair<ValueStorage*, StoreDiff>
store(const InfoHash& id, const Sp<Value>&, time_point created, time_point expiration, StorageBucket*);
/**
* Refreshes the time point of the value's lifetime begining.
*
* @param now The reference to now
* @param vid The value id
* @return time of the next expiration, time_point::max() if no expiration
*/
std::pair<ValueStorage*, time_point>
refresh(const InfoHash& id, const time_point& now, const Value::Id& vid, const TypeStore& types) {
for (auto& vs : values)
if (vs.data->id == vid) {
vs.created = now;
auto oldExp = vs.expiration;
vs.expiration = std::max(oldExp, now + types.getType(vs.data->type).expiration);
if (vs.store_bucket)
vs.store_bucket->refresh(id, *vs.data, oldExp, vs.expiration);
return {&vs, vs.expiration};
}
return {nullptr, time_point::max()};
}
size_t listen(ValueCallback& cb, Value::Filter& f, const Sp<Query>& q);
void cancelListen(size_t token) {
local_listeners.erase(token);
}
Sp<Value> remove(const InfoHash& id, Value::Id);
std::pair<ssize_t, std::vector<Sp<Value>>> expire(const InfoHash& id, time_point now);
private:
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
std::vector<ValueStorage> values {};
size_t total_size {};
};
size_t
Storage::listen(ValueCallback& gcb, Value::Filter& filter, const Sp<Query>& query)
{
if (not empty()) {
std::vector<Sp<Value>> newvals = get(filter);
if (not newvals.empty()) {
if (!gcb(newvals, false))
return 0;
}
}
auto tokenlocal = ++listener_token;
local_listeners.emplace(tokenlocal, LocalListener{query, filter, gcb});
return tokenlocal;
}
std::pair<ValueStorage*, Storage::StoreDiff>
Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, time_point expiration, StorageBucket* sb)
{
auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) {
return vr.data == value || vr.data->id == value->id;
});
ssize_t size_new = value->size();
if (it != values.end()) {
/* Already there, only need to refresh */
it->created = created;
if (it->data != value) {
size_t size_old = it->data->size();
ssize_t size_diff = size_new - (ssize_t)size_old;
//DHT_LOG.DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str());
// clear quota for previous value
if (it->store_bucket)
it->store_bucket->erase(id, *value, it->expiration);
it->expiration = expiration;
// update quota for new value
it->store_bucket = sb;
if (sb)
sb->insert(id, *value, expiration);
it->data = value;
total_size += size_diff;
return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0});
}
} else {
//DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str());
if (values.size() < MAX_VALUES) {
total_size += size_new;
values.emplace_back(value, created, expiration);
values.back().store_bucket = sb;
if (sb)
sb->insert(id, *value, expiration);
return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0});
}
}
return std::make_pair(nullptr, StoreDiff{});
}
Sp<Value>
Storage::remove(const InfoHash& id, Value::Id vid)
{
auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) {
return vr.data->id == vid;
});
if (it == values.end())
return {};
ssize_t size = it->data->size();
if (it->store_bucket)
it->store_bucket->erase(id, *it->data, it->expiration);
if (it->expiration_job)
it->expiration_job->cancel();
total_size -= size;
auto value = it->data;
values.erase(it);
return value;
}
Storage::StoreDiff
Storage::clear()
{
ssize_t num_values = values.size();
ssize_t tot_size = total_size;
values.clear();
total_size = 0;
return {-tot_size, -num_values, 0};
}
std::pair<ssize_t, std::vector<Sp<Value>>>
Storage::expire(const InfoHash& id, time_point now)
{
// expire listeners
ssize_t del_listen {0};
for (auto nl_it = listeners.begin(); nl_it != listeners.end();) {
auto& node_listeners = nl_it->second;
for (auto l = node_listeners.cbegin(); l != node_listeners.cend();) {
bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now;
if (expired)
l = node_listeners.erase(l);
else
++l;
}
if (node_listeners.empty()) {
nl_it = listeners.erase(nl_it);
del_listen--;
}
else
++nl_it;
}
// expire values
auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) {
return v.expiration > now;
});
std::vector<Sp<Value>> ret;
ret.reserve(std::distance(r, values.end()));
ssize_t size_diff {0};
std::for_each(r, values.end(), [&](const ValueStorage& v) {
size_diff -= v.data->size();
if (v.store_bucket)
v.store_bucket->erase(id, *v.data, v.expiration);
if (v.expiration_job)
v.expiration_job->cancel();
ret.emplace_back(std::move(v.data));
});
total_size += size_diff;
values.erase(r, values.end());
return {size_diff, std::move(ret)};
}
}