diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index c6ac30935c5f2e4b7bf034eff6edc0cae5f54d32..27531a4825279f2f7514a87c7861150ea304e340 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -317,7 +317,7 @@ private: struct Operation { std::shared_ptr<restbed::Request> req; - std::unique_ptr<std::thread> thread; + std::thread thread; }; std::vector<Operation> operations_; @@ -336,6 +336,8 @@ private: * Relaunch LISTEN requests if the client disconnect/reconnect. */ void restartListeners(); + + std::shared_ptr<Json::Value> currentProxyInfos_; }; } diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index fca754e411c58cdfc1f52d9525047596d631aa47..fd5b31246a7dc3741b5b94c41922c27dad2bdd4e 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -33,7 +33,7 @@ constexpr const char* const HTTP_PROTO {"http://"}; namespace dht { DhtProxyClient::DhtProxyClient(const std::string& serverHost) -: serverHost_(serverHost), scheduler(DHT_LOG) +: serverHost_(serverHost), scheduler(DHT_LOG), currentProxyInfos_(new Json::Value()) { auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5); nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this)); @@ -76,10 +76,10 @@ void DhtProxyClient::cancelAllOperations() { for (auto& operation: operations_) { - if (operation.thread && operation.thread->joinable()) { + if (operation.thread.joinable()) { // Close connection to stop operation? restbed::Http::close(operation.req); - operation.thread->join(); + operation.thread.join(); } } } @@ -144,12 +144,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Operation o; o.req = req; - o.thread = std::move(std::unique_ptr<std::thread>( - new std::thread([=](){ + o.thread = std::move(std::thread([=](){ // Try to contact the proxy and set the status to connected when done. // will change the connectivity status auto ok = std::make_shared<bool>(true); - auto future = restbed::Http::async(req, + restbed::Http::async(req, [=](const std::shared_ptr<restbed::Request>& req, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); @@ -176,14 +175,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, } else { *ok = false; } - }); - future.wait(); + }).wait(); donecb(*ok, {}); if (!ok) { // Connection failed, update connectivity getConnectivityStatus(); } - }))); + })); operations_.emplace_back(std::move(o)); } @@ -203,10 +201,9 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po Operation o; o.req = req; - o.thread = std::move(std::unique_ptr<std::thread>( - new std::thread([=](){ + o.thread = std::move(std::thread([=](){ auto ok = std::make_shared<bool>(true); - auto future = restbed::Http::async(req, + restbed::Http::async(req, [this, val, ok](const std::shared_ptr<restbed::Request>& /*req*/, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); @@ -227,21 +224,20 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po } else { *ok = false; } - }); - future.wait(); + }).wait(); cb(*ok, {}); if (!ok) { // Connection failed, update connectivity getConnectivityStatus(); } - }))); + })); operations_.emplace_back(std::move(o)); } NodeStats DhtProxyClient::getNodesStats(sa_family_t af) const { - auto proxyInfos = getProxyInfos(); + auto proxyInfos = *currentProxyInfos_; NodeStats stats {}; auto identifier = af == AF_INET6 ? "ipv6" : "ipv4"; try { @@ -253,14 +249,13 @@ DhtProxyClient::getNodesStats(sa_family_t af) const Json::Value DhtProxyClient::getProxyInfos() const { - auto result = std::make_shared<Json::Value>(); restbed::Uri uri(HTTP_PROTO + serverHost_ + "/"); auto req = std::make_shared<restbed::Request>(uri); // Try to contact the proxy and set the status to connected when done. // will change the connectivity status - auto future = restbed::Http::async(req, - [this, result](const std::shared_ptr<restbed::Request>&, + restbed::Http::async(req, + [this](const std::shared_ptr<restbed::Request>&, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); @@ -270,25 +265,28 @@ DhtProxyClient::getProxyInfos() const reply->get_body(body); Json::Reader reader; - reader.parse(body, *result); + try { + reader.parse(body, *currentProxyInfos_); + } catch (...) { + *currentProxyInfos_ = Json::Value(); + } + } else { + *currentProxyInfos_ = Json::Value(); } - }); - future.wait(); - return *result; + }).wait(); + return *currentProxyInfos_; } std::vector<SockAddr> DhtProxyClient::getPublicAddress(sa_family_t family) { - auto proxyInfos = getProxyInfos(); + auto proxyInfos = *currentProxyInfos_; // json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port if (!proxyInfos.isMember("public_ip")) { - getConnectivityStatus(); return {}; } auto public_ip = proxyInfos["public_ip"].asString(); if (public_ip.length() < 2) { - getConnectivityStatus(); return {}; } std::string ipv4Address = ""; diff --git a/tools/dhtchat.cpp b/tools/dhtchat.cpp index 34ae65e937e6d39902d4f9a93ecf1c7b1e737313..8e5654e02aa6258fdc0ea15dd95812aef7869833 100644 --- a/tools/dhtchat.cpp +++ b/tools/dhtchat.cpp @@ -67,6 +67,13 @@ main(int argc, char **argv) if (not params.bootstrap.first.empty()) dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); +#if OPENDHT_PROXY_CLIENT + if (!params.proxyclient.empty()) { + dht.setProxyServer(params.proxyclient); + dht.enableProxy(true); + } +#endif //OPENDHT_PROXY_CLIENT + print_node_info(dht, params); std::cout << " type 'c {hash}' to join a channel" << std::endl << std::endl;