From 3517c2e2e38641370d1a29885cc054af294540ff Mon Sep 17 00:00:00 2001 From: Adrien Beraud <adrien.beraud@savoirfairelinux.com> Date: Fri, 31 Mar 2017 16:31:57 +0200 Subject: [PATCH] wip --- include/opendht/callbacks.h | 2 +- include/opendht/dht.h | 2 +- include/opendht/dhtrunner.h | 7 +++++-- python/opendht.pyx | 2 +- python/opendht_cpp.pxd | 2 +- src/dht.cpp | 40 ++++++++++++++++++------------------- src/dhtrunner.cpp | 25 ++++++++++++++--------- 7 files changed, 45 insertions(+), 35 deletions(-) diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h index f96b2a65..2c2a7d65 100644 --- a/include/opendht/callbacks.h +++ b/include/opendht/callbacks.h @@ -104,7 +104,7 @@ typedef bool (*GetCallbackRaw)(std::shared_ptr<Value>, void *user_data); OPENDHT_PUBLIC GetCallbackSimple bindGetCb(GetCallbackRaw raw_cb, void* user_data); OPENDHT_PUBLIC GetCallback bindGetCb(GetCallbackSimple cb); -using DoneCallback = std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)>; +using DoneCallback = std::function<void(bool success, std::vector<std::shared_ptr<Node>>&& nodes)>; typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data); typedef void (*ShutdownCallbackRaw)(void *user_data); typedef void (*DoneCallbackSimpleRaw)(bool, void *user_data); diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 6fbdcf1b..b1ed3cd0 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -141,7 +141,7 @@ public: bootstrap_nodes.clear(); } - void pingNode(const SockAddr&, DoneCallbackSimple&& cb={}); + void pingNode(const SockAddr&, DoneCallback&& cb={}); /** * Get a value by searching on all available protocols (IPv4, IPv6), diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index dc8d7ee9..97f717ea 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -201,8 +201,11 @@ public: * Insert known nodes to the routing table, without necessarly ping them. * Usefull to restart a node and get things running fast without putting load on the network. */ - void bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes, DoneCallbackSimple&& cb={}); - void bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb={}); + void ping(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes, DoneCallback&& cb={}); + void ping(const SockAddr& addr, DoneCallback&& cb={}); + void ping(const SockAddr& addr, DoneCallbackSimple&& cb={}) { + ping(addr, bindDoneCb(cb)); + } /** * Insert known nodes to the routing table, without necessarly ping them. diff --git a/python/opendht.pyx b/python/opendht.pyx index 329bac23..1cc72592 100644 --- a/python/opendht.pyx +++ b/python/opendht.pyx @@ -466,7 +466,7 @@ cdef class DhtRunner(_WithID): if done_cb: cb_obj = {'done':done_cb} ref.Py_INCREF(cb_obj) - self.thisptr.get().bootstrap(addr._addr, cpp.bindDoneCbSimple(done_callback_simple, <void*>cb_obj)) + self.thisptr.get().ping(addr._addr, cpp.bindDoneCbSimple(done_callback_simple, <void*>cb_obj)) else: lock = threading.Condition() pending = 0 diff --git a/python/opendht_cpp.pxd b/python/opendht_cpp.pxd index 7d17c419..bd4e2e7e 100644 --- a/python/opendht_cpp.pxd +++ b/python/opendht_cpp.pxd @@ -222,7 +222,7 @@ cdef extern from "opendht/dhtrunner.h" namespace "dht": InfoHash getId() const InfoHash getNodeId() const void bootstrap(const_char*, const_char*) - void bootstrap(const SockAddr&, DoneCallbackSimple done_cb) + void ping(const SockAddr&, DoneCallbackSimple done_cb) void run(Config config) void join() void shutdown(ShutdownCallback) diff --git a/src/dht.cpp b/src/dht.cpp index 97561766..80c7a621 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -994,24 +994,24 @@ Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point cr auto done = std::make_shared<bool>(false); auto done4 = std::make_shared<bool>(false); auto done6 = std::make_shared<bool>(false); - auto donecb = [=](const std::vector<Sp<Node>>& nodes) { + auto donecb = [=](std::vector<Sp<Node>>&& nodes) { // Callback as soon as the value is announced on one of the available networks if (callback && !*done && (*done4 && *done6)) { - callback(*ok, nodes); + callback(*ok, std::move(nodes)); *done = true; } }; - announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) { + announce(id, AF_INET, val, [=](bool ok4, std::vector<Sp<Node>>&& nodes) { DHT_LOG.d(id, "Announce done IPv4 %d", ok4); *done4 = true; *ok |= ok4; - donecb(nodes); + donecb(std::move(nodes)); }, created, permanent); - announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) { + announce(id, AF_INET6, val, [=](bool ok6, std::vector<Sp<Node>>&& nodes) { DHT_LOG.d(id, "Announce done IPv6 %d", ok6); *done6 = true; *ok |= ok6; - donecb(nodes); + donecb(std::move(nodes)); }, created, permanent); } @@ -1029,7 +1029,7 @@ struct OpStatus { }; template <typename T> -void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, Sp<OpStatus<T>> op) { +void doneCallbackWrapper(DoneCallback dcb, std::vector<Sp<Node>>&& nodes, Sp<OpStatus<T>> op) { if (op->status.done) return; op->nodes.insert(op->nodes.end(), nodes.begin(), nodes.end()); @@ -1037,7 +1037,7 @@ void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, S bool ok = op->status.ok || op->status4.ok || op->status6.ok; op->status.done = true; if (dcb) - dcb(ok, op->nodes); + dcb(ok, std::move(op->nodes)); } } @@ -1100,17 +1100,17 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt /* Try to answer this search locally. */ gcb(getLocal(id, f)); - Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) { + Dht::search(id, AF_INET, gcb, {}, [=](bool ok, std::vector<Sp<Node>>&& nodes) { //DHT_LOG_WARN("DHT done IPv4"); op->status4.done = true; op->status4.ok = ok; - doneCallbackWrapper(donecb, nodes, op); + doneCallbackWrapper(donecb, std::move(nodes), op); }, f, q); - Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) { + Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, std::vector<Sp<Node>>&& nodes) { //DHT_LOG_WARN("DHT done IPv6"); op->status6.done = true; op->status6.ok = ok; - doneCallbackWrapper(donecb, nodes, op); + doneCallbackWrapper(donecb, std::move(nodes), op); }, f, q); } @@ -1149,17 +1149,17 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer /* Try to answer this search locally. */ qcb(local_fields); - Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) { + Dht::search(id, AF_INET, {}, qcb, [=](bool ok, std::vector<Sp<Node>>&& nodes) { //DHT_LOG_WARN("DHT done IPv4"); op->status4.done = true; op->status4.ok = ok; - doneCallbackWrapper(done_cb, nodes, op); + doneCallbackWrapper(done_cb, std::move(nodes), op); }, f, q); - Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) { + Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, std::vector<Sp<Node>>&& nodes) { //DHT_LOG_WARN("DHT done IPv6"); op->status6.done = true; op->status6.ok = ok; - doneCallbackWrapper(done_cb, nodes, op); + doneCallbackWrapper(done_cb, std::move(nodes), op); }, f, q); } @@ -2143,21 +2143,21 @@ Dht::insertNode(const InfoHash& id, const SockAddr& addr) } void -Dht::pingNode(const SockAddr& sa, DoneCallbackSimple&& cb) +Dht::pingNode(const SockAddr& sa, DoneCallback&& cb) { scheduler.syncTime(); DHT_LOG.d("Sending ping to %s", sa.toString().c_str()); auto& count = sa.getFamily() == AF_INET ? pending_pings4 : pending_pings6; count++; - network_engine.sendPing(sa, [&count,cb](const net::Request&, net::NetworkEngine::RequestAnswer&&) { + network_engine.sendPing(sa, [&count,cb](const net::Request& req, net::NetworkEngine::RequestAnswer&&) { count--; if (cb) - cb(true); + cb(true, {req.node}); }, [&count,cb](const net::Request&, bool last){ if (last) { count--; if (cb) - cb(false); + cb(false, {}); } }); } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index aa5cb9fe..4104ff8f 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -580,25 +580,32 @@ DhtRunner::clearBootstrap() } void -DhtRunner::bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes, DoneCallbackSimple&& cb) +DhtRunner::ping(const std::vector<std::pair<sockaddr_storage, socklen_t>>& nodes, DoneCallback&& cb) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops_prio.emplace([=](SecureDht& dht) mutable { - auto rem = cb ? std::make_shared<std::pair<size_t, bool>>(nodes.size(), false) : nullptr; + struct GroupPing { + size_t remaining {0}; + bool ok {false}; + std::vector<Sp<Node>> nodes; + GroupPing(size_t s) : remaining(s) {} + }; + Sp<GroupPing> rem = cb ? std::make_shared<GroupPing>(nodes.size()) : nullptr; for (auto& node : nodes) - dht.pingNode(SockAddr((sockaddr*)&node.first, node.second), cb ? [rem,cb](bool ok) { + dht.pingNode(SockAddr((sockaddr*)&node.first, node.second), cb ? [rem,cb](bool ok, std::vector<Sp<Node>>&& nodes) { auto& r = *rem; - r.first--; - r.second |= ok; - if (not r.first) - cb(r.second); - } : DoneCallbackSimple{}); + r.remaining--; + r.ok |= ok; + r.nodes.insert(r.nodes.end(), nodes.begin(), nodes.end()); + if (r.remaining == 0) + cb(r.ok, std::move(r.nodes)); + } : DoneCallback{}); }); uv_async_send(&uv_async_); } void -DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb) +DhtRunner::ping(const SockAddr& addr, DoneCallback&& cb) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops_prio.emplace([addr,cb=std::move(cb)](SecureDht& dht) mutable { -- GitLab