diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index a32e224cb5603385befd52bf9eeed9164c5f5550..13e9f55949017a8b49f4b422ed67b485a20b8b8c 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -43,8 +43,7 @@ struct DhtProxyClient::ListenState { struct DhtProxyClient::Listener { - ValueCache cache; - Sp<Scheduler::Job> cacheExpirationJob {}; + OpValueCache cache; ValueCallback cb; Value::Filter filter; Sp<restbed::Request> req; @@ -52,9 +51,8 @@ struct DhtProxyClient::Listener unsigned callbackId; Sp<ListenState> state; Sp<Scheduler::Job> refreshJob; - Listener(ValueCache&& c, Sp<Scheduler::Job>&& j, const Sp<restbed::Request>& r, Value::Filter&& f) + Listener(OpValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f) : cache(std::move(c)), - cacheExpirationJob(std::move(j)), filter(std::move(f)), req(r) {} @@ -64,11 +62,13 @@ struct PermanentPut { Sp<Value> value; Sp<Scheduler::Job> refreshJob; Sp<std::atomic_bool> ok; + PermanentPut(const Sp<Value>& v, Sp<Scheduler::Job>&& j, const Sp<std::atomic_bool>& o) + : value(v), refreshJob(std::move(j)), ok(o) {} }; struct DhtProxyClient::ProxySearch { SearchCache ops {}; - Sp<Scheduler::Job> opExpirationJob; + Sp<Scheduler::Job> opExpirationJob {}; std::map<size_t, Listener> listeners {}; std::map<Value::Id, PermanentPut> puts {}; }; @@ -336,24 +336,26 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po if (permanent) { std::lock_guard<std::mutex> lock(searchLock_); auto id = val->id; - auto search = searches_.emplace(key, ProxySearch{}).first; + auto& search = searches_[key]; auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN; auto ok = std::make_shared<std::atomic_bool>(false); - search->second.puts.erase(id); - search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id, ok]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s != searches_.end()) { - auto p = s->second.puts.find(id); - if (p != s->second.puts.end()) { - doPut(key, p->second.value, - [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){ - *ok = result; - }, time_point::max(), true); - scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); + search.puts.erase(id); + search.puts.emplace(std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple(val, scheduler.add(nextRefresh, [this, key, id, ok]{ + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s != searches_.end()) { + auto p = s->second.puts.find(id); + if (p != s->second.puts.end()) { + doPut(key, p->second.value, + [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){ + *ok = result; + }, time_point::max(), true); + scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); + } } - } - }), ok}); + }), ok)); } doPut(key, val, std::move(cb), created, permanent); } @@ -661,12 +663,9 @@ DhtProxyClient::getPublicAddress(sa_family_t family) size_t DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) { DHT_LOG.d(key, "[search %s]: listen", key.to_c_str()); - auto it = searches_.find(key); - if (it == searches_.end()) { - it = searches_.emplace(key, ProxySearch{}).first; - } + auto& search = searches_[key]; auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback scb) -> size_t { + auto token = search.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback /*scb*/) -> size_t { scheduler.syncTime(); restbed::Uri uri(serverHost_ + "/" + key.toString()); std::lock_guard<std::mutex> lock(searchLock_); @@ -682,21 +681,9 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt auto l = search->second.listeners.find(token); if (l == search->second.listeners.end()) { auto f = filter; - l = search->second.listeners.emplace(token, Listener { - ValueCache(cb, std::move(scb)), scheduler.add(time_point::max(), [this, key, token]{ - std::lock_guard<std::mutex> lock(searchLock_); - auto s = searches_.find(key); - if (s == searches_.end()) { - return; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return; - } - auto next = l->second.cache.expireValues(scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - }), req, std::move(f) - }).first; + l = search->second.listeners.emplace(std::piecewise_construct, + std::forward_as_tuple(token), + std::forward_as_tuple(OpValueCache(std::move(cb)), req, std::move(f))).first; } else { if (l->second.state) l->second.state->cancel = true; @@ -716,17 +703,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt if (l == s->second.listeners.end()) { return false; } - const std::vector<Sp<Value>> new_values_empty; - std::vector<Value::Id> expired_ids; - if (expired) { - expired_ids.reserve(values.size()); - for (const auto& v : values) - expired_ids.emplace_back(v->id); - } - auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - loopSignal_(); - return true; + return l->second.cache.onValue(values, expired); }; auto vcb = l->second.cb; @@ -749,8 +726,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt } l->second.thread = std::thread([this, req, vcb, filter, state]() { sendListen(req, vcb, filter, state, - deviceKey_.empty() ? ListenMethod::LISTEN - : ListenMethod::SUBSCRIBE); + deviceKey_.empty() ? ListenMethod::LISTEN : ListenMethod::SUBSCRIBE); }); return token; }); @@ -1019,20 +995,45 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string resubscribe(key, list.second); } } else { - auto keyidx = notification.find("key"); - InfoHash key(keyidx->second); - - for (auto& search: searches_) { - for (auto& list : search.second.listeners) { - if (search.first != key || list.second.state->cancel) - continue; - DHT_LOG.d(search.first, "[search %s] handling push notification", search.first.to_c_str()); + auto key = InfoHash(notification.at("key")); + auto& search = searches_.at(key); + for (auto& list : search.listeners) { + if (list.second.state->cancel) + continue; + DHT_LOG.d(key, "[search %s] handling push notification", key.to_c_str()); + auto expired = notification.find("exp"); + auto token = list.first; + auto state = list.second.state; + if (expired == notification.end()) { auto cb = list.second.cb; auto filter = list.second.filter; - get(search.first, [cb](const std::vector<Sp<Value>>& vals) { - cb(vals, false); - return true; - }, DoneCallbackSimple{}, std::move(filter)); + auto oldValues = list.second.cache.getValues(); + get(key, [cb](const std::vector<Sp<Value>>& vals) { + return cb(vals, false); + }, [cb, oldValues](bool /*ok*/) { + // Decrement old values refcount to expire values not present in the new list + cb(oldValues, true); + }, std::move(filter)); + } else { + std::stringstream ss(expired->second); + std::vector<Value::Id> ids; + while(ss.good()){ + std::string substr; + getline(ss, substr, ','); + ids.emplace_back(std::stoull(substr)); + } + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([this, key, token, state, ids]() { + if (state->cancel) return; + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) return; + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) return; + if (not state->cancel and not l->second.cache.onValuesExpired(ids)) + state->cancel = true; + }); + loopSignal_(); } } } @@ -1092,9 +1093,9 @@ DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req, bool resubscribe // } Json::Value body; getPushRequest(body); - if (!resubscribe) { + if (resubscribe) { // This is the first listen, we want to retrieve previous values. - body["previous_values"] = true; + body["refresh"] = true; } Json::StreamWriterBuilder wbuilder; wbuilder["commentStyle"] = "None"; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 0cf163d4262951f2a92ea94960816dee8bd9f8e5..7053ea7571707693d135785558026d7ca8f6af77 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -433,8 +433,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) scheduler_.edit(listener.expireNotifyJob, timeout - proxy::OP_MARGIN); s->yield(restbed::OK); - if (root.isMember("previous_values") && - root["previous_values"].asBool()) { + if (!root.isMember("refresh") or !root["refresh"].asBool()) { dht_->get( infoHash, [this, s](const Sp<Value> &value) { @@ -474,12 +473,20 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) // The listener is not found, so add it. listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, isAndroid, clientId](const std::vector<std::shared_ptr<Value>>& /*values*/) { - threadPool_->run([this, infoHash, pushToken, isAndroid, clientId](){ + [this, infoHash, pushToken, isAndroid, clientId](const std::vector<std::shared_ptr<Value>>& values, bool expired) { + threadPool_->run([this, infoHash, pushToken, isAndroid, clientId, values, expired]() { // Build message content Json::Value json; json["key"] = infoHash.toString(); json["to"] = clientId; + if (expired and values.size() < 3) { + std::stringstream ss; + for(size_t i = 0; i < values.size(); ++i) { + if(i != 0) ss << ","; + ss << values[i]->id; + } + json["exp"] = ss.str(); + } sendPushNotification(pushToken, std::move(json), isAndroid); }); return true; diff --git a/src/op_cache.cpp b/src/op_cache.cpp index b9aa9bcb84febf0cccab3ca7ff230a23278d40a6..899ef96394947fde106e3df49fee50a92687368e 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -34,6 +34,7 @@ OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals) { } return newValues.empty() ? true : callback(newValues, false); } + bool OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { std::vector<Sp<Value>> expiredValues; @@ -49,6 +50,24 @@ OpValueCache::onValuesExpired(const std::vector<Sp<Value>>& vals) { } return expiredValues.empty() ? true : callback(expiredValues, true); } + +bool +OpValueCache::onValuesExpired(const std::vector<Value::Id>& vids) +{ + std::vector<Sp<Value>> expiredValues; + for (const auto& vid : vids) { + auto vit = values.find(vid); + if (vit != values.end()) { + vit->second.refCount--; + if (not vit->second.refCount) { + expiredValues.emplace_back(std::move(vit->second.data)); + values.erase(vit); + } + } + } + return expiredValues.empty() ? true : callback(expiredValues, true); +} + std::vector<Sp<Value>> OpValueCache::get(const Value::Filter& filter) const { std::vector<Sp<Value>> ret; diff --git a/src/op_cache.h b/src/op_cache.h index 79cd8f140faac06fe1e21a976fbf02c7fa0b8f18..ec6b2d1324215b46a4341d337e1b11856951a7b4 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -56,6 +56,7 @@ public: bool onValuesAdded(const std::vector<Sp<Value>>& vals); bool onValuesExpired(const std::vector<Sp<Value>>& vals); + bool onValuesExpired(const std::vector<Value::Id>& vals); void onNodeChanged(ListenSyncStatus status) { switch (status) {