Skip to content
Snippets Groups Projects
Commit 20315085 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

OpCache: keep cache alive during callback processing

parent 9a5ef251
No related branches found
No related tags found
No related merge requests found
...@@ -102,9 +102,14 @@ public: ...@@ -102,9 +102,14 @@ public:
//std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl; //std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl;
} }
} }
auto list = listeners; if (not listeners.empty()) {
std::vector<LocalListener> list;
list.reserve(listeners.size());
for (const auto& l : listeners)
list.emplace_back(l.second);
for (auto& l : list) for (auto& l : list)
l.second.get_cb(l.second.filter.filter(newValues), false); l.get_cb(l.filter.filter(newValues), false);
}
} }
void onValuesExpired(const std::vector<Sp<Value>>& vals) { void onValuesExpired(const std::vector<Sp<Value>>& vals) {
std::vector<Sp<Value>> expiredValues; std::vector<Sp<Value>> expiredValues;
...@@ -117,9 +122,14 @@ public: ...@@ -117,9 +122,14 @@ public:
values.erase(vit); values.erase(vit);
} }
} }
auto list = listeners; if (not listeners.empty()) {
std::vector<LocalListener> list;
list.reserve(listeners.size());
for (const auto& l : listeners)
list.emplace_back(l.second);
for (auto& l : list) for (auto& l : list)
l.second.get_cb(l.second.filter.filter(expiredValues), true); l.get_cb(l.filter.filter(expiredValues), true);
}
} }
void addListener(size_t token, ValueCallback cb, Sp<Query> q, Value::Filter filter) { void addListener(size_t token, ValueCallback cb, Sp<Query> q, Value::Filter filter) {
...@@ -178,26 +188,26 @@ public: ...@@ -178,26 +188,26 @@ public:
} }
if (op == ops.end()) { if (op == ops.end()) {
// New query // New query
op = ops.emplace(q, OpCache{}).first; op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first;
auto& cache = op->second; auto& cache = *op->second;
cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){ cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){
return cache.onValue(values, expired); return cache.onValue(values, expired);
}); });
} }
auto token = nextToken_++; auto token = nextToken_++;
if (token == 0) if (nextToken_ == 0)
token++; nextToken_++;
op->second.addListener(token, get_cb, q, filter); op->second->addListener(token, get_cb, q, filter);
return token; return token;
} }
bool cancelListen(size_t gtoken, std::function<void(size_t)> onCancel) { bool cancelListen(size_t gtoken, std::function<void(size_t)> onCancel) {
for (auto it = ops.begin(); it != ops.end(); it++) { for (auto it = ops.begin(); it != ops.end(); it++) {
if (it->second.removeListener(gtoken)) { if (it->second->removeListener(gtoken)) {
if (it->second.isDone()) { if (it->second->isDone()) {
auto ltoken = it->second.searchToken; auto cache = std::move(it->second);
ops.erase(it); ops.erase(it);
onCancel(ltoken); onCancel(cache->searchToken);
} }
return true; return true;
} }
...@@ -207,10 +217,10 @@ public: ...@@ -207,10 +217,10 @@ public:
std::vector<Sp<Value>> get(Value::Filter& filter) const { std::vector<Sp<Value>> get(Value::Filter& filter) const {
if (ops.size() == 1) if (ops.size() == 1)
return ops.begin()->second.get(filter); return ops.begin()->second->get(filter);
std::map<Value::Id, Sp<Value>> c; std::map<Value::Id, Sp<Value>> c;
for (const auto& op : ops) { for (const auto& op : ops) {
for (const auto& v : op.second.get(filter)) for (const auto& v : op.second->get(filter))
c.emplace(v->id, v); c.emplace(v->id, v);
} }
std::vector<Sp<Value>> ret; std::vector<Sp<Value>> ret;
...@@ -222,13 +232,13 @@ public: ...@@ -222,13 +232,13 @@ public:
Sp<Value> get(Value::Id id) const { Sp<Value> get(Value::Id id) const {
for (const auto& op : ops) for (const auto& op : ops)
if (auto v = op.second.get(id)) if (auto v = op.second->get(id))
return v; return v;
return {}; return {};
} }
private: private:
std::map<Sp<Query>, OpCache> ops; std::map<Sp<Query>, std::unique_ptr<OpCache>> ops {};
size_t nextToken_ {1}; size_t nextToken_ {1};
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment