diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 59eba71e56e93f40d92aace6e2e726e6138bdabb..972167ca4769d59f9dd2f39faaaf597b6a5db204 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -41,8 +41,6 @@ namespace Json { namespace dht { -class SearchCache; - class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface { public: @@ -276,7 +274,6 @@ private: void opFailed(); - size_t doListen(const InfoHash& key, ValueCallback, Value::Filter); bool doCancelListen(const InfoHash& key, size_t token); struct ListenState; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 2d112bb445e49d68cd0978aa27a67416bd1d3e91..38eaf72a97d62393e33a9d3b615096e452c42fc0 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -668,8 +668,93 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt it = searches_.emplace(key, ProxySearch{}).first; } auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb, SyncCallback /*scb*/){ - return doListen(key, vcb, filter); + auto token = it->second.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback scb) -> size_t { + scheduler.syncTime(); + restbed::Uri uri(serverHost_ + "/" + key.toString()); + std::lock_guard<std::mutex> lock(searchLock_); + auto search = searches_.find(key); + if (search == searches_.end()) { + DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str()); + return 0; + } + DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); + + auto req = std::make_shared<restbed::Request>(uri); + auto token = ++listenerToken_; + auto l = search->second.listeners.find(token); + if (l == search->second.listeners.end()) { + auto f = filter; + l = search->second.listeners.emplace(token, Listener { + ValueCache(cb, std::move(scb)), scheduler.add(time_point::max(), [this, key, token]{ + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return; + } + auto next = l->second.cache.expireValues(scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + }), req, std::move(f) + }).first; + } else { + if (l->second.state) + l->second.state->cancel = true; + } + + auto state = std::make_shared<ListenState>(); + l->second.state = state; + l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { + if (state->cancel) + return false; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return false; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return false; + } + const std::vector<Sp<Value>> new_values_empty; + std::vector<Value::Id> expired_ids; + if (expired) { + expired_ids.reserve(values.size()); + for (const auto& v : values) + expired_ids.emplace_back(v->id); + } + auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + loopSignal_(); + return true; + }; + + auto vcb = l->second.cb; + l->second.req = req; + + if (not deviceKey_.empty()) { + // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason) + l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] { + if (state->cancel) + return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s != searches_.end()) { + auto l = s->second.listeners.find(token); + if (l != s->second.listeners.end()) { + resubscribe(key, l->second); + } + } + }); + } + l->second.thread = std::thread([this, req, vcb, filter, state]() { + sendListen(req, vcb, filter, state, + deviceKey_.empty() ? ListenMethod::LISTEN + : ListenMethod::SUBSCRIBE); + }); + return token; }); return token; } @@ -745,7 +830,7 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, } auto expired = json.get("expired", Json::Value(false)).asBool(); auto value = std::make_shared<Value>(json); - if ((not filter or filter(*value)) and cb) { + if ((not filter or filter(*value)) and cb) { std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([cb, value, state, expired]() { if (not state->cancel and not cb({value}, expired)) @@ -773,97 +858,6 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, opFailed(); } -size_t -DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter filter/*, Where where*/) -{ - scheduler.syncTime(); - restbed::Uri uri(serverHost_ + "/" + key.toString()); - std::lock_guard<std::mutex> lock(searchLock_); - auto search = searches_.find(key); - if (search == searches_.end()) { - DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str()); - return 0; - } - DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); - - auto req = std::make_shared<restbed::Request>(uri); - auto token = ++listenerToken_; - auto l = search->second.listeners.find(token); - if (l == search->second.listeners.end()) { - auto f = filter; - l = search->second.listeners.emplace(token, Listener { - ValueCache(cb), scheduler.add(time_point::max(), [this, key, token]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return; - } - auto next = l->second.cache.expireValues(scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - }), req, std::move(f) - }).first; - } else { - if (l->second.state) - l->second.state->cancel = true; - } - - auto state = std::make_shared<ListenState>(); - l->second.state = state; - l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) { - if (state->cancel) - return false; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return false; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return false; - } - const std::vector<Sp<Value>> new_values_empty; - std::vector<Value::Id> expired_ids; - if (expired) { - expired_ids.reserve(values.size()); - for (const auto& v : values) - expired_ids.emplace_back(v->id); - } - auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - loopSignal_(); - return true; - }; - - auto vcb = l->second.cb; - l->second.req = req; - - if (not deviceKey_.empty()) { - // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason) - l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] { - if (state->cancel) - return; - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s != searches_.end()) { - auto l = s->second.listeners.find(token); - if (l != s->second.listeners.end()) { - resubscribe(key, l->second); - } - } - }); - } - l->second.thread = std::thread([this, req, vcb, filter, state]() { - sendListen(req, vcb, filter, state, - deviceKey_.empty() ? ListenMethod::LISTEN - : ListenMethod::SUBSCRIBE); - }); - return token; -} - bool DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) {