Skip to content
Snippets Groups Projects
Unverified Commit 4a83b36d authored by Sébastien Blin's avatar Sébastien Blin
Browse files

tools: add the ability to use a proxy for dhtchat

parent eb1f9d64
No related branches found
No related tags found
No related merge requests found
...@@ -317,7 +317,7 @@ private: ...@@ -317,7 +317,7 @@ private:
struct Operation struct Operation
{ {
std::shared_ptr<restbed::Request> req; std::shared_ptr<restbed::Request> req;
std::unique_ptr<std::thread> thread; std::thread thread;
}; };
std::vector<Operation> operations_; std::vector<Operation> operations_;
...@@ -336,6 +336,8 @@ private: ...@@ -336,6 +336,8 @@ private:
* Relaunch LISTEN requests if the client disconnect/reconnect. * Relaunch LISTEN requests if the client disconnect/reconnect.
*/ */
void restartListeners(); void restartListeners();
std::shared_ptr<Json::Value> currentProxyInfos_;
}; };
} }
......
...@@ -33,7 +33,7 @@ constexpr const char* const HTTP_PROTO {"http://"}; ...@@ -33,7 +33,7 @@ constexpr const char* const HTTP_PROTO {"http://"};
namespace dht { namespace dht {
DhtProxyClient::DhtProxyClient(const std::string& serverHost) DhtProxyClient::DhtProxyClient(const std::string& serverHost)
: serverHost_(serverHost), scheduler(DHT_LOG) : serverHost_(serverHost), scheduler(DHT_LOG), currentProxyInfos_(new Json::Value())
{ {
auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5); auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5);
nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this)); nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this));
...@@ -76,10 +76,10 @@ void ...@@ -76,10 +76,10 @@ void
DhtProxyClient::cancelAllOperations() DhtProxyClient::cancelAllOperations()
{ {
for (auto& operation: operations_) { for (auto& operation: operations_) {
if (operation.thread && operation.thread->joinable()) { if (operation.thread.joinable()) {
// Close connection to stop operation? // Close connection to stop operation?
restbed::Http::close(operation.req); restbed::Http::close(operation.req);
operation.thread->join(); operation.thread.join();
} }
} }
} }
...@@ -144,12 +144,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, ...@@ -144,12 +144,11 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
Operation o; Operation o;
o.req = req; o.req = req;
o.thread = std::move(std::unique_ptr<std::thread>( o.thread = std::move(std::thread([=](){
new std::thread([=](){
// Try to contact the proxy and set the status to connected when done. // Try to contact the proxy and set the status to connected when done.
// will change the connectivity status // will change the connectivity status
auto ok = std::make_shared<bool>(true); auto ok = std::make_shared<bool>(true);
auto future = restbed::Http::async(req, restbed::Http::async(req,
[=](const std::shared_ptr<restbed::Request>& req, [=](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) { const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code(); auto code = reply->get_status_code();
...@@ -176,14 +175,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, ...@@ -176,14 +175,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
} else { } else {
*ok = false; *ok = false;
} }
}); }).wait();
future.wait();
donecb(*ok, {}); donecb(*ok, {});
if (!ok) { if (!ok) {
// Connection failed, update connectivity // Connection failed, update connectivity
getConnectivityStatus(); getConnectivityStatus();
} }
}))); }));
operations_.emplace_back(std::move(o)); operations_.emplace_back(std::move(o));
} }
...@@ -203,10 +201,9 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po ...@@ -203,10 +201,9 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
Operation o; Operation o;
o.req = req; o.req = req;
o.thread = std::move(std::unique_ptr<std::thread>( o.thread = std::move(std::thread([=](){
new std::thread([=](){
auto ok = std::make_shared<bool>(true); auto ok = std::make_shared<bool>(true);
auto future = restbed::Http::async(req, restbed::Http::async(req,
[this, val, ok](const std::shared_ptr<restbed::Request>& /*req*/, [this, val, ok](const std::shared_ptr<restbed::Request>& /*req*/,
const std::shared_ptr<restbed::Response>& reply) { const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code(); auto code = reply->get_status_code();
...@@ -227,21 +224,20 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po ...@@ -227,21 +224,20 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
} else { } else {
*ok = false; *ok = false;
} }
}); }).wait();
future.wait();
cb(*ok, {}); cb(*ok, {});
if (!ok) { if (!ok) {
// Connection failed, update connectivity // Connection failed, update connectivity
getConnectivityStatus(); getConnectivityStatus();
} }
}))); }));
operations_.emplace_back(std::move(o)); operations_.emplace_back(std::move(o));
} }
NodeStats NodeStats
DhtProxyClient::getNodesStats(sa_family_t af) const DhtProxyClient::getNodesStats(sa_family_t af) const
{ {
auto proxyInfos = getProxyInfos(); auto proxyInfos = *currentProxyInfos_;
NodeStats stats {}; NodeStats stats {};
auto identifier = af == AF_INET6 ? "ipv6" : "ipv4"; auto identifier = af == AF_INET6 ? "ipv6" : "ipv4";
try { try {
...@@ -253,14 +249,13 @@ DhtProxyClient::getNodesStats(sa_family_t af) const ...@@ -253,14 +249,13 @@ DhtProxyClient::getNodesStats(sa_family_t af) const
Json::Value Json::Value
DhtProxyClient::getProxyInfos() const DhtProxyClient::getProxyInfos() const
{ {
auto result = std::make_shared<Json::Value>();
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/"); restbed::Uri uri(HTTP_PROTO + serverHost_ + "/");
auto req = std::make_shared<restbed::Request>(uri); auto req = std::make_shared<restbed::Request>(uri);
// Try to contact the proxy and set the status to connected when done. // Try to contact the proxy and set the status to connected when done.
// will change the connectivity status // will change the connectivity status
auto future = restbed::Http::async(req, restbed::Http::async(req,
[this, result](const std::shared_ptr<restbed::Request>&, [this](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) { const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code(); auto code = reply->get_status_code();
...@@ -270,25 +265,28 @@ DhtProxyClient::getProxyInfos() const ...@@ -270,25 +265,28 @@ DhtProxyClient::getProxyInfos() const
reply->get_body(body); reply->get_body(body);
Json::Reader reader; Json::Reader reader;
reader.parse(body, *result); try {
reader.parse(body, *currentProxyInfos_);
} catch (...) {
*currentProxyInfos_ = Json::Value();
} }
}); } else {
future.wait(); *currentProxyInfos_ = Json::Value();
return *result; }
}).wait();
return *currentProxyInfos_;
} }
std::vector<SockAddr> std::vector<SockAddr>
DhtProxyClient::getPublicAddress(sa_family_t family) DhtProxyClient::getPublicAddress(sa_family_t family)
{ {
auto proxyInfos = getProxyInfos(); auto proxyInfos = *currentProxyInfos_;
// json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port // json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port
if (!proxyInfos.isMember("public_ip")) { if (!proxyInfos.isMember("public_ip")) {
getConnectivityStatus();
return {}; return {};
} }
auto public_ip = proxyInfos["public_ip"].asString(); auto public_ip = proxyInfos["public_ip"].asString();
if (public_ip.length() < 2) { if (public_ip.length() < 2) {
getConnectivityStatus();
return {}; return {};
} }
std::string ipv4Address = ""; std::string ipv4Address = "";
......
...@@ -67,6 +67,13 @@ main(int argc, char **argv) ...@@ -67,6 +67,13 @@ main(int argc, char **argv)
if (not params.bootstrap.first.empty()) if (not params.bootstrap.first.empty())
dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str());
#if OPENDHT_PROXY_CLIENT
if (!params.proxyclient.empty()) {
dht.setProxyServer(params.proxyclient);
dht.enableProxy(true);
}
#endif //OPENDHT_PROXY_CLIENT
print_node_info(dht, params); print_node_info(dht, params);
std::cout << " type 'c {hash}' to join a channel" << std::endl << std::endl; std::cout << " type 'c {hash}' to join a channel" << std::endl << std::endl;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment