diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 6bd6297a991cb5ca642895bca518b78822a713f1..102761c5d0bf3b958697625bc12da5fc2e41f152 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -223,12 +223,15 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) { // Exec all currently stored callbacks scheduler.syncTime(); - if (!callbacks_.empty()) { + decltype(callbacks_) callbacks; + { std::lock_guard<std::mutex> lock(lockCallbacks); - for (auto& callback : callbacks_) - callback(); - callbacks_.clear(); + callbacks = std::move(callbacks_); } + for (auto& callback : callbacks) + callback(); + callbacks.clear(); + // Remove finished operations { std::lock_guard<std::mutex> lock(lockOperations_); @@ -297,11 +300,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va if (reader->parse(char_data, char_data + body.size(), &json, &err)) { auto value = std::make_shared<Value>(json); if ((not filter or filter(*value)) and cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, state]() { - if (not state->stop and not cb({value})) - state->stop = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state]() { + if (not state->stop and not cb({value})) + state->stop = true; + }); + } loopSignal_(); } } else { @@ -317,11 +322,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va state->ok = false; } if (donecb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([=](){ - donecb(state->ok, {}); - state->stop = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([=](){ + donecb(state->ok, {}); + state->stop = true; + }); + } loopSignal_(); } if (!state->ok) { @@ -443,10 +450,12 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ *ok = false; } if (cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([=](){ - cb(*ok, {}); - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([=](){ + cb(*ok, {}); + }); + } loopSignal_(); } if (!ok) { @@ -822,11 +831,13 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, auto expired = json.get("expired", Json::Value(false)).asBool(); auto value = std::make_shared<Value>(json); if ((not filter or filter(*value)) and cb) { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, state, expired]() { - if (not state->cancel and not cb({value}, expired)) - state->cancel = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state, expired]() { + if (not state->cancel and not cb({value}, expired)) + state->cancel = true; + }); + } loopSignal_(); } } @@ -1051,17 +1062,19 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string getline(ss, substr, ','); ids.emplace_back(std::stoull(substr)); } - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([this, key, token, state, ids]() { - if (state->cancel) return; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) return; - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) return; - if (not state->cancel and not l->second.cache.onValuesExpired(ids)) - state->cancel = true; - }); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([this, key, token, state, ids]() { + if (state->cancel) return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) return; + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) return; + if (not state->cancel and not l->second.cache.onValuesExpired(ids)) + state->cancel = true; + }); + } loopSignal_(); } }