Skip to content
Snippets Groups Projects
Commit 8243bc4c authored by Adrien Béraud's avatar Adrien Béraud
Browse files

proxy_client: don't block for callback

Use shared flags to avoid blocking for the callback
while respecting the callback return value.
parent b1a97428
No related branches found
No related tags found
No related merge requests found
......@@ -207,8 +207,10 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
if (code == 200) {
try {
while (restbed::Http::is_open(req)) {
while (restbed::Http::is_open(req) and not *finished) {
restbed::Http::fetch("\n", reply);
if (*finished)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
......@@ -218,19 +220,12 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
callbacks_.emplace_back([cb, value, finished]() {
if (not *finished and not cb({value}))
*finished = true;
});
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
}
} else {
*ok = false;
}
......@@ -430,16 +425,22 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
auto ok = std::make_shared<bool>(true);
struct State {
bool ok {true};
bool cancel {false};
};
auto state = std::make_shared<State>();
restbed::Http::async(req,
[this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req,
[this, filterChain, cb, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
try {
while (restbed::Http::is_open(req)) {
while (restbed::Http::is_open(req) and not state->cancel) {
restbed::Http::fetch("\n", reply);
if (state->cancel)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
......@@ -449,31 +450,25 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
});
}
futureCb.wait();
if (!futureCb.get()) {
return;
callbacks_.emplace_back([cb, value, state]() {
if (not state->cancel and not cb({value})) {
state->cancel = true;
}
});
}
} else {
*ok = false;
state->ok = false;
}
}
} catch (std::runtime_error&) {
*ok = false;
state->ok = false;
}
} else {
*ok = false;
state->ok = false;
}
}, settings).get();
if (!ok) {
if (not state->ok) {
getConnectivityStatus();
}
}
......@@ -546,7 +541,7 @@ DhtProxyClient::restartListeners()
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("LISTEN");
listener.req = req;
listener.thread = std::move(std::thread([this, filterChain, cb, req]()
listener.thread = std::thread([this, filterChain, cb, req]()
{
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
......@@ -595,7 +590,7 @@ DhtProxyClient::restartListeners()
}
}, settings).get();
getConnectivityStatus();
})
}
);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment