diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index ea05f46f2e42caba52f310dcffe03dbe19e10be7..c6dbf8352590c9646698dc5efae72ecc65d40764 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -217,7 +217,7 @@ public: void insertNode(const NodeExport&) { } std::pair<size_t, size_t> getStoreSize() const { return {}; } virtual void registerType(const ValueType&) { } - const ValueType& getType(ValueType::Id) const { } + const ValueType& getType(ValueType::Id) const { return NO_VALUE; } std::vector<Sp<Value>> getLocal(const InfoHash&, Value::Filter) const { return {}; } Sp<Value> getLocalById(const InfoHash&, Value::Id) const { return {}; } std::vector<NodeExport> exportNodes() { return {}; } @@ -235,6 +235,7 @@ public: void connectivityChanged() { } private: + const ValueType NO_VALUE; /** * Get informations from the proxy node * @return the JSON returned by the proxy diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 6f524b9924828c7d157aa5b41af0fdd4153222a9..cc24a3535729be27bc8349bdae23521fb07bfe39 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -87,7 +87,7 @@ DhtProxyClient::~DhtProxyClient() void DhtProxyClient::cancelAllOperations() { - lockOperations_.lock(); + std::lock_guard<std::mutex> lock(lockOperations_); auto operation = operations_.begin(); while (operation != operations_.end()) { if (operation->thread.joinable()) { @@ -99,14 +99,12 @@ DhtProxyClient::cancelAllOperations() ++operation; } } - lockOperations_.unlock(); - } void DhtProxyClient::cancelAllListeners() { - lockListener_.lock(); + std::lock_guard<std::mutex> lock(lockListener_); for (auto& listener: listeners_) { if (listener.thread.joinable()) { // Close connection to stop listener? @@ -115,7 +113,6 @@ DhtProxyClient::cancelAllListeners() listener.thread.join(); } } - lockListener_.unlock(); } void @@ -161,28 +158,28 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) // Exec all currently stored callbacks scheduler.syncTime(); if (!callbacks_.empty()) { - lockCallbacks.lock(); + std::lock_guard<std::mutex> lock(lockCallbacks); for (auto& callback : callbacks_) callback(); callbacks_.clear(); - lockCallbacks.unlock(); } // Remove finished operations - lockOperations_.lock(); - auto operation = operations_.begin(); - while (operation != operations_.end()) { - if (*(operation->finished)) { - if (operation->thread.joinable()) { - // Close connection to stop operation? - restbed::Http::close(operation->req); - operation->thread.join(); + { + std::lock_guard<std::mutex> lock(lockOperations_); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (*(operation->finished)) { + if (operation->thread.joinable()) { + // Close connection to stop operation? + restbed::Http::close(operation->req); + operation->thread.join(); + } + operation = operations_.erase(operation); + } else { + ++operation; } - operation = operations_.erase(operation); - } else { - ++operation; } } - lockOperations_.unlock(); return scheduler.run(); } @@ -199,7 +196,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Operation o; o.req = req; o.finished = finished; - o.thread = std::move(std::thread([=](){ + o.thread = std::thread([=](){ // Try to contact the proxy and set the status to connected when done. // will change the connectivity status auto ok = std::make_shared<bool>(true); @@ -221,11 +218,18 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); if ((not filterChain or filterChain(*value)) && cb) { - lockCallbacks.lock(); - callbacks_.emplace_back([=](){ - cb({value}); - }); - lockCallbacks.unlock(); + auto okCb = std::make_shared<std::promise<bool>>(); + auto futureCb = okCb->get_future(); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, okCb](){ + okCb->set_value(cb({value})); + }); + } + futureCb.wait(); + if (!futureCb.get()) { + return; + } } } else { *ok = false; @@ -237,21 +241,21 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, } }).wait(); if (donecb) { - lockCallbacks.lock(); + std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([=](){ donecb(*ok, {}); }); - lockCallbacks.unlock(); } if (!ok) { // Connection failed, update connectivity getConnectivityStatus(); } *finished = true; - })); - lockOperations_.lock(); - operations_.emplace_back(std::move(o)); - lockOperations_.unlock(); + }); + { + std::lock_guard<std::mutex> lock(lockOperations_); + operations_.emplace_back(std::move(o)); + } } void @@ -272,7 +276,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po Operation o; o.req = req; o.finished = finished; - o.thread = std::move(std::thread([=](){ + o.thread = std::thread([=](){ auto ok = std::make_shared<bool>(true); restbed::Http::async(req, [this, ok](const std::shared_ptr<restbed::Request>& /*req*/, @@ -298,21 +302,21 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po } }).wait(); if (cb) { - lockCallbacks.lock(); + std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([=](){ cb(*ok, {}); }); - lockCallbacks.unlock(); } if (!ok) { // Connection failed, update connectivity getConnectivityStatus(); } *finished = true; - })); - lockOperations_.lock(); - operations_.emplace_back(std::move(o)); - lockOperations_.unlock(); + }); + { + std::lock_guard<std::mutex> lock(lockOperations_); + operations_.emplace_back(std::move(o)); + } } NodeStats @@ -395,9 +399,9 @@ DhtProxyClient::getPublicAddress(sa_family_t family) switch (family) { case AF_INET: - return DhtRunner::getAddrInfo(ipv4Address, port); + return SockAddr::resolve(ipv4Address, port); case AF_INET6: - return DhtRunner::getAddrInfo(ipv6Address, port); + return SockAddr::resolve(ipv6Address, port); default: return {}; } @@ -420,15 +424,16 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt l.req = req; l.cb = cb; l.filterChain = std::move(filterChain); - l.thread = std::move(std::thread([=]() + l.thread = std::thread([=]() { auto settings = std::make_shared<restbed::Settings>(); std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + auto ok = std::make_shared<bool>(true); restbed::Http::async(req, - [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req, - const std::shared_ptr<restbed::Response>& reply) { + [this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req, + const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); if (code == 200) { @@ -444,36 +449,46 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); if ((not filterChain or filterChain(*value)) && cb) { - lockCallbacks.lock(); - callbacks_.emplace_back([=](){ - cb({value}); - }); - lockCallbacks.unlock(); + auto okCb = std::make_shared<std::promise<bool>>(); + auto futureCb = okCb->get_future(); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, okCb](){ + okCb->set_value(cb({value})); + }); + } + futureCb.wait(); + if (!futureCb.get()) { + return; + } } + } else { + *ok = false; } } } catch (std::runtime_error&) { - // NOTE: Http::close() can occurs here. Ignore this. + *ok = false; } - } else { - this->statusIpv4_ = NodeStatus::Disconnected; - this->statusIpv6_ = NodeStatus::Disconnected; + *ok = false; } }, settings).get(); - getConnectivityStatus(); - }) + if (!ok) { + getConnectivityStatus(); + } + } ); - lockListener_.lock(); - listeners_.emplace_back(std::move(l)); - lockListener_.unlock(); + { + std::lock_guard<std::mutex> lock(lockListener_); + listeners_.emplace_back(std::move(l)); + } return listener_token_; } bool DhtProxyClient::cancelListen(const InfoHash&, size_t token) { - lockListener_.lock(); + std::lock_guard<std::mutex> lock(lockListener_); for (auto it = listeners_.begin(); it != listeners_.end(); ++it) { auto& listener = *it; if (listener.token == token) { @@ -483,12 +498,10 @@ DhtProxyClient::cancelListen(const InfoHash&, size_t token) restbed::Http::close(listener.req); listener.thread.join(); listeners_.erase(it); - lockListener_.unlock(); return true; } } } - lockListener_.unlock(); return false; } @@ -522,7 +535,7 @@ DhtProxyClient::getConnectivityStatus() void DhtProxyClient::restartListeners() { - lockListener_.lock(); + std::lock_guard<std::mutex> lock(lockListener_); for (auto& listener: listeners_) { if (listener.thread.joinable()) listener.thread.join(); @@ -557,11 +570,18 @@ DhtProxyClient::restartListeners() if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); if ((not filterChain or filterChain(*value)) && cb) { - lockCallbacks.lock(); - callbacks_.emplace_back([=](){ - cb({value}); - }); - lockCallbacks.unlock(); + auto okCb = std::make_shared<std::promise<bool>>(); + auto futureCb = okCb->get_future(); + { + std::lock_guard<std::mutex> lock(lockCallbacks); + callbacks_.emplace_back([cb, value, okCb](){ + okCb->set_value(cb({value})); + }); + } + futureCb.wait(); + if (!futureCb.get()) { + return; + } } } } @@ -578,7 +598,6 @@ DhtProxyClient::restartListeners() }) ); } - lockListener_.unlock(); } diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 133e74f0aaf174a4eea11ee695b31e0b40013e88..f74b3de271762620bfd4c45ece18138cc317d594 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -229,7 +229,7 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const // cache the session to avoid an incrementation of the shared_ptr's counter // else, the session->close() will not close the socket. auto cacheSession = std::weak_ptr<restbed::Session>(s); - listener.token = std::move(dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) { + listener.token = dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) { auto s = cacheSession.lock(); if (!s) return false; // Send values as soon as we get them @@ -238,7 +238,7 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session>){ }); } return !s->is_closed(); - })); + }); lockListener_.lock(); currentListeners_.emplace_back(std::move(listener)); lockListener_.unlock();