From a1f8cc906ccdf387eb862210ce15341efc575f72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Fri, 18 Jan 2019 02:09:33 -0500 Subject: [PATCH] proxy client: use SyncCallback --- include/opendht/dht_proxy_client.h | 3 - src/dht_proxy_client.cpp | 182 ++++++++++++++--------------- 2 files changed, 88 insertions(+), 97 deletions(-) diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 59eba71e..972167ca 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -41,8 +41,6 @@ namespace Json { namespace dht { -class SearchCache; - class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface { public: @@ -276,7 +274,6 @@ private: void opFailed(); - size_t doListen(const InfoHash& key, ValueCallback, Value::Filter); bool doCancelListen(const InfoHash& key, size_t token); struct ListenState; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 2d112bb4..38eaf72a 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -668,8 +668,93 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt it = searches_.emplace(key, ProxySearch{}).first; } auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb, SyncCallback /*scb*/){ - return doListen(key, vcb, filter); + auto token = it->second.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback scb) -> size_t { + scheduler.syncTime(); + restbed::Uri uri(serverHost_ + "/" + key.toString()); + std::lock_guard<std::mutex> lock(searchLock_); + auto search = searches_.find(key); + if (search == searches_.end()) { + DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str()); + return 0; + } + DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); + + auto req = std::make_shared<restbed::Request>(uri); + auto token = ++listenerToken_; + auto l = search->second.listeners.find(token); + if (l == search->second.listeners.end()) { + auto f = filter; + l = search->second.listeners.emplace(token, Listener { + ValueCache(cb, std::move(scb)), scheduler.add(time_point::max(), [this, key, token]{ + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return; + } + auto next = l->second.cache.expireValues(scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + }), req, std::move(f) + }).first; + } else { + if (l->second.state) + l->second.state->cancel = true; + } + + auto state = std::make_shared<ListenState>(); + l->second.state = state; + l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { + if (state->cancel) + return false; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return false; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return false; + } + const std::vector<Sp<Value>> new_values_empty; + std::vector<Value::Id> expired_ids; + if (expired) { + expired_ids.reserve(values.size()); + for (const auto& v : values) + expired_ids.emplace_back(v->id); + } + auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + loopSignal_(); + return true; + }; + + auto vcb = l->second.cb; + l->second.req = req; + + if (not deviceKey_.empty()) { + // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason) + l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] { + if (state->cancel) + return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s != searches_.end()) { + auto l = s->second.listeners.find(token); + if (l != s->second.listeners.end()) { + resubscribe(key, l->second); + } + } + }); + } + l->second.thread = std::thread([this, req, vcb, filter, state]() { + sendListen(req, vcb, filter, state, + deviceKey_.empty() ? ListenMethod::LISTEN + : ListenMethod::SUBSCRIBE); + }); + return token; }); return token; } @@ -745,7 +830,7 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, } auto expired = json.get("expired", Json::Value(false)).asBool(); auto value = std::make_shared<Value>(json); - if ((not filter or filter(*value)) and cb) { + if ((not filter or filter(*value)) and cb) { std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([cb, value, state, expired]() { if (not state->cancel and not cb({value}, expired)) @@ -773,97 +858,6 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, opFailed(); } -size_t -DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter filter/*, Where where*/) -{ - scheduler.syncTime(); - restbed::Uri uri(serverHost_ + "/" + key.toString()); - std::lock_guard<std::mutex> lock(searchLock_); - auto search = searches_.find(key); - if (search == searches_.end()) { - DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str()); - return 0; - } - DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); - - auto req = std::make_shared<restbed::Request>(uri); - auto token = ++listenerToken_; - auto l = search->second.listeners.find(token); - if (l == search->second.listeners.end()) { - auto f = filter; - l = search->second.listeners.emplace(token, Listener { - ValueCache(cb), scheduler.add(time_point::max(), [this, key, token]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return; - } - auto next = l->second.cache.expireValues(scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - }), req, std::move(f) - }).first; - } else { - if (l->second.state) - l->second.state->cancel = true; - } - - auto state = std::make_shared<ListenState>(); - l->second.state = state; - l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { - if (state->cancel) - return false; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return false; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return false; - } - const std::vector<Sp<Value>> new_values_empty; - std::vector<Value::Id> expired_ids; - if (expired) { - expired_ids.reserve(values.size()); - for (const auto& v : values) - expired_ids.emplace_back(v->id); - } - auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - loopSignal_(); - return true; - }; - - auto vcb = l->second.cb; - l->second.req = req; - - if (not deviceKey_.empty()) { - // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason) - l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] { - if (state->cancel) - return; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s != searches_.end()) { - auto l = s->second.listeners.find(token); - if (l != s->second.listeners.end()) { - resubscribe(key, l->second); - } - } - }); - } - l->second.thread = std::thread([this, req, vcb, filter, state]() { - sendListen(req, vcb, filter, state, - deviceKey_.empty() ? ListenMethod::LISTEN - : ListenMethod::SUBSCRIBE); - }); - return token; -} - bool DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) { -- GitLab