diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index a70061367a11bcc74209788a0337ef091b62112d..e9611e8fe71f3ab07a9954c7b29c2d8961417ae8 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -350,7 +350,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ o.thread = std::thread([=](){ auto ok = std::make_shared<std::atomic_bool>(true); restbed::Http::async(req, - [this, ok](const std::shared_ptr<restbed::Request>& /*req*/, + [ok](const std::shared_ptr<restbed::Request>& /*req*/, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); @@ -605,6 +605,12 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi return 0; } + struct State { + std::atomic_bool ok {true}; + std::atomic_bool cancel {false}; + }; + auto state = std::make_shared<State>(); + auto token = ++listener_token_; auto l = search->second.listeners.find(token); if (l == search->second.listeners.end()) { @@ -626,9 +632,18 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi }).first; } - ValueCache& cache = l->second.cache; - auto& job = l->second.cacheExpirationJob; - l->second.cb = [this,&cache,&job,key,token](const std::vector<Sp<Value>>& values, bool expired) { + l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { + if (state->cancel) + return false; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return false; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return false; + } const std::vector<Sp<Value>> new_values_empty; std::vector<Value::Id> expired_ids; if (expired) { @@ -636,8 +651,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi for (const auto& v : values) expired_ids.emplace_back(v->id); } - auto next = cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); - scheduler.edit(job, next); + auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); return true; }; std::weak_ptr<bool> isCanceledViaClose(l->second.isCanceledViaClose); @@ -656,11 +671,6 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi fillBodyToGetToken(req); #endif - struct State { - std::atomic_bool ok {true}; - std::atomic_bool cancel {false}; - }; - auto state = std::make_shared<State>(); restbed::Http::async(req, [this, filter, vcb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, const std::shared_ptr<restbed::Response>& reply) { @@ -928,7 +938,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) auto settings = std::make_shared<restbed::Settings>(); auto ok = std::make_shared<std::atomic_bool>(true); restbed::Http::async(req, - [this, pushNotifToken, ok](const std::shared_ptr<restbed::Request>&, + [pushNotifToken, ok](const std::shared_ptr<restbed::Request>&, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); if (code == 200) {