diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 676c3bbc471f9c6b6d21ee8f413c6ec26dfd7031..dd6ae5faca64ca91bfb8800327570fcd03d81d85 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -138,6 +138,8 @@ public: typedef bool (*GetCallbackRaw)(std::shared_ptr<Value>, void *user_data); + static constexpr size_t DEFAULT_STORAGE_LIMIT {1024 * 1024 * 64}; + static GetCallbackSimple bindGetCb(GetCallbackRaw raw_cb, void* user_data) { if (not raw_cb) return {}; @@ -360,6 +362,21 @@ public: return stats; } + /** + * Set the in-memory storage limit in bytes + */ + void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) { + max_store_size = limit; + } + + /** + * Returns the total memory usage of stored values and the number + * of stored values. + */ + std::pair<size_t, size_t> getStoreSize() const { + return {total_store_size, total_values}; + } + /* This must be provided by the user. */ static bool isBlacklisted(const sockaddr*, socklen_t) { return false; } @@ -411,7 +428,7 @@ private: static constexpr long unsigned MAX_REQUESTS_PER_SEC {1600}; - static constexpr unsigned TOKEN_SIZE {64}; + static constexpr size_t TOKEN_SIZE {64}; static const std::string my_v; @@ -593,27 +610,29 @@ private: /** * A search is a pointer to the nodes we think are responsible * for storing values for a given hash. - * - * A Search has 3 states: - * - Idle (nothing to do) - * - Syncing (Some nodes not synced) - * - Announcing (Some announces not performed on all nodes) */ struct Search { InfoHash id {}; sa_family_t af; uint16_t tid; - time_point step_time {time_point::min()}; /* the time of the last search_step */ - time_point get_step_time {time_point::min()}; /* the time of the last get time */ + time_point step_time {time_point::min()}; /* the time of the last search step */ + time_point get_step_time {time_point::min()}; /* the time of the last get step */ bool expired {false}; /* no node, or all nodes expired */ bool done {false}; /* search is over, cached for later */ bool refilled {false}; + + /* search routing table */ std::vector<SearchNode> nodes {}; + + /* pending puts */ std::vector<Announce> announce {}; + + /* pending gets */ std::vector<Get> callbacks {}; + /* listeners */ std::map<size_t, LocalListener> listeners {}; size_t listener_token = 1; @@ -694,13 +713,61 @@ private: struct Storage { InfoHash id; time_point maintenance_time {}; - std::vector<ValueStorage> values {}; std::vector<Listener> listeners {}; std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; Storage() {} Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} + + bool empty() const { + return values.empty(); + } + + void clear(); + + size_t valueCount() const { + return values.size(); + } + + size_t totalSize() const { + return total_size; + } + + const std::vector<ValueStorage>& getValues() const { return values; } + + std::vector<std::shared_ptr<Value>> get(Value::Filter f = {}) const { + std::vector<std::shared_ptr<Value>> newvals {}; + if (not f) newvals.reserve(values.size()); + for (auto& v : values) { + if (not f || f(*v.data)) + newvals.push_back(v.data); + } + return newvals; + } + + std::shared_ptr<Value> get(Value::Id vid) const { + for (auto& v : values) + if (v.data->id == vid) return v.data; + return {}; + } + + /** + * Stores a new value in this storage, or replace a previous value + * + * @return <storage, change_size, change_value_num> + * storage: set if a change happened + * change_size: size difference + * change_value_num: change of value number (0 or 1) + */ + std::tuple<ValueStorage*, ssize_t, ssize_t> + store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left); + + std::pair<ssize_t, ssize_t> expire(const std::map<ValueType::Id, ValueType>& types, time_point now); + + private: + std::vector<ValueStorage> values {}; + size_t total_size {}; }; enum class MessageType { @@ -783,7 +850,12 @@ private: // the stuff RoutingTable buckets {}; RoutingTable buckets6 {}; + std::vector<Storage> store {}; + size_t total_values {0}; + size_t total_store_size {0}; + size_t max_store_size {DEFAULT_STORAGE_LIMIT}; + std::list<Search> searches {}; uint16_t search_id {0}; @@ -879,7 +951,7 @@ private: } void storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, uint16_t tid); - ValueStorage* storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created); + bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created); void expireStorage(); void storageChanged(Storage& st, ValueStorage&); diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 4ca5fa39ccaf2caf2f314f54d5e96d593f1f97c5..da57404bc5015b3135dc88b7e27f3aafeb0683ec 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -237,6 +237,20 @@ public: return ntohs(((sockaddr_in*)&getBound(f).first)->sin_port); } + std::pair<size_t, size_t> getStoreSize() const { + std::lock_guard<std::mutex> lck(dht_mtx); + if (!dht_) + return {}; + return dht_->getStoreSize(); + } + + void setStorageLimit(size_t limit = Dht::DEFAULT_STORAGE_LIMIT) { + std::lock_guard<std::mutex> lck(dht_mtx); + if (!dht_) + throw std::runtime_error("dht is not running"); + return dht_->setStorageLimit(limit); + } + std::vector<NodeExport> exportNodes() const { std::lock_guard<std::mutex> lck(dht_mtx); if (!dht_) diff --git a/include/opendht/value.h b/include/opendht/value.h index 6cf4e3517b9389228a43e453e5a7c401d2f1f08f..5211450004edd00c5593d74aa902d1e2b9fcab66 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -331,6 +331,9 @@ struct Value return ss.str(); } + /** Return the size in bytes used by this value in memory (minimum). */ + size_t size() const; + template <typename Packer> void msgpack_pack_to_sign(Packer& pk) const { diff --git a/src/dht.cpp b/src/dht.cpp index 91d35720bae60feef2cdc72e0fffd5f9901e703d..310d5f0ac4fa1f91e02670e3d92e07115b30b33c 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1517,13 +1517,8 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f) st = std::prev(store.end()); } if (st != store.end()) { - if (not st->values.empty()) { - std::vector<std::shared_ptr<Value>> newvals {}; - newvals.reserve(st->values.size()); - for (auto& v : st->values) { - if (not f || f(*v.data)) - newvals.push_back(v.data); - } + if (not st->empty()) { + std::vector<std::shared_ptr<Value>> newvals = st->get(f); if (not newvals.empty()) { if (!cb(newvals)) return 0; @@ -1541,7 +1536,7 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f) auto token4 = Dht::listenTo(id, AF_INET, gcb, f); auto token6 = Dht::listenTo(id, AF_INET6, gcb, f); - DHT_WARN("Added listen : %d -> %d %d %d", token, tokenlocal, token4, token6); + DHT_DEBUG("Added listen : %d -> %d %d %d", token, tokenlocal, token4, token6); listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6)); return token; } @@ -1556,7 +1551,7 @@ Dht::cancelListen(const InfoHash& id, size_t token) DHT_WARN("Listen token not found: %d", token); return false; } - DHT_WARN("cancelListen %s with token %d", id.toString().c_str(), token); + DHT_DEBUG("cancelListen %s with token %d", id.toString().c_str(), token); auto st = findStorage(id); auto tokenlocal = std::get<0>(it->second); if (st != store.end() && tokenlocal) @@ -1680,21 +1675,15 @@ Dht::getLocal(const InfoHash& id, Value::Filter f) const { auto s = findStorage(id); if (s == store.end()) return {}; - std::vector<std::shared_ptr<Value>> vals; - vals.reserve(s->values.size()); - for (auto& v : s->values) - if (!f || f(*v.data)) vals.push_back(v.data); - return vals; + return s->get(f); } std::shared_ptr<Value> Dht::getLocalById(const InfoHash& id, const Value::Id& vid) const { auto s = findStorage(id); - if (s != store.end()) { - for (auto& v : s->values) - if (v.data->id == vid) return v.data; - } + if (s != store.end()) + return s->get(vid); return {}; } @@ -1771,40 +1760,63 @@ Dht::storageChanged(Storage& st, ValueStorage& v) } } -Dht::ValueStorage* +bool Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created) { created = std::min(created, now); auto st = findStorage(id); if (st == store.end()) { if (store.size() >= MAX_HASHES) - return nullptr; + return false; store.push_back(Storage {id, now}); st = std::prev(store.end()); } - auto it = std::find_if (st->values.begin(), st->values.end(), [&](const ValueStorage& vr) { + auto store = st->store(value, created, max_store_size - total_store_size); + if (std::get<0>(store)) { + total_store_size += std::get<1>(store); + total_values += std::get<2>(store); + storageChanged(*st, *std::get<0>(store)); + } + return std::get<0>(store); +} + +std::tuple<Dht::ValueStorage*, ssize_t, ssize_t> +Dht::Storage::store(const std::shared_ptr<Value>& value, time_point created, ssize_t size_left) { + + auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { return vr.data == value || vr.data->id == value->id; }); - if (it != st->values.end()) { + if (it != values.end()) { /* Already there, only need to refresh */ it->time = created; - if (it->data != value) { - DHT_DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); + ssize_t size_diff = value->size() - it->data->size(); + if (size_diff <= size_left and it->data != value) { + //DHT_DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); it->data = value; - storageChanged(*st, *it); + total_size += size_diff; + return std::make_tuple(&(*it), size_diff, 0); } - return &*it; + return std::make_tuple(nullptr, 0, 0); } else { - DHT_DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); - if (st->values.size() >= MAX_VALUES) - return nullptr; - st->values.emplace_back(value, created); - storageChanged(*st, st->values.back()); - return &st->values.back(); + //DHT_DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); + ssize_t size = value->size(); + if (size <= size_left and values.size() < MAX_VALUES) { + total_size += size; + values.emplace_back(value, created); + return std::make_tuple(&values.back(), size, 1); + } + return std::make_tuple(nullptr, 0, 0); } } +void +Dht::Storage::clear() +{ + values.clear(); + total_size = 0; +} + void Dht::storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, uint16_t tid) { @@ -1820,7 +1832,7 @@ Dht::storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr return l.ss.ss_family == af && l.id == node; }); if (l == st->listeners.end()) { - sendClosestNodes(from, fromlen, TransId {TransPrefix::GET_VALUES, tid}, st->id, WANT4 | WANT6, makeToken(from, false), st->values); + sendClosestNodes(from, fromlen, TransId {TransPrefix::GET_VALUES, tid}, st->id, WANT4 | WANT6, makeToken(from, false), st->getValues()); st->listeners.emplace_back(node, from, fromlen, tid, now); } else @@ -1846,20 +1858,11 @@ Dht::expireStorage() }), i->listeners.end()); - i->values.erase( - std::partition(i->values.begin(), i->values.end(), - [&](const ValueStorage& v) - { - if (!v.data) return false; // should not happen - const auto& type = getType(v.data->type); - bool expired = v.time + type.expiration < now; - if (expired) - DHT_DEBUG("Discarding expired value %s", v.data->toString().c_str()); - return !expired; - }), - i->values.end()); + auto stats = i->expire(types, now); + total_store_size += stats.first; + total_values += stats.second; - if (i->values.empty() && i->listeners.empty()) { + if (i->empty() && i->listeners.empty()) { DHT_DEBUG("Discarding expired value %s", i->id.toString().c_str()); i = store.erase(i); } @@ -1868,6 +1871,28 @@ Dht::expireStorage() } } +std::pair<ssize_t, ssize_t> +Dht::Storage::expire(const std::map<ValueType::Id, ValueType>& types, time_point now) +{ + auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) { + if (!v.data) return false; // should not happen + auto type_it = types.find(v.data->type); + const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; + bool expired = v.time + type.expiration < now; + //if (expired) + // DHT_DEBUG("Discarding expired value %s", v.data->toString().c_str()); + return !expired; + }); + ssize_t del_num = std::distance(r, values.end()); + ssize_t size_diff {}; + std::for_each(r, values.end(), [&](const ValueStorage& v){ + size_diff -= v.data->size(); + }); + total_size += size_diff; + values.erase(r, values.end()); + return {size_diff, -del_num}; +} + void Dht::connectivityChanged() { @@ -2107,16 +2132,8 @@ Dht::getStorageLog() const { using namespace std::chrono; std::stringstream out; - size_t total_values {}; - size_t total_size {}; for (const auto& st : store) { - size_t storage_size {}; - for (const auto& v : st.values) - storage_size += v.data->cypher.size() + v.data->data.size() + v.data->signature.size() + v.data->user_type.size(); - total_size += storage_size; - total_values += st.values.size(); - storage_size /= 1024; - out << "Storage " << st.id << " " << st.listeners.size() << " list., " << st.values.size() << " values (" << storage_size << " kB)" << std::endl; + out << "Storage " << st.id << " " << st.listeners.size() << " list., " << st.valueCount() << " values (" << st.totalSize() << " bytes)" << std::endl; for (const auto& l : st.listeners) { out << " " << "Listener " << l.id << " " << print_addr((sockaddr*)&l.ss, l.sslen); auto since = duration_cast<seconds>(now - l.time); @@ -2124,12 +2141,10 @@ Dht::getStorageLog() const out << " (since " << since.count() << "s, exp in " << expires.count() << "s)" << std::endl; } } - total_size /= 1024; - out << "Total " << store.size() << " storages, " << total_values << " values (" << total_size << " kB)" << std::endl; + out << "Total " << store.size() << " storages, " << total_values << " values (" << (total_store_size/1024) << " ĶB)" << std::endl; return out.str(); } - std::string Dht::getRoutingTablesLog(sa_family_t af) const { @@ -2310,7 +2325,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { auto nodes = buckets.findClosestNodes(id); if (!nodes.empty()) { if (force || id.xorCmp(nodes.back()->id, myid) < 0) { - for (auto &value : local_storage->values) { + for (auto &value : local_storage->getValues()) { const auto& vt = getType(value.data->type); if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there @@ -2326,7 +2341,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { auto nodes6 = buckets6.findClosestNodes(id); if (!nodes6.empty()) { if (force || id.xorCmp(nodes6.back()->id, myid) < 0) { - for (auto &value : local_storage->values) { + for (auto &value : local_storage->getValues()) { const auto& vt = getType(value.data->type); if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there @@ -2341,7 +2356,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { if (not want4 and not want6) { DHT_DEBUG("Discarding storage values %s", id.toString().c_str()); - local_storage->values.clear(); + local_storage->clear(); } return announce_per_af; @@ -2605,9 +2620,9 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc } else { auto st = findStorage(msg.info_hash); Blob ntoken = makeToken(from, false); - if (st != store.end() && st->values.size() > 0) { - DHT_DEBUG("[node %s %s] sending %u values.", msg.id.toString().c_str(), print_addr(from, fromlen).c_str(), st->values.size()); - sendClosestNodes(from, fromlen, msg.tid, msg.info_hash, msg.want, ntoken, st->values); + if (st != store.end() && not st->empty()) { + DHT_DEBUG("[node %s %s] sending %u values.", msg.id.toString().c_str(), print_addr(from, fromlen).c_str(), st->valueCount()); + sendClosestNodes(from, fromlen, msg.tid, msg.info_hash, msg.want, ntoken, st->getValues()); } else { DHT_DEBUG("[node %s %s] sending nodes.", msg.id.toString().c_str(), print_addr(from, fromlen).c_str()); sendClosestNodes(from, fromlen, msg.tid, msg.info_hash, msg.want, ntoken); @@ -2796,8 +2811,8 @@ Dht::exportValues() const msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_array(h.values.size()); - for (const auto& v : h.values) { + pk.pack_array(h.getValues().size()); + for (const auto& v : h.getValues()) { pk.pack_array(2); pk.pack(v.time.time_since_epoch().count()); v.data->msgpack_pack(pk); @@ -2838,8 +2853,7 @@ Dht::importValues(const std::vector<ValuesExport>& import) DHT_DEBUG("Discarding expired value at %s", h.first.toString().c_str()); continue; } - auto st = storageStore(h.first, std::make_shared<Value>(std::move(tmp_val)), val_time); - st->time = val_time; + storageStore(h.first, std::make_shared<Value>(std::move(tmp_val)), val_time); } } catch (const std::exception&) { DHT_ERROR("Error reading values at %s", h.first.toString().c_str()); diff --git a/src/value.cpp b/src/value.cpp index d15e01209e582d5b9037ee2ecda6cdd22e273094..6431678a06cf90dcde76443090857a43131df249 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -80,6 +80,12 @@ findMapValue(const msgpack::object& map, const std::string& key) { return nullptr; } +size_t +Value::size() const +{ + return cypher.size() + data.size() + signature.size() + user_type.size(); +} + void Value::msgpack_unpack(msgpack::object o) {