Skip to content
Snippets Groups Projects
Commit a1f8cc90 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

proxy client: use SyncCallback

parent 35c6a575
No related branches found
No related tags found
No related merge requests found
......@@ -41,8 +41,6 @@ namespace Json {
namespace dht {
class SearchCache;
class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
public:
......@@ -276,7 +274,6 @@ private:
void opFailed();
size_t doListen(const InfoHash& key, ValueCallback, Value::Filter);
bool doCancelListen(const InfoHash& key, size_t token);
struct ListenState;
......
......@@ -668,8 +668,93 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt
it = searches_.emplace(key, ProxySearch{}).first;
}
auto query = std::make_shared<Query>(Select{}, where);
auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb, SyncCallback /*scb*/){
return doListen(key, vcb, filter);
auto token = it->second.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback scb) -> size_t {
scheduler.syncTime();
restbed::Uri uri(serverHost_ + "/" + key.toString());
std::lock_guard<std::mutex> lock(searchLock_);
auto search = searches_.find(key);
if (search == searches_.end()) {
DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str());
return 0;
}
DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe");
auto req = std::make_shared<restbed::Request>(uri);
auto token = ++listenerToken_;
auto l = search->second.listeners.find(token);
if (l == search->second.listeners.end()) {
auto f = filter;
l = search->second.listeners.emplace(token, Listener {
ValueCache(cb, std::move(scb)), scheduler.add(time_point::max(), [this, key, token]{
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s == searches_.end()) {
return;
}
auto l = s->second.listeners.find(token);
if (l == s->second.listeners.end()) {
return;
}
auto next = l->second.cache.expireValues(scheduler.time());
scheduler.edit(l->second.cacheExpirationJob, next);
}), req, std::move(f)
}).first;
} else {
if (l->second.state)
l->second.state->cancel = true;
}
auto state = std::make_shared<ListenState>();
l->second.state = state;
l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) {
if (state->cancel)
return false;
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s == searches_.end()) {
return false;
}
auto l = s->second.listeners.find(token);
if (l == s->second.listeners.end()) {
return false;
}
const std::vector<Sp<Value>> new_values_empty;
std::vector<Value::Id> expired_ids;
if (expired) {
expired_ids.reserve(values.size());
for (const auto& v : values)
expired_ids.emplace_back(v->id);
}
auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time());
scheduler.edit(l->second.cacheExpirationJob, next);
loopSignal_();
return true;
};
auto vcb = l->second.cb;
l->second.req = req;
if (not deviceKey_.empty()) {
// Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason)
l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] {
if (state->cancel)
return;
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s != searches_.end()) {
auto l = s->second.listeners.find(token);
if (l != s->second.listeners.end()) {
resubscribe(key, l->second);
}
}
});
}
l->second.thread = std::thread([this, req, vcb, filter, state]() {
sendListen(req, vcb, filter, state,
deviceKey_.empty() ? ListenMethod::LISTEN
: ListenMethod::SUBSCRIBE);
});
return token;
});
return token;
}
......@@ -773,97 +858,6 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
opFailed();
}
size_t
DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter filter/*, Where where*/)
{
scheduler.syncTime();
restbed::Uri uri(serverHost_ + "/" + key.toString());
std::lock_guard<std::mutex> lock(searchLock_);
auto search = searches_.find(key);
if (search == searches_.end()) {
DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str());
return 0;
}
DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe");
auto req = std::make_shared<restbed::Request>(uri);
auto token = ++listenerToken_;
auto l = search->second.listeners.find(token);
if (l == search->second.listeners.end()) {
auto f = filter;
l = search->second.listeners.emplace(token, Listener {
ValueCache(cb), scheduler.add(time_point::max(), [this, key, token]{
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s == searches_.end()) {
return;
}
auto l = s->second.listeners.find(token);
if (l == s->second.listeners.end()) {
return;
}
auto next = l->second.cache.expireValues(scheduler.time());
scheduler.edit(l->second.cacheExpirationJob, next);
}), req, std::move(f)
}).first;
} else {
if (l->second.state)
l->second.state->cancel = true;
}
auto state = std::make_shared<ListenState>();
l->second.state = state;
l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) {
if (state->cancel)
return false;
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s == searches_.end()) {
return false;
}
auto l = s->second.listeners.find(token);
if (l == s->second.listeners.end()) {
return false;
}
const std::vector<Sp<Value>> new_values_empty;
std::vector<Value::Id> expired_ids;
if (expired) {
expired_ids.reserve(values.size());
for (const auto& v : values)
expired_ids.emplace_back(v->id);
}
auto next = l->second.cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time());
scheduler.edit(l->second.cacheExpirationJob, next);
loopSignal_();
return true;
};
auto vcb = l->second.cb;
l->second.req = req;
if (not deviceKey_.empty()) {
// Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason)
l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] {
if (state->cancel)
return;
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s != searches_.end()) {
auto l = s->second.listeners.find(token);
if (l != s->second.listeners.end()) {
resubscribe(key, l->second);
}
}
});
}
l->second.thread = std::thread([this, req, vcb, filter, state]() {
sendListen(req, vcb, filter, state,
deviceKey_.empty() ? ListenMethod::LISTEN
: ListenMethod::SUBSCRIBE);
});
return token;
}
bool
DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment