Skip to content
Snippets Groups Projects
Commit b1a97428 authored by Sébastien Blin's avatar Sébastien Blin
Browse files

dhtproxyclient: add lock_guards for mutex and clean code

parent 5adc5b4c
No related branches found
No related tags found
No related merge requests found
......@@ -217,7 +217,7 @@ public:
void insertNode(const NodeExport&) { }
std::pair<size_t, size_t> getStoreSize() const { return {}; }
virtual void registerType(const ValueType&) { }
const ValueType& getType(ValueType::Id) const { }
const ValueType& getType(ValueType::Id) const { return NO_VALUE; }
std::vector<Sp<Value>> getLocal(const InfoHash&, Value::Filter) const { return {}; }
Sp<Value> getLocalById(const InfoHash&, Value::Id) const { return {}; }
std::vector<NodeExport> exportNodes() { return {}; }
......@@ -235,6 +235,7 @@ public:
void connectivityChanged() { }
private:
const ValueType NO_VALUE;
/**
* Get informations from the proxy node
* @return the JSON returned by the proxy
......
......@@ -87,7 +87,7 @@ DhtProxyClient::~DhtProxyClient()
void
DhtProxyClient::cancelAllOperations()
{
lockOperations_.lock();
std::lock_guard<std::mutex> lock(lockOperations_);
auto operation = operations_.begin();
while (operation != operations_.end()) {
if (operation->thread.joinable()) {
......@@ -99,14 +99,12 @@ DhtProxyClient::cancelAllOperations()
++operation;
}
}
lockOperations_.unlock();
}
void
DhtProxyClient::cancelAllListeners()
{
lockListener_.lock();
std::lock_guard<std::mutex> lock(lockListener_);
for (auto& listener: listeners_) {
if (listener.thread.joinable()) {
// Close connection to stop listener?
......@@ -115,7 +113,6 @@ DhtProxyClient::cancelAllListeners()
listener.thread.join();
}
}
lockListener_.unlock();
}
void
......@@ -161,14 +158,14 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&)
// Exec all currently stored callbacks
scheduler.syncTime();
if (!callbacks_.empty()) {
lockCallbacks.lock();
std::lock_guard<std::mutex> lock(lockCallbacks);
for (auto& callback : callbacks_)
callback();
callbacks_.clear();
lockCallbacks.unlock();
}
// Remove finished operations
lockOperations_.lock();
{
std::lock_guard<std::mutex> lock(lockOperations_);
auto operation = operations_.begin();
while (operation != operations_.end()) {
if (*(operation->finished)) {
......@@ -182,7 +179,7 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&)
++operation;
}
}
lockOperations_.unlock();
}
return scheduler.run();
}
......@@ -199,7 +196,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
Operation o;
o.req = req;
o.finished = finished;
o.thread = std::move(std::thread([=](){
o.thread = std::thread([=](){
// Try to contact the proxy and set the status to connected when done.
// will change the connectivity status
auto ok = std::make_shared<bool>(true);
......@@ -221,11 +218,18 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
lockCallbacks.lock();
callbacks_.emplace_back([=](){
cb({value});
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
});
lockCallbacks.unlock();
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
}
} else {
*ok = false;
......@@ -237,21 +241,21 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
}
}).wait();
if (donecb) {
lockCallbacks.lock();
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([=](){
donecb(*ok, {});
});
lockCallbacks.unlock();
}
if (!ok) {
// Connection failed, update connectivity
getConnectivityStatus();
}
*finished = true;
}));
lockOperations_.lock();
});
{
std::lock_guard<std::mutex> lock(lockOperations_);
operations_.emplace_back(std::move(o));
lockOperations_.unlock();
}
}
void
......@@ -272,7 +276,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
Operation o;
o.req = req;
o.finished = finished;
o.thread = std::move(std::thread([=](){
o.thread = std::thread([=](){
auto ok = std::make_shared<bool>(true);
restbed::Http::async(req,
[this, ok](const std::shared_ptr<restbed::Request>& /*req*/,
......@@ -298,21 +302,21 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
}
}).wait();
if (cb) {
lockCallbacks.lock();
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([=](){
cb(*ok, {});
});
lockCallbacks.unlock();
}
if (!ok) {
// Connection failed, update connectivity
getConnectivityStatus();
}
*finished = true;
}));
lockOperations_.lock();
});
{
std::lock_guard<std::mutex> lock(lockOperations_);
operations_.emplace_back(std::move(o));
lockOperations_.unlock();
}
}
NodeStats
......@@ -395,9 +399,9 @@ DhtProxyClient::getPublicAddress(sa_family_t family)
switch (family)
{
case AF_INET:
return DhtRunner::getAddrInfo(ipv4Address, port);
return SockAddr::resolve(ipv4Address, port);
case AF_INET6:
return DhtRunner::getAddrInfo(ipv6Address, port);
return SockAddr::resolve(ipv6Address, port);
default:
return {};
}
......@@ -420,14 +424,15 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt
l.req = req;
l.cb = cb;
l.filterChain = std::move(filterChain);
l.thread = std::move(std::thread([=]()
l.thread = std::thread([=]()
{
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
auto ok = std::make_shared<bool>(true);
restbed::Http::async(req,
[this, filterChain, cb](const std::shared_ptr<restbed::Request>& req,
[this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
......@@ -444,36 +449,46 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
lockCallbacks.lock();
callbacks_.emplace_back([=](){
cb({value});
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
});
lockCallbacks.unlock();
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
}
} else {
*ok = false;
}
}
} catch (std::runtime_error&) {
// NOTE: Http::close() can occurs here. Ignore this.
*ok = false;
}
} else {
this->statusIpv4_ = NodeStatus::Disconnected;
this->statusIpv6_ = NodeStatus::Disconnected;
*ok = false;
}
}, settings).get();
if (!ok) {
getConnectivityStatus();
})
}
}
);
lockListener_.lock();
{
std::lock_guard<std::mutex> lock(lockListener_);
listeners_.emplace_back(std::move(l));
lockListener_.unlock();
}
return listener_token_;
}
bool
DhtProxyClient::cancelListen(const InfoHash&, size_t token)
{
lockListener_.lock();
std::lock_guard<std::mutex> lock(lockListener_);
for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
auto& listener = *it;
if (listener.token == token) {
......@@ -483,12 +498,10 @@ DhtProxyClient::cancelListen(const InfoHash&, size_t token)
restbed::Http::close(listener.req);
listener.thread.join();
listeners_.erase(it);
lockListener_.unlock();
return true;
}
}
}
lockListener_.unlock();
return false;
}
......@@ -522,7 +535,7 @@ DhtProxyClient::getConnectivityStatus()
void
DhtProxyClient::restartListeners()
{
lockListener_.lock();
std::lock_guard<std::mutex> lock(lockListener_);
for (auto& listener: listeners_) {
if (listener.thread.joinable())
listener.thread.join();
......@@ -557,11 +570,18 @@ DhtProxyClient::restartListeners()
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
lockCallbacks.lock();
callbacks_.emplace_back([=](){
cb({value});
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
});
lockCallbacks.unlock();
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
}
}
}
......@@ -578,7 +598,6 @@ DhtProxyClient::restartListeners()
})
);
}
lockListener_.unlock();
}
......
......@@ -229,7 +229,7 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const
// cache the session to avoid an incrementation of the shared_ptr's counter
// else, the session->close() will not close the socket.
auto cacheSession = std::weak_ptr<restbed::Session>(s);
listener.token = std::move(dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) {
listener.token = dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) {
auto s = cacheSession.lock();
if (!s) return false;
// Send values as soon as we get them
......@@ -238,7 +238,7 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const
s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session>){ });
}
return !s->is_closed();
}));
});
lockListener_.lock();
currentListeners_.emplace_back(std::move(listener));
lockListener_.unlock();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment