diff --git a/include/opendht/dht.h b/include/opendht/dht.h index c7bf07b0595ede07c94d10fd28e6b4944eac3dab..c96d40623d8753fda1cf2b40e9095d47024558f1 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -324,6 +324,14 @@ private: return /*pinged < 3 && replied &&*/ reply_time >= now - NODE_EXPIRE_TIME; } + bool isAnnounced(Value::Id vid, const ValueType& type, time_t now) const { + auto ack = acked.find(vid); + if (ack == acked.end()) { + + return false; + } + return ack->second.reply_time + type.expiration > now; + } time_t getAnnounceTime(AnnounceStatusMap::const_iterator ack, const ValueType& type) const { if (ack == acked.end()) return request_time + 5; @@ -410,6 +418,8 @@ private: time_t getUpdateTime(time_t now) const; + bool isAnnounced(Value::Id id, const ValueType& type, time_t now) const; + /** * ret = 0 : no announce required. * ret > 0 : (re-)announce required at time ret. diff --git a/src/dht.cpp b/src/dht.cpp index 2c622041c288c610522f299658fd7de232574c5a..8e60a927a100f7d48b5c3f785bc85c49240255f3 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -662,11 +662,22 @@ Dht::searchStep(Search& sr) if (now.tv_sec - sr.step_time >= SEARCH_TIMEOUT) { DHT_WARN("Search IPv%c %s timed out.", sr.af == AF_INET ? '4' : '6', sr.id.toString().c_str()); sr.step_time = now.tv_sec; - for (const auto& g : sr.callbacks) { - if (g.done_cb) - g.done_cb(false); + { + auto get_cbs = std::move(sr.callbacks); + for (const auto& g : get_cbs) { + if (g.done_cb) + g.done_cb(false); + } + } + { + std::vector<DoneCallback> a_cbs; + a_cbs.reserve(sr.announce.size()); + for (const auto& a : sr.announce) + if (a.callback) + a_cbs.emplace_back(std::move(a.callback)); + for (const auto& a : a_cbs) + a(false); } - sr.callbacks.clear(); if (sr.announce.empty() && sr.listeners.empty()) sr.done = true; } @@ -731,7 +742,6 @@ Dht::searchStep(Search& sr) DHT_ERROR("Trying to announce a null value !"); } unsigned i = 0; - bool all_acked = true; auto vid = a.value->id; const auto& type = getType(a.value->type); if (in) { @@ -744,7 +754,6 @@ Dht::searchStep(Search& sr) auto a_status = n.acked.find(vid); auto at = n.getAnnounceTime(a_status, type); if ( at <= now.tv_sec ) { - all_acked = false; { char hbuf[NI_MAXHOST]; char sbuf[NI_MAXSERV]; @@ -766,10 +775,6 @@ Dht::searchStep(Search& sr) if (++i == 8) break; } - if (all_acked && a.callback) { - a.callback(true); - a.callback = nullptr; - } } for (auto& n : sr.nodes) { if (n.pending) { @@ -780,13 +785,9 @@ Dht::searchStep(Search& sr) pinged(*node); } } - DHT_DEBUG("Search done."); - /*if (sr.done_callback) { - sr.done_callback(true); - sr.done_callback = nullptr; - }*/ if (sr.announce.empty() && sr.listeners.empty()) sr.done = true; + DHT_DEBUG("Searchstep done."); } } else { @@ -915,6 +916,23 @@ Dht::Search::getUpdateTime(time_t now) const return ut == STEP_INVALID ? (callbacks.empty() ? 0 : now) : ut; } +bool +Dht::Search::isAnnounced(Value::Id id, const ValueType& type, time_t now) const +{ + if (nodes.empty()) + return false; + unsigned i = 0; + for (const auto& n : nodes) { + if (n.pinged >= 3) + continue; + if (!n.isAnnounced(id, type, now)) + return false; + if (++i == 8) + break; + } + return i; +} + time_t Dht::Search::getAnnounceTime(const std::map<ValueType::Id, ValueType>& types) const { @@ -1101,13 +1119,13 @@ Dht::announce(const InfoHash& id, sa_family_t af, const std::shared_ptr<Value>& for (auto& n : sr->nodes) n.acked[value->id] = {0, 0}; } + if (a_sr->callback) + a_sr->callback(false); a_sr->callback = callback; } - if (not sr->nodes.empty()) { - time_t tm = sr->getNextStepTime(types, now.tv_sec); - if (tm != 0 && (search_time == 0 || search_time > tm)) - search_time = tm; - } + time_t tm = sr->getNextStepTime(types, now.tv_sec); + if (tm != 0 && (search_time == 0 || search_time > tm)) + search_time = tm; } size_t @@ -1126,10 +1144,8 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter auto token = ++sr->listener_token; sr->listeners.insert({token, {f, cb}}); time_t tm = sr->getNextStepTime(types, now.tv_sec); - if (tm != 0 && (search_time == 0 || search_time > tm)) { - + if (tm != 0 && (search_time == 0 || search_time > tm)) search_time = tm; - } return token; } @@ -1205,13 +1221,13 @@ Dht::put(const InfoHash& id, Value&& value, DoneCallback callback) } }; announce(id, AF_INET, val, [=](bool ok4) { - DHT_DEBUG("search done IPv4 %d", ok4); + DHT_DEBUG("Announce done IPv4 %d", ok4); *done4 = true; *ok |= ok4; donecb(); }); announce(id, AF_INET6, val, [=](bool ok6) { - DHT_DEBUG("search done IPv6 %d", ok6); + DHT_DEBUG("Announce done IPv6 %d", ok6); *done6 = true; *ok |= ok6; donecb(); @@ -2054,6 +2070,16 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc } /* See comment for gp above. */ searchSendGetValues(*sr); + + // If the value was just successfully announced, call the callback + for (auto& a : sr->announce) { + if (!a.callback || !a.value || a.value->id != value_id) + continue; + if (sr->isAnnounced(value_id, getType(a.value->type), now.tv_sec)) { + a.callback(true); + a.callback = nullptr; + } + } } } else if (tid.matches(TransPrefix::LISTEN, &ttid)) { DHT_DEBUG("Got reply to listen.");