diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 446e91140fe1f50daf48b430a23cf4d12d7ebd54..db3b4d994dd9967956d07299f6174c3fd0fdd72a 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -353,7 +353,7 @@ private: RoutingTable buckets {}; RoutingTable buckets6 {}; - std::vector<std::unique_ptr<Storage>> store; + std::map<InfoHash, Storage> store; size_t total_values {0}; size_t total_store_size {0}; size_t max_store_size {DEFAULT_STORAGE_LIMIT}; @@ -388,13 +388,10 @@ private: void reportedAddr(const SockAddr&); // Storage - decltype(Dht::store)::iterator findStorage(const InfoHash& id); - decltype(Dht::store)::const_iterator findStorage(const InfoHash& id) const; - void storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t tid, Query&& = {}); bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created); void expireStorage(); - void storageChanged(Storage& st, ValueStorage&); + void storageChanged(const InfoHash& id, Storage& st, ValueStorage&); /** * For a given storage, if values don't belong there anymore because this diff --git a/src/dht.cpp b/src/dht.cpp index 196c5e6436d82c98d014e00420e7476de994f77a..9bcbbd6f723da2e2bcd4ee4ff21c8ed45bd2c109 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -116,20 +116,18 @@ struct Dht::ValueStorage { }; struct Dht::Storage { - InfoHash id; time_point maintenance_time {}; std::map<std::shared_ptr<Node>, Listener> listeners {}; std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; Storage() {} - Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} + Storage(time_point now) : maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 9 || defined(_WIN32) // GCC-bug: remove me when support of GCC < 4.9.2 is abandoned Storage(Storage&& o) noexcept - : id(std::move(o.id)) - , maintenance_time(std::move(o.maintenance_time)) + : maintenance_time(std::move(o.maintenance_time)) , listeners(std::move(o.listeners)) , local_listeners(std::move(o.local_listeners)) , listener_token(std::move(o.listener_token)) @@ -654,7 +652,7 @@ Dht::shutdown(ShutdownCallback cb) { }; for (const auto& str : store) { - *remaining += maintainStorage(str->id, true, str_donecb); + *remaining += maintainStorage(str.first, true, str_donecb); } DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); if (!*remaining && cb) { cb(); } @@ -1789,15 +1787,13 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where auto query = std::make_shared<Query>(q); auto filter = f.chain(q.where.getFilter()); - auto st = findStorage(id); size_t tokenlocal = 0; - if (st == store.end() && store.size() < MAX_HASHES) { - store.emplace_back(new Storage(id, scheduler.time())); - st = std::prev(store.end()); - } + auto st = store.find(id); + if (st == store.end() && store.size() < MAX_HASHES) + st = store.emplace(id, Storage(scheduler.time())).first; if (st != store.end()) { - if (not (*st)->empty()) { - std::vector<std::shared_ptr<Value>> newvals = (*st)->get(filter); + if (not st->second.empty()) { + std::vector<std::shared_ptr<Value>> newvals = st->second.get(filter); if (not newvals.empty()) { if (!cb(newvals)) return 0; @@ -1808,8 +1804,8 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where } } } - tokenlocal = ++(*st)->listener_token; - (*st)->local_listeners.emplace(tokenlocal, LocalListener{query, filter, gcb}); + tokenlocal = ++st->second.listener_token; + st->second.local_listeners.emplace(tokenlocal, LocalListener{query, filter, gcb}); } auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query); @@ -1831,10 +1827,10 @@ Dht::cancelListen(const InfoHash& id, size_t token) return false; } DHT_LOG.DEBUG("cancelListen %s with token %d", id.toString().c_str(), token); - auto st = findStorage(id); + auto st = store.find(id); auto tokenlocal = std::get<0>(it->second); if (st != store.end() && tokenlocal) - (*st)->local_listeners.erase(tokenlocal); + st->second.local_listeners.erase(tokenlocal); auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { for (auto& sp : srs) { @@ -2039,17 +2035,17 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer std::vector<std::shared_ptr<Value>> Dht::getLocal(const InfoHash& id, Value::Filter f) const { - auto s = findStorage(id); + auto s = store.find(id); if (s == store.end()) return {}; - return (*s)->get(f); + return s->second.get(f); } std::shared_ptr<Value> Dht::getLocalById(const InfoHash& id, Value::Id vid) const { - auto s = findStorage(id); + auto s = store.find(id); if (s != store.end()) - return (*s)->getById(vid); + return s->second.getById(vid); return {}; } @@ -2119,27 +2115,12 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) // Storage -decltype(Dht::store)::iterator -Dht::findStorage(const InfoHash& id) -{ - return std::find_if(store.begin(), store.end(), [&](const std::unique_ptr<Storage>& st) { - return st->id == id; - }); -} -decltype(Dht::store)::const_iterator -Dht::findStorage(const InfoHash& id) const -{ - return std::find_if(store.cbegin(), store.cend(), [&](const std::unique_ptr<Storage>& st) { - return st->id == id; - }); -} - void -Dht::storageChanged(Storage& st, ValueStorage& v) +Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) { - DHT_LOG.DEBUG("[Storage %s] changed.", st.id.toString().c_str()); + DHT_LOG.DEBUG("[Storage %s] changed.", id.toString().c_str()); if (not st.local_listeners.empty()) { - DHT_LOG.DEBUG("[Storage %s] %lu local listeners.", st.id.toString().c_str(), st.local_listeners.size()); + DHT_LOG.DEBUG("[Storage %s] %lu local listeners.", id.toString().c_str(), st.local_listeners.size()); std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> cbs; for (const auto& l : st.local_listeners) { std::vector<std::shared_ptr<Value>> vals; @@ -2147,7 +2128,7 @@ Dht::storageChanged(Storage& st, ValueStorage& v) vals.push_back(v.data); if (not vals.empty()) { DHT_LOG.DEBUG("[Storage %s] Sending update local listener with token %lu.", - st.id.toString().c_str(), + id.toString().c_str(), l.first); cbs.emplace_back(l.second.get_cb, std::move(vals)); } @@ -2157,16 +2138,16 @@ Dht::storageChanged(Storage& st, ValueStorage& v) cb.first(cb.second); } - DHT_LOG.DEBUG("[Storage %s] %lu remote listeners.", st.id.toString().c_str(), st.listeners.size()); + DHT_LOG.DEBUG("[Storage %s] %lu remote listeners.", id.toString().c_str(), st.listeners.size()); for (const auto& l : st.listeners) { auto f = l.second.query.where.getFilter(); if (f and not f(*v.data)) continue; - DHT_LOG.DEBUG("[Storage %s] Sending update to %s.", st.id.toString().c_str(), l.first->toString().c_str()); + DHT_LOG.DEBUG("[Storage %s] Sending update to %s.", id.toString().c_str(), l.first->toString().c_str()); std::vector<std::shared_ptr<Value>> vals {}; vals.push_back(v.data); Blob ntoken = makeToken((const sockaddr*)&l.first->addr.first, false); - network_engine.tellListener(l.first, l.second.rid, st.id, 0, ntoken, {}, {}, + network_engine.tellListener(l.first, l.second.rid, id, 0, ntoken, {}, {}, std::move(vals), l.second.query); } } @@ -2180,19 +2161,18 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ if ( created + getType(value->id).expiration < clock::now() ) return false; - auto st = findStorage(id); + auto st = store.find(id); if (st == store.end()) { if (store.size() >= MAX_HASHES) return false; - store.emplace_back(new Storage(id, now)); - st = std::prev(store.end()); + st = store.emplace(id, Storage(now)).first; } - auto store = (*st)->store(value, created, max_store_size - total_store_size); + auto store = st->second.store(value, created, max_store_size - total_store_size); if (std::get<0>(store)) { total_store_size += std::get<1>(store); total_values += std::get<2>(store); - storageChanged(*(*st), *std::get<0>(store)); + storageChanged(id, st->second, *std::get<0>(store)); } if (not nextStorageMaintenance) @@ -2243,22 +2223,21 @@ void Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t rid, Query&& query) { const auto& now = scheduler.time(); - auto st = findStorage(id); + auto st = store.find(id); if (st == store.end()) { if (store.size() >= MAX_HASHES) return; - store.emplace_back(new Storage(id, now)); - st = std::prev(store.end()); + st = store.emplace(id, Storage(now)).first; } - auto l = (*st)->listeners.find(node); - if (l == (*st)->listeners.end()) { - auto vals = (*st)->get(query.where.getFilter()); + auto l = st->second.listeners.find(node); + if (l == st->second.listeners.end()) { + auto vals = st->second.get(query.where.getFilter()); if (not vals.empty()) { network_engine.tellListener(node, rid, id, WANT4 | WANT6, makeToken((sockaddr*)&node->addr.first, false), buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), std::move(vals), query); } - (*st)->listeners.emplace(node, Listener {rid, now, std::forward<Query>(query)}); + st->second.listeners.emplace(node, Listener {rid, now, std::forward<Query>(query)}); } else l->second.refresh(rid, now); @@ -2270,21 +2249,21 @@ Dht::expireStorage() const auto& now = scheduler.time(); auto i = store.begin(); while (i != store.end()) { - for (auto l = (*i)->listeners.cbegin(); l != (*i)->listeners.cend();){ + for (auto l = i->second.listeners.cbegin(); l != i->second.listeners.cend();){ bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; if (expired) { DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str()); - (*i)->listeners.erase(l++); + i->second.listeners.erase(l++); } else ++l; } - auto stats = (*i)->expire(types, now); + auto stats = i->second.expire(types, now); total_store_size += stats.first; total_values += stats.second; - if ((*i)->empty() && (*i)->listeners.empty() && (*i)->local_listeners.empty()) { - DHT_LOG.DEBUG("Discarding expired value %s", (*i)->id.toString().c_str()); + if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) { + DHT_LOG.DEBUG("Discarding expired value %s", i->first.toString().c_str()); i = store.erase(i); } else @@ -2574,10 +2553,13 @@ Dht::getStorageLog() const using namespace std::chrono; std::stringstream out; for (const auto& st : store) { - out << "Storage " << (*st).id << " " << (*st).listeners.size() << " list., " << (*st).valueCount() << " values (" << (*st).totalSize() << " bytes)" << std::endl; - if (not (*st).local_listeners.empty()) - out << " " << (*st).local_listeners.size() << " local listeners" << std::endl; - for (const auto& l : (*st).listeners) { + out << "Storage " << st.first << " " + << st.second.listeners.size() << " list., " + << st.second.valueCount() << " values (" + << st.second.totalSize() << " bytes)" << std::endl; + if (not st.second.local_listeners.empty()) + out << " " << st.second.local_listeners.size() << " local listeners" << std::endl; + for (const auto& l : st.second.listeners) { out << " " << "Listener " << l.first->toString(); auto since = duration_cast<seconds>(now - l.second.time); auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now); @@ -2760,13 +2742,13 @@ Dht::dataPersistence() { const auto& now = scheduler.time(); auto storage_maintenance_time = time_point::max(); for (auto &str : store) { - if (now > str->maintenance_time) { + if (now > str.second.maintenance_time) { DHT_LOG.WARN("[storage %s] maintenance (%u values, %u bytes)", - str->id.toString().c_str(), str->valueCount(), str->totalSize()); - maintainStorage(str->id); - str->maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; + str.first.toString().c_str(), str.second.valueCount(), str.second.totalSize()); + maintainStorage(str.first); + str.second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; } - storage_maintenance_time = std::min(storage_maintenance_time, str->maintenance_time); + storage_maintenance_time = std::min(storage_maintenance_time, str.second.maintenance_time); } DHT_LOG.WARN("[store] next maintenance in %u minutes", std::chrono::duration_cast<std::chrono::minutes>(storage_maintenance_time-now)); @@ -2779,7 +2761,7 @@ size_t Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { const auto& now = scheduler.time(); size_t announce_per_af = 0; - auto local_storage = findStorage(id); + auto local_storage = store.find(id); if (local_storage == store.end()) { return 0; } bool want4 = true, want6 = true; @@ -2787,7 +2769,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { auto nodes = buckets.findClosestNodes(id, now); if (!nodes.empty()) { if (force || id.xorCmp(nodes.back()->id, myid) < 0) { - for (auto &value : (*local_storage)->getValues()) { + for (auto &value : local_storage->second.getValues()) { const auto& vt = getType(value.data->type); if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there @@ -2803,7 +2785,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { auto nodes6 = buckets6.findClosestNodes(id, now); if (!nodes6.empty()) { if (force || id.xorCmp(nodes6.back()->id, myid) < 0) { - for (auto &value : (*local_storage)->getValues()) { + for (auto &value : local_storage->second.getValues()) { const auto& vt = getType(value.data->type); if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there @@ -2818,7 +2800,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { if (not want4 and not want6) { DHT_LOG.DEBUG("Discarding storage values %s", id.toString().c_str()); - (*local_storage)->clear(); + local_storage->second.clear(); } return announce_per_af; @@ -2911,12 +2893,13 @@ Dht::exportValues() const e.reserve(store.size()); for (const auto& h : store) { ValuesExport ve; - ve.first = h->id; + ve.first = h.first; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_array(h->getValues().size()); - for (const auto& v : h->getValues()) { + const auto& vals = h.second.getValues(); + pk.pack_array(vals.size()); + for (const auto& v : vals) { pk.pack_array(2); pk.pack(v.time.time_since_epoch().count()); v.data->msgpack_pack(pk); @@ -3083,12 +3066,12 @@ Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t, const Query } const auto& now = scheduler.time(); NetworkEngine::RequestAnswer answer {}; - auto st = findStorage(hash); + auto st = store.find(hash); answer.ntoken = makeToken((sockaddr*)&node->addr.first, false); answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES); answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); - if (st != store.end() && not (*st)->empty()) { - answer.values = (*st)->get(query.where.getFilter()); + if (st != store.end() && not st->second.empty()) { + answer.values = st->second.get(query.where.getFilter()); DHT_LOG.DEBUG("[node %s] sending %u values.", node->toString().c_str(), answer.values.size()); } else { DHT_LOG.DEBUG("[node %s] sending nodes.", node->toString().c_str());