diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 795370a67ddb12fee6299983cd0345b79c74347b..4251aba96d05b2769c594483f79f8f07dee4005d 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -625,20 +625,13 @@ private: * Foreign nodes asking for updates about an InfoHash. */ struct Listener { - InfoHash id {}; - sockaddr_storage ss; - socklen_t sslen {}; size_t rid {}; time_point time {}; - /*constexpr*/ Listener() : ss() {} - Listener(const InfoHash& id, const sockaddr *from, socklen_t fromlen, uint16_t rid, time_point t) : id(id), ss(), sslen(fromlen), rid(rid), time(t) { - memcpy(&ss, from, fromlen); - } - void refresh(const sockaddr *from, socklen_t fromlen, size_t rid, time_point t) { - memcpy(&ss, from, fromlen); - sslen = fromlen; - this->rid = rid; + constexpr Listener(size_t rid, time_point t) : rid(rid), time(t) {} + + void refresh(size_t tid, time_point t) { + rid = tid; time = t; } }; @@ -646,7 +639,7 @@ private: struct Storage { InfoHash id; time_point maintenance_time {}; - std::vector<Listener> listeners {}; + std::map<std::shared_ptr<Node>, Listener> listeners {}; std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; @@ -794,7 +787,7 @@ private: }); } - void storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, size_t tid); + void storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t tid); bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created); void expireStorage(); void storageChanged(Storage& st, ValueStorage&); diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 066c7d8aaff44031cb2433741aa2575e49c24434..eae68348f2a6e828e00691fbfffe728c84d07149 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -385,7 +385,7 @@ public: * @param nodes6 The ipv6 closest nodes. * @param values The values to send. */ - void tellListener(const sockaddr* sa, socklen_t salen, uint16_t rid, InfoHash hash, want_t want, Blob ntoken, + void tellListener(std::shared_ptr<Node> n, uint16_t rid, InfoHash hash, want_t want, Blob ntoken, std::vector<std::shared_ptr<Node>> nodes, std::vector<std::shared_ptr<Node>> nodes6, std::vector<std::shared_ptr<Value>> values); diff --git a/src/dht.cpp b/src/dht.cpp index aa2bf8e0602a0f6c95e4faf795759d20c7a6a3a5..60fe95440f91da4239c76c6b69e55063b83bdfcf 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1659,6 +1659,7 @@ Dht::storageChanged(Storage& st, ValueStorage& v) const auto& now = scheduler.time(); { std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> cbs; + DHT_LOG.DEBUG("Storage changed. Sending update to %lu local listeners.", st.local_listeners.size()); for (const auto& l : st.local_listeners) { std::vector<std::shared_ptr<Value>> vals; if (not l.second.filter or l.second.filter(*v.data)) @@ -1672,12 +1673,12 @@ Dht::storageChanged(Storage& st, ValueStorage& v) } for (const auto& l : st.listeners) { - DHT_LOG.WARN("Storage changed. Sending update to %s %s.", - l.id.toString().c_str(), print_addr((sockaddr*)&l.ss, l.sslen).c_str()); + DHT_LOG.DEBUG("Storage changed. Sending update to %s %s.", + l.first->id.toString().c_str(), print_addr((sockaddr*)&l.first->ss, l.first->sslen).c_str()); std::vector<std::shared_ptr<Value>> vals; vals.push_back(v.data); - Blob ntoken = makeToken((const sockaddr*)&l.ss, false); - network_engine.tellListener((const sockaddr*)&l.ss, l.sslen, l.rid, st.id, WANT4 | WANT6, ntoken, + Blob ntoken = makeToken((const sockaddr*)&l.first->ss, false); + network_engine.tellListener(l.first, l.second.rid, st.id, WANT4 | WANT6, ntoken, buckets.findClosestNodes(st.id, now, TARGET_NODES), buckets6.findClosestNodes(st.id, now, TARGET_NODES), vals); } @@ -1742,7 +1743,7 @@ Dht::Storage::clear() } void -Dht::storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, size_t rid) +Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t rid) { const auto& now = scheduler.time(); auto st = findStorage(id); @@ -1752,22 +1753,21 @@ Dht::storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr store.emplace_back(id, now); st = std::prev(store.end()); } - sa_family_t af = from->sa_family; - auto l = std::find_if(st->listeners.begin(), st->listeners.end(), [&](const Listener& l){ - return l.ss.ss_family == af && l.id == node; - }); + auto l = st->listeners.find(node); if (l == st->listeners.end()) { - auto stvalues = st->getValues(); - std::vector<std::shared_ptr<Value>> values(stvalues.size()); - std::transform(stvalues.begin(), stvalues.end(), values.begin(), [=](ValueStorage& vs) { return vs.data; }); - - network_engine.tellListener(from, fromlen, rid, id, WANT4 | WANT6, makeToken(from, false), - buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), - values); - st->listeners.emplace_back(node, from, fromlen, rid, now); + const auto& stvalues = st->getValues(); + if (not stvalues.empty()) { + std::vector<std::shared_ptr<Value>> values(stvalues.size()); + std::transform(stvalues.begin(), stvalues.end(), values.begin(), [=](const ValueStorage& vs) { return vs.data; }); + + network_engine.tellListener(node, rid, id, WANT4 | WANT6, makeToken((sockaddr*)&node->ss, false), + buckets.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), + values); + } + st->listeners.emplace(node, Listener {rid, now}); } else - l->refresh(from, fromlen, rid, now); + l->second.refresh(rid, now); } void @@ -1776,19 +1776,14 @@ Dht::expireStorage() const auto& now = scheduler.time(); auto i = store.begin(); while (i != store.end()) { - // put elements to remove at the end with std::partition, - // and then remove them with std::vector::erase. - i->listeners.erase( - std::partition(i->listeners.begin(), i->listeners.end(), - [&](const Listener& l) - { - bool expired = l.time + Node::NODE_EXPIRE_TIME < now; - if (expired) - DHT_LOG.DEBUG("Discarding expired listener %s", l.id.toString().c_str()); - // return false if the element should be removed - return !expired; - }), - i->listeners.end()); + for (auto l = i->listeners.cbegin(); l != i->listeners.cend();){ + bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; + if (expired) { + DHT_LOG.DEBUG("Discarding expired listener %s", l->first->id.toString().c_str()); + i->listeners.erase(l++); + } else + ++l; + } auto stats = i->expire(types, now); total_store_size += stats.first; @@ -2090,9 +2085,9 @@ Dht::getStorageLog() const for (const auto& st : store) { out << "Storage " << st.id << " " << st.listeners.size() << " list., " << st.valueCount() << " values (" << st.totalSize() << " bytes)" << std::endl; for (const auto& l : st.listeners) { - out << " " << "Listener " << l.id << " " << print_addr((sockaddr*)&l.ss, l.sslen); - auto since = duration_cast<seconds>(now - l.time); - auto expires = duration_cast<seconds>(l.time + Node::NODE_EXPIRE_TIME - now); + out << " " << "Listener " << l.first->id << " " << print_addr((sockaddr*)&l.first->ss, l.first->sslen); + auto since = duration_cast<seconds>(now - l.second.time); + auto expires = duration_cast<seconds>(l.second.time + Node::NODE_EXPIRE_TIME - now); out << " (since " << since.count() << "s, exp in " << expires.count() << "s)" << std::endl; } } @@ -2686,7 +2681,7 @@ Dht::onListen(std::shared_ptr<Node> node, InfoHash& hash, Blob& token, size_t ri hash.toString().c_str(), to_hex(token.data(), token.size()).c_str()); throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::LISTEN_WRONG_TOKEN}; } - storageAddListener(hash, node->id, (sockaddr*)&node->ss, node->sslen, rid); + storageAddListener(hash, node, rid); return {}; } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index b3e24cb3ba26e02b7b906e201c0adf3b08bfdc72..d509c2be6cca6fe7aadd29016bfab867e0110d59 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -55,13 +55,13 @@ NetworkEngine::pinged(Node& n) } void -NetworkEngine::tellListener(const sockaddr *sa, socklen_t salen, uint16_t rid, InfoHash hash, want_t want, +NetworkEngine::tellListener(std::shared_ptr<Node> node, uint16_t rid, InfoHash hash, want_t want, Blob ntoken, std::vector<std::shared_ptr<Node>> nodes, std::vector<std::shared_ptr<Node>> nodes6, std::vector<std::shared_ptr<Value>> values) { - auto nnodes = bufferNodes(sa->sa_family, hash, want, nodes, nodes6); + auto nnodes = bufferNodes(node->getFamily(), hash, want, nodes, nodes6); try { - sendNodesValues(sa, salen, TransId {TransPrefix::GET_VALUES, (uint16_t)rid}, nnodes.first, nnodes.second, + sendNodesValues((const sockaddr*)&node->ss, node->sslen, TransId {TransPrefix::GET_VALUES, (uint16_t)rid}, nnodes.first, nnodes.second, values, ntoken); } catch (const std::overflow_error& e) { DHT_LOG.ERROR("Can't send value: buffer not large enough !");