diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 3c0d139d262016c0b856aa074d725569b3fa1caf..72a194a215577ea758cacdfc7bddd3887a04ab80 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -242,6 +242,13 @@ public: int pingNode(const sockaddr*, socklen_t); + /** + * Maintains the store. For each storage, if values don't belong there + * anymore because this node is too far from the target, values are sent to + * the appropriate nodes. + */ + void maintainStore(bool force=false); + time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen); /** @@ -285,16 +292,16 @@ public: * reannounced on a regular basis. * User can call #cancelPut(InfoHash, Value::Id) to cancel a put operation. */ - void put(const InfoHash& key, std::shared_ptr<Value>, DoneCallback cb=nullptr); - void put(const InfoHash& key, const std::shared_ptr<Value>& v, DoneCallbackSimple cb) { - put(key, v, bindDoneCb(cb)); + void put(const InfoHash& key, std::shared_ptr<Value>, DoneCallback cb=nullptr, time_point created=time_point::max()); + void put(const InfoHash& key, const std::shared_ptr<Value>& v, DoneCallbackSimple cb, time_point created=time_point::max()) { + put(key, v, bindDoneCb(cb), created); } - void put(const InfoHash& key, Value&& v, DoneCallback cb=nullptr) { - put(key, std::make_shared<Value>(std::move(v)), cb); + void put(const InfoHash& key, Value&& v, DoneCallback cb=nullptr, time_point created=time_point::max()) { + put(key, std::make_shared<Value>(std::move(v)), cb, created); } - void put(const InfoHash& key, Value&& v, DoneCallbackSimple cb) { - put(key, std::forward<Value>(v), bindDoneCb(cb)); + void put(const InfoHash& key, Value&& v, DoneCallbackSimple cb, time_point created=time_point::max()) { + put(key, std::forward<Value>(v), bindDoneCb(cb), created); } /** @@ -350,9 +357,13 @@ public: std::string getSearchesLog(sa_family_t) const; void dumpTables() const; - std::vector<unsigned> getNodeMessageStats(bool in = false) const { - return in ? std::vector<unsigned>{in_stats.ping, in_stats.find, in_stats.get, in_stats.listen, in_stats.put} + std::vector<unsigned> getNodeMessageStats(bool in = false) { + auto stats = in ? std::vector<unsigned>{in_stats.ping, in_stats.find, in_stats.get, in_stats.listen, in_stats.put} : std::vector<unsigned>{out_stats.ping, out_stats.find, out_stats.get, out_stats.listen, out_stats.put}; + if (in) { in_stats = {}; } + else { out_stats = {}; } + + return stats; } /* This must be provided by the user. */ @@ -388,6 +399,8 @@ private: a search in case of no answers. */ static constexpr std::chrono::seconds SEARCH_GET_STEP {3}; + static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10}; + /* The time after which we consider a search to be expirable. */ static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62}; @@ -438,11 +451,13 @@ private: InfoHash middle(const RoutingTable::const_iterator&) const; + std::vector<std::shared_ptr<Node>> findClosestNodes(const InfoHash id) const; + RoutingTable::iterator findBucket(const InfoHash& id); RoutingTable::const_iterator findBucket(const InfoHash& id) const; /** - * Returns true if the id is in the bucket's range. + * Return true if the id is in the bucket's range. */ inline bool contains(const RoutingTable::const_iterator& bucket, const InfoHash& id) const { return InfoHash::cmp(bucket->first, id) <= 0 @@ -450,7 +465,14 @@ private: } /** - * Returns a random id in the bucket's range. + * Return true if the table has no bucket ore one empty buket. + */ + inline bool isEmpty() const { + return empty() || (size() == 1 && front().nodes.empty()); + } + + /** + * Return a random id in the bucket's range. */ InfoHash randomId(const RoutingTable::const_iterator& bucket) const; @@ -555,6 +577,7 @@ private: */ struct Announce { std::shared_ptr<Value> value; + time_point created; DoneCallback callback; }; @@ -592,8 +615,11 @@ private: std::map<size_t, LocalListener> listeners {}; size_t listener_token = 1; + /** + * @returns true if the node was not present and added to the search + */ bool insertNode(std::shared_ptr<Node> n, time_point now, const Blob& token={}); - void insertBucket(const Bucket&, time_point now); + unsigned insertBucket(const Bucket&, time_point now); /** * Can we use this search to announce ? @@ -628,6 +654,8 @@ private: bool removeExpiredNode(time_point now); + unsigned refill(const RoutingTable&, time_point now); + std::vector<std::shared_ptr<Node>> getNodes() const; }; @@ -663,13 +691,15 @@ private: struct Storage { InfoHash id; + bool want4 {true}, want6 {true}; + time_point maintenance_time {}; std::vector<ValueStorage> values {}; std::vector<Listener> listeners {}; std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; Storage() {} - Storage(InfoHash id) : id(id) {} + Storage(InfoHash id, time_point now) : id(id), maintenance_time(now+MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {} }; enum class MessageType { @@ -801,9 +831,9 @@ private: int sendListenConfirmation(const sockaddr*, socklen_t, TransId); - int sendAnnounceValue(const sockaddr*, socklen_t, TransId, - const InfoHash&, const Value&, - const Blob& token, int confirm); + int sendAnnounceValue(const sockaddr*, socklen_t, TransId, const InfoHash&, + const Value&, time_point created, const Blob& token, + int confirm); int sendValueAnnounced(const sockaddr*, socklen_t, TransId, Value::Id); @@ -819,6 +849,7 @@ private: TransId tid; Blob token; Value::Id value_id; + time_point created { time_point::max() }; Blob nodes4; Blob nodes6; std::vector<std::shared_ptr<Value>> values; @@ -843,10 +874,12 @@ private: } void storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, uint16_t tid); - ValueStorage* storageStore(const InfoHash& id, const std::shared_ptr<Value>& value); + ValueStorage* storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created=time_point::max()); void expireStorage(); void storageChanged(Storage& st, ValueStorage&); + size_t maintainStorage(InfoHash id, bool force=false, DoneCallback donecb=nullptr); + // Buckets Bucket* findBucket(const InfoHash& id, sa_family_t af) { RoutingTable::iterator b; @@ -892,7 +925,7 @@ private: * The values can be filtered by an arbitrary provided filter. */ Search* search(const InfoHash& id, sa_family_t af, GetCallback = nullptr, DoneCallback = nullptr, Value::Filter = Value::AllFilter()); - void announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback); + void announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created=time_point::max()); size_t listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f = Value::AllFilter()); std::list<Search>::iterator newSearch(); diff --git a/include/opendht/infohash.h b/include/opendht/infohash.h index 0329d904bb9d60b0ce75c5c5fee6f00621041db1..953d51f89e018c89edf5f6dd1fb9fb843135dbc7 100644 --- a/include/opendht/infohash.h +++ b/include/opendht/infohash.h @@ -168,7 +168,7 @@ public: { double v = 0.; for (unsigned i = 0; i < std::min<size_t>(HASH_LEN, sizeof(unsigned)-1); i++) - v += *(cbegin()+i)/(double)(1<<(8*(i+1))); + v += *(cbegin()+i)/(double)(1<<(8*(i+1))); return v; } @@ -224,7 +224,7 @@ namespace std { typedef dht::InfoHash argument_type; typedef std::size_t result_type; - + result_type operator()(dht::InfoHash const& s) const { result_type r {}; diff --git a/include/opendht/value.h b/include/opendht/value.h index 2a9a39b074c5a66d17797d5cda4420a9e17e449c..c2f08513bae9e4ae6864557b5f4370391b9d3cb3 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -64,6 +64,9 @@ using clock = std::chrono::steady_clock; using time_point = clock::time_point; using duration = clock::duration; +time_point from_time_t(std::time_t t); +std::time_t to_time_t(time_point t); + static /*constexpr*/ const time_point TIME_INVALID = {time_point::min()}; static /*constexpr*/ const time_point TIME_MAX {time_point::max()}; diff --git a/src/dht.cpp b/src/dht.cpp index 23dddde14e0ebb3f5aff4e97811dfd6df1e09cd5..67c3a03795db1c9a1053a26194267483fe54f329 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -154,6 +154,7 @@ constexpr std::chrono::minutes Node::NODE_GOOD_TIME; constexpr std::chrono::seconds Node::MAX_RESPONSE_TIME; constexpr std::chrono::seconds Dht::SEARCH_GET_STEP; +constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME; constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME; constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN; @@ -182,7 +183,20 @@ Dht::getStatus(sa_family_t af) const void Dht::shutdown(ShutdownCallback cb) { - if (cb) { cb(); } + /**************************** + * Last store maintenance * + ****************************/ + + auto remaining = std::make_shared<int>(0); + auto str_donecb = [=](bool, const std::vector<std::shared_ptr<Node>>&) { + --*remaining; + if (!*remaining && cb) { cb(); } + }; + + for (auto str : store) { + *remaining += maintainStorage(str.id, true, str_donecb); + } + if (!*remaining && cb) { cb(); } } bool @@ -286,6 +300,41 @@ Dht::RoutingTable::depth(const RoutingTable::const_iterator& it) const return std::max(bit1, bit2)+1; } +std::vector<std::shared_ptr<Node>> +Dht::RoutingTable::findClosestNodes(const InfoHash id) const { + std::vector<std::shared_ptr<Node>> nodes {}; + auto bucket = findBucket(id); + + if (bucket == end()) { return nodes; } + + auto sortedBucketInsert = [&](const Bucket &b) { + for (auto n : b.nodes) { + auto here = std::find_if(nodes.begin(), nodes.end(), [&id,&n](std::shared_ptr<Node> &node) { + return id.xorCmp(node->id, n->id) < 0; + }); + nodes.insert(here, n); + } + }; + + // Inserting very closest nodes + sortedBucketInsert(*bucket); + + // adjacent buckets contain remaining closest candidates + if (std::next(bucket) != this->end() && nodes.size() < TARGET_NODES) { + sortedBucketInsert(*std::next(bucket)); + } + if (std::prev(bucket) != this->end() && nodes.size() < TARGET_NODES) { + sortedBucketInsert(*std::prev(bucket)); + } + + // shrink to the TARGET_NODES closest nodes. + if (nodes.size() > TARGET_NODES) { + nodes.resize(TARGET_NODES); + } + + return nodes; +} + Dht::RoutingTable::iterator Dht::RoutingTable::findBucket(const InfoHash& id) { @@ -744,16 +793,10 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& if (nodes.size() >= SEARCH_NODES && id.xorCmp(nid, nodes.back().node->id) > 0 && node->isExpired(now)) return false; - // Reset search timer if it was empty - if (nodes.empty()) { - step_time = TIME_INVALID; - get_step_time = TIME_INVALID; - } - bool found = false; unsigned num_candidates = 0; auto n = std::find_if(nodes.begin(), nodes.end(), [&](const SearchNode& sn) { - if (sn.candidate) + if (sn.candidate or sn.node->isExpired(now)) num_candidates++; if (sn.node == node) { found = true; @@ -761,20 +804,26 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& } return id.xorCmp(nid, sn.node->id) < 0; }); + + bool new_search_node = false; if (!found) { - if (nodes.size()-num_candidates >= SEARCH_NODES or nodes.size() >= SEARCH_NODES+TARGET_NODES/2) { + if (nodes.size()-num_candidates >= SEARCH_NODES) { if (node->isExpired(now)) return false; - if (n == nodes.end()) { - // search is full, try to remove an expired node - if (not removeExpiredNode(now)) - return false; - n = nodes.end(); - } + if (n == nodes.end()) + return false; } + + // Reset search timer if the search is empty + if (nodes.empty()) { + step_time = TIME_INVALID; + get_step_time = TIME_INVALID; + } + //bool synced = isSynced(now); n = nodes.insert(n, SearchNode(node)); node->time = now; + new_search_node = true; /*if (synced) { n->candidate = true; //std::cout << "Adding candidate node " << node->id << " to IPv" << (af==AF_INET?'4':'6') << " synced search " << id << std::endl; @@ -789,15 +838,12 @@ Dht::Search::insertNode(std::shared_ptr<Node> node, time_point now, const Blob& if (not token.empty()) { n->getStatus.reply_time = now; n->getStatus.request_time = TIME_INVALID; - if (n->candidate) { - n->candidate = false; - //std::cout << "Confirm candidate node " << node->id << " to synced search " << id << std::endl; - } + n->candidate = false; if (token.size() <= 64) n->token = token; expired = false; } - return true; + return new_search_node; } std::vector<std::shared_ptr<Node>> @@ -932,8 +978,9 @@ Dht::searchStep(Search& sr) //std::cout << "Sending announce_value to " << n.node->id << " " << print_addr(n.node->ss, n.node->sslen) << std::endl; sendAnnounceValue((sockaddr*)&n.node->ss, n.node->sslen, - TransId {TransPrefix::ANNOUNCE_VALUES, sr.tid}, sr.id, *a.value, - n.token, n.node->reply_time >= now - UDP_REPLY_TIME); + TransId {TransPrefix::ANNOUNCE_VALUES, sr.tid}, + sr.id, *a.value, a.created, n.token, + n.node->reply_time >= now - UDP_REPLY_TIME); if (a_status == n.acked.end()) { n.acked[vid] = { now }; } else { @@ -973,28 +1020,33 @@ Dht::searchStep(Search& sr) return sn.candidate or sn.node->isExpired(now); }) == sr.nodes.size()) { - DHT_ERROR("[search %s IPv%c] expired", sr.id.toString().c_str(), sr.af == AF_INET ? '4' : '6'); - // no nodes or all expired nodes - sr.expired = true; - if (sr.announce.empty() && sr.listeners.empty()) { - // Listening or announcing requires keeping the cluster up to date. - sr.done = true; - } - { - auto get_cbs = std::move(sr.callbacks); - for (const auto& g : get_cbs) { - if (g.done_cb) - g.done_cb(false, {}); + unsigned added = sr.refill(sr.af == AF_INET ? buckets : buckets6, now); + if (added) { + DHT_WARN("[search %s IPv%c] refilled with %u nodes", sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', added); + } else { + DHT_ERROR("[search %s IPv%c] expired", sr.id.toString().c_str(), sr.af == AF_INET ? '4' : '6'); + // no nodes or all expired nodes + sr.expired = true; + if (sr.announce.empty() && sr.listeners.empty()) { + // Listening or announcing requires keeping the cluster up to date. + sr.done = true; + } + { + auto get_cbs = std::move(sr.callbacks); + for (const auto& g : get_cbs) { + if (g.done_cb) + g.done_cb(false, {}); + } + } + { + std::vector<DoneCallback> a_cbs; + a_cbs.reserve(sr.announce.size()); + for (const auto& a : sr.announce) + if (a.callback) + a_cbs.emplace_back(std::move(a.callback)); + for (const auto& a : a_cbs) + a(false, {}); } - } - { - std::vector<DoneCallback> a_cbs; - a_cbs.reserve(sr.announce.size()); - for (const auto& a : sr.announce) - if (a.callback) - a_cbs.emplace_back(std::move(a.callback)); - for (const auto& a : a_cbs) - a(false, {}); } } } @@ -1035,13 +1087,15 @@ Dht::newSearch() } /* Insert the contents of a bucket into a search structure. */ -void +unsigned Dht::Search::insertBucket(const Bucket& b, time_point now) { + unsigned inserted = 0; for (auto& n : b.nodes) { - if (not n->isExpired(now)) - insertNode(n, now); + if (not n->isExpired(now) and insertNode(n, now)) + inserted++; } + return inserted; } bool @@ -1247,6 +1301,27 @@ Dht::bootstrapSearch(Dht::Search& sr) sr.insertBucket(*list.findBucket(myid), now); } +unsigned +Dht::Search::refill(const RoutingTable& r, time_point now) { + if (r.isEmpty() or r.front().af != af) + return 0; + unsigned added = 0; + auto b = r.findBucket(id); + auto n = b; + while (added < SEARCH_NODES && (std::next(n) != r.end() || b != r.begin())) { + if (std::next(n) != r.end()) { + added += insertBucket(*std::next(n), now); + n = std::next(n); + } + if (b != r.begin()) { + added += insertBucket(*std::prev(b), now); + b = std::prev(b); + } + } + //DHT_WARN("[search %s IPv%c] refilled with %u nodes", id.toString().c_str(), (af == AF_INET) ? '4' : '6', added); + return added; +} + /* Start a search. */ Dht::Search* Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallback done_callback, Value::Filter filter) @@ -1291,7 +1366,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallba } void -Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback) +Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created) { if (!value) { if (callback) @@ -1312,16 +1387,26 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, return a.value->id == value->id; }); if (a_sr == sr->announce.end()) - sr->announce.emplace_back(Announce {value, callback}); + sr->announce.emplace_back(Announce {value, created, callback}); else { if (a_sr->value != value) { a_sr->value = value; for (auto& n : sr->nodes) n.acked[value->id] = {}; } - if (a_sr->callback) - a_sr->callback(false, {}); - a_sr->callback = callback; + if (sr->isAnnounced(value->id, getType(value->type), now)) { + if (a_sr->callback) + a_sr->callback(true, {}); + a_sr->callback = {}; + if (callback) { + callback(true, {}); + } + return; + } else { + if (a_sr->callback) + a_sr->callback(false, {}); + a_sr->callback = callback; + } } auto tm = sr->getNextStepTime(types, now); if (tm < search_time) { @@ -1389,7 +1474,7 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f) Storage* st = findStorage(id); size_t tokenlocal = 0; if (!st && store.size() < MAX_HASHES) { - store.push_back(Storage {id}); + store.push_back(Storage {id, now}); st = &store.back(); } if (st) { @@ -1449,7 +1534,7 @@ Dht::cancelListen(const InfoHash& id, size_t token) } void -Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback) +Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, time_point created) { now = clock::now(); @@ -1477,13 +1562,13 @@ Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback) *done4 = true; *ok |= ok4; donecb(nodes); - }); + }, created); announce(id, AF_INET6, val, [=](bool ok6, const std::vector<std::shared_ptr<Node>>& nodes) { DHT_DEBUG("Announce done IPv6 %d", ok6); *done6 = true; *ok |= ok6; donecb(nodes); - }); + }, created); } struct OpStatus { @@ -1659,13 +1744,14 @@ Dht::storageChanged(Storage& st, ValueStorage& v) } Dht::ValueStorage* -Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) +Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created) { + created = std::min(created, now); Storage *st = findStorage(id); if (!st) { if (store.size() >= MAX_HASHES) return nullptr; - store.push_back(Storage {id}); + store.push_back(Storage {id, now}); st = &store.back(); } @@ -1674,7 +1760,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) }); if (it != st->values.end()) { /* Already there, only need to refresh */ - it->time = now; + it->time = created; if (it->data != value) { DHT_DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); it->data = value; @@ -1685,7 +1771,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) DHT_DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); if (st->values.size() >= MAX_VALUES) return nullptr; - st->values.emplace_back(value, now); + st->values.emplace_back(value, created); storageChanged(*st, st->values.back()); return &st->values.back(); } @@ -1698,7 +1784,7 @@ Dht::storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr if (!st) { if (store.size() >= MAX_HASHES) return; - store.push_back(Storage {id}); + store.push_back(Storage {id, now}); st = &store.back(); } sa_family_t af = from->sa_family; @@ -1717,8 +1803,7 @@ void Dht::expireStorage() { auto i = store.begin(); - while (i != store.end()) - { + while (i != store.end()) { // put elements to remove at the end with std::partition, // and then remove them with std::vector::erase. i->listeners.erase( @@ -1746,7 +1831,7 @@ Dht::expireStorage() }), i->values.end()); - if (i->values.empty() && i->listeners.empty()) { + if ((i->values.empty() && i->listeners.empty()) || (!i->want4 && !i->want6)) { DHT_DEBUG("Discarding expired value %s", i->id.toString().c_str()); i = store.erase(i); } @@ -2181,6 +2266,50 @@ Dht::bucketMaintenance(RoutingTable& list) return false; } +size_t +Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { + int announce_per_af = 0; + auto *local_storage = findStorage(id); + if (!local_storage) { return 0; } + + auto nodes = buckets.findClosestNodes(id); + auto nodes6 = buckets6.findClosestNodes(id); + + if (!nodes.empty()) { + if (force || id.xorCmp(nodes.back()->id, myid) < 0) { + for (auto &local_value_storage : local_storage->values) { + const auto& vt = getType(local_value_storage.data->type); + if (force || local_value_storage.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { + // gotta put that value there + announce(id, AF_INET, local_value_storage.data, donecb, local_value_storage.time); + ++announce_per_af; + } + } + local_storage->want4 = false; + } + else { local_storage->want4 = true; } + } + else { local_storage->want4 = false; } + + if (!nodes6.empty()) { + if (force || id.xorCmp(nodes6.back()->id, myid) < 0) { + for (auto &local_value_storage : local_storage->values) { + const auto& vt = getType(local_value_storage.data->type); + if (force || local_value_storage.time + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) { + // gotta put that value there + announce(id, AF_INET6, local_value_storage.data, donecb, local_value_storage.time); + ++announce_per_af; + } + } + local_storage->want6 = false; + } + else { local_storage->want6 = true; } + } + else { local_storage->want6 = false; } + + return announce_per_af; +} + void Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, socklen_t fromlen) { @@ -2473,7 +2602,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc const auto& type = getType(lv->type); if (type.editPolicy(msg.info_hash, lv, vc, msg.id, from, fromlen)) { DHT_DEBUG("Editing value of type %s belonging to %s at %s.", type.name.c_str(), v->owner.getId().toString().c_str(), msg.info_hash.toString().c_str()); - storageStore(msg.info_hash, vc); + storageStore(msg.info_hash, vc, msg.created); } else { DHT_WARN("Rejecting edition of type %s belonging to %s at %s because of storage policy.", type.name.c_str(), v->owner.getId().toString().c_str(), msg.info_hash.toString().c_str()); } @@ -2482,7 +2611,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc const auto& type = getType(vc->type); if (type.storePolicy(msg.info_hash, vc, msg.id, from, fromlen)) { DHT_DEBUG("Storing value of type %s belonging to %s at %s.", type.name.c_str(), v->owner.getId().toString().c_str(), msg.info_hash.toString().c_str()); - storageStore(msg.info_hash, vc); + storageStore(msg.info_hash, vc, msg.created); } else { DHT_WARN("Rejecting storage of type %s belonging to %s at %s because of storage policy.", type.name.c_str(), v->owner.getId().toString().c_str(), msg.info_hash.toString().c_str()); } @@ -2581,7 +2710,18 @@ Dht::periodic(const uint8_t *buf, size_t buflen, confirm_nodes_time = now + time_dis(rd); } - return std::min(confirm_nodes_time, search_time); + //data persistence + time_point storage_maintenance_time = time_point::max(); + for (auto &str : store) { + if (now > str.maintenance_time) { + maintainStorage(str.id); + str.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; + + } + storage_maintenance_time = std::min(storage_maintenance_time, str.maintenance_time); + } + + return std::min(confirm_nodes_time, std::min(search_time, storage_maintenance_time)); } std::vector<Dht::ValuesExport> @@ -3036,18 +3176,22 @@ Dht::sendListenConfirmation(const sockaddr* sa, socklen_t salen, TransId tid) int Dht::sendAnnounceValue(const sockaddr *sa, socklen_t salen, TransId tid, - const InfoHash& infohash, const Value& value, + const InfoHash& infohash, const Value& value, time_point created, const Blob& token, int confirm) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_map(5); - pk.pack(std::string("a")); pk.pack_map(4); + pk.pack(std::string("a")); pk.pack_map((created < now ? 5 : 4)); pk.pack(std::string("id")); pk.pack(myid); pk.pack(std::string("h")); pk.pack(infohash); pk.pack(std::string("values")); pk.pack_array(1); pk.pack(value); - pk.pack(std::string("token")); packToken(pk, token); + if (created < now) { + pk.pack(std::string("c")); + pk.pack(to_time_t(created)); + } + pk.pack(std::string("token")); pk.pack(token); pk.pack(std::string("q")); pk.pack(std::string("put")); pk.pack(std::string("t")); pk.pack_bin(tid.size()); @@ -3162,6 +3306,9 @@ Dht::ParsedMessage::msgpack_unpack(msgpack::object msg) if (auto rnodes4 = findMapValue(req, "n4")) nodes4 = unpackBlob(*rnodes4); + if (auto rcreated = findMapValue(req, "c")) + created = from_time_t(rcreated->as<std::time_t>()); + if (auto rnodes6 = findMapValue(req, "n6")) nodes6 = unpackBlob(*rnodes6); diff --git a/src/value.cpp b/src/value.cpp index dbcb197f51e2161bf1dd4860d15669343c52d186..5d22f259b57c462be73949af17dfe3f0619f7e81 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -35,6 +35,16 @@ namespace dht { +time_point from_time_t(std::time_t t) { + return clock::now() + (std::chrono::system_clock::from_time_t(t) - std::chrono::system_clock::now()); +} + +std::time_t to_time_t(time_point t) { + return std::chrono::system_clock::to_time_t(std::chrono::system_clock::now() + + (t - clock::now())); +} + + std::ostream& operator<< (std::ostream& s, const Value& v) { s << "Value[id:" << std::hex << v.id << std::dec << " ";