Skip to content
Snippets Groups Projects
Commit 405b8739 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

dhtrunner: always call `opEnded` once per op

parent 7ed11a02
Branches
No related tags found
No related merge requests found
...@@ -297,16 +297,24 @@ DhtRunner::shutdown(ShutdownCallback cb, bool stop) { ...@@ -297,16 +297,24 @@ DhtRunner::shutdown(ShutdownCallback cb, bool stop) {
} }
if (logger_) if (logger_)
logger_->d("[runner %p] state changed to Stopping, %zu ongoing ops", this, ongoing_ops.load()); logger_->d("[runner %p] state changed to Stopping, %zu ongoing ops", this, ongoing_ops.load());
#ifdef OPENDHT_PROXY_CLIENT
ongoing_ops += 2;
#else
ongoing_ops++; ongoing_ops++;
#endif
shutdownCallbacks_.emplace_back(std::move(cb)); shutdownCallbacks_.emplace_back(std::move(cb));
pending_ops.emplace([=](SecureDht&) mutable { pending_ops.emplace([=](SecureDht&) mutable {
auto onShutdown = [this]{ opEnded(); }; auto onShutdown = [this]{ opEnded(); };
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_) if (dht_via_proxy_)
dht_via_proxy_->shutdown(onShutdown, stop); dht_via_proxy_->shutdown(onShutdown, stop);
else
opEnded();
#endif #endif
if (dht_) if (dht_)
dht_->shutdown(onShutdown, stop); dht_->shutdown(onShutdown, stop);
else
opEnded();
}); });
cv.notify_all(); cv.notify_all();
} }
...@@ -823,12 +831,16 @@ DhtRunner::cancelListen(InfoHash h, size_t token) ...@@ -823,12 +831,16 @@ DhtRunner::cancelListen(InfoHash h, size_t token)
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) { pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(token); auto it = listeners_.find(token);
if (it == listeners_.end()) return; if (it != listeners_.end()) {
if (it->second.tokenClassicDht) if (it->second.tokenClassicDht)
dht_->cancelListen(h, it->second.tokenClassicDht); dht_->cancelListen(h, it->second.tokenClassicDht);
if (it->second.tokenProxyDht and dht_via_proxy_) if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it); listeners_.erase(it);
} else {
if (logger_)
logger_->w("[runner %p] cancelListen: unknown token %zu.", this, token);
}
opEnded(); opEnded();
}); });
#else #else
...@@ -849,13 +861,18 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) ...@@ -849,13 +861,18 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
ongoing_ops++; ongoing_ops++;
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([this, h, ftoken = std::move(ftoken)](SecureDht&) { pending_ops.emplace([this, h, ftoken = std::move(ftoken)](SecureDht&) {
auto it = listeners_.find(ftoken.get()); auto token = ftoken.get();
if (it == listeners_.end()) return; auto it = listeners_.find(token);
if (it != listeners_.end()) {
if (it->second.tokenClassicDht) if (it->second.tokenClassicDht)
dht_->cancelListen(h, it->second.tokenClassicDht); dht_->cancelListen(h, it->second.tokenClassicDht);
if (it->second.tokenProxyDht and dht_via_proxy_) if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it); listeners_.erase(it);
} else {
if (logger_)
logger_->w("[runner %p] cancelListen: unknown token %zu.", this, token);
}
opEnded(); opEnded();
}); });
#else #else
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment