Skip to content
Snippets Groups Projects
Commit f6d26757 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

dhtrunner: wait for current ops to complete on shutdown

parent 476d0bca
Branches
Tags
No related merge requests found
...@@ -340,7 +340,7 @@ public: ...@@ -340,7 +340,7 @@ public:
void importValues(const std::vector<ValuesExport>& values); void importValues(const std::vector<ValuesExport>& values);
bool isRunning() const { bool isRunning() const {
return running; return running != State::Idle;
} }
NodeStats getNodesStats(sa_family_t af) const; NodeStats getNodesStats(sa_family_t af) const;
...@@ -358,7 +358,7 @@ public: ...@@ -358,7 +358,7 @@ public:
// securedht methods // securedht methods
void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)>); void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
void registerCertificate(std::shared_ptr<crypto::Certificate> cert); void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
void setLocalCertificateStore(CertificateStoreQuery&& query_method); void setLocalCertificateStore(CertificateStoreQuery&& query_method);
...@@ -411,7 +411,7 @@ public: ...@@ -411,7 +411,7 @@ public:
/** /**
* Gracefuly disconnect from network. * Gracefuly disconnect from network.
*/ */
void shutdown(ShutdownCallback cb); void shutdown(ShutdownCallback cb = {});
/** /**
* Quit and wait for all threads to terminate. * Quit and wait for all threads to terminate.
...@@ -449,6 +449,12 @@ public: ...@@ -449,6 +449,12 @@ public:
private: private:
static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10}; static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
enum class State {
Idle,
Running,
Stopping
};
/** /**
* Will try to resolve the list of hostnames `bootstrap_nodes` on seperate * Will try to resolve the list of hostnames `bootstrap_nodes` on seperate
* thread and then queue ping requests. This list should contain reliable * thread and then queue ping requests. This list should contain reliable
...@@ -463,6 +469,11 @@ private: ...@@ -463,6 +469,11 @@ private:
return std::max(status4, status6); return std::max(status4, status6);
} }
bool checkShutdown();
void opEnded();
DoneCallback bindOpDoneCallback(DoneCallback&& cb);
DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
/** Local DHT instance */ /** Local DHT instance */
std::unique_ptr<SecureDht> dht_; std::unique_ptr<SecureDht> dht_;
...@@ -510,7 +521,9 @@ private: ...@@ -510,7 +521,9 @@ private:
std::queue<std::function<void(SecureDht&)>> pending_ops {}; std::queue<std::function<void(SecureDht&)>> pending_ops {};
std::mutex storage_mtx {}; std::mutex storage_mtx {};
std::atomic_bool running {false}; std::atomic<State> running {State::Idle};
std::atomic_uint ongoing_ops {0};
std::vector<ShutdownCallback> shutdownCallbacks_;
NodeStatus status4 {NodeStatus::Disconnected}, NodeStatus status4 {NodeStatus::Disconnected},
status6 {NodeStatus::Disconnected}; status6 {NodeStatus::Disconnected};
......
...@@ -100,7 +100,7 @@ DhtRunner::run(const char* ip4, const char* ip6, const char* service, const Conf ...@@ -100,7 +100,7 @@ DhtRunner::run(const char* ip4, const char* ip6, const char* service, const Conf
void void
DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const Config& config, Context&& context) DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const Config& config, Context&& context)
{ {
if (not running) { if (running == State::Idle) {
if (not context.sock) if (not context.sock)
context.sock.reset(new net::UdpSocket(local4, local6, context.logger ? *context.logger : Logger{})); context.sock.reset(new net::UdpSocket(local4, local6, context.logger ? *context.logger : Logger{}));
run(config, std::move(context)); run(config, std::move(context));
...@@ -111,11 +111,14 @@ void ...@@ -111,11 +111,14 @@ void
DhtRunner::run(const Config& config, Context&& context) DhtRunner::run(const Config& config, Context&& context)
{ {
std::lock_guard<std::mutex> lck(dht_mtx); std::lock_guard<std::mutex> lck(dht_mtx);
if (running) auto expected = State::Idle;
if (not running.compare_exchange_strong(expected, State::Running))
return; return;
if (context.logger) if (context.logger) {
logger_ = context.logger; logger_ = context.logger;
logger_->d("[runner %p] state changed to Running", this);
}
context.sock->setOnReceive([&] (std::unique_ptr<net::ReceivedPacket>&& pkt) { context.sock->setOnReceive([&] (std::unique_ptr<net::ReceivedPacket>&& pkt) {
{ {
...@@ -148,16 +151,15 @@ DhtRunner::run(const Config& config, Context&& context) ...@@ -148,16 +151,15 @@ DhtRunner::run(const Config& config, Context&& context)
dht_via_proxy_->setLocalCertificateStore(std::move(context.certificateStore)); dht_via_proxy_->setLocalCertificateStore(std::move(context.certificateStore));
} }
running = true;
if (not config.threaded) if (not config.threaded)
return; return;
dht_thread = std::thread([this]() { dht_thread = std::thread([this]() {
while (running) { while (running != State::Idle) {
std::unique_lock<std::mutex> lk(dht_mtx); std::unique_lock<std::mutex> lk(dht_mtx);
time_point wakeup = loop_(); time_point wakeup = loop_();
auto hasJobToDo = [this]() { auto hasJobToDo = [this]() {
if (not running) if (running == State::Idle)
return true; return true;
{ {
std::lock_guard<std::mutex> lck(sock_mtx); std::lock_guard<std::mutex> lck(sock_mtx);
...@@ -219,36 +221,84 @@ DhtRunner::run(const Config& config, Context&& context) ...@@ -219,36 +221,84 @@ DhtRunner::run(const Config& config, Context&& context)
void void
DhtRunner::shutdown(ShutdownCallback cb) { DhtRunner::shutdown(ShutdownCallback cb) {
if (not running) { auto expected = State::Running;
cb(); if (not running.compare_exchange_strong(expected, State::Stopping)) {
if (expected == State::Stopping and ongoing_ops) {
std::lock_guard<std::mutex> lck(storage_mtx);
shutdownCallbacks_.emplace_back(std::move(cb));
}
else if (cb) cb();
return; return;
} }
if (logger_)
logger_->d("[runner %p] state changed to Stopping", this);
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
shutdownCallbacks_.emplace_back(std::move(cb));
pending_ops_prio.emplace([=](SecureDht&) mutable { pending_ops_prio.emplace([=](SecureDht&) mutable {
auto onShutdown = [this]{ opEnded(); };
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_) if (dht_via_proxy_)
dht_via_proxy_->shutdown(cb); dht_via_proxy_->shutdown(onShutdown);
#endif #endif
if (dht_) if (dht_)
dht_->shutdown(cb); dht_->shutdown(onShutdown);
}); });
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::join() DhtRunner::opEnded() {
if (--ongoing_ops == 0)
checkShutdown();
}
DoneCallback
DhtRunner::bindOpDoneCallback(DoneCallback&& cb) {
return [this, cb = std::move(cb)](bool ok, const std::vector<std::shared_ptr<Node>>& nodes){
if (cb) cb(ok, nodes);
opEnded();
};
}
DoneCallbackSimple
DhtRunner::bindOpDoneCallback(DoneCallbackSimple&& cb) {
return [this, cb = std::move(cb)](bool ok){
if (cb) cb(ok);
opEnded();
};
}
bool
DhtRunner::checkShutdown() {
if (running != State::Stopping or ongoing_ops)
return false;
decltype(shutdownCallbacks_) cbs;
{ {
if (peerDiscovery_) std::lock_guard<std::mutex> lck(storage_mtx);
peerDiscovery_->stop(); cbs = std::move(shutdownCallbacks_);
}
for (auto& cb : cbs)
if (cb) cb();
return true;
}
void
DhtRunner::join()
{
{ {
std::lock_guard<std::mutex> lck(dht_mtx); std::lock_guard<std::mutex> lck(dht_mtx);
running = false; if (running.exchange(State::Idle) == State::Idle)
return;
cv.notify_all(); cv.notify_all();
bootstrap_cv.notify_all(); bootstrap_cv.notify_all();
if (peerDiscovery_)
peerDiscovery_->stop();
if (dht_) if (dht_)
if (auto sock = dht_->getSocket()) if (auto sock = dht_->getSocket())
sock->stop(); sock->stop();
if (logger_)
logger_->d("[runner %p] state changed to Idle", this);
} }
if (dht_thread.joinable()) if (dht_thread.joinable())
...@@ -257,9 +307,8 @@ DhtRunner::join() ...@@ -257,9 +307,8 @@ DhtRunner::join()
if (bootstrap_thread.joinable()) if (bootstrap_thread.joinable())
bootstrap_thread.join(); bootstrap_thread.join();
if (peerDiscovery_) { if (peerDiscovery_)
peerDiscovery_->join(); peerDiscovery_->join();
}
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
...@@ -572,12 +621,15 @@ DhtRunner::loop_() ...@@ -572,12 +621,15 @@ DhtRunner::loop_()
void void
DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w) DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w)
{ {
{ if (running != State::Running) {
if (dcb) dcb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) mutable { pending_ops.emplace([=](SecureDht& dht) mutable {
dht.get(hash, std::move(vcb), std::move(dcb), std::move(f), std::move(w)); dht.get(hash, std::move(vcb), bindOpDoneCallback(std::move(dcb)), std::move(f), std::move(w));
}); });
}
cv.notify_all(); cv.notify_all();
} }
...@@ -588,12 +640,15 @@ DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, ...@@ -588,12 +640,15 @@ DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb,
} }
void void
DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) { DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) {
{ if (running != State::Running) {
if (done_cb) done_cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) mutable { pending_ops.emplace([=](SecureDht& dht) mutable {
dht.query(hash, std::move(cb), std::move(done_cb), std::move(q)); dht.query(hash, std::move(cb), bindOpDoneCallback(std::move(done_cb)), std::move(q));
}); });
}
cv.notify_all(); cv.notify_all();
} }
...@@ -601,7 +656,10 @@ std::future<size_t> ...@@ -601,7 +656,10 @@ std::future<size_t>
DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w) DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w)
{ {
auto ret_token = std::make_shared<std::promise<size_t>>(); auto ret_token = std::make_shared<std::promise<size_t>>();
{ if (running != State::Running) {
ret_token->set_value(0);
return ret_token->get_future();
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) mutable { pending_ops.emplace([=](SecureDht& dht) mutable {
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
...@@ -626,7 +684,6 @@ DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w) ...@@ -626,7 +684,6 @@ DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w)
ret_token->set_value(dht.listen(hash, std::move(vcb), std::move(f), std::move(w))); ret_token->set_value(dht.listen(hash, std::move(vcb), std::move(f), std::move(w)));
#endif #endif
}); });
}
cv.notify_all(); cv.notify_all();
return ret_token->get_future(); return ret_token->get_future();
} }
...@@ -639,9 +696,9 @@ DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Wher ...@@ -639,9 +696,9 @@ DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Wher
void void
DhtRunner::cancelListen(InfoHash h, size_t token) DhtRunner::cancelListen(InfoHash h, size_t token)
{
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) { pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(token); auto it = listeners_.find(token);
...@@ -651,21 +708,22 @@ DhtRunner::cancelListen(InfoHash h, size_t token) ...@@ -651,21 +708,22 @@ DhtRunner::cancelListen(InfoHash h, size_t token)
if (it->second.tokenProxyDht and dht_via_proxy_) if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it); listeners_.erase(it);
opEnded();
}); });
#else #else
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelListen(h, token); dht.cancelListen(h, token);
opEnded();
}); });
#endif // OPENDHT_PROXY_CLIENT #endif // OPENDHT_PROXY_CLIENT
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
{
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) { pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(ftoken.get()); auto it = listeners_.find(ftoken.get());
...@@ -675,38 +733,47 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) ...@@ -675,38 +733,47 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
if (it->second.tokenProxyDht and dht_via_proxy_) if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it); listeners_.erase(it);
opEnded();
}); });
#else #else
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelListen(h, ftoken.get()); dht.cancelListen(h, ftoken.get());
opEnded();
}); });
#endif // OPENDHT_PROXY_CLIENT #endif // OPENDHT_PROXY_CLIENT
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent) DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent)
{ {
{ if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
auto sv = std::make_shared<Value>(std::move(value)); ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=,
dht.put(hash, sv, cb, created, permanent); cb = std::move(cb),
sv = std::make_shared<Value>(std::move(value))
] (SecureDht& dht) mutable {
dht.put(hash, sv, bindOpDoneCallback(std::move(cb)), created, permanent);
}); });
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent) DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent)
{ {
{ if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) { ongoing_ops++;
dht.put(hash, value, cb, created, permanent); pending_ops.emplace([=, cb = std::move(cb)](SecureDht& dht) mutable {
dht.put(hash, value, bindOpDoneCallback(std::move(cb)), created, permanent);
}); });
}
cv.notify_all(); cv.notify_all();
} }
...@@ -718,37 +785,43 @@ DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, tim ...@@ -718,37 +785,43 @@ DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, tim
void void
DhtRunner::cancelPut(const InfoHash& h, Value::Id id) DhtRunner::cancelPut(const InfoHash& h, Value::Id id)
{
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelPut(h, id); dht.cancelPut(h, id);
opEnded();
}); });
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value) DhtRunner::cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value)
{
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelPut(h, value->id); dht.cancelPut(h, value->id);
opEnded();
}); });
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, bool permanent) DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, bool permanent)
{ {
{ if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) { ongoing_ops++;
dht.putSigned(hash, value, cb, permanent); pending_ops.emplace([=,
cb = std::move(cb),
value = std::move(value)
](SecureDht& dht) mutable {
dht.putSigned(hash, value, bindOpDoneCallback(std::move(cb)), permanent);
}); });
}
cv.notify_all(); cv.notify_all();
} }
...@@ -767,12 +840,18 @@ DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple c ...@@ -767,12 +840,18 @@ DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple c
void void
DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb, bool permanent) DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb, bool permanent)
{ {
{ if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) { ongoing_ops++;
dht.putEncrypted(hash, to, value, cb, permanent); pending_ops.emplace([=,
cb = std::move(cb),
value = std::move(value)
] (SecureDht& dht) mutable {
dht.putEncrypted(hash, to, value, bindOpDoneCallback(std::move(cb)), permanent);
}); });
}
cv.notify_all(); cv.notify_all();
} }
...@@ -817,7 +896,7 @@ DhtRunner::tryBootstrapContinuously() ...@@ -817,7 +896,7 @@ DhtRunner::tryBootstrapContinuously()
++ping_count; ++ping_count;
try { try {
bootstrap(SockAddr::resolve(it->first, it->second), [&](bool) { bootstrap(SockAddr::resolve(it->first, it->second), [&](bool) {
if (not running) if (running != State::Running)
return; return;
{ {
std::unique_lock<std::mutex> blck(mtx); std::unique_lock<std::mutex> blck(mtx);
...@@ -831,15 +910,15 @@ DhtRunner::tryBootstrapContinuously() ...@@ -831,15 +910,15 @@ DhtRunner::tryBootstrapContinuously()
} }
} }
// wait at least until the next BOOTSTRAP_PERIOD // wait at least until the next BOOTSTRAP_PERIOD
bootstrap_cv.wait_until(blck, next, [&]() { return not running; }); bootstrap_cv.wait_until(blck, next, [&]() { return running != State::Running; });
// wait for bootstrap requests to end. // wait for bootstrap requests to end.
if (running) if (running != State::Running)
bootstrap_cv.wait(blck, [&]() { return not running or ping_count == 0; }); bootstrap_cv.wait(blck, [&]() { return running != State::Running or ping_count == 0; });
} }
// update state // update state
{ {
std::lock_guard<std::mutex> lck(dht_mtx); std::lock_guard<std::mutex> lck(dht_mtx);
bootstraping = running and bootstraping = running == State::Running and
status4 == NodeStatus::Disconnected and status4 == NodeStatus::Disconnected and
status6 == NodeStatus::Disconnected; status6 == NodeStatus::Disconnected;
} }
...@@ -876,19 +955,28 @@ DhtRunner::clearBootstrap() ...@@ -876,19 +955,28 @@ DhtRunner::clearBootstrap()
void void
DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb) DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb)
{ {
if (running != State::Running) {
cb(false);
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) mutable { ongoing_ops++;
pending_ops_prio.emplace([
cb = bindOpDoneCallback(std::move(cb)),
nodes = std::move(nodes)
] (SecureDht& dht) mutable {
auto rem = cb ? std::make_shared<std::pair<size_t, bool>>(nodes.size(), false) : nullptr; auto rem = cb ? std::make_shared<std::pair<size_t, bool>>(nodes.size(), false) : nullptr;
for (auto& node : nodes) { for (auto& node : nodes) {
if (node.getPort() == 0) if (node.getPort() == 0)
node.setPort(net::DHT_DEFAULT_PORT); node.setPort(net::DHT_DEFAULT_PORT);
dht.pingNode(std::move(node), cb ? [rem,cb](bool ok) { dht.pingNode(std::move(node), [rem,cb](bool ok) {
auto& r = *rem; auto& r = *rem;
r.first--; r.first--;
r.second |= ok; r.second |= ok;
if (not r.first) if (r.first == 0) {
cb(r.second); cb(r.second);
} : DoneCallbackSimple{}); }
});
} }
}); });
cv.notify_all(); cv.notify_all();
...@@ -897,8 +985,13 @@ DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb) ...@@ -897,8 +985,13 @@ DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb)
void void
DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb) DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
{ {
if (running != State::Running) {
if (cb) cb(false);
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([addr, cb](SecureDht& dht) mutable { ongoing_ops++;
pending_ops_prio.emplace([addr, cb = bindOpDoneCallback(std::move(cb))](SecureDht& dht) mutable {
dht.pingNode(std::move(addr), std::move(cb)); dht.pingNode(std::move(addr), std::move(cb));
}); });
cv.notify_all(); cv.notify_all();
...@@ -907,48 +1000,52 @@ DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb) ...@@ -907,48 +1000,52 @@ DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
void void
DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
{ {
{ if (running != State::Running)
return;
std::unique_lock<std::mutex> lck(storage_mtx); std::unique_lock<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([id, address](SecureDht& dht) mutable { pending_ops_prio.emplace([id, address](SecureDht& dht) mutable {
dht.insertNode(id, address); dht.insertNode(id, address);
}); });
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
{ {
{ if (running != State::Running)
return;
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) { pending_ops_prio.emplace([=](SecureDht& dht) {
for (auto& node : nodes) for (auto& node : nodes)
dht.insertNode(node); dht.insertNode(node);
}); });
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::connectivityChanged() DhtRunner::connectivityChanged()
{
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) { pending_ops_prio.emplace([=](SecureDht& dht) {
dht.connectivityChanged(); dht.connectivityChanged();
}); });
}
cv.notify_all(); cv.notify_all();
} }
void void
DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb) { DhtRunner::findCertificate(InfoHash hash, std::function<void(const Sp<crypto::Certificate>&)> cb) {
{ if (running != State::Running) {
cb({});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) { ongoing_ops++;
dht.findCertificate(hash, cb); pending_ops.emplace([this, hash, cb = std::move(cb)] (SecureDht& dht) {
dht.findCertificate(hash, [this, cb = std::move(cb)](const Sp<crypto::Certificate>& crt){
cb(crt);
opEnded();
});
}); });
}
cv.notify_all(); cv.notify_all();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment