diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 24176029900d8dd0e5707fb89fabbb401199d8e4..30e5f8b447d9f294d24154f289e0cc288aef83ad 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -388,7 +388,7 @@ private: RoutingTable buckets {}; RoutingTable buckets6 {}; - std::vector<Storage> store; + std::vector<std::unique_ptr<Storage>> store; size_t total_values {0}; size_t total_store_size {0}; size_t max_store_size {DEFAULT_STORAGE_LIMIT}; diff --git a/src/dht.cpp b/src/dht.cpp index 25e04ff46f30df0d13145a791bedfbb65bc9d56c..0ceffa9b753c54c62c73cdaa2187521c44cc770f 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -362,7 +362,7 @@ Dht::shutdown(ShutdownCallback cb) { }; for (const auto& str : store) { - *remaining += maintainStorage(str.id, true, str_donecb); + *remaining += maintainStorage(str->id, true, str_donecb); } DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); if (!*remaining && cb) { cb(); } @@ -1336,12 +1336,12 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f) auto st = findStorage(id); size_t tokenlocal = 0; if (st == store.end() && store.size() < MAX_HASHES) { - store.emplace_back(id, scheduler.time()); + store.emplace_back(new Storage(id, scheduler.time())); st = std::prev(store.end()); } if (st != store.end()) { - if (not st->empty()) { - std::vector<std::shared_ptr<Value>> newvals = st->get(f); + if (not (*st)->empty()) { + std::vector<std::shared_ptr<Value>> newvals = (*st)->get(f); if (not newvals.empty()) { if (!cb(newvals)) return 0; @@ -1352,8 +1352,8 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f) } } } - tokenlocal = ++st->listener_token; - st->local_listeners.emplace(tokenlocal, LocalListener{f, gcb}); + tokenlocal = ++(*st)->listener_token; + (*st)->local_listeners.emplace(tokenlocal, LocalListener{f, gcb}); } auto token4 = Dht::listenTo(id, AF_INET, gcb, f); @@ -1378,7 +1378,7 @@ Dht::cancelListen(const InfoHash& id, size_t token) auto st = findStorage(id); auto tokenlocal = std::get<0>(it->second); if (st != store.end() && tokenlocal) - st->local_listeners.erase(tokenlocal); + (*st)->local_listeners.erase(tokenlocal); auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { for (auto& sp : srs) { @@ -1511,7 +1511,7 @@ Dht::getLocal(const InfoHash& id, Value::Filter f) const { auto s = findStorage(id); if (s == store.end()) return {}; - return s->get(f); + return (*s)->get(f); } std::shared_ptr<Value> @@ -1519,7 +1519,7 @@ Dht::getLocalById(const InfoHash& id, Value::Id vid) const { auto s = findStorage(id); if (s != store.end()) - return s->getById(vid); + return (*s)->getById(vid); return {}; } @@ -1592,15 +1592,15 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) decltype(Dht::store)::iterator Dht::findStorage(const InfoHash& id) { - return std::find_if(store.begin(), store.end(), [&](const Storage& st) { - return st.id == id; + return std::find_if(store.begin(), store.end(), [&](const std::unique_ptr<Storage>& st) { + return st->id == id; }); } decltype(Dht::store)::const_iterator Dht::findStorage(const InfoHash& id) const { - return std::find_if(store.cbegin(), store.cend(), [&](const Storage& st) { - return st.id == id; + return std::find_if(store.cbegin(), store.cend(), [&](const std::unique_ptr<Storage>& st) { + return st->id == id; }); } @@ -1640,15 +1640,15 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ if (st == store.end()) { if (store.size() >= MAX_HASHES) return false; - store.emplace_back(id, now); + store.emplace_back(new Storage(id, now)); st = std::prev(store.end()); } - auto store = st->store(value, created, max_store_size - total_store_size); + auto store = (*st)->store(value, created, max_store_size - total_store_size); if (std::get<0>(store)) { total_store_size += std::get<1>(store); total_values += std::get<2>(store); - storageChanged(*st, *std::get<0>(store)); + storageChanged(*(*st), *std::get<0>(store)); } return std::get<0>(store); } @@ -1697,12 +1697,12 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s if (st == store.end()) { if (store.size() >= MAX_HASHES) return; - store.emplace_back(id, now); + store.emplace_back(new Storage(id, now)); st = std::prev(store.end()); } - auto l = st->listeners.find(node); - if (l == st->listeners.end()) { - const auto& stvalues = st->getValues(); + auto l = (*st)->listeners.find(node); + if (l == (*st)->listeners.end()) { + const auto& stvalues = (*st)->getValues(); if (not stvalues.empty()) { std::vector<std::shared_ptr<Value>> values(stvalues.size()); std::transform(stvalues.begin(), stvalues.end(), values.begin(), [=](const ValueStorage& vs) { return vs.data; }); @@ -1711,7 +1711,7 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), values); } - st->listeners.emplace(node, Listener {rid, now}); + (*st)->listeners.emplace(node, Listener {rid, now}); } else l->second.refresh(rid, now); @@ -1723,21 +1723,21 @@ Dht::expireStorage() const auto& now = scheduler.time(); auto i = store.begin(); while (i != store.end()) { - for (auto l = i->listeners.cbegin(); l != i->listeners.cend();){ + for (auto l = (*i)->listeners.cbegin(); l != (*i)->listeners.cend();){ bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; if (expired) { DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str()); - i->listeners.erase(l++); + (*i)->listeners.erase(l++); } else ++l; } - auto stats = i->expire(types, now); + auto stats = (*i)->expire(types, now); total_store_size += stats.first; total_values += stats.second; - if (i->empty() && i->listeners.empty() && i->local_listeners.empty()) { - DHT_LOG.DEBUG("Discarding expired value %s", i->id.toString().c_str()); + if ((*i)->empty() && (*i)->listeners.empty() && (*i)->local_listeners.empty()) { + DHT_LOG.DEBUG("Discarding expired value %s", (*i)->id.toString().c_str()); i = store.erase(i); } else @@ -2018,10 +2018,10 @@ Dht::getStorageLog() const using namespace std::chrono; std::stringstream out; for (const auto& st : store) { - out << "Storage " << st.id << " " << st.listeners.size() << " list., " << st.valueCount() << " values (" << st.totalSize() << " bytes)" << std::endl; - if (not st.local_listeners.empty()) - out << " " << st.local_listeners.size() << " local listeners" << std::endl; - for (const auto& l : st.listeners) { + out << "Storage " << (*st).id << " " << (*st).listeners.size() << " list., " << (*st).valueCount() << " values (" << (*st).totalSize() << " bytes)" << std::endl; + if (not (*st).local_listeners.empty()) + out << " " << (*st).local_listeners.size() << " local listeners" << std::endl; + for (const auto& l : (*st).listeners) { out << " " << "Listener " << l.first->toString(); auto since = duration_cast<seconds>(now - l.second.time); auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now); @@ -2204,11 +2204,11 @@ Dht::dataPersistence() { const auto& now = scheduler.time(); auto storage_maintenance_time = time_point::max(); for (auto &str : store) { - if (now > str.maintenance_time) { - maintainStorage(str.id); - str.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; + if (now > str->maintenance_time) { + maintainStorage(str->id); + str->maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; } - storage_maintenance_time = std::min(storage_maintenance_time, str.maintenance_time); + storage_maintenance_time = std::min(storage_maintenance_time, str->maintenance_time); } scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this)); } @@ -2225,7 +2225,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { auto nodes = buckets.findClosestNodes(id, now); if (!nodes.empty()) { if (force || id.xorCmp(nodes.back()->id, myid) < 0) { - for (auto &value : local_storage->getValues()) { + for (auto &value : (*local_storage)->getValues()) { const auto& vt = getType(value.data->type); if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there @@ -2241,7 +2241,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { auto nodes6 = buckets6.findClosestNodes(id, now); if (!nodes6.empty()) { if (force || id.xorCmp(nodes6.back()->id, myid) < 0) { - for (auto &value : local_storage->getValues()) { + for (auto &value : (*local_storage)->getValues()) { const auto& vt = getType(value.data->type); if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there @@ -2256,7 +2256,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { if (not want4 and not want6) { DHT_LOG.DEBUG("Discarding storage values %s", id.toString().c_str()); - local_storage->clear(); + (*local_storage)->clear(); } return announce_per_af; @@ -2349,12 +2349,12 @@ Dht::exportValues() const e.reserve(store.size()); for (const auto& h : store) { ValuesExport ve; - ve.first = h.id; + ve.first = h->id; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_array(h.getValues().size()); - for (const auto& v : h.getValues()) { + pk.pack_array(h->getValues().size()); + for (const auto& v : h->getValues()) { pk.pack_array(2); pk.pack(v.time.time_since_epoch().count()); v.data->msgpack_pack(pk); @@ -2517,8 +2517,8 @@ Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t) answer.ntoken = makeToken((sockaddr*)&node->ss, false); answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES); answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); - if (st != store.end() && not st->empty()) { - auto values = st->getValues(); + if (st != store.end() && not (*st)->empty()) { + auto values = (*st)->getValues(); answer.values.resize(values.size()); std::transform(values.begin(), values.end(), answer.values.begin(), [](const ValueStorage& vs) { return vs.data;