diff --git a/include/opendht/dht.h b/include/opendht/dht.h index a79619e830489923353275f158ec2c6223959560..529cc049d0d50e8e38b8f6577cd20b802c8cd106 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -573,7 +573,8 @@ private: * * @param sr The search to execute its operations. */ - void searchStep(Sp<Search>); + void searchStep(std::weak_ptr<Search> ws); + void searchSynchedNodeListen(const Sp<Search>&, SearchNode&); void dumpSearch(const Search& sr, std::ostream& out) const; diff --git a/src/dht.cpp b/src/dht.cpp index 74c563b2c6867ec11d392dfe1eb4856a896772f0..7c08225409f180a126b516a6f3e236fd3864dd13 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -288,7 +288,7 @@ Dht::searchNodeGetDone(const net::Request& req, if (srn->syncJob) scheduler.edit(srn->syncJob, syncTime); else - srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr)); + srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, ws)); } onGetValuesDone(req.node, answer, sr, query); } @@ -490,13 +490,18 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { } catch (std::out_of_range&) { } auto next_refresh_time = now + getType(a.value->type).expiration; + auto& acked = sn->acked[a.value->id]; + if (acked.refresh) { + acked.refresh->cancel(); + acked.refresh.reset(); + } /* only put the value if the node doesn't already have it */ if (not hasValue or seq_no < a.value->seq) { if (logger_) logger_->d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %016" PRIx64 ")", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); auto created = a.permanent ? time_point::max() : a.created; - sn->acked[a.value->id] = { + acked = { network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired), next_refresh_time }; @@ -504,7 +509,7 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { if (logger_) logger_->w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %016" PRIx64 ")", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = { + acked = { network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, [this, ws, node=sn->node, v=a.value, onDone, @@ -536,17 +541,13 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED); ack_req->reply_time = now; - sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time); + acked = {std::move(ack_req), next_refresh_time}; /* step to clear announces */ scheduler.edit(sr->nextSearchStep, now); } if (a.permanent) { - scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, [this,ws] { - if (auto sr = ws.lock()) { - searchStep(sr); - } - }); + acked.refresh = scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, std::bind(&Dht::searchStep, this, ws)); } } }; @@ -644,8 +645,8 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) if (auto sr = ws.lock()) { scheduler.edit(sr->nextSearchStep, scheduler.time()); if (auto sn = sr->getNode(req.node)) { - scheduler.add(sn->getListenTime(query, getListenExpiration()), std::bind(&Dht::searchStep, this, sr)); - sn->onListenSynced(query); + auto job = scheduler.add(sn->getListenTime(query, getListenExpiration()), std::bind(&Dht::searchStep, this, ws)); + sn->onListenSynced(query, true, std::move(job)); } onListenDone(req.node, answer, sr); } @@ -671,8 +672,9 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) /* When a search is in progress, we periodically call search_step to send further requests. */ void -Dht::searchStep(Sp<Search> sr) +Dht::searchStep(std::weak_ptr<Search> ws) { + auto sr = ws.lock(); if (not sr or sr->expired or sr->done) return; const auto& now = scheduler.time(); @@ -825,7 +827,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q sr->expired = false; sr->nodes.clear(); sr->nodes.reserve(SEARCH_NODES+1); - sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, sr)); + sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, std::weak_ptr<Search>(sr))); if (logger_) logger_->w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); if (search_id == 0) @@ -1278,8 +1280,10 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created if (auto vs = store.first) { total_store_size += store.second.size_diff; total_values += store.second.values_diff; + if (vs->expiration_job) + vs->expiration_job->cancel(); if (not permanent) { - scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id)); + vs->expiration_job = scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id)); } if (total_store_size > max_store_size) { expireStore(); @@ -1621,10 +1625,10 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const out << "["; for (const auto& a : sr.announce) { auto ack = n.acked.find(a.value->id); - if (ack == n.acked.end() or not ack->second.first) { + if (ack == n.acked.end() or not ack->second.req) { out << ' '; } else { - out << ack->second.first->getStateChar(); + out << ack->second.req->getStateChar(); } } out << "] "; @@ -2193,25 +2197,25 @@ Dht::exportNodes() const if (b4 != dht4.buckets.end()) { for (auto& n : b4->nodes) if (n->isGood(now)) - nodes.push_back(n->exportNode()); + nodes.emplace_back(n->exportNode()); } const auto b6 = dht6.buckets.findBucket(myid); if (b6 != dht6.buckets.end()) { for (auto& n : b6->nodes) if (n->isGood(now)) - nodes.push_back(n->exportNode()); + nodes.emplace_back(n->exportNode()); } for (auto b = dht4.buckets.begin(); b != dht4.buckets.end(); ++b) { if (b == b4) continue; for (auto& n : b->nodes) if (n->isGood(now)) - nodes.push_back(n->exportNode()); + nodes.emplace_back(n->exportNode()); } for (auto b = dht6.buckets.begin(); b != dht6.buckets.end(); ++b) { if (b == b6) continue; for (auto& n : b->nodes) if (n->isGood(now)) - nodes.push_back(n->exportNode()); + nodes.emplace_back(n->exportNode()); } return nodes; } @@ -2484,7 +2488,7 @@ Dht::onAnnounce(Sp<Node> n, if (*lv == *vc) { storageRefresh(hash, v->id); if (logger_) - logger_->d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str()); + logger_->d(hash, node.id, "[store %s] [node %s] refreshed value %016" PRIx64, hash.toString().c_str(), node.toString().c_str(), v->id); } else { const auto& type = getType(lv->type); if (type.editPolicy(hash, lv, vc, node.id, node.getAddr())) { @@ -2527,7 +2531,7 @@ Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Val } if (storageRefresh(hash, vid)) { if (logger_) - logger_->d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str()); + logger_->d(hash, node->id, "[store %s] [node %s] refreshed value %016" PRIx64, hash.toString().c_str(), node->toString().c_str(), vid); } else { if (logger_) logger_->d(hash, node->id, "[store %s] [node %s] got refresh for unknown value", @@ -2563,8 +2567,15 @@ Dht::storageRefresh(const InfoHash& id, Value::Id vid) } auto expiration = s->second.refresh(now, vid, types); - if (expiration != time_point::max()) - scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id)); + if (expiration.first) { + if (expiration.first->expiration_job) { + expiration.first->expiration_job->cancel(); + expiration.first->expiration_job.reset(); + } + if (expiration.second != time_point::max()) { + expiration.first->expiration_job = scheduler.add(expiration.second, std::bind(&Dht::expireStorage, this, id)); + } + } return true; } return false; diff --git a/src/search.h b/src/search.h index 3212bb2cdddb55cbdce1a719fc15aabe1876d913..cd89d6a88e101512e3ec1f4be4c609db9cb29597 100644 --- a/src/search.h +++ b/src/search.h @@ -49,12 +49,21 @@ struct Dht::Announce { }; struct Dht::SearchNode { + + struct AnnounceStatus { + Sp<net::Request> req {}; + Sp<Scheduler::Job> refresh {}; + time_point refresh_time; + AnnounceStatus(){}; + AnnounceStatus(Sp<net::Request> r, time_point t): req(std::move(r)), refresh_time(t) + {} + }; /** * Foreach value id, we keep track of a pair (net::Request, time_point) where the * request is the request returned by the network engine and the time_point * is the next time at which the value must be refreshed. */ - using AnnounceStatus = std::map<Value::Id, std::pair<Sp<net::Request>, time_point>>; + using AnnounceStatusMap = std::map<Value::Id, AnnounceStatus>; /** * Foreach Query, we keep track of the request returned by the network * engine when we sent the "get". @@ -63,6 +72,7 @@ struct Dht::SearchNode { struct CachedListenStatus { ValueCache cache; + Sp<Scheduler::Job> refresh {}; Sp<Scheduler::Job> cacheExpirationJob {}; Sp<net::Request> req {}; Tid socketId {0}; @@ -87,7 +97,7 @@ struct Dht::SearchNode { SyncStatus getStatus {}; /* get/sync status */ NodeListenerStatus listenStatus {}; /* listen status */ - AnnounceStatus acked {}; /* announcement status for a given value id */ + AnnounceStatusMap acked {}; /* announcement status for a given value id */ Blob token {}; /* last token the node sent to us after a get request */ time_point last_get_reply {time_point::min()}; /* last time received valid token */ @@ -239,9 +249,12 @@ struct Dht::SearchNode { } } - void onListenSynced(const Sp<Query>& q, bool synced = true) { + void onListenSynced(const Sp<Query>& q, bool synced = true, Sp<Scheduler::Job> refreshJob = {}) { auto l = listenStatus.find(q); if (l != listenStatus.end()) { + if (l->second.refresh) + l->second.refresh->cancel(); + l->second.refresh = std::move(refreshJob); l->second.cache.onSynced(synced); } } @@ -292,16 +305,18 @@ struct Dht::SearchNode { bool isAnnounced(Value::Id vid) const { auto ack = acked.find(vid); - if (ack == acked.end() or not ack->second.first) + if (ack == acked.end() or not ack->second.req) return false; - return ack->second.first->completed(); + return ack->second.req->completed(); } void cancelAnnounce() { for (const auto& status : acked) { - const auto& req = status.second.first; + const auto& req = status.second.req; if (req and req->pending()) { node->cancelRequest(req); } + if (status.second.refresh) + status.second.refresh->cancel(); } acked.clear(); } @@ -328,14 +343,21 @@ struct Dht::SearchNode { return listen_status->second.req->reply_time + listen_expire > now; } void cancelListen() { - for (const auto& status : listenStatus) + for (const auto& status : listenStatus) { node->cancelRequest(status.second.req); - listenStatus.clear(); + if (status.second.refresh) + status.second.refresh->cancel(); + if (status.second.cacheExpirationJob) + status.second.cacheExpirationJob->cancel(); + } + listenStatus.clear(); } void cancelListen(const Sp<Query>& query) { auto it = listenStatus.find(query); if (it != listenStatus.end()) { node->cancelRequest(it->second.req); + if (it->second.refresh) + it->second.refresh->cancel(); listenStatus.erase(it); } } @@ -345,13 +367,13 @@ struct Dht::SearchNode { */ time_point getAnnounceTime(Value::Id vid) const { const auto& ack = acked.find(vid); - if (ack == acked.cend() or not ack->second.first) { + if (ack == acked.cend() or not ack->second.req) { return time_point::min(); } - if (ack->second.first->completed()) { - return ack->second.second - REANNOUNCE_MARGIN; + if (ack->second.req->completed()) { + return ack->second.refresh_time - REANNOUNCE_MARGIN; } - return ack->second.first->pending() ? time_point::max() : time_point::min(); + return ack->second.req->pending() ? time_point::max() : time_point::min(); } /** @@ -541,17 +563,15 @@ struct Dht::Search { if (not opExpirationJob) opExpirationJob = scheduler.add(time_point::max(), [this,&scheduler]{ auto nextExpire = cache.expire(scheduler.time(), [&](size_t t){ - Sp<Query> query; const auto& ll = listeners.find(t); if (ll != listeners.cend()) { - query = ll->second.query; + auto query = ll->second.query; listeners.erase(ll); - } - for (auto& sn : nodes) { - if (listeners.empty()) - sn->cancelListen(); - else if (query) - sn->cancelListen(query); + if (listeners.empty()) { + for (auto& sn : nodes) sn->cancelListen(); + } else if (query) { + for (auto& sn : nodes) sn->cancelListen(query); + } } }); scheduler.edit(opExpirationJob, nextExpire); @@ -588,8 +608,10 @@ struct Dht::Search { for (auto& n : nodes) { auto ackIt = n->acked.find(vid); if (ackIt != n->acked.end()) { - if (ackIt->second.first) - ackIt->second.first->cancel(); + if (ackIt->second.req) + ackIt->second.req->cancel(); + if (ackIt->second.refresh) + ackIt->second.refresh->cancel(); n->acked.erase(ackIt); } } @@ -606,7 +628,7 @@ struct Dht::Search { announce.emplace_back(Announce {permanent, value, created, callback}); for (auto& n : nodes) { n->probe_query.reset(); - n->acked[value->id].first.reset(); + n->acked[value->id].req.reset(); } } else { a_sr->permanent = permanent; @@ -614,7 +636,7 @@ struct Dht::Search { if (a_sr->value != value) { a_sr->value = value; for (auto& n : nodes) { - n->acked[value->id].first.reset(); + n->acked[value->id].req.reset(); n->probe_query.reset(); } } @@ -738,8 +760,8 @@ struct Dht::Search { for (auto& n : nodes) { auto ackIt = n->acked.find(it->value->id); if (ackIt != n->acked.end()) { - if (ackIt->second.first) - ackIt->second.first->cancel(); + if (ackIt->second.req) + ackIt->second.req->cancel(); n->acked.erase(ackIt); } } diff --git a/src/storage.h b/src/storage.h index 9c9f9db94abbf4b2fc12c38511155a35cd4a4234..0dc958000c78dc13655356a69f3917dba94e7b29 100644 --- a/src/storage.h +++ b/src/storage.h @@ -58,6 +58,7 @@ struct ValueStorage { Sp<Value> data {}; time_point created {}; time_point expiration {}; + Sp<Scheduler::Job> expiration_job {}; StorageBucket* store_bucket {nullptr}; ValueStorage() {} @@ -142,14 +143,15 @@ struct Storage { * @param vid The value id * @return time of the next expiration, time_point::max() if no expiration */ - time_point refresh(const time_point& now, const Value::Id& vid, const TypeStore& types) { + std::pair<ValueStorage*, time_point> + refresh(const time_point& now, const Value::Id& vid, const TypeStore& types) { for (auto& vs : values) if (vs.data->id == vid) { vs.created = now; vs.expiration = std::max(vs.expiration, now + types.getType(vs.data->type).expiration); - return vs.expiration; + return {&vs, vs.expiration}; } - return time_point::max(); + return {nullptr, time_point::max()}; } size_t listen(ValueCallback& cb, Value::Filter& f, const Sp<Query>& q); @@ -197,9 +199,9 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t if (it != values.end()) { /* Already there, only need to refresh */ it->created = created; - size_t size_old = it->data->size(); - ssize_t size_diff = size_new - (ssize_t)size_old; if (it->data != value) { + size_t size_old = it->data->size(); + ssize_t size_diff = size_new - (ssize_t)size_old; //DHT_LOG.DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); // clear quota for previous value if (it->store_bucket) @@ -213,7 +215,6 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t total_size += size_diff; return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0}); } - return std::make_pair(nullptr, StoreDiff{}); } else { //DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); if (values.size() < MAX_VALUES) { @@ -224,8 +225,8 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t sb->insert(id, *value, expiration); return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0}); } - return std::make_pair(nullptr, StoreDiff{}); } + return std::make_pair(nullptr, StoreDiff{}); } Storage::StoreDiff @@ -239,6 +240,8 @@ Storage::remove(const InfoHash& id, Value::Id vid) ssize_t size = it->data->size(); if (it->store_bucket) it->store_bucket->erase(id, *it->data, it->expiration); + if (it->expiration_job) + it->expiration_job->cancel(); total_size -= size; values.erase(it); return {-size, -1, 0}; @@ -287,6 +290,8 @@ Storage::expire(const InfoHash& id, time_point now) size_diff -= v.data->size(); if (v.store_bucket) v.store_bucket->erase(id, *v.data, v.expiration); + if (v.expiration_job) + v.expiration_job->cancel(); ret.emplace_back(std::move(v.data)); }); total_size += size_diff;