diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 1a7a991e4634f0533322e24ac50797e6e2ebbd59..c4f1b388bf076cd31d230e0d7942cd0c0765c8c0 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -423,7 +423,10 @@ private: // Storage void storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t tid, Query&& = {}); bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, const SockAddr* sa = nullptr); - void expireStorage(); + void expireStore(); + void expireStorage(InfoHash h); + void expireStore(decltype(store)::iterator); + void storageChanged(const InfoHash& id, Storage& st, ValueStorage&); std::string printStorageLog(const decltype(store)::value_type&) const; void printStorageQuota(std::ostream& out, const decltype(store_quota)::value_type& ip) const; diff --git a/src/dht.cpp b/src/dht.cpp index 3c5b91e8bba47da108457c4b3fe0babc6beb73b8..5dd3e3a62568debd0d2ecd868054704356c5c96d 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -152,6 +152,18 @@ struct Dht::Storage { std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; + /** + * Changes caused by an operation on the storage. + */ + struct StoreDiff { + /** Difference in stored size caused by the op */ + ssize_t size_diff; + /** Difference in number of values */ + ssize_t values_diff; + /** Difference in number of listeners */ + ssize_t listeners_diff; + }; + Storage() {} Storage(time_point now) : maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} @@ -174,7 +186,7 @@ struct Dht::Storage { return values.empty(); } - std::pair<ssize_t, ssize_t> clear(); + StoreDiff clear(); size_t valueCount() const { return values.size(); @@ -210,7 +222,7 @@ struct Dht::Storage { * change_size: size difference * change_value_num: change of value number (0 or 1) */ - std::tuple<ValueStorage*, ssize_t, ssize_t> + std::pair<ValueStorage*, StoreDiff> store(const InfoHash& id, const std::shared_ptr<Value>&, time_point created, time_point expiration, StorageBucket*); /** @@ -229,9 +241,9 @@ struct Dht::Storage { return false; } - std::pair<ssize_t, ssize_t> remove(const InfoHash& id, Value::Id); + StoreDiff remove(const InfoHash& id, Value::Id); - std::pair<ssize_t, ssize_t> expire(const InfoHash& id, time_point now); + StoreDiff expire(const InfoHash& id, time_point now); private: Storage(const Storage&) = delete; @@ -2294,11 +2306,12 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ store_bucket = &store_quota.emplace(*sa, StorageBucket{}).first->second; auto store = st->second.store(id, value, created, expiration, store_bucket); - if (auto vs = std::get<0>(store)) { - total_store_size += std::get<1>(store); - total_values += std::get<2>(store); + if (auto vs = store.first) { + total_store_size += store.second.size_diff; + total_values += store.second.values_diff; + scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id)); if (total_store_size > max_store_size) { - expireStorage(); + expireStore(); } storageChanged(id, st->second, *vs); } @@ -2306,13 +2319,13 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ return std::get<0>(store); } -std::tuple<Dht::ValueStorage*, ssize_t, ssize_t> +std::pair<Dht::ValueStorage*, Dht::Storage::StoreDiff> Dht::Storage::store(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, time_point expiration, StorageBucket* sb) { auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { return vr.data == value || vr.data->id == value->id; }); - size_t size_new = value->size(); + ssize_t size_new = value->size(); if (it != values.end()) { /* Already there, only need to refresh */ it->created = created; @@ -2330,9 +2343,9 @@ Dht::Storage::store(const InfoHash& id, const std::shared_ptr<Value>& value, tim sb->insert(id, *value, expiration); it->data = value; total_size += size_diff; - return std::make_tuple(&(*it), size_diff, 0); + return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0}); } - return std::make_tuple(nullptr, 0, 0); + return std::make_pair(nullptr, StoreDiff{}); } else { //DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); if (values.size() < MAX_VALUES) { @@ -2341,36 +2354,36 @@ Dht::Storage::store(const InfoHash& id, const std::shared_ptr<Value>& value, tim values.back().store_bucket = sb; if (sb) sb->insert(id, *value, expiration); - return std::make_tuple(&values.back(), size_new, 1); + return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0}); } - return std::make_tuple(nullptr, 0, 0); + return std::make_pair(nullptr, StoreDiff{}); } } -std::pair<ssize_t, ssize_t> +Dht::Storage::StoreDiff Dht::Storage::remove(const InfoHash& id, Value::Id vid) { auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { return vr.data->id == vid; }); if (it == values.end()) - return {0, 0}; + return {}; ssize_t size = it->data->size(); if (it->store_bucket) it->store_bucket->erase(id, *it->data, it->expiration); total_size -= size; values.erase(it); - return {-size, -1}; + return {-size, -1, 0}; } -std::pair<ssize_t, ssize_t> +Dht::Storage::StoreDiff Dht::Storage::clear() { ssize_t num_values = values.size(); ssize_t tot_size = total_size; values.clear(); total_size = 0; - return std::make_pair(-tot_size, -num_values); + return {-tot_size, -num_values, 0}; } void @@ -2399,38 +2412,31 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s } void -Dht::expireStorage() +Dht::expireStore(decltype(store)::iterator i) { - const auto& now = scheduler.time(); + auto stats = i->second.expire(i->first, scheduler.time()); + total_store_size += stats.size_diff; + total_values += stats.values_diff; + if (stats.values_diff) { + DHT_LOG.d(i->first, "[store %s] discarded %ld expired values (%ld bytes)", i->first.toString().c_str(), -stats.values_diff, -stats.size_diff); + } +} + +void +Dht::expireStorage(InfoHash h) +{ + auto i = store.find(h); + if (i != store.end()) + expireStore(i); +} +void +Dht::expireStore() +{ // removing expired values auto i = store.begin(); while (i != store.end()) { - // iterate over node listener entries - for (auto nl_it = i->second.listeners.begin(); nl_it != i->second.listeners.end();) { - auto& node_listeners = nl_it->second; - for (auto l = node_listeners.cbegin(); l != node_listeners.cend();) { - bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; - if (expired) { - DHT_LOG.d(i->first, nl_it->first->id, "[store %s] [node %s] discarding expired listener", - i->first.toString().c_str(), - nl_it->first->toString().c_str()); - l = node_listeners.erase(l); - } else - ++l; - } - if (node_listeners.empty()) - nl_it = i->second.listeners.erase(nl_it); - else - ++nl_it; - } - - auto stats = i->second.expire(i->first, now); - total_store_size += stats.first; - total_values += stats.second; - if (stats.second) { - DHT_LOG.d(i->first, "[store %s] discarded %ld expired values (%ld bytes)", i->first.toString().c_str(), -stats.second, -stats.first); - } + expireStore(i); if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) { DHT_LOG.d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str()); @@ -2458,10 +2464,10 @@ Dht::expireStorage() auto storage = store.find(exp_value.first); if (storage != store.end()) { auto ret = storage->second.remove(exp_value.first, exp_value.second); - total_store_size += ret.first; - total_values += ret.second; + total_store_size += ret.size_diff; + total_values += ret.values_diff; DHT_LOG.w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size); - if (ret.second) + if (ret.values_diff) break; } else std::cout << "exp_value not found " << exp_value.first << std::endl; @@ -2469,13 +2475,33 @@ Dht::expireStorage() } } -std::pair<ssize_t, ssize_t> +Dht::Storage::StoreDiff Dht::Storage::expire(const InfoHash& id, time_point now) { + // expire listeners + ssize_t del_listen {0}; + for (auto nl_it = listeners.begin(); nl_it != listeners.end();) { + auto& node_listeners = nl_it->second; + for (auto l = node_listeners.cbegin(); l != node_listeners.cend();) { + bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; + if (expired) + l = node_listeners.erase(l); + else + ++l; + } + if (node_listeners.empty()) { + nl_it = listeners.erase(nl_it); + del_listen--; + } + else + ++nl_it; + } + + // expire values auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) { return v.expiration > now; }); - ssize_t del_num = std::distance(r, values.end()); + ssize_t del_num = -std::distance(r, values.end()); ssize_t size_diff {}; std::for_each(r, values.end(), [&](const ValueStorage& v) { size_diff -= v.data->size(); @@ -2484,7 +2510,7 @@ Dht::Storage::expire(const InfoHash& id, time_point now) }); total_size += size_diff; values.erase(r, values.end()); - return {size_diff, -del_num}; + return {size_diff, del_num, del_listen}; } void @@ -3055,8 +3081,8 @@ Dht::maintainStorage(decltype(store)::value_type& storage, bool force, DoneCallb if (not want4 and not want6) { DHT_LOG.d(storage.first, "Discarding storage values %s", storage.first.toString().c_str()); auto diff = storage.second.clear(); - total_store_size += diff.first; - total_values += diff.second; + total_store_size += diff.size_diff; + total_values += diff.values_diff; } return announce_per_af; @@ -3099,7 +3125,7 @@ Dht::expire() expireBuckets(buckets4); expireBuckets(buckets6); - expireStorage(); + expireStore(); expireSearches(); scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this)); }