diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 13e9f55949017a8b49f4b422ed67b485a20b8b8c..9a31f77c7ff50ab48985ab7209b22b9f30804b10 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -52,10 +52,7 @@ struct DhtProxyClient::Listener Sp<ListenState> state; Sp<Scheduler::Job> refreshJob; Listener(OpValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f) - : cache(std::move(c)), - filter(std::move(f)), - req(r) - {} + : cache(std::move(c)), filter(std::move(f)),req(r) {} }; struct PermanentPut { @@ -682,8 +679,8 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt if (l == search->second.listeners.end()) { auto f = filter; l = search->second.listeners.emplace(std::piecewise_construct, - std::forward_as_tuple(token), - std::forward_as_tuple(OpValueCache(std::move(cb)), req, std::move(f))).first; + std::forward_as_tuple(token), + std::forward_as_tuple(std::move(cb), req, std::move(f))).first; } else { if (l->second.state) l->second.state->cancel = true; @@ -765,23 +762,23 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req, const Value::Filter &filter, const Sp<ListenState> &state, ListenMethod method) { - auto settings = std::make_shared<restbed::Settings>(); - if (method != ListenMethod::LISTEN) { - req->set_method("SUBSCRIBE"); - } else { - std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); - settings->set_connection_timeout( - timeout); // Avoid the client to close the socket after 5 seconds. - req->set_method("LISTEN"); - } - try { + auto settings = std::make_shared<restbed::Settings>(); + if (method != ListenMethod::LISTEN) { + req->set_method("SUBSCRIBE"); + } else { + std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + req->set_method("LISTEN"); + } + try { #ifdef OPENDHT_PUSH_NOTIFICATIONS - if (method != ListenMethod::LISTEN) - fillBody(req, method == ListenMethod::RESUBSCRIBE); -#endif + if (method != ListenMethod::LISTEN) + fillBody(req, method == ListenMethod::RESUBSCRIBE); + #endif restbed::Http::async(req, - [this, filter, cb, state](const std::shared_ptr<restbed::Request>& req, - const std::shared_ptr<restbed::Response>& reply) { + [this, filter, cb, state](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) + { auto code = reply->get_status_code(); if (code == 200) { try { @@ -1051,15 +1048,17 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) scheduler.syncTime(); DHT_LOG.d(key, "[search %s] resubscribe push listener", key.to_c_str()); // Subscribe - restbed::Uri uri(serverHost_ + "/" + key.toString()); - auto req = std::make_shared<restbed::Request>(uri); - req->set_method("SUBSCRIBE"); - auto state = listener.state; - if (listener.thread.joinable()) + if (listener.thread.joinable()) { + state->cancel = true; + if (listener.req) + restbed::Http::close(listener.req); listener.thread.join(); + } state->cancel = false; state->ok = true; + auto req = std::make_shared<restbed::Request>(restbed::Uri {serverHost_ + "/" + key.toString()}); + req->set_method("SUBSCRIBE"); listener.req = req; scheduler.edit(listener.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); auto vcb = listener.cb;