Skip to content
Snippets Groups Projects
Unverified Commit e7d8bc42 authored by Sébastien Blin's avatar Sébastien Blin
Browse files

proxyclient: erase finished operation threads

parent eb750f5b
Branches
Tags
No related merge requests found
......@@ -281,8 +281,10 @@ private:
{
std::shared_ptr<restbed::Request> req;
std::thread thread;
std::shared_ptr<bool> finished;
};
std::vector<Operation> operations_;
std::mutex lockOperations_;
/**
* Callbacks should be executed in the main thread.
*/
......
......@@ -87,13 +87,20 @@ DhtProxyClient::~DhtProxyClient()
void
DhtProxyClient::cancelAllOperations()
{
for (auto& operation: operations_) {
if (operation.thread.joinable()) {
lockOperations_.lock();
auto operation = operations_.begin();
while (operation != operations_.end()) {
if (operation->thread.joinable()) {
// Close connection to stop operation?
restbed::Http::close(operation.req);
operation.thread.join();
restbed::Http::close(operation->req);
operation->thread.join();
operation = operations_.erase(operation);
} else {
++operation;
}
}
lockOperations_.unlock();
}
void
......@@ -160,6 +167,22 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&)
callbacks_.clear();
lockCallbacks.unlock();
}
// Remove finished operations
lockOperations_.lock();
auto operation = operations_.begin();
while (operation != operations_.end()) {
if (*(operation->finished)) {
if (operation->thread.joinable()) {
// Close connection to stop operation?
restbed::Http::close(operation->req);
operation->thread.join();
}
operation = operations_.erase(operation);
} else {
++operation;
}
}
lockOperations_.unlock();
return scheduler.run();
}
......@@ -172,8 +195,10 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
Query query {{}, where};
auto filterChain = filter.chain(query.where.getFilter());
auto finished = std::make_shared<bool>(false);
Operation o;
o.req = req;
o.finished = finished;
o.thread = std::move(std::thread([=](){
// Try to contact the proxy and set the status to connected when done.
// will change the connectivity status
......@@ -222,8 +247,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
// Connection failed, update connectivity
getConnectivityStatus();
}
*finished = true;
}));
lockOperations_.lock();
operations_.emplace_back(std::move(o));
lockOperations_.unlock();
}
void
......@@ -240,8 +268,10 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
req->set_body(body);
req->set_header("Content-Length", std::to_string(body.size()));
auto finished = std::make_shared<bool>(false);
Operation o;
o.req = req;
o.finished = finished;
o.thread = std::move(std::thread([=](){
auto ok = std::make_shared<bool>(true);
restbed::Http::async(req,
......@@ -278,8 +308,11 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
// Connection failed, update connectivity
getConnectivityStatus();
}
*finished = true;
}));
lockOperations_.lock();
operations_.emplace_back(std::move(o));
lockOperations_.unlock();
}
NodeStats
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment