From 75738ac28b13a50ec28ab63510651d07ac9b9299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Sat, 20 Apr 2019 22:04:01 -0400 Subject: [PATCH] proxy client: don't hold lock while calling callback --- src/dht_proxy_client.cpp | 81 +++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 6bd6297a..102761c5 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -223,12 +223,15 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) { // Exec all currently stored callbacks scheduler.syncTime(); - if (!callbacks_.empty()) { + decltype(callbacks_) callbacks; + { std::lock_guard<std::mutex> lock(lockCallbacks); - for (auto& callback : callbacks_) - callback(); - callbacks_.clear(); + callbacks = std::move(callbacks_); } + for (auto& callback : callbacks) + callback(); + callbacks.clear(); + // Remove finished operations { std::lock_guard<std::mutex> lock(lockOperations_); @@ -297,11 +300,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va if (reader->parse(char_data, char_data + body.size(), &json, &err)) { auto value = std::make_shared<Value>(json); if ((not filter or filter(*value)) and cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, state]() { - if (not state->stop and not cb({value})) - state->stop = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state]() { + if (not state->stop and not cb({value})) + state->stop = true; + }); + } loopSignal_(); } } else { @@ -317,11 +322,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va state->ok = false; } if (donecb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([=](){ - donecb(state->ok, {}); - state->stop = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([=](){ + donecb(state->ok, {}); + state->stop = true; + }); + } loopSignal_(); } if (!state->ok) { @@ -443,10 +450,12 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ *ok = false; } if (cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([=](){ - cb(*ok, {}); - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([=](){ + cb(*ok, {}); + }); + } loopSignal_(); } if (!ok) { @@ -822,11 +831,13 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, auto expired = json.get("expired", Json::Value(false)).asBool(); auto value = std::make_shared<Value>(json); if ((not filter or filter(*value)) and cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, state, expired]() { - if (not state->cancel and not cb({value}, expired)) - state->cancel = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state, expired]() { + if (not state->cancel and not cb({value}, expired)) + state->cancel = true; + }); + } loopSignal_(); } } @@ -1051,17 +1062,19 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string getline(ss, substr, ','); ids.emplace_back(std::stoull(substr)); } - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([this, key, token, state, ids]() { - if (state->cancel) return; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) return; - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) return; - if (not state->cancel and not l->second.cache.onValuesExpired(ids)) - state->cancel = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([this, key, token, state, ids]() { + if (state->cancel) return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) return; + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) return; + if (not state->cancel and not l->second.cache.onValuesExpired(ids)) + state->cancel = true; + }); + } loopSignal_(); } } -- GitLab