diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 119aecd5c642487c96a48162b73530e01fe6f735..ea05f46f2e42caba52f310dcffe03dbe19e10be7 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -1,6 +1,6 @@ /* * Copyright (C) 2016 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -281,8 +281,10 @@ private: { std::shared_ptr<restbed::Request> req; std::thread thread; + std::shared_ptr<bool> finished; }; std::vector<Operation> operations_; + std::mutex lockOperations_; /** * Callbacks should be executed in the main thread. */ diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index ade9b02a36824ed4b79461bdfca0ca50986a1212..5e197808e58443110b2332f3433cf6d3cacafbe6 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -1,6 +1,6 @@ /* * Copyright (C) 2017 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 459fb49d8a49941daf08f36130c2f1a865835a96..6f524b9924828c7d157aa5b41af0fdd4153222a9 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -1,6 +1,6 @@ /* * Copyright (C) 2016 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -87,13 +87,20 @@ DhtProxyClient::~DhtProxyClient() void DhtProxyClient::cancelAllOperations() { - for (auto& operation: operations_) { - if (operation.thread.joinable()) { + lockOperations_.lock(); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (operation->thread.joinable()) { // Close connection to stop operation? - restbed::Http::close(operation.req); - operation.thread.join(); + restbed::Http::close(operation->req); + operation->thread.join(); + operation = operations_.erase(operation); + } else { + ++operation; } } + lockOperations_.unlock(); + } void @@ -160,6 +167,22 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) callbacks_.clear(); lockCallbacks.unlock(); } + // Remove finished operations + lockOperations_.lock(); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (*(operation->finished)) { + if (operation->thread.joinable()) { + // Close connection to stop operation? + restbed::Http::close(operation->req); + operation->thread.join(); + } + operation = operations_.erase(operation); + } else { + ++operation; + } + } + lockOperations_.unlock(); return scheduler.run(); } @@ -172,8 +195,10 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Query query {{}, where}; auto filterChain = filter.chain(query.where.getFilter()); + auto finished = std::make_shared<bool>(false); Operation o; o.req = req; + o.finished = finished; o.thread = std::move(std::thread([=](){ // Try to contact the proxy and set the status to connected when done. // will change the connectivity status @@ -222,8 +247,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, // Connection failed, update connectivity getConnectivityStatus(); } + *finished = true; })); + lockOperations_.lock(); operations_.emplace_back(std::move(o)); + lockOperations_.unlock(); } void @@ -240,8 +268,10 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po req->set_body(body); req->set_header("Content-Length", std::to_string(body.size())); + auto finished = std::make_shared<bool>(false); Operation o; o.req = req; + o.finished = finished; o.thread = std::move(std::thread([=](){ auto ok = std::make_shared<bool>(true); restbed::Http::async(req, @@ -278,8 +308,11 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po // Connection failed, update connectivity getConnectivityStatus(); } + *finished = true; })); + lockOperations_.lock(); operations_.emplace_back(std::move(o)); + lockOperations_.unlock(); } NodeStats diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 580c8821b7e5f8fcef659753a72b1b76e116056b..133e74f0aaf174a4eea11ee695b31e0b40013e88 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -1,6 +1,6 @@ /* * Copyright (C) 2017 Savoir-faire Linux Inc. - * Author : Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by