diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 8d1303db2cce33b757629cc2cfa781e4e970542f..736748ba279ddf96d2c21ae2008e6316568b2199 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -357,6 +357,7 @@ private: struct LocalListener; struct Search; struct ValueStorage; + struct StorageBucket; struct Listener; struct Storage; @@ -384,6 +385,7 @@ private: RoutingTable buckets6 {}; std::map<InfoHash, Storage> store; + std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota; size_t total_values {0}; size_t total_store_size {0}; size_t max_store_size {DEFAULT_STORAGE_LIMIT}; @@ -420,10 +422,11 @@ private: // Storage void storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t tid, Query&& = {}); - bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created); + bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, const SockAddr* sa = nullptr); void expireStorage(); void storageChanged(const InfoHash& id, Storage& st, ValueStorage&); std::string printStorageLog(const decltype(store)::value_type&) const; + void printStorageQuota(std::ostream& out, const decltype(store_quota)::value_type& ip) const; /** * For a given storage, if values don't belong there anymore because this diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index eaa11ef0a3ba80d6580ba93770fcb96080adfc21..c15da00fae40db48c246ef924b0cca885a8d01a8 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -595,7 +595,7 @@ private: }; // global limiting should be triggered by at least 8 different IPs using IpLimiter = RateLimiter<MAX_REQUESTS_PER_SEC/8>; - using IpLimiterMap = std::map<SockAddr, IpLimiter, cmpSockAddr>; + using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>; IpLimiterMap address_rate_limiter {}; RateLimiter<MAX_REQUESTS_PER_SEC> rate_limiter {}; diff --git a/include/opendht/sockaddr.h b/include/opendht/sockaddr.h index 2381920e3ba222d39256ec5da48de7c411d07651..758c27db87392c3d236e0c37f9e5fca2e2b3db12 100644 --- a/include/opendht/sockaddr.h +++ b/include/opendht/sockaddr.h @@ -68,6 +68,36 @@ public: return print_addr(first, second); } sa_family_t getFamily() const { return second > sizeof(sa_family_t) ? first.ss_family : AF_UNSPEC; } + + /** + * A comparator to classify IP addresses, only considering the + * first 64 bits in IPv6. + */ + struct ipCmp { + bool operator()(const SockAddr& a, const SockAddr& b) { + if (a.second != b.second) + return a.second < b.second; + socklen_t start, len; + switch(a.getFamily()) { + case AF_INET: + start = offsetof(sockaddr_in, sin_addr); + len = sizeof(in_addr); + break; + case AF_INET6: + start = offsetof(sockaddr_in6, sin6_addr); + // don't consider more than 64 bits (IPv6) + len = 8; + break; + default: + start = 0; + len = a.second; + break; + } + + return std::memcmp((uint8_t*)&a.first+start, (uint8_t*)&b.first+start, len) < 0; + } + }; + }; OPENDHT_PUBLIC bool operator==(const SockAddr& a, const SockAddr& b); diff --git a/src/dht.cpp b/src/dht.cpp index 28af1858dcdfcbb347ca245e24221b230d112587..1ea19e89231574260455170c4256e96276388ed5 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -107,12 +107,43 @@ struct Dht::Listener { } }; +/** + * Tracks storage usage per IP or IP range + */ +class Dht::StorageBucket { +public: + void insert(const InfoHash& id, const Value& value, time_point expiration) { + totalSize_ += value.size(); + storedValues_.emplace(expiration, std::pair<InfoHash, Value::Id>(id, value.id)); + } + void erase(const InfoHash& id, const Value& value, time_point expiration) { + auto size = value.size(); + totalSize_ -= size; + auto range = storedValues_.equal_range(expiration); + for (auto rit = range.first; rit != range.second;) { + if (rit->second.first == id && rit->second.second == value.id) { + storedValues_.erase(rit); + break; + } else + ++rit; + } + } + size_t size() const { return totalSize_; } + std::pair<InfoHash, Value::Id> getOldest() const { return storedValues_.begin()->second; } +private: + std::multimap<time_point, std::pair<InfoHash, Value::Id>> storedValues_; + size_t totalSize_ {0}; +}; + struct Dht::ValueStorage { std::shared_ptr<Value> data {}; time_point created {}; + time_point expiration {}; + StorageBucket* store_bucket {nullptr}; ValueStorage() {} - ValueStorage(const std::shared_ptr<Value>& v, time_point t) : data(v), created(t) {} + ValueStorage(const std::shared_ptr<Value>& v, time_point t, time_point e) + : data(v), created(t), expiration(e) {} }; struct Dht::Storage { @@ -180,7 +211,7 @@ struct Dht::Storage { * change_value_num: change of value number (0 or 1) */ std::tuple<ValueStorage*, ssize_t, ssize_t> - store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left); + store(const InfoHash& id, const std::shared_ptr<Value>&, time_point created, time_point expiration, StorageBucket*); /** * Refreshes the time point of the value's lifetime begining. @@ -198,7 +229,9 @@ struct Dht::Storage { return false; } - std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now); + std::pair<ssize_t, ssize_t> remove(const InfoHash& id, Value::Id); + + std::pair<ssize_t, ssize_t> expire(const InfoHash& id, time_point now); private: Storage(const Storage&) = delete; @@ -2225,17 +2258,14 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) } bool -Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created) +Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, const SockAddr* sa) { const auto& now = scheduler.time(); - auto expiration = created + getType(value->type).expiration; - if (expiration < now) { - using namespace std::chrono; - DHT_LOG.w(id, "[store %s] won't store already expired value (created %lld s ago, expired %lld s ago)", - id.toString().c_str(), duration_cast<seconds>(now - created).count(), - duration_cast<seconds>(now - expiration).count()); + created = std::min(created, now); + auto expiration = created + getType(value->id).expiration; + + if ( expiration < now ) return false; - } auto st = store.find(id); if (st == store.end()) { @@ -2247,45 +2277,80 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ scheduler.add(st->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id)); } - auto store = st->second.store(value, created, max_store_size - total_store_size); - if (std::get<0>(store)) { + StorageBucket* store_bucket {nullptr}; + if (sa) + store_bucket = &store_quota.emplace(*sa, StorageBucket{}).first->second; + + auto store = st->second.store(id, value, created, expiration, store_bucket); + if (auto vs = std::get<0>(store)) { total_store_size += std::get<1>(store); total_values += std::get<2>(store); - storageChanged(id, st->second, *std::get<0>(store)); + if (total_store_size > max_store_size) { + expireStorage(); + } + storageChanged(id, st->second, *vs); } return std::get<0>(store); } std::tuple<Dht::ValueStorage*, ssize_t, ssize_t> -Dht::Storage::store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left) { - +Dht::Storage::store(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, time_point expiration, StorageBucket* sb) +{ auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { return vr.data == value || vr.data->id == value->id; }); + size_t size_new = value->size(); if (it != values.end()) { /* Already there, only need to refresh */ it->created = created; - ssize_t size_diff = value->size() - it->data->size(); - if (size_diff <= size_left and it->data != value) { - //DHT_LOG_DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); + size_t size_old = it->data->size(); + ssize_t size_diff = size_new - (ssize_t)size_old; + if (it->data != value) { + //DHT_LOG.DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); + // clear quota for previous value + if (it->store_bucket) + it->store_bucket->erase(id, *value, it->expiration); + it->expiration = expiration; + // update quota for new value + it->store_bucket = sb; + if (sb) + sb->insert(id, *value, expiration); it->data = value; total_size += size_diff; return std::make_tuple(&(*it), size_diff, 0); } return std::make_tuple(nullptr, 0, 0); } else { - //DHT_LOG_DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); - ssize_t size = value->size(); - if (size <= size_left and values.size() < MAX_VALUES) { - total_size += size; - values.emplace_back(value, created); - return std::make_tuple(&values.back(), size, 1); + //DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); + if (values.size() < MAX_VALUES) { + total_size += size_new; + values.emplace_back(value, created, expiration); + values.back().store_bucket = sb; + if (sb) + sb->insert(id, *value, expiration); + return std::make_tuple(&values.back(), size_new, 1); } return std::make_tuple(nullptr, 0, 0); } } +std::pair<ssize_t, ssize_t> +Dht::Storage::remove(const InfoHash& id, Value::Id vid) +{ + auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { + return vr.data->id == vid; + }); + if (it == values.end()) + return {0, 0}; + ssize_t size = it->data->size(); + if (it->store_bucket) + it->store_bucket->erase(id, *it->data, it->expiration); + total_size -= size; + values.erase(it); + return {-size, -1}; +} + std::pair<ssize_t, ssize_t> Dht::Storage::clear() { @@ -2325,8 +2390,11 @@ void Dht::expireStorage() { const auto& now = scheduler.time(); + + // removing expired values auto i = store.begin(); while (i != store.end()) { + // iterate over node listener entries for (auto nl_it = i->second.listeners.begin(); nl_it != i->second.listeners.end();) { auto& node_listeners = nl_it->second; for (auto l = node_listeners.cbegin(); l != node_listeners.cend();) { @@ -2345,9 +2413,12 @@ Dht::expireStorage() ++nl_it; } - auto stats = i->second.expire(types, now); + auto stats = i->second.expire(i->first, now); total_store_size += stats.first; total_values += stats.second; + if (stats.second) { + DHT_LOG.d(i->first, "[store %s] discarded %ld expired values (%ld bytes)", i->first.toString().c_str(), -stats.second, -stats.first); + } if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) { DHT_LOG.d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str()); @@ -2356,24 +2427,48 @@ Dht::expireStorage() else ++i; } + + // remove more values if storage limit is exceeded + while (total_store_size > max_store_size) { + // find IP using the most storage + if (store_quota.empty()) { + DHT_LOG.w("No space left: local data consumes all the quota!"); + break; + } + decltype(store_quota)::iterator largest = store_quota.end(); + for (auto it = store_quota.begin(); it != store_quota.end(); ++it) { + if (largest == store_quota.end() or it->second.size() > largest->second.size()) + largest = it; + } + DHT_LOG.w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str()); + while (true) { + auto exp_value = largest->second.getOldest(); + auto storage = store.find(exp_value.first); + if (storage != store.end()) { + auto ret = storage->second.remove(exp_value.first, exp_value.second); + total_store_size += ret.first; + total_values += ret.second; + DHT_LOG.w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size); + if (ret.second) + break; + } else + std::cout << "exp_value not found " << exp_value.first << std::endl; + } + } } std::pair<ssize_t, ssize_t> -Dht::Storage::expire(const std::map<ValueType::Id, ValueType>& types, time_point now) +Dht::Storage::expire(const InfoHash& id, time_point now) { auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) { - if (!v.data) return false; // should not happen - auto type_it = types.find(v.data->type); - const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; - bool expired = v.created + type.expiration < now; - //if (expired) - // DHT_LOG_DEBUG("Discarding expired value %s", v.data->toString().c_str()); - return !expired; + return v.expiration > now; }); ssize_t del_num = std::distance(r, values.end()); ssize_t size_diff {}; - std::for_each(r, values.end(), [&](const ValueStorage& v){ + std::for_each(r, values.end(), [&](const ValueStorage& v) { size_diff -= v.data->size(); + if (v.store_bucket) + v.store_bucket->erase(id, *v.data, v.expiration); }); total_size += size_diff; values.erase(r, values.end()); @@ -2633,13 +2728,28 @@ Dht::dumpTables() const DHT_LOG.d("%s", out.str().c_str()); } +void +Dht::printStorageQuota(std::ostream& out, const decltype(store_quota)::value_type& ip) const +{ + out << "IP " << ip.first.toString() << " uses " << ip.second.size() << " bytes" << std::endl; +} + std::string Dht::getStorageLog() const { std::stringstream out; for (const auto& s : store) out << printStorageLog(s); - out << "Total " << store.size() << " storages, " << total_values << " values (" << (total_store_size/1024) << " KB)" << std::endl; + out << "Total " << store.size() << " storages, " << total_values << " values ("; + if (total_store_size < 1024) + out << total_store_size << " bytes)"; + else + out << (total_store_size/1024) << " KB)"; + out << std::endl << std::endl; + for (const auto& ip : store_quota) { + printStorageQuota(out, ip); + } + out << std::endl; return out.str(); } @@ -2742,7 +2852,8 @@ Dht::Dht() : store(), scheduler(DHT_LOG), network_engine(DHT_LOG, scheduler) {} Dht::Dht(int s, int s6, Config config) : myid(config.node_id), is_bootstrap(config.is_bootstrap), - maintain_storage(config.maintain_storage), store(), scheduler(DHT_LOG), + maintain_storage(config.maintain_storage), store(), store_quota(), + scheduler(DHT_LOG), network_engine(myid, config.network, s, s6, DHT_LOG, scheduler, std::bind(&Dht::onError, this, _1, _2), std::bind(&Dht::onNewNode, this, _1, _2), @@ -3377,7 +3488,7 @@ Dht::onAnnounce(std::shared_ptr<Node> node, if (type.editPolicy(hash, lv, vc, node->id, (sockaddr*)&node->addr.first, node->addr.second)) { DHT_LOG.d(hash, node->id, "[store %s] editing %s", hash.toString().c_str(), vc->toString().c_str()); - storageStore(hash, vc, created); + storageStore(hash, vc, created, &node->addr); } else { DHT_LOG.d(hash, node->id, "[store %s] rejecting edition of %s because of storage policy", hash.toString().c_str(), vc->toString().c_str()); @@ -3388,7 +3499,7 @@ Dht::onAnnounce(std::shared_ptr<Node> node, const auto& type = getType(vc->type); if (type.storePolicy(hash, vc, node->id, (sockaddr*)&node->addr.first, node->addr.second)) { DHT_LOG.d(hash, node->id, "[store %s] storing %s", hash.toString().c_str(), vc->toString().c_str()); - storageStore(hash, vc, created); + storageStore(hash, vc, created, &node->addr); } else { DHT_LOG.d(hash, node->id, "[store %s] rejecting storage of %s", hash.toString().c_str(), vc->toString().c_str());