Skip to content
Snippets Groups Projects
Commit cf6d9fd8 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

proxy client: add lock for requests

parent 5df34bb7
No related branches found
No related tags found
No related merge requests found
......@@ -349,6 +349,8 @@ private:
*/
asio::io_context httpContext_;
std::shared_ptr<http::Resolver> resolver_;
mutable std::mutex requestLock_;
std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_;
/*
* Thread for executing the http io_context.run() blocking call
......
......@@ -334,12 +334,16 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va
}
loopSignal_();
}
std::lock_guard<std::mutex> l(requestLock_);
requests_.erase(reqid);
}
});
request->send();
{
std::lock_guard<std::mutex> l(requestLock_);
requests_[reqid] = request;
}
request->send();
}
catch (const std::exception &e){
if (logger_)
logger_->e("[proxy:client] [get %s] error: %s", key.to_c_str(), e.what());
......@@ -470,12 +474,16 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb,
}
if (cb)
cb(ok);
std::lock_guard<std::mutex> l(requestLock_);
requests_.erase(reqid);
}
});
request->send();
{
std::lock_guard<std::mutex> l(requestLock_);
requests_[reqid] = request;
}
request->send();
}
catch (const std::exception &e){
if (logger_)
logger_->e("[proxy:client] [put %s] error: %s", key.to_c_str(), e.what());
......@@ -590,14 +598,17 @@ DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t
if (not infoState->cancel)
onProxyInfos(proxyInfos, family);
}
std::lock_guard<std::mutex> l(requestLock_);
requests_.erase(reqid);
}
});
if (infoState->cancel.load())
return;
{
std::lock_guard<std::mutex> l(requestLock_);
requests_[reqid] = request;
}
request->send();
}
catch (const std::exception &e){
......@@ -686,10 +697,10 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt
if (logger_)
logger_->d("[proxy:client] [listen] [search %s]", key.to_c_str());
std::lock_guard<std::mutex> lock(searchLock_);
auto& search = searches_[key];
auto query = std::make_shared<Query>(Select{}, std::move(where));
return search.ops.listen(cb, query, filter, [this, key](Sp<Query>, ValueCallback cb, SyncCallback) -> size_t {
std::lock_guard<std::mutex> lock(searchLock_);
// Find search
auto search = searches_.find(key);
if (search == searches_.end()) {
......@@ -863,13 +874,15 @@ DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash&
requests_.erase(reqid);
}
});
request->send();
{
std::lock_guard<std::mutex> l(requestLock_);
requests_[reqid] = request;
}
request->send();
}
catch (const std::exception &e){
if (logger_)
logger_->e("[proxy:client] [unsubscribe %s] failed: %s", key.to_c_str(), e.what());
requests_.erase(reqid);
}
} else {
// stop the request
......@@ -962,12 +975,16 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header,
if (response.status_code == 0)
opFailed();
}
std::lock_guard<std::mutex> l(requestLock_);
requests_.erase(reqid);
}
});
request->send();
{
std::lock_guard<std::mutex> l(requestLock_);
requests_[reqid] = request;
}
request->send();
}
catch (const std::exception &e){
if (logger_)
logger_->e("[proxy:client] [listen] request failed: %s", e.what());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment