diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index cc24a3535729be27bc8349bdae23521fb07bfe39..d93f32bde61fa6d60ff13bea43ea3eb89160aa60 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -207,8 +207,10 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, if (code == 200) { try { - while (restbed::Http::is_open(req)) { + while (restbed::Http::is_open(req) and not *finished) { restbed::Http::fetch("\n", reply); + if (*finished) + break; std::string body; reply->get_body(body); reply->set_body(""); // Reset the body for the next fetch @@ -218,18 +220,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); if ((not filterChain or filterChain(*value)) && cb) { - auto okCb = std::make_shared<std::promise<bool>>(); - auto futureCb = okCb->get_future(); - { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, okCb](){ - okCb->set_value(cb({value})); - }); - } - futureCb.wait(); - if (!futureCb.get()) { - return; - } + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, finished]() { + if (not *finished and not cb({value})) + *finished = true; + }); } } else { *ok = false; @@ -430,16 +425,22 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. - auto ok = std::make_shared<bool>(true); + struct State { + bool ok {true}; + bool cancel {false}; + }; + auto state = std::make_shared<State>(); restbed::Http::async(req, - [this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req, + [this, filterChain, cb, state](const std::shared_ptr<restbed::Request>& req, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); if (code == 200) { try { - while (restbed::Http::is_open(req)) { + while (restbed::Http::is_open(req) and not state->cancel) { restbed::Http::fetch("\n", reply); + if (state->cancel) + break; std::string body; reply->get_body(body); reply->set_body(""); // Reset the body for the next fetch @@ -449,31 +450,25 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); if ((not filterChain or filterChain(*value)) && cb) { - auto okCb = std::make_shared<std::promise<bool>>(); - auto futureCb = okCb->get_future(); - { - std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, okCb](){ - okCb->set_value(cb({value})); - }); - } - futureCb.wait(); - if (!futureCb.get()) { - return; - } + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state]() { + if (not state->cancel and not cb({value})) { + state->cancel = true; + } + }); } } else { - *ok = false; + state->ok = false; } } } catch (std::runtime_error&) { - *ok = false; + state->ok = false; } } else { - *ok = false; + state->ok = false; } }, settings).get(); - if (!ok) { + if (not state->ok) { getConnectivityStatus(); } } @@ -546,7 +541,7 @@ DhtProxyClient::restartListeners() auto req = std::make_shared<restbed::Request>(uri); req->set_method("LISTEN"); listener.req = req; - listener.thread = std::move(std::thread([this, filterChain, cb, req]() + listener.thread = std::thread([this, filterChain, cb, req]() { auto settings = std::make_shared<restbed::Settings>(); std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); @@ -595,7 +590,7 @@ DhtProxyClient::restartListeners() } }, settings).get(); getConnectivityStatus(); - }) + } ); } }