diff --git a/src/dht.cpp b/src/dht.cpp index 2e30b093ae73ba13a76f30f2d3ceb21102f4a7ba..f60f4d619095d993b40b73fd933698aebc7611a1 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -552,14 +552,14 @@ struct Dht::Search { * @param types The sequence of existing types. * @param now The time reference to now. */ - void announced(Value::Id vid, const std::map<ValueType::Id, ValueType>& types, time_point now) { + void checkAnnounced(const std::map<ValueType::Id, ValueType>& types, time_point now, Value::Id vid = Value::INVALID_ID) { announce.erase(std::remove_if(announce.begin(), announce.end(), [this,&vid,&now,&types](Announce& a) { - if (!a.value || a.value->id != vid) + if (vid != Value::INVALID_ID and (!a.value || a.value->id != vid)) return false; const auto& type_it = types.find(a.value->type); const ValueType& type = (type_it == types.end()) ? ValueType::USER_DATA : type_it->second; - if (isAnnounced(vid, type, now)) { + if (isAnnounced(a.value->id, type, now)) { if (a.callback) { a.callback(true, getNodes()); a.callback = nullptr; @@ -613,8 +613,8 @@ Dht::shutdown(ShutdownCallback cb) { auto remaining = std::make_shared<int>(0); auto str_donecb = [=](bool, const std::vector<std::shared_ptr<Node>>&) { --*remaining; + DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); if (!*remaining && cb) { cb(); } - else DHT_LOG.WARN("Shuting down node: %u ops remaining.", *remaining); }; for (const auto& str : store) { @@ -1128,56 +1128,48 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { if (auto sn = sr->getNode(status.node)) { for (auto ait = sr->announce.begin(); ait != sr->announce.end();) { auto& a = *ait; - if (not (sn->isSynced(now) and sn->getAnnounceTime(a.value->id, getType(a.value->type)) <= now)) - continue; - if (!a.value) continue; - - auto hasValue {false}; - uint16_t seq_no; - try { - const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), - [&a](const std::shared_ptr<FieldValueIndex>& i){ - return i->index.at(Value::Field::Id).getInt() == a.value->id; - }); - if (f != answer.fields.cend() and *f) { - hasValue = true; - seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt()); - } - } catch (std::out_of_range&) { } - - /* only put the value if the node doesn't already have it */ - if (not hasValue or seq_no < a.value->seq) { - DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)", - sr->id.toString().c_str(), - sr->af == AF_INET ? '4' : '6', - sn->node->toString().c_str(), - a.value->id); - sn->acked[a.value->id] = network_engine.sendAnnounceValue(sn->node, - sr->id, - *a.value, - a.created, - sn->token, - onDone, - onExpired); - } else { - DHT_LOG.WARN("[search %s IPv%c] [node %s] already has value (vid: %d). Aborting.", - sr->id.toString().c_str(), - sr->af == AF_INET ? '4' : '6', - sn->node->toString().c_str(), - a.value->id); - /* TODO: kind of a hack. Other solution? */ - auto ack_req = std::make_shared<Request>(); - ack_req->reply_time = now; - sn->acked[a.value->id] = std::move(ack_req); - if (sr->isAnnounced(a.value->id, getType(a.value->type), now)){ - if (a.callback) { - a.callback(true, sr->getNodes()); - a.callback = nullptr; - } - if (not a.permanent) { - ait = sr->announce.erase(ait); - continue; + const auto& type = getType(a.value->type); + if (sn->isSynced(now) and sn->getAnnounceTime(a.value->id, type) <= now) { + auto hasValue {false}; + uint16_t seq_no; + try { + const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), + [&a](const std::shared_ptr<FieldValueIndex>& i){ + return i->index.at(Value::Field::Id).getInt() == a.value->id; + }); + if (f != answer.fields.cend() and *f) { + hasValue = true; + seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt()); } + } catch (std::out_of_range&) { } + + /* only put the value if the node doesn't already have it */ + if (not hasValue or seq_no < a.value->seq) { + DHT_LOG.WARN("[search %s IPv%c] [node %s] sending 'put' (vid: %d)", + sr->id.toString().c_str(), + sr->af == AF_INET ? '4' : '6', + sn->node->toString().c_str(), + a.value->id); + sn->acked[a.value->id] = network_engine.sendAnnounceValue(sn->node, + sr->id, + *a.value, + a.created, + sn->token, + onDone, + onExpired); + } else { + DHT_LOG.WARN("[search %s IPv%c] [node %s] already has value (vid: %d). Aborting.", + sr->id.toString().c_str(), + sr->af == AF_INET ? '4' : '6', + sn->node->toString().c_str(), + a.value->id); + /* TODO: kind of a hack. Other solution? */ + auto ack_req = std::make_shared<Request>(); + ack_req->reply_time = now; + sn->acked[a.value->id] = std::move(ack_req); + + /* step to clear announces */ + scheduler.edit(sr->nextSearchStep, now); } } ++ait; @@ -1217,7 +1209,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) /* Check if the first TARGET_NODES (8) live nodes have replied. */ if (sr->isSynced(now)) { - if (not sr->callbacks.empty()) { + if (not (sr->callbacks.empty() and sr->announce.empty())) { // search is synced but some (newer) get operations are not complete // Call callbacks when done for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) { @@ -1231,6 +1223,10 @@ Dht::searchStep(std::shared_ptr<Search> sr) else ++b; } + + /* clearing callbacks for announced values */ + sr->checkAnnounced(types, now); + if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty()) sr->done = true; } @@ -3242,7 +3238,7 @@ Dht::onAnnounceDone(const Request&, NetworkEngine::RequestAnswer& answer, std::s DHT_LOG.DEBUG("[search %s IPv%c] got reply to put!", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); searchSendGetValues(sr); - sr->announced(answer.vid, types, now); + sr->checkAnnounced(types, now, answer.vid); } }