Skip to content
Snippets Groups Projects
Commit 201982b1 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

dhtrunner: don't count cancel as ops

parent 83f2b272
Branches
Tags
No related merge requests found
...@@ -231,7 +231,7 @@ DhtRunner::shutdown(ShutdownCallback cb) { ...@@ -231,7 +231,7 @@ DhtRunner::shutdown(ShutdownCallback cb) {
return; return;
} }
if (logger_) if (logger_)
logger_->d("[runner %p] state changed to Stopping", this); logger_->d("[runner %p] state changed to Stopping, %zu ongoing ops", this, ongoing_ops.load());
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++; ongoing_ops++;
shutdownCallbacks_.emplace_back(std::move(cb)); shutdownCallbacks_.emplace_back(std::move(cb));
...@@ -314,6 +314,7 @@ DhtRunner::join() ...@@ -314,6 +314,7 @@ DhtRunner::join()
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops = decltype(pending_ops)(); pending_ops = decltype(pending_ops)();
pending_ops_prio = decltype(pending_ops_prio)(); pending_ops_prio = decltype(pending_ops_prio)();
ongoing_ops = 0;
} }
{ {
std::lock_guard<std::mutex> lck(dht_mtx); std::lock_guard<std::mutex> lck(dht_mtx);
...@@ -699,7 +700,6 @@ void ...@@ -699,7 +700,6 @@ void
DhtRunner::cancelListen(InfoHash h, size_t token) DhtRunner::cancelListen(InfoHash h, size_t token)
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) { pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(token); auto it = listeners_.find(token);
...@@ -709,12 +709,10 @@ DhtRunner::cancelListen(InfoHash h, size_t token) ...@@ -709,12 +709,10 @@ DhtRunner::cancelListen(InfoHash h, size_t token)
if (it->second.tokenProxyDht and dht_via_proxy_) if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it); listeners_.erase(it);
opEnded();
}); });
#else #else
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelListen(h, token); dht.cancelListen(h, token);
opEnded();
}); });
#endif // OPENDHT_PROXY_CLIENT #endif // OPENDHT_PROXY_CLIENT
cv.notify_all(); cv.notify_all();
...@@ -724,7 +722,6 @@ void ...@@ -724,7 +722,6 @@ void
DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
#ifdef OPENDHT_PROXY_CLIENT #ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) { pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(ftoken.get()); auto it = listeners_.find(ftoken.get());
...@@ -734,12 +731,10 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken) ...@@ -734,12 +731,10 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
if (it->second.tokenProxyDht and dht_via_proxy_) if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it); listeners_.erase(it);
opEnded();
}); });
#else #else
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelListen(h, ftoken.get()); dht.cancelListen(h, ftoken.get());
opEnded();
}); });
#endif // OPENDHT_PROXY_CLIENT #endif // OPENDHT_PROXY_CLIENT
cv.notify_all(); cv.notify_all();
...@@ -788,10 +783,8 @@ void ...@@ -788,10 +783,8 @@ void
DhtRunner::cancelPut(const InfoHash& h, Value::Id id) DhtRunner::cancelPut(const InfoHash& h, Value::Id id)
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelPut(h, id); dht.cancelPut(h, id);
opEnded();
}); });
cv.notify_all(); cv.notify_all();
} }
...@@ -800,10 +793,8 @@ void ...@@ -800,10 +793,8 @@ void
DhtRunner::cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value) DhtRunner::cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value)
{ {
std::lock_guard<std::mutex> lck(storage_mtx); std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) { pending_ops.emplace([=](SecureDht& dht) {
dht.cancelPut(h, value->id); dht.cancelPut(h, value->id);
opEnded();
}); });
cv.notify_all(); cv.notify_all();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment