Skip to content
Snippets Groups Projects
Commit 1f3bf697 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

search: use syncStatus for get

parent a1f8cc90
Branches
Tags
No related merge requests found
...@@ -108,20 +108,43 @@ OpCache::getExpiration() const { ...@@ -108,20 +108,43 @@ OpCache::getExpiration() const {
return lastRemoved + EXPIRATION; return lastRemoved + EXPIRATION;
} }
size_t SearchCache::OpMap::iterator
SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)> onListen) SearchCache::getOp(const Sp<Query>& q)
{ {
// find exact match // find exact match
auto op = ops.find(q); auto op = ops.find(q);
if (op == ops.end()) { if (op != ops.end())
return op;
// find satisfying query
for (auto it = ops.begin(); it != ops.end(); it++) {
if (q->isSatisfiedBy(*it->first)) {
return it;
}
}
return ops.end();
}
SearchCache::OpMap::const_iterator
SearchCache::getOp(const Sp<Query>& q) const
{
// find exact match
auto op = ops.find(q);
if (op != ops.cend())
return op;
// find satisfying query // find satisfying query
for (auto it = ops.begin(); it != ops.end(); it++) { for (auto it = ops.begin(); it != ops.end(); it++) {
if (q->isSatisfiedBy(*it->first)) { if (q->isSatisfiedBy(*it->first)) {
op = it; return it;
break;
} }
} }
return ops.cend();
} }
size_t
SearchCache::listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, OnListen onListen)
{
// find exact match
auto op = getOp(q);
if (op == ops.end()) { if (op == ops.end()) {
// New query // New query
op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first; op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first;
...@@ -178,6 +201,20 @@ SearchCache::expire(const time_point& now, std::function<void(size_t)> onCancel) ...@@ -178,6 +201,20 @@ SearchCache::expire(const time_point& now, std::function<void(size_t)> onCancel)
return ret; return ret;
} }
bool
SearchCache::get(const Value::Filter& f, const Sp<Query>& q, const GetCallback& gcb, const DoneCallback& dcb) const
{
auto op = getOp(q);
if (op != ops.end()) {
auto vals = op->second->get(f);
if ((not vals.empty() and not gcb(vals)) or op->second->isSynced()) {
dcb(true, {});
return true;
}
}
return false;
}
std::vector<Sp<Value>> std::vector<Sp<Value>>
SearchCache::get(const Value::Filter& filter) const { SearchCache::get(const Value::Filter& filter) const {
if (ops.size() == 1) if (ops.size() == 1)
......
...@@ -160,7 +160,9 @@ class SearchCache { ...@@ -160,7 +160,9 @@ class SearchCache {
public: public:
SearchCache() {} SearchCache() {}
SearchCache(SearchCache&&) = default; SearchCache(SearchCache&&) = default;
size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)> onListen);
using OnListen = std::function<size_t(Sp<Query>, ValueCallback, SyncCallback)>;
size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, OnListen onListen);
bool cancelListen(size_t gtoken, const time_point& now); bool cancelListen(size_t gtoken, const time_point& now);
void cancelAll(std::function<void(size_t)> onCancel); void cancelAll(std::function<void(size_t)> onCancel);
...@@ -170,6 +172,7 @@ public: ...@@ -170,6 +172,7 @@ public:
return nextExpiration_; return nextExpiration_;
} }
bool get(const Value::Filter& f, const Sp<Query>& q, const GetCallback& gcb, const DoneCallback& dcb) const;
std::vector<Sp<Value>> get(const Value::Filter& filter) const; std::vector<Sp<Value>> get(const Value::Filter& filter) const;
Sp<Value> get(Value::Id id) const; Sp<Value> get(Value::Id id) const;
...@@ -177,7 +180,11 @@ private: ...@@ -177,7 +180,11 @@ private:
SearchCache(const SearchCache&) = delete; SearchCache(const SearchCache&) = delete;
SearchCache& operator=(const SearchCache&) = delete; SearchCache& operator=(const SearchCache&) = delete;
std::map<Sp<Query>, std::unique_ptr<OpCache>> ops {}; using OpMap = std::map<Sp<Query>, std::unique_ptr<OpCache>>;
OpMap ops {};
OpMap::iterator getOp(const Sp<Query>& q);
OpMap::const_iterator getOp(const Sp<Query>& q) const;
size_t nextToken_ {1}; size_t nextToken_ {1};
time_point nextExpiration_ {time_point::max()}; time_point nextExpiration_ {time_point::max()};
}; };
......
...@@ -491,14 +491,13 @@ struct Dht::Search { ...@@ -491,14 +491,13 @@ struct Dht::Search {
void get(Value::Filter f, const Sp<Query>& q, const QueryCallback& qcb, const GetCallback& gcb, const DoneCallback& dcb, Scheduler& scheduler) { void get(Value::Filter f, const Sp<Query>& q, const QueryCallback& qcb, const GetCallback& gcb, const DoneCallback& dcb, Scheduler& scheduler) {
if (gcb or qcb) { if (gcb or qcb) {
if (not cache.get(f, q, gcb, dcb)) {
const auto& now = scheduler.time(); const auto& now = scheduler.time();
callbacks.emplace(now, Get { now, f, q, qcb, gcb, dcb }); callbacks.emplace(now, Get { now, f, q, qcb, gcb, dcb });
auto values = cache.get(f);
if (not values.empty())
gcb(values);
scheduler.edit(nextSearchStep, now); scheduler.edit(nextSearchStep, now);
} }
} }
}
size_t listen(ValueCallback cb, Value::Filter f, const Sp<Query>& q, Scheduler& scheduler) { size_t listen(ValueCallback cb, Value::Filter f, const Sp<Query>& q, Scheduler& scheduler) {
//DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); //DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment