diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 94ccb86a62646d6fb102303cd21cdb41befe8498..f2a160579aeed13e1c9cb36be4914c861ba344a5 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -66,6 +66,7 @@ struct DhtProxyClient::Listener struct PermanentPut { Sp<Value> value; Sp<Scheduler::Job> refreshJob; + Sp<std::atomic_bool> ok; }; struct DhtProxyClient::ProxySearch { @@ -331,24 +332,28 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po auto id = val->id; auto search = searches_.emplace(key, ProxySearch{}).first; auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN; + auto ok = std::make_shared<std::atomic_bool>(false); search->second.puts.erase(id); - search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id]{ + search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id, ok]{ std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(key); if (s != searches_.end()) { auto p = s->second.puts.find(id); if (p != s->second.puts.end()) { - doPut(key, p->second.value, {}, time_point::max(), true); + doPut(key, p->second.value, + [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){ + *ok = !result; + }, time_point::max(), true); scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); } } - })}); + }), ok}); } doPut(key, val, std::move(cb), created, permanent); } void -DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent) +DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent) { DHT_LOG.d(key, "[search %s] performing put of %s", key.to_c_str(), val->toString().c_str()); restbed::Uri uri(proxy::HTTP_PROTO + serverHost_ + "/" + key.toString()); @@ -642,7 +647,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt it = searches_.emplace(key, ProxySearch{}).first; } auto query = std::make_shared<Query>(Select{}, where); - auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> q, ValueCallback vcb){ + auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> /*q*/, ValueCallback vcb){ return doListen(key, vcb, filter); }); return token; @@ -943,7 +948,26 @@ DhtProxyClient::getConnectivityStatus() void DhtProxyClient::restartListeners() { + DHT_LOG.d("Refresh permanent puts"); + for (auto& search : searches_) { + for (auto& put : search.second.puts) { + if (!put.second.ok) { + auto ok = put.second.ok; + doPut(search.first, put.second.value, + [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){ + *ok = !result; + }, time_point::max(), true); + scheduler.edit(put.second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); + } + } + } if (not deviceKey_.empty()) { + DHT_LOG.d("resubscribe due to a connectivity change"); + // Connectivity changed, refresh all subscribe + for (auto& search : searches_) + for (auto& listener : search.second.listeners) + if (!listener.second.state->ok) + resubscribe(search.first, listener.second); return; } DHT_LOG.d("Restarting listeners"); diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index ad26fcb29e333fe13b2534386862695b69739228..3b8a199eb6c38abc1533983a23c275ab8e0ba6bb 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -138,6 +138,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , if (stopListeners) return; if (service_->is_up()) std::cout << getStats().toString() << std::endl; + scheduler_.edit(printStatsJob_, scheduler_.time() + PRINT_STATS_PERIOD); // Refresh stats cache auto newInfo = dht_->getNodeInfo(); {