Skip to content
Snippets Groups Projects
Unverified Commit c48e2440 authored by Adrien Béraud's avatar Adrien Béraud Committed by GitHub
Browse files

Merge pull request #296 from AmarOk1412/resub

proxy: refresh puts and subscriptions when connectivity changes
parents e6b247be 3cf2c092
No related branches found
No related tags found
No related merge requests found
...@@ -66,6 +66,7 @@ struct DhtProxyClient::Listener ...@@ -66,6 +66,7 @@ struct DhtProxyClient::Listener
struct PermanentPut { struct PermanentPut {
Sp<Value> value; Sp<Value> value;
Sp<Scheduler::Job> refreshJob; Sp<Scheduler::Job> refreshJob;
Sp<std::atomic_bool> ok;
}; };
struct DhtProxyClient::ProxySearch { struct DhtProxyClient::ProxySearch {
...@@ -331,24 +332,28 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po ...@@ -331,24 +332,28 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
auto id = val->id; auto id = val->id;
auto search = searches_.emplace(key, ProxySearch{}).first; auto search = searches_.emplace(key, ProxySearch{}).first;
auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN; auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN;
auto ok = std::make_shared<std::atomic_bool>(false);
search->second.puts.erase(id); search->second.puts.erase(id);
search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id]{ search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id, ok]{
std::lock_guard<std::mutex> lock(searchLock_); std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key); auto s = searches_.find(key);
if (s != searches_.end()) { if (s != searches_.end()) {
auto p = s->second.puts.find(id); auto p = s->second.puts.find(id);
if (p != s->second.puts.end()) { if (p != s->second.puts.end()) {
doPut(key, p->second.value, {}, time_point::max(), true); doPut(key, p->second.value,
[ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){
*ok = !result;
}, time_point::max(), true);
scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
} }
} }
})}); }), ok});
} }
doPut(key, val, std::move(cb), created, permanent); doPut(key, val, std::move(cb), created, permanent);
} }
void void
DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent) DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent)
{ {
DHT_LOG.d(key, "[search %s] performing put of %s", key.to_c_str(), val->toString().c_str()); DHT_LOG.d(key, "[search %s] performing put of %s", key.to_c_str(), val->toString().c_str());
restbed::Uri uri(proxy::HTTP_PROTO + serverHost_ + "/" + key.toString()); restbed::Uri uri(proxy::HTTP_PROTO + serverHost_ + "/" + key.toString());
...@@ -642,7 +647,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt ...@@ -642,7 +647,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt
it = searches_.emplace(key, ProxySearch{}).first; it = searches_.emplace(key, ProxySearch{}).first;
} }
auto query = std::make_shared<Query>(Select{}, where); auto query = std::make_shared<Query>(Select{}, where);
auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> q, ValueCallback vcb){ auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb){
return doListen(key, vcb, filter); return doListen(key, vcb, filter);
}); });
return token; return token;
...@@ -943,7 +948,26 @@ DhtProxyClient::getConnectivityStatus() ...@@ -943,7 +948,26 @@ DhtProxyClient::getConnectivityStatus()
void void
DhtProxyClient::restartListeners() DhtProxyClient::restartListeners()
{ {
DHT_LOG.d("Refresh permanent puts");
for (auto& search : searches_) {
for (auto& put : search.second.puts) {
if (!put.second.ok) {
auto ok = put.second.ok;
doPut(search.first, put.second.value,
[ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){
*ok = !result;
}, time_point::max(), true);
scheduler.edit(put.second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
}
}
}
if (not deviceKey_.empty()) { if (not deviceKey_.empty()) {
DHT_LOG.d("resubscribe due to a connectivity change");
// Connectivity changed, refresh all subscribe
for (auto& search : searches_)
for (auto& listener : search.second.listeners)
if (!listener.second.state->ok)
resubscribe(search.first, listener.second);
return; return;
} }
DHT_LOG.d("Restarting listeners"); DHT_LOG.d("Restarting listeners");
......
...@@ -138,6 +138,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , ...@@ -138,6 +138,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,
if (stopListeners) return; if (stopListeners) return;
if (service_->is_up()) if (service_->is_up())
std::cout << getStats().toString() << std::endl; std::cout << getStats().toString() << std::endl;
scheduler_.edit(printStatsJob_, scheduler_.time() + PRINT_STATS_PERIOD);
// Refresh stats cache // Refresh stats cache
auto newInfo = dht_->getNodeInfo(); auto newInfo = dht_->getNodeInfo();
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment