Skip to content
Snippets Groups Projects
Commit 567fa1c7 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

proxy client: retrieve search from callback

parent 20ab71ab
Branches
Tags
No related merge requests found
...@@ -350,7 +350,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ ...@@ -350,7 +350,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_
o.thread = std::thread([=](){ o.thread = std::thread([=](){
auto ok = std::make_shared<std::atomic_bool>(true); auto ok = std::make_shared<std::atomic_bool>(true);
restbed::Http::async(req, restbed::Http::async(req,
[this, ok](const std::shared_ptr<restbed::Request>& /*req*/, [ok](const std::shared_ptr<restbed::Request>& /*req*/,
const std::shared_ptr<restbed::Response>& reply) { const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code(); auto code = reply->get_status_code();
...@@ -605,6 +605,12 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi ...@@ -605,6 +605,12 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
return 0; return 0;
} }
struct State {
std::atomic_bool ok {true};
std::atomic_bool cancel {false};
};
auto state = std::make_shared<State>();
auto token = ++listener_token_; auto token = ++listener_token_;
auto l = search->second.listeners.find(token); auto l = search->second.listeners.find(token);
if (l == search->second.listeners.end()) { if (l == search->second.listeners.end()) {
...@@ -626,9 +632,18 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi ...@@ -626,9 +632,18 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
}).first; }).first;
} }
ValueCache& cache = l->second.cache; l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) {
auto& job = l->second.cacheExpirationJob; if (state->cancel)
l->second.cb = [this,&cache,&job,key,token](const std::vector<Sp<Value>>& values, bool expired) { return false;
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s == searches_.end()) {
return false;
}
auto l = s->second.listeners.find(token);
if (l == s->second.listeners.end()) {
return false;
}
const std::vector<Sp<Value>> new_values_empty; const std::vector<Sp<Value>> new_values_empty;
std::vector<Value::Id> expired_ids; std::vector<Value::Id> expired_ids;
if (expired) { if (expired) {
...@@ -636,8 +651,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi ...@@ -636,8 +651,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
for (const auto& v : values) for (const auto& v : values)
expired_ids.emplace_back(v->id); expired_ids.emplace_back(v->id);
} }
auto next = cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time());
scheduler.edit(job, next); scheduler.edit(l->second.cacheExpirationJob, next);
return true; return true;
}; };
std::weak_ptr<bool> isCanceledViaClose(l->second.isCanceledViaClose); std::weak_ptr<bool> isCanceledViaClose(l->second.isCanceledViaClose);
...@@ -656,11 +671,6 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi ...@@ -656,11 +671,6 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
fillBodyToGetToken(req); fillBodyToGetToken(req);
#endif #endif
struct State {
std::atomic_bool ok {true};
std::atomic_bool cancel {false};
};
auto state = std::make_shared<State>();
restbed::Http::async(req, restbed::Http::async(req,
[this, filter, vcb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, [this, filter, vcb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) { const std::shared_ptr<restbed::Response>& reply) {
...@@ -928,7 +938,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) ...@@ -928,7 +938,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
auto settings = std::make_shared<restbed::Settings>(); auto settings = std::make_shared<restbed::Settings>();
auto ok = std::make_shared<std::atomic_bool>(true); auto ok = std::make_shared<std::atomic_bool>(true);
restbed::Http::async(req, restbed::Http::async(req,
[this, pushNotifToken, ok](const std::shared_ptr<restbed::Request>&, [pushNotifToken, ok](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) { const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code(); auto code = reply->get_status_code();
if (code == 200) { if (code == 200) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment