Skip to content
Snippets Groups Projects
Commit 1f387a0e authored by Adrien Béraud's avatar Adrien Béraud
Browse files

proxy client: resolve again on connectivity change

parent 142aee12
Branches
Tags
No related merge requests found
...@@ -286,8 +286,7 @@ private: ...@@ -286,8 +286,7 @@ private:
struct InfoState; struct InfoState;
void getProxyInfos(); void getProxyInfos();
void handleProxyStatus(const asio::error_code &ec, std::shared_ptr<InfoState> infoState); void handleProxyStatus(const asio::error_code &ec, std::shared_ptr<InfoState> infoState);
void queryProxyInfo(std::shared_ptr<InfoState> infoState, const sa_family_t family, void queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t family, std::shared_ptr<http::Resolver> resolver);
std::vector<asio::ip::tcp::endpoint>&& endpoints);
void onProxyInfos(const Json::Value& val, const sa_family_t family); void onProxyInfos(const Json::Value& val, const sa_family_t family);
SockAddr parsePublicAddress(const Json::Value& val); SockAddr parsePublicAddress(const Json::Value& val);
...@@ -327,9 +326,10 @@ private: ...@@ -327,9 +326,10 @@ private:
*/ */
void cancelAllOperations(); void cancelAllOperations();
std::string proxyUrl_;
dht::crypto::Identity clientIdentity_; dht::crypto::Identity clientIdentity_;
std::shared_ptr<dht::crypto::Certificate> serverCertificate_; std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
std::pair<std::string, std::string> serverHostService_; //std::pair<std::string, std::string> serverHostService_;
std::string pushClientId_; std::string pushClientId_;
/* /*
...@@ -409,7 +409,10 @@ private: ...@@ -409,7 +409,10 @@ private:
std::atomic_bool isDestroying_ {false}; std::atomic_bool isDestroying_ {false};
Json::StreamWriterBuilder jsonBuilder_;
std::shared_ptr<dht::Logger> logger_; std::shared_ptr<dht::Logger> logger_;
std::shared_ptr<http::Request> buildRequest(const std::string& target = {});
}; };
} }
...@@ -147,7 +147,7 @@ class OPENDHT_PUBLIC Resolver ...@@ -147,7 +147,7 @@ class OPENDHT_PUBLIC Resolver
{ {
public: public:
using ResolverCb = std::function<void(const asio::error_code& ec, using ResolverCb = std::function<void(const asio::error_code& ec,
std::vector<asio::ip::tcp::endpoint> endpoints)>; const std::vector<asio::ip::tcp::endpoint>& endpoints)>;
Resolver(asio::io_context& ctx, const std::string& url, std::shared_ptr<dht::Logger> logger = {}); Resolver(asio::io_context& ctx, const std::string& url, std::shared_ptr<dht::Logger> logger = {});
Resolver(asio::io_context& ctx, const std::string& host, const std::string& service, Resolver(asio::io_context& ctx, const std::string& host, const std::string& service,
...@@ -156,14 +156,20 @@ public: ...@@ -156,14 +156,20 @@ public:
// use already resolved endpoints with classes using this resolver // use already resolved endpoints with classes using this resolver
Resolver(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint> endpoints, Resolver(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint> endpoints,
const bool ssl = false, std::shared_ptr<dht::Logger> logger = {}); const bool ssl = false, std::shared_ptr<dht::Logger> logger = {});
Resolver(asio::io_context& ctx, const std::string& url, std::vector<asio::ip::tcp::endpoint> endpoints,
std::shared_ptr<dht::Logger> logger = {});
~Resolver(); ~Resolver();
inline const Url& get_url() const { inline const Url& get_url() const {
return url_; return url_;
}; }
void add_callback(ResolverCb cb); void add_callback(ResolverCb cb, sa_family_t family = AF_UNSPEC);
std::shared_ptr<Logger> getLogger() const {
return logger_;
}
private: private:
void resolve(const std::string& host, const std::string& service); void resolve(const std::string& host, const std::string& service);
...@@ -212,7 +218,8 @@ public: ...@@ -212,7 +218,8 @@ public:
const bool ssl = false, std::shared_ptr<dht::Logger> logger = {}); const bool ssl = false, std::shared_ptr<dht::Logger> logger = {});
// user defined resolver // user defined resolver
Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, std::shared_ptr<dht::Logger> logger = {}); Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, sa_family_t family = AF_UNSPEC);
Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, const std::string& target, sa_family_t family = AF_UNSPEC);
// user defined resolved endpoints // user defined resolved endpoints
Request(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint>&& endpoints, Request(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint>&& endpoints,
...@@ -307,7 +314,7 @@ private: ...@@ -307,7 +314,7 @@ private:
std::string body_; std::string body_;
std::mutex cbs_mutex_; std::mutex cbs_mutex_;
std::unique_ptr<Callbacks> cbs_; Callbacks cbs_;
State state_; State state_;
dht::crypto::Identity client_identity_; dht::crypto::Identity client_identity_;
...@@ -316,8 +323,9 @@ private: ...@@ -316,8 +323,9 @@ private:
std::string host_; std::string host_;
unsigned int id_; unsigned int id_;
static unsigned int ids_;
asio::io_context& ctx_; asio::io_context& ctx_;
sa_family_t family_ = AF_UNSPEC;
static unsigned int ids_;
std::shared_ptr<Connection> conn_; std::shared_ptr<Connection> conn_;
std::shared_ptr<Resolver> resolver_; std::shared_ptr<Resolver> resolver_;
......
This diff is collapsed.
...@@ -373,14 +373,35 @@ Resolver::~Resolver() ...@@ -373,14 +373,35 @@ Resolver::~Resolver()
*destroyed_ = true; *destroyed_ = true;
} }
inline
std::vector<asio::ip::tcp::endpoint>
filter(const std::vector<asio::ip::tcp::endpoint>& epts, sa_family_t family)
{
if (family == AF_UNSPEC)
return epts;
std::vector<asio::ip::tcp::endpoint> ret;
for (const auto& ep : epts) {
if (family == AF_INET && ep.address().is_v4())
ret.emplace_back(ep);
else if (family == AF_INET6 && ep.address().is_v6())
ret.emplace_back(ep);
}
return ret;
}
void void
Resolver::add_callback(ResolverCb cb) Resolver::add_callback(ResolverCb cb, sa_family_t family)
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (!completed_) if (!completed_)
cbs_.emplace(std::move(cb)); cbs_.emplace(family == AF_UNSPEC ? std::move(cb) : [cb, family](const asio::error_code& ec, const std::vector<asio::ip::tcp::endpoint>& endpoints){
if (ec)
cb(ec, endpoints);
else
cb(ec, filter(endpoints, family));
});
else else
cb(ec_, endpoints_); cb(ec_, family == AF_UNSPEC ? endpoints_ : filter(endpoints_, family));
} }
void void
...@@ -397,14 +418,6 @@ Resolver::resolve(const std::string& host, const std::string& service) ...@@ -397,14 +418,6 @@ Resolver::resolve(const std::string& host, const std::string& service)
if (ec) if (ec)
logger_->e("[http:client] [resolver] error for %s:%s: %s", logger_->e("[http:client] [resolver] error for %s:%s: %s",
host.c_str(), service.c_str(), ec.message().c_str()); host.c_str(), service.c_str(), ec.message().c_str());
else {
for (auto it = endpoints.begin(); it != endpoints.end(); ++it){
asio::ip::tcp::endpoint endpoint = *it;
logger_->d("[http:client] [resolver] %s:%s endpoint (ipv%i): %s",
host.c_str(), service.c_str(), endpoint.address().is_v6() ? 6 : 4,
endpoint.address().to_string().c_str());
}
}
} }
decltype(cbs_) cbs; decltype(cbs_) cbs;
{ {
...@@ -430,7 +443,7 @@ unsigned int Request::ids_ = 1; ...@@ -430,7 +443,7 @@ unsigned int Request::ids_ = 1;
Request::Request(asio::io_context& ctx, const std::string& url, const Json::Value& json, OnJsonCb jsoncb, Request::Request(asio::io_context& ctx, const std::string& url, const Json::Value& json, OnJsonCb jsoncb,
std::shared_ptr<dht::Logger> logger) std::shared_ptr<dht::Logger> logger)
: cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), : id_(Request::ids_++), ctx_(ctx),
resolver_(std::make_shared<Resolver>(ctx, url, logger)), logger_(logger) resolver_(std::make_shared<Resolver>(ctx, url, logger)), logger_(logger)
{ {
init_default_headers(); init_default_headers();
...@@ -453,7 +466,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu ...@@ -453,7 +466,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu
} }
Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr<dht::Logger> logger) Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr<dht::Logger> logger)
: cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), : id_(Request::ids_++), ctx_(ctx),
resolver_(std::make_shared<Resolver>(ctx, url, logger)), logger_(logger) resolver_(std::make_shared<Resolver>(ctx, url, logger)), logger_(logger)
{ {
init_default_headers(); init_default_headers();
...@@ -461,26 +474,33 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr< ...@@ -461,26 +474,33 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr<
Request::Request(asio::io_context& ctx, const std::string& host, const std::string& service, Request::Request(asio::io_context& ctx, const std::string& host, const std::string& service,
const bool ssl, std::shared_ptr<dht::Logger> logger) const bool ssl, std::shared_ptr<dht::Logger> logger)
: cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), : id_(Request::ids_++), ctx_(ctx),
resolver_(std::make_shared<Resolver>(ctx, host, service, ssl, logger)), logger_(logger) resolver_(std::make_shared<Resolver>(ctx, host, service, ssl, logger)), logger_(logger)
{ {
init_default_headers(); init_default_headers();
} }
Request::Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, std::shared_ptr<dht::Logger> logger) Request::Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, sa_family_t family)
: cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), resolver_(resolver), logger_(logger) : id_(Request::ids_++), ctx_(ctx), family_(family), resolver_(resolver), logger_(resolver->getLogger())
{ {
init_default_headers(); init_default_headers();
} }
Request::Request(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint>&& endpoints, const bool ssl, Request::Request(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint>&& endpoints, const bool ssl,
std::shared_ptr<dht::Logger> logger) std::shared_ptr<dht::Logger> logger)
: cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), : id_(Request::ids_++), ctx_(ctx),
resolver_(std::make_shared<Resolver>(ctx, std::move(endpoints), ssl, logger)), logger_(logger) resolver_(std::make_shared<Resolver>(ctx, std::move(endpoints), ssl, logger)), logger_(logger)
{ {
init_default_headers(); init_default_headers();
} }
Request::Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, const std::string& target, sa_family_t family)
: id_(Request::ids_++), ctx_(ctx), family_(family), resolver_(resolver), logger_(resolver->getLogger())
{
set_header_field(restinio::http_field_t::host, get_url().host + ":" + get_url().service);
set_target(target);
}
Request::~Request() Request::~Request()
{ {
} }
...@@ -488,8 +508,9 @@ Request::~Request() ...@@ -488,8 +508,9 @@ Request::~Request()
void void
Request::init_default_headers() Request::init_default_headers()
{ {
set_header_field(restinio::http_field_t::host, get_url().host + ":" + get_url().service); const auto& url = resolver_->get_url();
set_target(resolver_->get_url().target); set_header_field(restinio::http_field_t::host, url.host + ":" + url.service);
set_target(url.target);
} }
void void
...@@ -627,29 +648,29 @@ void ...@@ -627,29 +648,29 @@ void
Request::add_on_status_callback(OnStatusCb cb) Request::add_on_status_callback(OnStatusCb cb)
{ {
std::lock_guard<std::mutex> lock(cbs_mutex_); std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_->on_status = std::move(cb); cbs_.on_status = std::move(cb);
} }
void void
Request::add_on_body_callback(OnDataCb cb) Request::add_on_body_callback(OnDataCb cb)
{ {
std::lock_guard<std::mutex> lock(cbs_mutex_); std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_->on_body = std::move(cb); cbs_.on_body = std::move(cb);
} }
void void
Request::add_on_state_change_callback(OnStateChangeCb cb) Request::add_on_state_change_callback(OnStateChangeCb cb)
{ {
std::lock_guard<std::mutex> lock(cbs_mutex_); std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_->on_state_change = std::move(cb); cbs_.on_state_change = std::move(cb);
} }
void void
Request::notify_state_change(const State state) Request::notify_state_change(const State state)
{ {
state_ = state; state_ = state;
if (cbs_ and cbs_->on_state_change) if (cbs_.on_state_change)
cbs_->on_state_change(state, response_); cbs_.on_state_change(state, response_);
} }
void void
...@@ -658,7 +679,7 @@ Request::init_parser() ...@@ -658,7 +679,7 @@ Request::init_parser()
if (!parser_) if (!parser_)
parser_ = std::make_unique<http_parser>(); parser_ = std::make_unique<http_parser>();
http_parser_init(parser_.get(), HTTP_RESPONSE); http_parser_init(parser_.get(), HTTP_RESPONSE);
parser_->data = static_cast<void*>(cbs_.get()); parser_->data = static_cast<void*>(&cbs_);
if (!parser_s_) if (!parser_s_)
parser_s_ = std::make_unique<http_parser_settings>(); parser_s_ = std::make_unique<http_parser_settings>();
...@@ -666,36 +687,36 @@ Request::init_parser() ...@@ -666,36 +687,36 @@ Request::init_parser()
{ {
// user registered callbacks wrappers to store its data in the response // user registered callbacks wrappers to store its data in the response
std::lock_guard<std::mutex> lock(cbs_mutex_); std::lock_guard<std::mutex> lock(cbs_mutex_);
auto on_status_cb = cbs_->on_status; auto on_status_cb = cbs_.on_status;
cbs_->on_status = [this, on_status_cb](unsigned int status_code){ cbs_.on_status = [this, on_status_cb](unsigned int status_code){
response_.status_code = status_code; response_.status_code = status_code;
if (on_status_cb) if (on_status_cb)
on_status_cb(status_code); on_status_cb(status_code);
}; };
auto header_field = std::make_shared<std::string>(""); auto header_field = std::make_shared<std::string>("");
auto on_header_field_cb = cbs_->on_header_field; auto on_header_field_cb = cbs_.on_header_field;
cbs_->on_header_field = [header_field, on_header_field_cb](const char* at, size_t length) { cbs_.on_header_field = [header_field, on_header_field_cb](const char* at, size_t length) {
header_field->erase(); header_field->erase();
auto field = std::string(at, length); auto field = std::string(at, length);
header_field->append(field); header_field->append(field);
if (on_header_field_cb) if (on_header_field_cb)
on_header_field_cb(at, length); on_header_field_cb(at, length);
}; };
auto on_header_value_cb = cbs_->on_header_value; auto on_header_value_cb = cbs_.on_header_value;
cbs_->on_header_value = [this, header_field, on_header_value_cb](const char* at, size_t length) { cbs_.on_header_value = [this, header_field, on_header_value_cb](const char* at, size_t length) {
response_.headers[*header_field] = std::string(at, length); response_.headers[*header_field] = std::string(at, length);
if (on_header_value_cb) if (on_header_value_cb)
on_header_value_cb(at, length); on_header_value_cb(at, length);
}; };
cbs_->on_headers_complete = [this](){ cbs_.on_headers_complete = [this](){
notify_state_change(State::HEADER_RECEIVED); notify_state_change(State::HEADER_RECEIVED);
}; };
auto on_body_cb = cbs_->on_body; auto on_body_cb = cbs_.on_body;
cbs_->on_body = [on_body_cb](const char* at, size_t length) { cbs_.on_body = [on_body_cb](const char* at, size_t length) {
if (on_body_cb) if (on_body_cb)
on_body_cb(at, length); on_body_cb(at, length);
}; };
cbs_->on_message_complete = [this](){ cbs_.on_message_complete = [this](){
if (logger_) if (logger_)
logger_->d("[http:client] [request:%i] response: message complete", id_); logger_->d("[http:client] [request:%i] response: message complete", id_);
message_complete_.store(true); message_complete_.store(true);
...@@ -827,7 +848,7 @@ Request::send() ...@@ -827,7 +848,7 @@ Request::send()
} }
else else
post(); post();
}); }, family_);
} }
void void
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment