From e7d8bc4275d80c04605e083462df9dec0feaf92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Wed, 29 Nov 2017 10:56:57 -0500 Subject: [PATCH] proxyclient: erase finished operation threads --- include/opendht/dht_proxy_client.h | 4 ++- include/opendht/dht_proxy_server.h | 2 +- src/dht_proxy_client.cpp | 43 ++++++++++++++++++++++++++---- src/dht_proxy_server.cpp | 2 +- 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 119aecd5..ea05f46f 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -1,6 +1,6 @@ /* * Copyright (C) 2016 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -281,8 +281,10 @@ private: { std::shared_ptr<restbed::Request> req; std::thread thread; + std::shared_ptr<bool> finished; }; std::vector<Operation> operations_; + std::mutex lockOperations_; /** * Callbacks should be executed in the main thread. */ diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index ade9b02a..5e197808 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -1,6 +1,6 @@ /* * Copyright (C) 2017 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 459fb49d..6f524b99 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -1,6 +1,6 @@ /* * Copyright (C) 2016 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -87,13 +87,20 @@ DhtProxyClient::~DhtProxyClient() void DhtProxyClient::cancelAllOperations() { - for (auto& operation: operations_) { - if (operation.thread.joinable()) { + lockOperations_.lock(); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (operation->thread.joinable()) { // Close connection to stop operation? - restbed::Http::close(operation.req); - operation.thread.join(); + restbed::Http::close(operation->req); + operation->thread.join(); + operation = operations_.erase(operation); + } else { + ++operation; } } + lockOperations_.unlock(); + } void @@ -160,6 +167,22 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) callbacks_.clear(); lockCallbacks.unlock(); } + // Remove finished operations + lockOperations_.lock(); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (*(operation->finished)) { + if (operation->thread.joinable()) { + // Close connection to stop operation? + restbed::Http::close(operation->req); + operation->thread.join(); + } + operation = operations_.erase(operation); + } else { + ++operation; + } + } + lockOperations_.unlock(); return scheduler.run(); } @@ -172,8 +195,10 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Query query {{}, where}; auto filterChain = filter.chain(query.where.getFilter()); + auto finished = std::make_shared<bool>(false); Operation o; o.req = req; + o.finished = finished; o.thread = std::move(std::thread([=](){ // Try to contact the proxy and set the status to connected when done. // will change the connectivity status @@ -222,8 +247,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, // Connection failed, update connectivity getConnectivityStatus(); } + *finished = true; })); + lockOperations_.lock(); operations_.emplace_back(std::move(o)); + lockOperations_.unlock(); } void @@ -240,8 +268,10 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po req->set_body(body); req->set_header("Content-Length", std::to_string(body.size())); + auto finished = std::make_shared<bool>(false); Operation o; o.req = req; + o.finished = finished; o.thread = std::move(std::thread([=](){ auto ok = std::make_shared<bool>(true); restbed::Http::async(req, @@ -278,8 +308,11 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po // Connection failed, update connectivity getConnectivityStatus(); } + *finished = true; })); + lockOperations_.lock(); operations_.emplace_back(std::move(o)); + lockOperations_.unlock(); } NodeStats diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 580c8821..133e74f0 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -1,6 +1,6 @@ /* * Copyright (C) 2017 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by -- GitLab