Skip to content
Snippets Groups Projects
Unverified Commit 01fbe0fd authored by Simon Désaulniers's avatar Simon Désaulniers
Browse files

dht: make main store a vector<unique_ptr<Storage>>

Since the elements inside are going to be moved constantly during the life of
the node, this design is better. Also, it works around a crash when the store is
resized (elements inside moved) at some point (always happenned the 3rd time)
for some reason. This bug was known when compiled with LLVM >7.3.0 (>clang-703.0.31).
parent 502776ac
Branches
Tags
No related merge requests found
...@@ -388,7 +388,7 @@ private: ...@@ -388,7 +388,7 @@ private:
RoutingTable buckets {}; RoutingTable buckets {};
RoutingTable buckets6 {}; RoutingTable buckets6 {};
std::vector<Storage> store; std::vector<std::unique_ptr<Storage>> store;
size_t total_values {0}; size_t total_values {0};
size_t total_store_size {0}; size_t total_store_size {0};
size_t max_store_size {DEFAULT_STORAGE_LIMIT}; size_t max_store_size {DEFAULT_STORAGE_LIMIT};
......
...@@ -362,7 +362,7 @@ Dht::shutdown(ShutdownCallback cb) { ...@@ -362,7 +362,7 @@ Dht::shutdown(ShutdownCallback cb) {
}; };
for (const auto& str : store) { for (const auto& str : store) {
*remaining += maintainStorage(str.id, true, str_donecb); *remaining += maintainStorage(str->id, true, str_donecb);
} }
DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining);
if (!*remaining && cb) { cb(); } if (!*remaining && cb) { cb(); }
...@@ -1336,12 +1336,12 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f) ...@@ -1336,12 +1336,12 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f)
auto st = findStorage(id); auto st = findStorage(id);
size_t tokenlocal = 0; size_t tokenlocal = 0;
if (st == store.end() && store.size() < MAX_HASHES) { if (st == store.end() && store.size() < MAX_HASHES) {
store.emplace_back(id, scheduler.time()); store.emplace_back(new Storage(id, scheduler.time()));
st = std::prev(store.end()); st = std::prev(store.end());
} }
if (st != store.end()) { if (st != store.end()) {
if (not st->empty()) { if (not (*st)->empty()) {
std::vector<std::shared_ptr<Value>> newvals = st->get(f); std::vector<std::shared_ptr<Value>> newvals = (*st)->get(f);
if (not newvals.empty()) { if (not newvals.empty()) {
if (!cb(newvals)) if (!cb(newvals))
return 0; return 0;
...@@ -1352,8 +1352,8 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f) ...@@ -1352,8 +1352,8 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f)
} }
} }
} }
tokenlocal = ++st->listener_token; tokenlocal = ++(*st)->listener_token;
st->local_listeners.emplace(tokenlocal, LocalListener{f, gcb}); (*st)->local_listeners.emplace(tokenlocal, LocalListener{f, gcb});
} }
auto token4 = Dht::listenTo(id, AF_INET, gcb, f); auto token4 = Dht::listenTo(id, AF_INET, gcb, f);
...@@ -1378,7 +1378,7 @@ Dht::cancelListen(const InfoHash& id, size_t token) ...@@ -1378,7 +1378,7 @@ Dht::cancelListen(const InfoHash& id, size_t token)
auto st = findStorage(id); auto st = findStorage(id);
auto tokenlocal = std::get<0>(it->second); auto tokenlocal = std::get<0>(it->second);
if (st != store.end() && tokenlocal) if (st != store.end() && tokenlocal)
st->local_listeners.erase(tokenlocal); (*st)->local_listeners.erase(tokenlocal);
auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) {
for (auto& sp : srs) { for (auto& sp : srs) {
...@@ -1511,7 +1511,7 @@ Dht::getLocal(const InfoHash& id, Value::Filter f) const ...@@ -1511,7 +1511,7 @@ Dht::getLocal(const InfoHash& id, Value::Filter f) const
{ {
auto s = findStorage(id); auto s = findStorage(id);
if (s == store.end()) return {}; if (s == store.end()) return {};
return s->get(f); return (*s)->get(f);
} }
std::shared_ptr<Value> std::shared_ptr<Value>
...@@ -1519,7 +1519,7 @@ Dht::getLocalById(const InfoHash& id, Value::Id vid) const ...@@ -1519,7 +1519,7 @@ Dht::getLocalById(const InfoHash& id, Value::Id vid) const
{ {
auto s = findStorage(id); auto s = findStorage(id);
if (s != store.end()) if (s != store.end())
return s->getById(vid); return (*s)->getById(vid);
return {}; return {};
} }
...@@ -1592,15 +1592,15 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) ...@@ -1592,15 +1592,15 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
decltype(Dht::store)::iterator decltype(Dht::store)::iterator
Dht::findStorage(const InfoHash& id) Dht::findStorage(const InfoHash& id)
{ {
return std::find_if(store.begin(), store.end(), [&](const Storage& st) { return std::find_if(store.begin(), store.end(), [&](const std::unique_ptr<Storage>& st) {
return st.id == id; return st->id == id;
}); });
} }
decltype(Dht::store)::const_iterator decltype(Dht::store)::const_iterator
Dht::findStorage(const InfoHash& id) const Dht::findStorage(const InfoHash& id) const
{ {
return std::find_if(store.cbegin(), store.cend(), [&](const Storage& st) { return std::find_if(store.cbegin(), store.cend(), [&](const std::unique_ptr<Storage>& st) {
return st.id == id; return st->id == id;
}); });
} }
...@@ -1640,15 +1640,15 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ ...@@ -1640,15 +1640,15 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_
if (st == store.end()) { if (st == store.end()) {
if (store.size() >= MAX_HASHES) if (store.size() >= MAX_HASHES)
return false; return false;
store.emplace_back(id, now); store.emplace_back(new Storage(id, now));
st = std::prev(store.end()); st = std::prev(store.end());
} }
auto store = st->store(value, created, max_store_size - total_store_size); auto store = (*st)->store(value, created, max_store_size - total_store_size);
if (std::get<0>(store)) { if (std::get<0>(store)) {
total_store_size += std::get<1>(store); total_store_size += std::get<1>(store);
total_values += std::get<2>(store); total_values += std::get<2>(store);
storageChanged(*st, *std::get<0>(store)); storageChanged(*(*st), *std::get<0>(store));
} }
return std::get<0>(store); return std::get<0>(store);
} }
...@@ -1697,12 +1697,12 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s ...@@ -1697,12 +1697,12 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s
if (st == store.end()) { if (st == store.end()) {
if (store.size() >= MAX_HASHES) if (store.size() >= MAX_HASHES)
return; return;
store.emplace_back(id, now); store.emplace_back(new Storage(id, now));
st = std::prev(store.end()); st = std::prev(store.end());
} }
auto l = st->listeners.find(node); auto l = (*st)->listeners.find(node);
if (l == st->listeners.end()) { if (l == (*st)->listeners.end()) {
const auto& stvalues = st->getValues(); const auto& stvalues = (*st)->getValues();
if (not stvalues.empty()) { if (not stvalues.empty()) {
std::vector<std::shared_ptr<Value>> values(stvalues.size()); std::vector<std::shared_ptr<Value>> values(stvalues.size());
std::transform(stvalues.begin(), stvalues.end(), values.begin(), [=](const ValueStorage& vs) { return vs.data; }); std::transform(stvalues.begin(), stvalues.end(), values.begin(), [=](const ValueStorage& vs) { return vs.data; });
...@@ -1711,7 +1711,7 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s ...@@ -1711,7 +1711,7 @@ Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, s
buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
values); values);
} }
st->listeners.emplace(node, Listener {rid, now}); (*st)->listeners.emplace(node, Listener {rid, now});
} }
else else
l->second.refresh(rid, now); l->second.refresh(rid, now);
...@@ -1723,21 +1723,21 @@ Dht::expireStorage() ...@@ -1723,21 +1723,21 @@ Dht::expireStorage()
const auto& now = scheduler.time(); const auto& now = scheduler.time();
auto i = store.begin(); auto i = store.begin();
while (i != store.end()) { while (i != store.end()) {
for (auto l = i->listeners.cbegin(); l != i->listeners.cend();){ for (auto l = (*i)->listeners.cbegin(); l != (*i)->listeners.cend();){
bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now;
if (expired) { if (expired) {
DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str()); DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str());
i->listeners.erase(l++); (*i)->listeners.erase(l++);
} else } else
++l; ++l;
} }
auto stats = i->expire(types, now); auto stats = (*i)->expire(types, now);
total_store_size += stats.first; total_store_size += stats.first;
total_values += stats.second; total_values += stats.second;
if (i->empty() && i->listeners.empty() && i->local_listeners.empty()) { if ((*i)->empty() && (*i)->listeners.empty() && (*i)->local_listeners.empty()) {
DHT_LOG.DEBUG("Discarding expired value %s", i->id.toString().c_str()); DHT_LOG.DEBUG("Discarding expired value %s", (*i)->id.toString().c_str());
i = store.erase(i); i = store.erase(i);
} }
else else
...@@ -2018,10 +2018,10 @@ Dht::getStorageLog() const ...@@ -2018,10 +2018,10 @@ Dht::getStorageLog() const
using namespace std::chrono; using namespace std::chrono;
std::stringstream out; std::stringstream out;
for (const auto& st : store) { for (const auto& st : store) {
out << "Storage " << st.id << " " << st.listeners.size() << " list., " << st.valueCount() << " values (" << st.totalSize() << " bytes)" << std::endl; out << "Storage " << (*st).id << " " << (*st).listeners.size() << " list., " << (*st).valueCount() << " values (" << (*st).totalSize() << " bytes)" << std::endl;
if (not st.local_listeners.empty()) if (not (*st).local_listeners.empty())
out << " " << st.local_listeners.size() << " local listeners" << std::endl; out << " " << (*st).local_listeners.size() << " local listeners" << std::endl;
for (const auto& l : st.listeners) { for (const auto& l : (*st).listeners) {
out << " " << "Listener " << l.first->toString(); out << " " << "Listener " << l.first->toString();
auto since = duration_cast<seconds>(now - l.second.time); auto since = duration_cast<seconds>(now - l.second.time);
auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now); auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now);
...@@ -2204,11 +2204,11 @@ Dht::dataPersistence() { ...@@ -2204,11 +2204,11 @@ Dht::dataPersistence() {
const auto& now = scheduler.time(); const auto& now = scheduler.time();
auto storage_maintenance_time = time_point::max(); auto storage_maintenance_time = time_point::max();
for (auto &str : store) { for (auto &str : store) {
if (now > str.maintenance_time) { if (now > str->maintenance_time) {
maintainStorage(str.id); maintainStorage(str->id);
str.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; str->maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
} }
storage_maintenance_time = std::min(storage_maintenance_time, str.maintenance_time); storage_maintenance_time = std::min(storage_maintenance_time, str->maintenance_time);
} }
scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this)); scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this));
} }
...@@ -2225,7 +2225,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { ...@@ -2225,7 +2225,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) {
auto nodes = buckets.findClosestNodes(id, now); auto nodes = buckets.findClosestNodes(id, now);
if (!nodes.empty()) { if (!nodes.empty()) {
if (force || id.xorCmp(nodes.back()->id, myid) < 0) { if (force || id.xorCmp(nodes.back()->id, myid) < 0) {
for (auto &value : local_storage->getValues()) { for (auto &value : (*local_storage)->getValues()) {
const auto& vt = getType(value.data->type); const auto& vt = getType(value.data->type);
if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there // gotta put that value there
...@@ -2241,7 +2241,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { ...@@ -2241,7 +2241,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) {
auto nodes6 = buckets6.findClosestNodes(id, now); auto nodes6 = buckets6.findClosestNodes(id, now);
if (!nodes6.empty()) { if (!nodes6.empty()) {
if (force || id.xorCmp(nodes6.back()->id, myid) < 0) { if (force || id.xorCmp(nodes6.back()->id, myid) < 0) {
for (auto &value : local_storage->getValues()) { for (auto &value : (*local_storage)->getValues()) {
const auto& vt = getType(value.data->type); const auto& vt = getType(value.data->type);
if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
// gotta put that value there // gotta put that value there
...@@ -2256,7 +2256,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { ...@@ -2256,7 +2256,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) {
if (not want4 and not want6) { if (not want4 and not want6) {
DHT_LOG.DEBUG("Discarding storage values %s", id.toString().c_str()); DHT_LOG.DEBUG("Discarding storage values %s", id.toString().c_str());
local_storage->clear(); (*local_storage)->clear();
} }
return announce_per_af; return announce_per_af;
...@@ -2349,12 +2349,12 @@ Dht::exportValues() const ...@@ -2349,12 +2349,12 @@ Dht::exportValues() const
e.reserve(store.size()); e.reserve(store.size());
for (const auto& h : store) { for (const auto& h : store) {
ValuesExport ve; ValuesExport ve;
ve.first = h.id; ve.first = h->id;
msgpack::sbuffer buffer; msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer); msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_array(h.getValues().size()); pk.pack_array(h->getValues().size());
for (const auto& v : h.getValues()) { for (const auto& v : h->getValues()) {
pk.pack_array(2); pk.pack_array(2);
pk.pack(v.time.time_since_epoch().count()); pk.pack(v.time.time_since_epoch().count());
v.data->msgpack_pack(pk); v.data->msgpack_pack(pk);
...@@ -2517,8 +2517,8 @@ Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t) ...@@ -2517,8 +2517,8 @@ Dht::onGetValues(std::shared_ptr<Node> node, InfoHash& hash, want_t)
answer.ntoken = makeToken((sockaddr*)&node->ss, false); answer.ntoken = makeToken((sockaddr*)&node->ss, false);
answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES); answer.nodes4 = buckets.findClosestNodes(hash, now, TARGET_NODES);
answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES);
if (st != store.end() && not st->empty()) { if (st != store.end() && not (*st)->empty()) {
auto values = st->getValues(); auto values = (*st)->getValues();
answer.values.resize(values.size()); answer.values.resize(values.size());
std::transform(values.begin(), values.end(), answer.values.begin(), [](const ValueStorage& vs) { std::transform(values.begin(), values.end(), answer.values.begin(), [](const ValueStorage& vs) {
return vs.data; return vs.data;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment