From 5112a322de61d026d02874dc51398f2f711d791e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Wed, 19 Sep 2018 01:43:42 -0400 Subject: [PATCH] dht: reschedule permanent put refresh after request --- src/dht.cpp | 165 ++++++++++++++++++++++++++++------------------------ 1 file changed, 88 insertions(+), 77 deletions(-) diff --git a/src/dht.cpp b/src/dht.cpp index 65ea1967..a2071c9d 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -383,6 +383,94 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { unsigned i = 0; auto probe_query = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum)); std::weak_ptr<Search> ws = sr; + + auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer) + { /* when put done */ + if (auto sr = ws.lock()) { + onAnnounceDone(req.node, answer, sr); + searchStep(sr); + } + }; + + auto onExpired = [this,ws](const net::Request&, bool over) + { /* when put expired */ + if (over) + if (auto sr = ws.lock()) + scheduler.edit(sr->nextSearchStep, scheduler.time()); + }; + + auto onSelectDone = + [this,ws,onDone,onExpired](const net::Request& req, net::RequestAnswer&& answer) mutable + { /* on probing done */ + auto sr = ws.lock(); + if (not sr) return; + const auto& now = scheduler.time(); + sr->insertNode(req.node, scheduler.time(), answer.ntoken); + auto sn = sr->getNode(req.node); + if (not sn) return; + + if (not sn->isSynced(now)) { + /* Search is now unsynced. Let's call searchStep to sync again. */ + scheduler.edit(sr->nextSearchStep, now); + return; + } + for (auto& a : sr->announce) { + if (sn->getAnnounceTime(a.value->id) > now) + continue; + bool hasValue {false}; + uint16_t seq_no = 0; + try { + const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), + [&a](const Sp<FieldValueIndex>& i){ + return i->index.at(Value::Field::Id).getInt() == a.value->id; + }); + if (f != answer.fields.cend() and *f) { + hasValue = true; + seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt()); + } + } catch (std::out_of_range&) { } + + auto next_refresh_time = now + getType(a.value->type).expiration; + /* only put the value if the node doesn't already have it */ + if (not hasValue or seq_no < a.value->seq) { + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", + sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); + sn->acked[a.value->id] = std::make_pair(network_engine.sendAnnounceValue(sn->node, + sr->id, + a.value, + a.permanent ? time_point::max() : a.created, + sn->token, + onDone, + onExpired), next_refresh_time); + } else if (hasValue and a.permanent) { + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", + sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); + sn->acked[a.value->id] = std::make_pair(network_engine.sendRefreshValue(sn->node, + sr->id, + a.value->id, + sn->token, + onDone, + onExpired), next_refresh_time); + } else { + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", + sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); + auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED); + ack_req->reply_time = now; + sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time); + + /* step to clear announces */ + scheduler.edit(sr->nextSearchStep, now); + } + if (a.permanent) { + scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, [this,ws] { + if (auto sr = ws.lock()) { + searchStep(sr); + } + }); + } + } + }; + const auto& now = scheduler.time(); for (auto& n : sr->nodes) { if (not n.isSynced(now)) @@ -393,83 +481,6 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { }) == sr->announce.cend()) continue; - auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer) - { /* when put done */ - if (auto sr = ws.lock()) { - onAnnounceDone(req.node, answer, sr); - searchStep(sr); - } - }; - auto onExpired = [this,ws](const net::Request&, bool over) - { /* when put expired */ - if (over) - if (auto sr = ws.lock()) - scheduler.edit(sr->nextSearchStep, scheduler.time()); - }; - auto onSelectDone = - [this,ws,onDone,onExpired](const net::Request& req, net::RequestAnswer&& answer) mutable - { /* on probing done */ - auto sr = ws.lock(); - if (not sr) return; - const auto& now = scheduler.time(); - sr->insertNode(req.node, scheduler.time(), answer.ntoken); - auto sn = sr->getNode(req.node); - if (not sn) return; - - if (not sn->isSynced(now)) { - /* Search is now unsynced. Let's call searchStep to sync again. */ - scheduler.edit(sr->nextSearchStep, now); - return; - } - for (auto& a : sr->announce) { - if (sn->getAnnounceTime(a.value->id) > now) - continue; - bool hasValue {false}; - uint16_t seq_no = 0; - try { - const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), - [&a](const Sp<FieldValueIndex>& i){ - return i->index.at(Value::Field::Id).getInt() == a.value->id; - }); - if (f != answer.fields.cend() and *f) { - hasValue = true; - seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt()); - } - } catch (std::out_of_range&) { } - - auto next_refresh_time = now + getType(a.value->type).expiration; - /* only put the value if the node doesn't already have it */ - if (not hasValue or seq_no < a.value->seq) { - DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", - sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = std::make_pair(network_engine.sendAnnounceValue(sn->node, - sr->id, - a.value, - a.permanent ? time_point::max() : a.created, - sn->token, - onDone, - onExpired), next_refresh_time); - } else if (hasValue and a.permanent) { - DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", - sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - sn->acked[a.value->id] = std::make_pair(network_engine.sendRefreshValue(sn->node, - sr->id, - a.value->id, - sn->token, - onDone, - onExpired), next_refresh_time); - } else { - DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", - sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED); - ack_req->reply_time = now; - sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time); - - /* step to clear announces */ - scheduler.edit(sr->nextSearchStep, now); - } - } - }; DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending %s", sr->id.toString().c_str(), n.node->toString().c_str(), probe_query->toString().c_str()); n.probe_query = probe_query; -- GitLab