Skip to content
Snippets Groups Projects
Unverified Commit 333c44db authored by Sébastien Blin's avatar Sébastien Blin Committed by GitHub
Browse files

proxy_client: don't block for callback

parents b1a97428 8243bc4c
No related branches found
No related tags found
No related merge requests found
......@@ -207,8 +207,10 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
if (code == 200) {
try {
while (restbed::Http::is_open(req)) {
while (restbed::Http::is_open(req) and not *finished) {
restbed::Http::fetch("\n", reply);
if (*finished)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
......@@ -218,19 +220,12 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
callbacks_.emplace_back([cb, value, finished]() {
if (not *finished and not cb({value}))
*finished = true;
});
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
}
} else {
*ok = false;
}
......@@ -430,16 +425,22 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
auto ok = std::make_shared<bool>(true);
struct State {
bool ok {true};
bool cancel {false};
};
auto state = std::make_shared<State>();
restbed::Http::async(req,
[this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req,
[this, filterChain, cb, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
try {
while (restbed::Http::is_open(req)) {
while (restbed::Http::is_open(req) and not state->cancel) {
restbed::Http::fetch("\n", reply);
if (state->cancel)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
......@@ -449,31 +450,25 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
});
}
futureCb.wait();
if (!futureCb.get()) {
return;
callbacks_.emplace_back([cb, value, state]() {
if (not state->cancel and not cb({value})) {
state->cancel = true;
}
});
}
} else {
*ok = false;
state->ok = false;
}
}
} catch (std::runtime_error&) {
*ok = false;
state->ok = false;
}
} else {
*ok = false;
state->ok = false;
}
}, settings).get();
if (!ok) {
if (not state->ok) {
getConnectivityStatus();
}
}
......@@ -546,7 +541,7 @@ DhtProxyClient::restartListeners()
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("LISTEN");
listener.req = req;
listener.thread = std::move(std::thread([this, filterChain, cb, req]()
listener.thread = std::thread([this, filterChain, cb, req]()
{
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
......@@ -595,7 +590,7 @@ DhtProxyClient::restartListeners()
}
}, settings).get();
getConnectivityStatus();
})
}
);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment