"src/categorizedcontactmodel.cpp" did not exist on "555b87cf917f741689413646500a8dd8ee80342c"
Select Git revision
proxy_benchmark.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht_proxy_client.cpp 20.67 KiB
/*
* Copyright (C) 2016 Savoir-faire Linux Inc.
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#if OPENDHT_PROXY_CLIENT
#include "dht_proxy_client.h"
#include <chrono>
#include <json/json.h>
#include <restbed>
#include <vector>
#include "dhtrunner.h"
#include <iostream>
constexpr const char* const HTTP_PROTO {"http://"};
namespace dht {
DhtProxyClient::DhtProxyClient(const std::string& serverHost)
: serverHost_(serverHost), lockCurrentProxyInfos_(new std::mutex()),
scheduler(DHT_LOG), currentProxyInfos_(new Json::Value())
{
if (!serverHost_.empty())
startProxy(serverHost_);
}
void
DhtProxyClient::confirmProxy()
{
if (serverHost_.empty()) return;
// Retrieve the connectivity each hours if connected, else every 5 seconds.
auto disconnected_old_status = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
getConnectivityStatus();
auto disconnected_new_status = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
auto time = disconnected_new_status ? std::chrono::seconds(5) : std::chrono::hours(1);
if (disconnected_old_status && !disconnected_new_status) {
restartListeners();
}
auto confirm_proxy_time = scheduler.time() + time;
scheduler.edit(nextProxyConfirmation, confirm_proxy_time);
}
void
DhtProxyClient::startProxy(const std::string& serverHost)
{
serverHost_ = serverHost;
if (serverHost_.empty()) return;
auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5);
nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this));
auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5);
nextConnectivityConfirmation = scheduler.add(confirm_connectivity, std::bind(&DhtProxyClient::confirmConnectivity, this));
getConnectivityStatus();
}
void
DhtProxyClient::confirmConnectivity()
{
// The scheduler must get if the proxy is disconnected
auto confirm_connectivity = scheduler.time() + std::chrono::seconds(3);
scheduler.edit(nextConnectivityConfirmation, confirm_connectivity);
}
DhtProxyClient::~DhtProxyClient()
{
cancelAllOperations();
cancelAllListeners();
}
void
DhtProxyClient::cancelAllOperations()
{
std::lock_guard<std::mutex> lock(lockOperations_);
auto operation = operations_.begin();
while (operation != operations_.end()) {
if (operation->thread.joinable()) {
// Close connection to stop operation?
restbed::Http::close(operation->req);
operation->thread.join();
operation = operations_.erase(operation);
} else {
++operation;
}
}
}
void
DhtProxyClient::cancelAllListeners()
{
std::lock_guard<std::mutex> lock(lockListener_);
for (auto& listener: listeners_) {
if (listener.thread.joinable()) {
// Close connection to stop listener?
if (listener.req)
restbed::Http::close(listener.req);
listener.thread.join();
}
}
}
void
DhtProxyClient::shutdown(ShutdownCallback cb)
{
cancelAllOperations();
cancelAllListeners();
if (cb)
cb();
}
NodeStatus
DhtProxyClient::getStatus(sa_family_t af) const
{
switch (af)
{
case AF_INET:
return statusIpv4_;
case AF_INET6:
return statusIpv6_;
default:
return NodeStatus::Disconnected;
}
}
bool
DhtProxyClient::isRunning(sa_family_t af) const
{
switch (af)
{
case AF_INET:
return statusIpv4_ == NodeStatus::Connected;
case AF_INET6:
return statusIpv6_ == NodeStatus::Connected;
default:
return false;
}
}
time_point
DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&)
{
// Exec all currently stored callbacks
scheduler.syncTime();
if (!callbacks_.empty()) {
std::lock_guard<std::mutex> lock(lockCallbacks);
for (auto& callback : callbacks_)
callback();
callbacks_.clear();
}
// Remove finished operations
{
std::lock_guard<std::mutex> lock(lockOperations_);
auto operation = operations_.begin();
while (operation != operations_.end()) {
if (*(operation->finished)) {
if (operation->thread.joinable()) {
// Close connection to stop operation?
restbed::Http::close(operation->req);
operation->thread.join();
}
operation = operations_.erase(operation);
} else {
++operation;
}
}
}
return scheduler.run();
}
void
DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
Value::Filter&& filter, Where&& where)
{
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
auto req = std::make_shared<restbed::Request>(uri);
Query query {{}, where};
auto filterChain = filter.chain(query.where.getFilter());
auto finished = std::make_shared<bool>(false);
Operation o;
o.req = req;
o.finished = finished;
o.thread = std::thread([=](){
// Try to contact the proxy and set the status to connected when done.
// will change the connectivity status
auto ok = std::make_shared<bool>(true);
restbed::Http::async(req,
[=](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
try {
while (restbed::Http::is_open(req) and not *finished) {
restbed::Http::fetch("\n", reply);
if (*finished)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
Json::Value json;
Json::Reader reader;
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, finished]() {
if (not *finished and not cb({value}))
*finished = true;
});
}
} else {
*ok = false;
}
}
} catch (std::runtime_error& e) { }
} else {
*ok = false;
}
}).wait();
if (donecb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([=](){
donecb(*ok, {});
});
}
if (!ok) {
// Connection failed, update connectivity
getConnectivityStatus();
}
*finished = true;
});
{
std::lock_guard<std::mutex> lock(lockOperations_);
operations_.emplace_back(std::move(o));
}
}
void
DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point, bool permanent)
{
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("POST");
Json::FastWriter writer;
auto json = val->toJson();
if (permanent)
json["permanent"] = true;
auto body = writer.write(json);
req->set_body(body);
req->set_header("Content-Length", std::to_string(body.size()));
auto finished = std::make_shared<bool>(false);
Operation o;
o.req = req;
o.finished = finished;
o.thread = std::thread([=](){
auto ok = std::make_shared<bool>(true);
restbed::Http::async(req,
[this, ok](const std::shared_ptr<restbed::Request>& /*req*/,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
restbed::Http::fetch("\n", reply);
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
Json::Value json;
Json::Reader reader;
try {
if (!reader.parse(body, json))
*ok = false;
} catch (...) {
*ok = false;
}
} else {
*ok = false;
}
}).wait();
if (cb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([=](){
cb(*ok, {});
});
}
if (!ok) {
// Connection failed, update connectivity
getConnectivityStatus();
}
*finished = true;
});
{
std::lock_guard<std::mutex> lock(lockOperations_);
operations_.emplace_back(std::move(o));
}
}
NodeStats
DhtProxyClient::getNodesStats(sa_family_t af) const
{
lockCurrentProxyInfos_->lock();
auto proxyInfos = *currentProxyInfos_;
lockCurrentProxyInfos_->unlock();
NodeStats stats {};
auto identifier = af == AF_INET6 ? "ipv6" : "ipv4";
try {
stats = NodeStats(proxyInfos[identifier]);
} catch (...) { }
return stats;
}
Json::Value
DhtProxyClient::getProxyInfos() const
{
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/");
auto req = std::make_shared<restbed::Request>(uri);
// Try to contact the proxy and set the status to connected when done.
// will change the connectivity status
restbed::Http::async(req,
[this](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
restbed::Http::fetch("\n", reply);
std::string body;
reply->get_body(body);
Json::Reader reader;
lockCurrentProxyInfos_->lock();
try {
reader.parse(body, *currentProxyInfos_);
} catch (...) {
*currentProxyInfos_ = Json::Value();
}
lockCurrentProxyInfos_->unlock();
} else {
lockCurrentProxyInfos_->lock();
*currentProxyInfos_ = Json::Value();
lockCurrentProxyInfos_->unlock();
}
}).wait();
lockCurrentProxyInfos_->lock();
auto result = *currentProxyInfos_;
lockCurrentProxyInfos_->unlock();
return result;
}
std::vector<SockAddr>
DhtProxyClient::getPublicAddress(sa_family_t family)
{
lockCurrentProxyInfos_->lock();
auto proxyInfos = *currentProxyInfos_;
lockCurrentProxyInfos_->unlock();
// json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port
auto public_ip = proxyInfos["public_ip"].asString();
if (!proxyInfos.isMember("public_ip") || (public_ip.length() < 2)) return {};
std::string ipv4Address = "", ipv6Address = "", port = "";
if (public_ip[0] == '[') {
// ipv6 complient
auto endIp = public_ip.find(']');
if (public_ip.length() > endIp + 2) {
port = public_ip.substr(endIp + 2);
auto ips = public_ip.substr(1, endIp - 1);
auto ipv4And6Separator = ips.find_last_of(':');
ipv4Address = ips.substr(ipv4And6Separator + 1);
ipv6Address = ips.substr(0, ipv4And6Separator - 1);
}
} else {
auto endIp = public_ip.find_last_of(':');
port = public_ip.substr(endIp + 1);
ipv4Address = public_ip.substr(0, endIp - 1);
}
switch (family)
{
case AF_INET:
return SockAddr::resolve(ipv4Address, port);
case AF_INET6:
return SockAddr::resolve(ipv6Address, port);
default:
return {};
}
}
size_t
DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filter, Where&& where)
{
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("LISTEN");
Query query {{}, where};
auto filterChain = filter.chain(query.where.getFilter());
Listener l;
++listener_token_;
l.key = key.toString();
l.token = listener_token_;
l.req = req;
l.cb = cb;
l.filterChain = std::move(filterChain);
l.thread = std::thread([=]()
{
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
struct State {
bool ok {true};
bool cancel {false};
};
auto state = std::make_shared<State>();
restbed::Http::async(req,
[this, filterChain, cb, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
try {
while (restbed::Http::is_open(req) and not state->cancel) {
restbed::Http::fetch("\n", reply);
if (state->cancel)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
Json::Value json;
Json::Reader reader;
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, state]() {
if (not state->cancel and not cb({value})) {
state->cancel = true;
}
});
}
} else {
state->ok = false;
}
}
} catch (std::runtime_error&) {
state->ok = false;
}
} else {
state->ok = false;
}
}, settings).get();
if (not state->ok) {
getConnectivityStatus();
}
}
);
{
std::lock_guard<std::mutex> lock(lockListener_);
listeners_.emplace_back(std::move(l));
}
return listener_token_;
}
bool
DhtProxyClient::cancelListen(const InfoHash&, size_t token)
{
std::lock_guard<std::mutex> lock(lockListener_);
for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
auto& listener = *it;
if (listener.token == token) {
if (listener.thread.joinable()) {
// Close connection to stop listener?
if (listener.req)
restbed::Http::close(listener.req);
listener.thread.join();
listeners_.erase(it);
return true;
}
}
}
return false;
}
void
DhtProxyClient::getConnectivityStatus()
{
auto proxyInfos = getProxyInfos();
// NOTE: json["ipvX"] contains NodeStats::toJson()
try {
auto goodIpv4 = static_cast<long>(proxyInfos["ipv4"]["good"].asLargestUInt());
auto dubiousIpv4 = static_cast<long>(proxyInfos["ipv4"]["dubious"].asLargestUInt());
statusIpv4_ = (goodIpv4 + dubiousIpv4 > 0) ? NodeStatus::Connected : NodeStatus::Disconnected;
auto goodIpv6 = static_cast<long>(proxyInfos["ipv6"]["good"].asLargestUInt());
auto dubiousIpv6 = static_cast<long>(proxyInfos["ipv6"]["dubious"].asLargestUInt());
statusIpv6_ = (goodIpv6 + dubiousIpv6 > 0) ? NodeStatus::Connected : NodeStatus::Disconnected;
myid = InfoHash(proxyInfos["node_id"].asString());
if (statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected) {
const auto& now = scheduler.time();
scheduler.edit(nextProxyConfirmation, now);
}
} catch (...) {
statusIpv4_ = NodeStatus::Disconnected;
statusIpv6_ = NodeStatus::Disconnected;
const auto& now = scheduler.time();
scheduler.edit(nextProxyConfirmation, now);
}
}
void
DhtProxyClient::restartListeners()
{
std::lock_guard<std::mutex> lock(lockListener_);
for (auto& listener: listeners_) {
if (listener.thread.joinable())
listener.thread.join();
// Redo listen
auto filterChain = listener.filterChain;
auto cb = listener.cb;
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key);
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("LISTEN");
listener.req = req;
listener.thread = std::thread([this, filterChain, cb, req]()
{
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
restbed::Http::async(req,
[this, filterChain, cb](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
try {
while (restbed::Http::is_open(req)) {
restbed::Http::fetch("\n", reply);
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
Json::Value json;
Json::Reader reader;
if (reader.parse(body, json)) {
auto value = std::make_shared<Value>(json);
if ((not filterChain or filterChain(*value)) && cb) {
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}));
});
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
}
}
}
} catch (std::runtime_error&) {
// NOTE: Http::close() can occurs here. Ignore this.
}
} else {
this->statusIpv4_ = NodeStatus::Disconnected;
this->statusIpv6_ = NodeStatus::Disconnected;
}
}, settings).get();
getConnectivityStatus();
}
);
}
}
} // namespace dht
#endif // OPENDHT_PROXY_CLIENT