diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 603b128d25d44513aba218f0528159d9737e3732..80289281b13ef7c4bc761dc97056e2aa82a91989 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -425,8 +425,8 @@ private: * node is too far from the target, values are sent to the appropriate * nodes. */ - void dataPersistence(); - size_t maintainStorage(InfoHash id, bool force=false, DoneCallback donecb=nullptr); + void dataPersistence(InfoHash id); + size_t maintainStorage(decltype(store)::value_type&, bool force=false, DoneCallback donecb=nullptr); // Buckets RoutingTable& buckets(sa_family_t af) { return af == AF_INET ? buckets4 : buckets6; } diff --git a/src/dht.cpp b/src/dht.cpp index dd2ff23d393a8661e7d0239df5467529451aa8cd..fb23baa5e3af8531222255b70037da34a571e42a 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -108,10 +108,10 @@ struct Dht::Listener { struct Dht::ValueStorage { std::shared_ptr<Value> data {}; - time_point time {}; + time_point created {}; ValueStorage() {} - ValueStorage(const std::shared_ptr<Value>& v, time_point t) : data(v), time(t) {} + ValueStorage(const std::shared_ptr<Value>& v, time_point t) : data(v), created(t) {} }; struct Dht::Storage { @@ -191,7 +191,7 @@ struct Dht::Storage { bool refresh(const time_point& now, const Value::Id& vid) { for (auto& vs : values) if (vs.data->id == vid) { - vs.time = now; + vs.created = now; return true; } return false; @@ -746,9 +746,8 @@ Dht::shutdown(ShutdownCallback cb) { if (!*remaining && cb) { cb(); } }; - for (const auto& str : store) { - *remaining += maintainStorage(str.first, true, str_donecb); - } + for (auto& str : store) + *remaining += maintainStorage(str, true, str_donecb); if (!*remaining) { DHT_LOG.w("shuting down node: %u ops remaining", *remaining); @@ -2214,6 +2213,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ if (store.size() >= MAX_HASHES) return false; st = store.emplace(id, Storage(now)).first; + scheduler.add(st->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id)); } auto store = st->second.store(value, created, max_store_size - total_store_size); @@ -2223,11 +2223,6 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ storageChanged(id, st->second, *std::get<0>(store)); } - if (not nextStorageMaintenance) - /* activate storage maintenance for the first time */ - nextStorageMaintenance = scheduler.add(now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME, - std::bind(&Dht::dataPersistence, this)); - return std::get<0>(store); } @@ -2239,7 +2234,7 @@ Dht::Storage::store(const std::shared_ptr<Value>& value, time_point created, ssi }); if (it != values.end()) { /* Already there, only need to refresh */ - it->time = created; + it->created = created; ssize_t size_diff = value->size() - it->data->size(); if (size_diff <= size_left and it->data != value) { //DHT_LOG_DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); @@ -2329,7 +2324,7 @@ Dht::Storage::expire(const std::map<ValueType::Id, ValueType>& types, time_point if (!v.data) return false; // should not happen auto type_it = types.find(v.data->type); const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; - bool expired = v.time + type.expiration < now; + bool expired = v.created + type.expiration < now; //if (expired) // DHT_LOG_DEBUG("Discarding expired value %s", v.data->toString().c_str()); return !expired; @@ -2819,42 +2814,35 @@ Dht::bucketMaintenance(RoutingTable& list) } void -Dht::dataPersistence() { +Dht::dataPersistence(InfoHash id) +{ const auto& now = scheduler.time(); - auto storage_maintenance_time = time_point::max(); - for (auto &str : store) { - if (now > str.second.maintenance_time) { - DHT_LOG.w(str.first, "[storage %s] maintenance (%u values, %u bytes)", - str.first.toString().c_str(), str.second.valueCount(), str.second.totalSize()); - maintainStorage(str.first); - str.second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; - } - storage_maintenance_time = std::min(storage_maintenance_time, str.second.maintenance_time); + auto str = store.find(id); + if (str != store.end() and now > str->second.maintenance_time) { + DHT_LOG.d(id, "[storage %s] maintenance (%u values, %u bytes)", + id.toString().c_str(), str->second.valueCount(), str->second.totalSize()); + maintainStorage(*str); + str->second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; + scheduler.add(str->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id)); } - DHT_LOG.w("[store] next maintenance in %u minutes", - std::chrono::duration_cast<std::chrono::minutes>(storage_maintenance_time-now)); - nextStorageMaintenance = storage_maintenance_time != time_point::max() ? - scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this)) : - nullptr; } size_t -Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { +Dht::maintainStorage(decltype(store)::value_type& storage, bool force, DoneCallback donecb) +{ const auto& now = scheduler.time(); size_t announce_per_af = 0; - auto local_storage = store.find(id); - if (local_storage == store.end()) { return 0; } bool want4 = true, want6 = true; - auto nodes = buckets4.findClosestNodes(id, now); + auto nodes = buckets4.findClosestNodes(storage.first, now); if (!nodes.empty()) { - if (force || id.xorCmp(nodes.back()->id, myid) < 0) { - for (auto &value : local_storage->second.getValues()) { + if (force || storage.first.xorCmp(nodes.back()->id, myid) < 0) { + for (auto &value : storage.second.getValues()) { const auto& vt = getType(value.data->type); - if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { + if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there - announce(id, AF_INET, value.data, donecb, value.time); + announce(storage.first, AF_INET, value.data, donecb, value.created); ++announce_per_af; } } @@ -2862,14 +2850,14 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { } } - auto nodes6 = buckets6.findClosestNodes(id, now); + auto nodes6 = buckets6.findClosestNodes(storage.first, now); if (!nodes6.empty()) { - if (force || id.xorCmp(nodes6.back()->id, myid) < 0) { - for (auto &value : local_storage->second.getValues()) { + if (force || storage.first.xorCmp(nodes6.back()->id, myid) < 0) { + for (auto &value : storage.second.getValues()) { const auto& vt = getType(value.data->type); - if (force || value.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { + if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { // gotta put that value there - announce(id, AF_INET6, value.data, donecb, value.time); + announce(storage.first, AF_INET6, value.data, donecb, value.created); ++announce_per_af; } } @@ -2878,8 +2866,8 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { } if (not want4 and not want6) { - DHT_LOG.d(id, "Discarding storage values %s", id.toString().c_str()); - auto diff = local_storage->second.clear(); + DHT_LOG.d(storage.first, "Discarding storage values %s", storage.first.toString().c_str()); + auto diff = storage.second.clear(); total_store_size += diff.first; total_values += diff.second; } @@ -2985,7 +2973,7 @@ Dht::exportValues() const pk.pack_array(vals.size()); for (const auto& v : vals) { pk.pack_array(2); - pk.pack(v.time.time_since_epoch().count()); + pk.pack(v.created.time_since_epoch().count()); v.data->msgpack_pack(pk); } ve.second = {buffer.data(), buffer.data()+buffer.size()};