diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 8bbe2463a378b0bf9555f98dd6e0b0f7a2db6d7f..6e8132f4f31d087903658200aadc03dde5ba01d8 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -447,12 +447,12 @@ private: // Storage void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {}, int version = 0); bool storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa = {}, bool permanent = false); - bool storageErase(const InfoHash& id, Value::Id vid); bool storageRefresh(const InfoHash& id, Value::Id vid); void expireStore(); void expireStorage(InfoHash h); void expireStore(decltype(store)::iterator); + void storageRemoved(const InfoHash& id, Storage& st, const std::vector<Sp<Value>>& values, size_t totalSize); void storageChanged(const InfoHash& id, Storage& st, const Sp<Value>&, bool newValue); std::string printStorageLog(const decltype(store)::value_type&) const; diff --git a/src/dht.cpp b/src/dht.cpp index ddee14e05a887cda1d718236276c3cd702539bf9..3755b4b25fca9b7cfd5598b40308c4ae5531709c 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1194,8 +1194,13 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) }; canceled |= sr_cancel_put(dht4.searches); canceled |= sr_cancel_put(dht6.searches); - if (canceled) - storageErase(id, vid); + if (canceled) { + auto st = store.find(id); + if (st != store.end()) { + if (auto value = st->second.remove(id, vid)) + storageRemoved(id, st->second, {value}, value->size()); + } + } return canceled; } @@ -1294,18 +1299,6 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created return std::get<0>(store); } -bool -Dht::storageErase(const InfoHash& id, Value::Id vid) -{ - auto st = store.find(id); - if (st == store.end()) - return false; - auto ret = st->second.remove(id, vid); - total_store_size += ret.size_diff; - total_values += ret.values_diff; - return ret.values_diff; -} - void Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_id, Query&& query, int version) { @@ -1337,36 +1330,8 @@ Dht::expireStore(decltype(store)::iterator i) const auto& id = i->first; auto& st = i->second; auto stats = st.expire(id, scheduler.time()); - total_store_size += stats.first; - total_values -= stats.second.size(); if (not stats.second.empty()) { - if (logger_) - logger_->d(id, "[store %s] discarded %ld expired values (%ld bytes)", - id.toString().c_str(), stats.second.size(), -stats.first); - - if (not st.listeners.empty()) { - if (logger_) - logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); - - std::vector<Value::Id> ids; - ids.reserve(stats.second.size()); - for (const auto& v : stats.second) - ids.emplace_back(v->id); - - for (const auto& node_listeners : st.listeners) { - for (const auto& l : node_listeners.second) { - if (logger_) - logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending expired", - id.toString().c_str(), - node_listeners.first->toString().c_str()); - Blob ntoken = makeToken(node_listeners.first->getAddr(), false); - network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids, l.second.version); - } - } - } - for (const auto& local_listeners : st.local_listeners) { - local_listeners.second.get_cb(stats.second, true); - } + storageRemoved(id, st, stats.second, -stats.first); } } @@ -1378,6 +1343,41 @@ Dht::expireStorage(InfoHash h) expireStore(i); } +void +Dht::storageRemoved(const InfoHash& id, Storage& st, const std::vector<Sp<Value>>& values, size_t totalSize) +{ + if (logger_) + logger_->d(id, "[store %s] discarded %ld values (%ld bytes)", + id.toString().c_str(), values.size(), totalSize); + + total_store_size -= totalSize; + total_values -= values.size(); + + if (not st.listeners.empty()) { + if (logger_) + logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); + + std::vector<Value::Id> ids; + ids.reserve(values.size()); + for (const auto& v : values) + ids.emplace_back(v->id); + + for (const auto& node_listeners : st.listeners) { + for (const auto& l : node_listeners.second) { + if (logger_) + logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending expired", + id.toString().c_str(), + node_listeners.first->toString().c_str()); + Blob ntoken = makeToken(node_listeners.first->getAddr(), false); + network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids, l.second.version); + } + } + } + for (const auto& local_listeners : st.local_listeners) { + local_listeners.second.get_cb(values, true); + } +} + void Dht::expireStore() { @@ -1414,11 +1414,11 @@ Dht::expireStore() if (storage != store.end()) { if (logger_) logger_->w("Storage quota full: discarding value from %s at %s %016" PRIx64, largest->first.toString().c_str(), exp_value.first.to_c_str(), exp_value.second); - auto ret = storage->second.remove(exp_value.first, exp_value.second); - total_store_size += ret.size_diff; - total_values += ret.values_diff; - if (ret.values_diff) + + if (auto value = storage->second.remove(exp_value.first, exp_value.second)) { + storageRemoved(storage->first, storage->second, {value}, value->size()); break; + } } } } @@ -1668,7 +1668,7 @@ Dht::dumpTables() const std::string Dht::getStorageLog() const { - std::stringstream out; + std::ostringstream out; for (const auto& s : store) out << printStorageLog(s); out << std::endl << std::endl; diff --git a/src/storage.h b/src/storage.h index ef4e2f048c93d58ae7236248285437d4a587061f..bff04ab5cdcdcdb56a7f0faeefbb659e79dbeba0 100644 --- a/src/storage.h +++ b/src/storage.h @@ -177,7 +177,7 @@ struct Storage { local_listeners.erase(token); } - StoreDiff remove(const InfoHash& id, Value::Id); + Sp<Value> remove(const InfoHash& id, Value::Id); std::pair<ssize_t, std::vector<Sp<Value>>> expire(const InfoHash& id, time_point now); @@ -246,7 +246,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t return std::make_pair(nullptr, StoreDiff{}); } -Storage::StoreDiff +Sp<Value> Storage::remove(const InfoHash& id, Value::Id vid) { auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { @@ -260,8 +260,9 @@ Storage::remove(const InfoHash& id, Value::Id vid) if (it->expiration_job) it->expiration_job->cancel(); total_size -= size; + auto value = it->data; values.erase(it); - return {-size, -1, 0}; + return value; } Storage::StoreDiff