diff --git a/src/dht.cpp b/src/dht.cpp index c441fcd0d26b1ac5bf00088ffd2a279cd24e13da..46ff08f3719e161363c332b0e1e1c5533493effa 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -331,8 +331,8 @@ Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update) n = pn; } else { for (auto& sn : sr->nodes) { - if (sn->canGet(now, up, query)) { - n = sn.get(); + if (sn.canGet(now, up, query)) { + n = &sn; break; } } @@ -385,11 +385,11 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { std::weak_ptr<Search> ws = sr; const auto& now = scheduler.time(); for (auto& n : sr->nodes) { - if (not n->isSynced(now)) + if (not n.isSynced(now)) continue; if (std::find_if(sr->announce.cbegin(), sr->announce.cend(), [&now,&n](const Announce& a) { - return n->getAnnounceTime(a.value->id) <= now; + return n.getAnnounceTime(a.value->id) <= now; }) == sr->announce.cend()) continue; @@ -427,12 +427,13 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { bool hasValue {false}; uint16_t seq_no = 0; try { - for (const auto& field : answer.fields) { - if (field->index.at(Value::Field::Id).getInt() == a.value->id) { - hasValue = true; - seq_no = static_cast<uint16_t>(field->index.at(Value::Field::SeqNum).getInt()); - break; - } + const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), + [&a](const Sp<FieldValueIndex>& i){ + return i->index.at(Value::Field::Id).getInt() == a.value->id; + }); + if (f != answer.fields.cend() and *f) { + hasValue = true; + seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt()); } } catch (std::out_of_range&) { } @@ -469,16 +470,16 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { } } }; - DHT_LOG.w(sr->id, n->node->id, "[search %s] [node %s] sending %s", - sr->id.toString().c_str(), n->node->toString().c_str(), probe_query->toString().c_str()); - n->probe_query = probe_query; - n->getStatus[probe_query] = network_engine.sendGetValues(n->node, + DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending %s", + sr->id.toString().c_str(), n.node->toString().c_str(), probe_query->toString().c_str()); + n.probe_query = probe_query; + n.getStatus[probe_query] = network_engine.sendGetValues(n.node, sr->id, *probe_query, -1, onSelectDone, std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, probe_query)); - if (not n->candidate and ++i == TARGET_NODES) + if (not n.candidate and ++i == TARGET_NODES) break; } } @@ -589,8 +590,8 @@ Dht::searchStep(Sp<Search> sr) // clear corresponding queries for (const auto& get : completed_gets) for (auto& sn : sr->nodes) { - sn->getStatus.erase(get.query); - sn->pagination_queries.erase(get.query); + sn.getStatus.erase(get.query); + sn.pagination_queries.erase(get.query); } /* clearing callbacks for announced values */ @@ -609,10 +610,10 @@ Dht::searchStep(Sp<Search> sr) if (not sr->listeners.empty()) { unsigned i = 0; for (auto& n : sr->nodes) { - if (not n->isSynced(now)) + if (not n.isSynced(now)) continue; - searchSynchedNodeListen(sr, *n); - if (not n->candidate and ++i == LISTEN_NODES) + searchSynchedNodeListen(sr, n); + if (not n.candidate and ++i == LISTEN_NODES) break; } } @@ -777,8 +778,8 @@ Dht::announce(const InfoHash& id, if (a_sr == sr->announce.end()) { sr->announce.emplace_back(Announce {permanent, value, created, callback}); for (auto& n : sr->nodes) { - n->probe_query.reset(); - n->acked[value->id].first.reset(); + n.probe_query.reset(); + n.acked[value->id].first.reset(); } } else { a_sr->permanent = permanent; @@ -786,8 +787,8 @@ Dht::announce(const InfoHash& id, if (a_sr->value != value) { a_sr->value = value; for (auto& n : sr->nodes) { - n->acked[value->id].first.reset(); - n->probe_query.reset(); + n.acked[value->id].first.reset(); + n.probe_query.reset(); } } if (sr->isAnnounced(value->id)) { @@ -1355,9 +1356,9 @@ Dht::connectivityChanged(sa_family_t af) network_engine.connectivityChanged(af); for (auto& sp : searches(af)) for (auto& sn : sp.second->nodes) { - for (auto& ls : sn->listenStatus) - sn->node->cancelRequest(ls.second.req); - sn->listenStatus.clear(); + for (auto& ls : sn.listenStatus) + sn.node->cancelRequest(ls.second.req); + sn.listenStatus.clear(); } reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){ return addr.second.getFamily() == af; @@ -1504,42 +1505,42 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const auto last_get = sr.getLastGetTime(); for (const auto& n : sr.nodes) { i++; - out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n->node->id) << ' ' << n->node->id; - out << ' ' << (findNode(n->node->id, sr.af) ? '*' : ' '); + out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id; + out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' '); out << " ["; - if (auto pendingCount = n->node->getPendingMessageCount()) + if (auto pendingCount = n.node->getPendingMessageCount()) out << pendingCount; else out << ' '; - out << (n->node->isExpired() ? 'x' : ' ') << "]"; + out << (n.node->isExpired() ? 'x' : ' ') << "]"; // Get status { - char g_i = n->pending(n->getStatus) ? (n->candidate ? 'c' : 'f') : ' '; - char s_i = n->isSynced(now) ? (n->last_get_reply > last_get ? 'u' : 's') : '-'; + char g_i = n.pending(n.getStatus) ? (n.candidate ? 'c' : 'f') : ' '; + char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-'; out << " [" << s_i << g_i << "] "; } // Listen status if (not sr.listeners.empty()) { - if (n->listenStatus.empty()) + if (n.listenStatus.empty()) out << " "; else out << "[" - << (n->isListening(now) ? 'l' : (n->pending(n->listenStatus) ? 'f' : ' ')) << "] "; + << (n.isListening(now) ? 'l' : (n.pending(n.listenStatus) ? 'f' : ' ')) << "] "; } // Announce status if (not sr.announce.empty()) { - if (n->acked.empty()) { + if (n.acked.empty()) { out << " "; for (size_t a=0; a < sr.announce.size(); a++) out << ' '; } else { out << "["; for (const auto& a : sr.announce) { - auto ack = n->acked.find(a.value->id); - if (ack == n->acked.end() or not ack->second.first) { + auto ack = n.acked.find(a.value->id); + if (ack == n.acked.end() or not ack->second.first) { out << ' '; } else { out << ack->second.first->getStateChar(); @@ -1548,7 +1549,7 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const out << "] "; } } - out << n->node->getAddrStr() << std::endl; + out << n.node->getAddrStr() << std::endl; } } @@ -2095,9 +2096,9 @@ Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) { for (auto& srp : searches(node->getFamily())) { auto& sr = srp.second; for (auto& n : sr->nodes) { - if (n->node != node) continue; - n->token.clear(); - n->last_get_reply = time_point::min(); + if (n.node != node) continue; + n.token.clear(); + n.last_get_reply = time_point::min(); searchSendGetValues(sr); scheduler.edit(sr->nextSearchStep, scheduler.time()); break; diff --git a/src/search.h b/src/search.h index 099439eac1c2adf0e130add31e938b41c93e06b2..d6418618a6531b802f19bd9f7b2898916fefb271 100644 --- a/src/search.h +++ b/src/search.h @@ -91,9 +91,9 @@ struct Dht::SearchNode { SearchNode() : node() {} SearchNode(const SearchNode&) = delete; - SearchNode(SearchNode&&) = delete; + SearchNode(SearchNode&&) = default; SearchNode& operator=(const SearchNode&) = delete; - SearchNode& operator=(SearchNode&&) = delete; + SearchNode& operator=(SearchNode&&) = default; SearchNode(const Sp<Node>& node) : node(node) {} ~SearchNode() { @@ -369,7 +369,7 @@ struct Dht::Search { bool expired {false}; /* no node, or all nodes expired */ bool done {false}; /* search is over, cached for later */ - std::vector<std::unique_ptr<SearchNode>> nodes {}; + std::vector<SearchNode> nodes {}; /* pending puts */ std::vector<Announce> announce {}; @@ -404,17 +404,17 @@ struct Dht::Search { bool insertNode(const Sp<Node>& n, time_point now, const Blob& token={}); SearchNode* getNode(const Sp<Node>& n) { - for (auto& sn : nodes) - if (sn->node == n) - return sn.get(); - return nullptr; + auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) { + return n == sn.node; + }); + return (srn == nodes.end()) ? nullptr : &(*srn); } /* number of concurrent sync requests */ unsigned currentlySolicitedNodeCount() const { unsigned count = 0; for (const auto& n : nodes) - if (not n->isBad() and n->pendingGet()) + if (not n.isBad() and n.pendingGet()) count++; return count; } @@ -448,12 +448,12 @@ struct Dht::Search { */ void setDone(const Get& get) { for (auto& n : nodes) { - auto pqs = n->pagination_queries.find(get.query); - if (pqs != n->pagination_queries.cend()) { + auto pqs = n.pagination_queries.find(get.query); + if (pqs != n.pagination_queries.cend()) { for (auto& pq : pqs->second) - n->getStatus.erase(pq); + n.getStatus.erase(pq); } - n->getStatus.erase(get.query); + n.getStatus.erase(get.query); } if (get.done_cb) get.done_cb(true, getNodes()); @@ -466,9 +466,9 @@ struct Dht::Search { */ void setDone() { for (auto& n : nodes) { - n->getStatus.clear(); - n->listenStatus.clear(); - n->acked.clear(); + n.getStatus.clear(); + n.listenStatus.clear(); + n.acked.clear(); } done = true; } @@ -500,9 +500,9 @@ struct Dht::Search { } for (auto& sn : nodes) { if (listeners.empty()) - sn->cancelListen(); + sn.cancelListen(); else if (query) - sn->cancelListen(query); + sn.cancelListen(query); } }); scheduler.edit(opExpirationJob, nextExpire); @@ -514,14 +514,14 @@ struct Dht::Search { * @return The number of non-good search nodes. */ unsigned getNumberOfBadNodes() const { - return std::count_if(nodes.begin(), nodes.end(), [](const std::unique_ptr<SearchNode>& sn) { - return sn->isBad(); + return std::count_if(nodes.begin(), nodes.end(), [](const SearchNode& sn) { + return sn.isBad(); }); } unsigned getNumberOfConsecutiveBadNodes() const { unsigned count = 0; - std::find_if(nodes.begin(), nodes.end(), [&count](const std::unique_ptr<SearchNode>& sn) { - if (not sn->isBad()) + std::find_if(nodes.begin(), nodes.end(), [&count](const SearchNode& sn) { + if (not sn.isBad()) return true; ++count; return false; @@ -540,7 +540,7 @@ struct Dht::Search { */ bool removeExpiredNode(const time_point& now) { for (auto e = nodes.cend(); e != nodes.cbegin();) { - const Node& n = *(*(--e))->node; + const Node& n = *(--e)->node; if (n.isRemovable(now)) { //std::cout << "Removing expired node " << n.id << " from IPv" << (af==AF_INET?'4':'6') << " search " << id << std::endl; nodes.erase(e); @@ -613,7 +613,7 @@ struct Dht::Search { // remove acked for cleared annouces for (auto it = announced; it != announce.end(); ++it) { for (auto& n : nodes) - n->acked.erase(it->value->id); + n.acked.erase(it->value->id); } announce.erase(announced, announce.end()); } @@ -646,13 +646,13 @@ Dht::Search::insertNode(const Sp<Node>& snode, time_point now, const Blob& token auto n = nodes.end(); while (n != nodes.begin()) { --n; - if ((*n)->node == snode) { + if (n->node == snode) { found = true; break; } /* Node not found. We could insert it after this one. */ - if (id.xorCmp(nid, (*n)->node->id) > 0) { + if (id.xorCmp(nid, n->node->id) > 0) { ++n; break; } @@ -676,7 +676,7 @@ Dht::Search::insertNode(const Sp<Node>& snode, time_point now, const Blob& token full = nodes.size() - bad >= SEARCH_NODES; while (std::distance(nodes.cbegin(), t) - bad > SEARCH_NODES) { --t; - if ((*t)->isBad()) + if (t->isBad()) bad--; } } @@ -692,7 +692,7 @@ Dht::Search::insertNode(const Sp<Node>& snode, time_point now, const Blob& token if (nodes.empty()) { step_time = time_point::min(); } - n = nodes.insert(n, std::unique_ptr<SearchNode>(new SearchNode(snode))); + n = nodes.insert(n, SearchNode(snode)); node.setTime(now); new_search_node = true; if (node.isExpired()) { @@ -704,16 +704,16 @@ Dht::Search::insertNode(const Sp<Node>& snode, time_point now, const Blob& token } while (nodes.size() - bad > SEARCH_NODES) { - if (not expired and nodes.back()->isBad()) + if (not expired and nodes.back().isBad()) bad--; nodes.pop_back(); } } if (not token.empty()) { - (*n)->candidate = false; - (*n)->last_get_reply = now; + n->candidate = false; + n->last_get_reply = now; if (token.size() <= 64) - (*n)->token = token; + n->token = token; expired = false; } if (new_search_node) @@ -727,7 +727,7 @@ Dht::Search::getNodes() const std::vector<Sp<Node>> ret {}; ret.reserve(nodes.size()); for (const auto& sn : nodes) - ret.emplace_back(sn->node); + ret.emplace_back(sn.node); return ret; } @@ -736,9 +736,9 @@ Dht::Search::isSynced(time_point now) const { unsigned i = 0; for (const auto& n : nodes) { - if (n->isBad()) + if (n.isBad()) continue; - if (not n->isSynced(now)) + if (not n.isSynced(now)) return false; if (++i == TARGET_NODES) break; @@ -769,9 +769,9 @@ Dht::Search::isDone(const Get& get) const { unsigned i = 0; for (const auto& sn : nodes) { - if (sn->isBad()) + if (sn.isBad()) continue; - if (not sn->isDone(get)) + if (not sn.isDone(get)) return false; if (++i == TARGET_NODES) break; @@ -786,9 +786,9 @@ Dht::Search::isAnnounced(Value::Id id) const return false; unsigned i = 0; for (const auto& n : nodes) { - if (n->isBad()) + if (n.isBad()) continue; - if (not n->isAnnounced(id)) + if (not n.isAnnounced(id)) return false; if (++i == TARGET_NODES) return true; @@ -803,14 +803,14 @@ Dht::Search::isListening(time_point now) const return false; unsigned i = 0; for (const auto& n : nodes) { - if (n->isBad()) + if (n.isBad()) continue; SearchNode::NodeListenerStatus::const_iterator ls {}; - for (ls = n->listenStatus.begin(); ls != n->listenStatus.end() ; ++ls) { - if (n->isListening(now, ls)) + for (ls = n.listenStatus.begin(); ls != n.listenStatus.end() ; ++ls) { + if (n.isListening(now, ls)) break; } - if (ls == n->listenStatus.end()) + if (ls == n.listenStatus.end()) return false; if (++i == LISTEN_NODES) break;