diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 4f84cc7b069a3b58ea77240fff79479182f0ca9c..f506495f9a692ddaa5dc865dec225f722efcbc6f 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -286,8 +286,7 @@ private: struct InfoState; void getProxyInfos(); void handleProxyStatus(const asio::error_code &ec, std::shared_ptr<InfoState> infoState); - void queryProxyInfo(std::shared_ptr<InfoState> infoState, const sa_family_t family, - std::vector<asio::ip::tcp::endpoint>&& endpoints); + void queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t family, std::shared_ptr<http::Resolver> resolver); void onProxyInfos(const Json::Value& val, const sa_family_t family); SockAddr parsePublicAddress(const Json::Value& val); @@ -327,9 +326,10 @@ private: */ void cancelAllOperations(); + std::string proxyUrl_; dht::crypto::Identity clientIdentity_; std::shared_ptr<dht::crypto::Certificate> serverCertificate_; - std::pair<std::string, std::string> serverHostService_; + //std::pair<std::string, std::string> serverHostService_; std::string pushClientId_; /* @@ -409,7 +409,10 @@ private: std::atomic_bool isDestroying_ {false}; + Json::StreamWriterBuilder jsonBuilder_; std::shared_ptr<dht::Logger> logger_; + + std::shared_ptr<http::Request> buildRequest(const std::string& target = {}); }; } diff --git a/include/opendht/http.h b/include/opendht/http.h index 61a47b2240719963df3934b27dcd46dcc5d65f17..947414a22947c33747bf7c6d2614799b681e3568 100644 --- a/include/opendht/http.h +++ b/include/opendht/http.h @@ -147,7 +147,7 @@ class OPENDHT_PUBLIC Resolver { public: using ResolverCb = std::function<void(const asio::error_code& ec, - std::vector<asio::ip::tcp::endpoint> endpoints)>; + const std::vector<asio::ip::tcp::endpoint>& endpoints)>; Resolver(asio::io_context& ctx, const std::string& url, std::shared_ptr<dht::Logger> logger = {}); Resolver(asio::io_context& ctx, const std::string& host, const std::string& service, @@ -156,14 +156,20 @@ public: // use already resolved endpoints with classes using this resolver Resolver(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint> endpoints, const bool ssl = false, std::shared_ptr<dht::Logger> logger = {}); + Resolver(asio::io_context& ctx, const std::string& url, std::vector<asio::ip::tcp::endpoint> endpoints, + std::shared_ptr<dht::Logger> logger = {}); ~Resolver(); inline const Url& get_url() const { return url_; - }; + } - void add_callback(ResolverCb cb); + void add_callback(ResolverCb cb, sa_family_t family = AF_UNSPEC); + + std::shared_ptr<Logger> getLogger() const { + return logger_; + } private: void resolve(const std::string& host, const std::string& service); @@ -212,7 +218,8 @@ public: const bool ssl = false, std::shared_ptr<dht::Logger> logger = {}); // user defined resolver - Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, std::shared_ptr<dht::Logger> logger = {}); + Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, sa_family_t family = AF_UNSPEC); + Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, const std::string& target, sa_family_t family = AF_UNSPEC); // user defined resolved endpoints Request(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint>&& endpoints, @@ -307,7 +314,7 @@ private: std::string body_; std::mutex cbs_mutex_; - std::unique_ptr<Callbacks> cbs_; + Callbacks cbs_; State state_; dht::crypto::Identity client_identity_; @@ -316,8 +323,9 @@ private: std::string host_; unsigned int id_; - static unsigned int ids_; asio::io_context& ctx_; + sa_family_t family_ = AF_UNSPEC; + static unsigned int ids_; std::shared_ptr<Connection> conn_; std::shared_ptr<Resolver> resolver_; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index e7ffad3dc4450e33bcbce6caa19431770c0b37ef..dceee268406682d669e5630b666b73c9c3e9fd7b 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -80,9 +80,10 @@ DhtProxyClient::DhtProxyClient( pushClientId_(pushClientId), loopSignal_(signal), logger_(logger) { // build http client - serverHostService_ = splitPort(serverHost); - serverHostService_.second = serverHostService_.second.empty() ? "80" : - serverHostService_.second; + proxyUrl_ = serverHost; + + jsonBuilder_["commentStyle"] = "None"; + jsonBuilder_["indentation"] = ""; if (logger_){ if (serverCertificate_) logger_->d("[proxy:client] using ca certificate for ssl:\n%s", @@ -91,8 +92,6 @@ DhtProxyClient::DhtProxyClient( logger_->d("[proxy:client] using client certificate for ssl:\n%s", clientIdentity_.second->toString(false/*chain*/).c_str()); } - // resolve once - resolver_ = std::make_shared<http::Resolver>(httpContext_, serverHost, logger_); // run http client httpClientThread_ = std::thread([this](){ try { @@ -109,18 +108,18 @@ DhtProxyClient::DhtProxyClient( logger_->e("[proxy:client] run error: %s", ex.what()); } }); - if (!serverHostService_.first.empty()) + if (!proxyUrl_.empty()) startProxy(); } void DhtProxyClient::startProxy() { - if (serverHostService_.first.empty()) + if (proxyUrl_.empty()) return; if (logger_) - logger_->d("[proxy:client] start proxy with %s", serverHostService_.first.c_str()); + logger_->d("[proxy:client] start proxy with %s", proxyUrl_.c_str()); nextProxyConfirmationTimer_ = std::make_shared<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now()); nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1)); @@ -140,7 +139,7 @@ DhtProxyClient::handleProxyConfirm(const asio::error_code &ec) logger_->e("[proxy:client] confirm error: %s", ec.message().c_str()); return; } - if (serverHostService_.first.empty()) + if (proxyUrl_.empty()) return; getConnectivityStatus(); } @@ -263,9 +262,6 @@ DhtProxyClient::periodic(const uint8_t*, size_t, SockAddr) void DhtProxyClient::setHeaderFields(http::Request& request){ - request.set_header_field(restinio::http_field_t::host, - serverHostService_.first + ":" + serverHostService_.second); - request.set_header_field(restinio::http_field_t::user_agent, "RESTinio client"); request.set_header_field(restinio::http_field_t::accept, "*/*"); request.set_header_field(restinio::http_field_t::content_type, "application/json"); } @@ -276,24 +272,16 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va if (logger_) logger_->d("[proxy:client] [get] [search %s]", key.to_c_str()); - auto request = std::make_shared<http::Request>(httpContext_, resolver_, logger_); + auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); try { request->set_connection_type(restinio::http_connection_header_t::keep_alive); - request->set_target("/" + key.toString()); request->set_method(restinio::http_method_get()); setHeaderFields(*request); auto opstate = std::make_shared<OperationState>(); Value::Filter filter = w.empty() ? f : f.chain(w.getFilter()); - request->add_on_status_callback([this, key, opstate](unsigned int status_code){ - if (status_code != 200){ - if (logger_) - logger_->e("[proxy:client] [get %s] status error: %i", key.to_c_str(), status_code); - opstate->ok.store(false); - } - }); request->add_on_body_callback([this, key, opstate, filter, cb](const char* at, size_t length){ try { Json::CharReaderBuilder rbuilder; @@ -336,6 +324,8 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va if (logger_) logger_->e("[proxy:client] [get %s] failed with code=%i", key.to_c_str(), response.status_code); opstate->ok.store(false); + if (response.status_code == 0) + opFailed(); } if (donecb){ { @@ -350,10 +340,6 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va requests_.erase(reqid); } }); - if (serverCertificate_) - request->set_certificate_authority(serverCertificate_); - if (clientIdentity_.first and clientIdentity_.second) - request->set_identity(clientIdentity_); request->send(); requests_[reqid] = request; } @@ -425,16 +411,29 @@ DhtProxyClient::handleRefreshPut(const asio::error_code &ec, const InfoHash& key } } +std::shared_ptr<http::Request> +DhtProxyClient::buildRequest(const std::string& target) +{ + auto request = target.empty() + ? std::make_shared<http::Request>(httpContext_, resolver_) + : std::make_shared<http::Request>(httpContext_, resolver_, target); + if (serverCertificate_) + request->set_certificate_authority(serverCertificate_); + if (clientIdentity_.first and clientIdentity_.second) + request->set_identity(clientIdentity_); + request->set_header_field(restinio::http_field_t::user_agent, "RESTinio client"); + return request; +} + void DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent) { if (logger_) logger_->d("[proxy:client] [put] [search %s] executing for %s", key.to_c_str(), val->toString().c_str()); - auto request = std::make_shared<http::Request>(httpContext_, resolver_, logger_); + auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); try { - request->set_target("/" + key.toString()); request->set_method(restinio::http_method_post()); setHeaderFields(*request); @@ -452,29 +451,21 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ #endif } } - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto body = Json::writeString(wbuilder, json); - request->set_body(body); + request->set_body(Json::writeString(jsonBuilder_, json)); auto ok = std::make_shared<std::atomic_bool>(); ok->store(true); - request->add_on_status_callback([this, key, ok](unsigned int status_code){ - if (status_code != 200){ - if (logger_) - logger_->e("[proxy:client] [put %s] status error: %i", key.to_c_str(), status_code); - ok->store(false); - } - return 0; - }); request->add_on_state_change_callback([this, reqid, ok, cb, key] (http::Request::State state, const http::Response& response){ if (state == http::Request::State::DONE){ - if (response.status_code != 200) + if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); + ok->store(false); + if (response.status_code == 0) + opFailed(); + } if (cb){ { std::lock_guard<std::mutex> lock(lockCallbacks_); @@ -487,10 +478,6 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ requests_.erase(reqid); } }); - if (serverCertificate_) - request->set_certificate_authority(serverCertificate_); - if (clientIdentity_.first and clientIdentity_.second) - request->set_identity(clientIdentity_); request->send(); requests_[reqid] = request; } @@ -591,92 +578,47 @@ DhtProxyClient::handleProxyStatus(const asio::error_code& ec, std::shared_ptr<In if (logger_) logger_->d("[proxy:client] [status] sending request"); - resolver_->add_callback([this, infoState](const asio::error_code& ec, - std::vector<asio::ip::tcp::endpoint> endpoints) - { - if (ec){ - if (logger_) - logger_->e("[proxy:client] [status] resolve error: %s", ec.message().c_str()); - return; - } - std::vector<asio::ip::tcp::endpoint> endpointsIpv4; - std::vector<asio::ip::tcp::endpoint> endpointsIpv6; - - for (auto& endpoint: endpoints){ - try { - if (endpoint.address().is_v6()) - endpointsIpv6.push_back(endpoint); - if (endpoint.address().is_v4()) - endpointsIpv4.push_back(endpoint); - } catch (const std::exception&){}; - } - // avoiding nested add_callback call - if (!endpointsIpv4.empty()) - queryProxyInfo(infoState, AF_INET, std::move(endpointsIpv4)); - if (!endpointsIpv6.empty()) - queryProxyInfo(infoState, AF_INET6, std::move(endpointsIpv6)); - }); + auto resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_); + queryProxyInfo(infoState, AF_INET, resolver); + queryProxyInfo(infoState, AF_INET6, resolver); + resolver_ = resolver; } void -DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, const sa_family_t family, - std::vector<asio::ip::tcp::endpoint>&& endpoints) +DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t family, std::shared_ptr<http::Resolver> resolver) { if (logger_) logger_->d("[proxy:client] [status] query ipv%i info", family == AF_INET ? 4 : 6); - auto request = std::make_shared<http::Request>(httpContext_, std::move(endpoints), - resolver_->get_url().protocol == "https" ? /*ssl*/ true : false, logger_); + auto request = std::make_shared<http::Request>(httpContext_, resolver, family); auto reqid = request->id(); try { - request->set_connection_type(restinio::http_connection_header_t::keep_alive); - request->set_target("/"); - request->set_method(restinio::http_method_get()); + request->set_method(restinio::http_method_get()); setHeaderFields(*request); - - auto ok = std::make_shared<std::atomic_bool>(); - ok->store(true); - - request->add_on_status_callback([this, ok, family](unsigned int status_code){ - if (status_code != 200){ - if (logger_) - logger_->e("[proxy:client] [status] ipv%i status code error: %i", family, status_code); - ok->store(false); - } - }); - request->add_on_body_callback([this, ok, infoState, family](const char* at, size_t length){ - try{ - std::string err; - Json::Value proxyInfos; - Json::CharReaderBuilder rbuilder; - auto body = std::string(at, length); - auto* char_data = static_cast<const char*>(&body[0]); - auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); - if (!reader->parse(char_data, char_data + body.size(), &proxyInfos, &err)){ - ok->store(false); - return; - } - if (not infoState->cancel) - onProxyInfos(proxyInfos, family); - } - catch (const std::exception& e) { - if (logger_) - logger_->e("[proxy:client] [status] body error: %s", e.what()); - ok->store(false); - } - }); request->add_on_state_change_callback([this, reqid, family, infoState] (http::Request::State state, const http::Response& response){ - if (state == http::Request::State::DONE){ - if (response.status_code != 200) + if (state == http::Request::State::DONE) { + if (infoState->cancel.load()) + return; + //ok->store(false); + if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [status] ipv%i failed with code=%i", family == AF_INET ? 4 : 6, response.status_code); - if (infoState->cancel.load()){ // pass along the failures - if (family == AF_INET and infoState->ipv4 == 0) - onProxyInfos(Json::Value{}, AF_INET); - if (family == AF_INET6 and infoState->ipv6 == 0) - onProxyInfos(Json::Value{}, AF_INET6); + if ((family == AF_INET and infoState->ipv4 == 0) or (family == AF_INET6 and infoState->ipv6 == 0)) + onProxyInfos(Json::Value{}, family); + } else { + std::string err; + Json::Value proxyInfos; + Json::CharReaderBuilder rbuilder; + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (!reader->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){ + //ok->store(false); + onProxyInfos(Json::Value{}, family); + return; + } + if (not infoState->cancel) + onProxyInfos(proxyInfos, family); } requests_.erase(reqid); } @@ -685,12 +627,8 @@ DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, const sa_fa if (infoState->cancel.load()) return; - if (serverCertificate_) - request->set_certificate_authority(serverCertificate_); - if (clientIdentity_.first and clientIdentity_.second) - request->set_identity(clientIdentity_); - request->send(); requests_[reqid] = request; + request->send(); } catch (const std::exception &e){ if (logger_) @@ -933,43 +871,29 @@ DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& if (not deviceKey_.empty()) { // UNSUBSCRIBE - auto request = std::make_shared<http::Request>(httpContext_, resolver_, logger_); + auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); try { - request->set_target("/" + key.toString()); request->set_method(restinio::http_method_unsubscribe()); setHeaderFields(*request); Json::Value body; body["key"] = deviceKey_; body["client_id"] = pushClientId_; - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto content = Json::writeString(wbuilder, body) + "\n"; - std::replace(content.begin(), content.end(), '\n', ' '); - request->set_body(content); - - request->add_on_status_callback([this, key](unsigned int status_code){ - if (status_code != 200) - if (logger_) - logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i", - key.to_c_str(), status_code); - }); + request->set_body(Json::writeString(jsonBuilder_, body)); request->add_on_state_change_callback([this, reqid, key] (http::Request::State state, const http::Response& response){ if (state == http::Request::State::DONE){ - if (response.status_code != 200) + if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i", key.to_c_str(), response.status_code); + if (response.status_code == 0) + opFailed(); + } requests_.erase(reqid); } }); - if (serverCertificate_) - request->set_certificate_authority(serverCertificate_); - if (clientIdentity_.first and clientIdentity_.second) - request->set_identity(clientIdentity_); request->send(); requests_[reqid] = request; } @@ -1003,7 +927,9 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, const Sp<OperationState>& opstate, Listener& listener, ListenMethod method) { - auto request = std::make_shared<http::Request>(httpContext_, resolver_, logger_); + if (logger_) + logger_->e("[proxy:client] [listen] sendListen: %d", (int)method); + auto request = buildRequest(); listener.request = request; auto reqid = request->id(); try { @@ -1011,25 +937,17 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, setHeaderFields(*request); if (method == ListenMethod::LISTEN) request->set_connection_type(restinio::http_connection_header_t::keep_alive); - std::string body; #ifdef OPENDHT_PUSH_NOTIFICATIONS + std::string body; if (method != ListenMethod::LISTEN) body = fillBody(method == ListenMethod::RESUBSCRIBE); -#endif request->set_body(body); +#endif - request->add_on_status_callback([this, reqid, opstate](unsigned int status_code){ - if (status_code != 200){ - if (logger_) - logger_->e("[proxy:client] [listen] send request #%i status error: %i", reqid, status_code); - opstate->ok.store(false); - } - }); request->add_on_body_callback([this, reqid, opstate, cb](const char* at, size_t length){ try { Json::CharReaderBuilder rbuilder; auto body = std::string(at, length); - std::vector<dht::Sp<dht::Value>> values; // one value per body line std::string data_line; std::stringstream body_stream(body); @@ -1065,20 +983,20 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, opstate->ok.store(false); } }); - request->add_on_state_change_callback([this, reqid] + request->add_on_state_change_callback([this, opstate, reqid] (http::Request::State state, const http::Response& response){ if (state == http::Request::State::DONE){ - if (response.status_code != 200) + if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [listen] send request #%i failed with code=%i", reqid, response.status_code); + opstate->ok.store(false); + if (response.status_code == 0) + opFailed(); + } requests_.erase(reqid); } }); - if (serverCertificate_) - request->set_certificate_authority(serverCertificate_); - if (clientIdentity_.first and clientIdentity_.second) - request->set_identity(clientIdentity_); request->send(); requests_[reqid] = request; } @@ -1333,10 +1251,7 @@ DhtProxyClient::fillBody(bool resubscribe) // This is the first listen, we want to retrieve previous values. body["refresh"] = true; } - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto content = Json::writeString(wbuilder, body) + "\n"; + auto content = Json::writeString(jsonBuilder_, body) + "\n"; std::replace(content.begin(), content.end(), '\n', ' '); return content; } diff --git a/src/http.cpp b/src/http.cpp index 0872308cacf1dc7bd0616296248ff0944b9100ed..1ce2dc0bfde09174e735d326ee0fa5f13d6eae42 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -373,14 +373,35 @@ Resolver::~Resolver() *destroyed_ = true; } +inline +std::vector<asio::ip::tcp::endpoint> +filter(const std::vector<asio::ip::tcp::endpoint>& epts, sa_family_t family) +{ + if (family == AF_UNSPEC) + return epts; + std::vector<asio::ip::tcp::endpoint> ret; + for (const auto& ep : epts) { + if (family == AF_INET && ep.address().is_v4()) + ret.emplace_back(ep); + else if (family == AF_INET6 && ep.address().is_v6()) + ret.emplace_back(ep); + } + return ret; +} + void -Resolver::add_callback(ResolverCb cb) +Resolver::add_callback(ResolverCb cb, sa_family_t family) { std::lock_guard<std::mutex> lock(mutex_); if (!completed_) - cbs_.emplace(std::move(cb)); + cbs_.emplace(family == AF_UNSPEC ? std::move(cb) : [cb, family](const asio::error_code& ec, const std::vector<asio::ip::tcp::endpoint>& endpoints){ + if (ec) + cb(ec, endpoints); + else + cb(ec, filter(endpoints, family)); + }); else - cb(ec_, endpoints_); + cb(ec_, family == AF_UNSPEC ? endpoints_ : filter(endpoints_, family)); } void @@ -397,14 +418,6 @@ Resolver::resolve(const std::string& host, const std::string& service) if (ec) logger_->e("[http:client] [resolver] error for %s:%s: %s", host.c_str(), service.c_str(), ec.message().c_str()); - else { - for (auto it = endpoints.begin(); it != endpoints.end(); ++it){ - asio::ip::tcp::endpoint endpoint = *it; - logger_->d("[http:client] [resolver] %s:%s endpoint (ipv%i): %s", - host.c_str(), service.c_str(), endpoint.address().is_v6() ? 6 : 4, - endpoint.address().to_string().c_str()); - } - } } decltype(cbs_) cbs; { @@ -430,7 +443,7 @@ unsigned int Request::ids_ = 1; Request::Request(asio::io_context& ctx, const std::string& url, const Json::Value& json, OnJsonCb jsoncb, std::shared_ptr<dht::Logger> logger) - : cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), + : id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger)), logger_(logger) { init_default_headers(); @@ -453,7 +466,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu } Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr<dht::Logger> logger) - : cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), + : id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger)), logger_(logger) { init_default_headers(); @@ -461,26 +474,33 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr< Request::Request(asio::io_context& ctx, const std::string& host, const std::string& service, const bool ssl, std::shared_ptr<dht::Logger> logger) - : cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), + : id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, host, service, ssl, logger)), logger_(logger) { init_default_headers(); } -Request::Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, std::shared_ptr<dht::Logger> logger) - : cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), resolver_(resolver), logger_(logger) +Request::Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, sa_family_t family) + : id_(Request::ids_++), ctx_(ctx), family_(family), resolver_(resolver), logger_(resolver->getLogger()) { init_default_headers(); } Request::Request(asio::io_context& ctx, std::vector<asio::ip::tcp::endpoint>&& endpoints, const bool ssl, std::shared_ptr<dht::Logger> logger) - : cbs_(std::make_unique<Callbacks>()), id_(Request::ids_++), ctx_(ctx), + : id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, std::move(endpoints), ssl, logger)), logger_(logger) { init_default_headers(); } +Request::Request(asio::io_context& ctx, std::shared_ptr<Resolver> resolver, const std::string& target, sa_family_t family) + : id_(Request::ids_++), ctx_(ctx), family_(family), resolver_(resolver), logger_(resolver->getLogger()) +{ + set_header_field(restinio::http_field_t::host, get_url().host + ":" + get_url().service); + set_target(target); +} + Request::~Request() { } @@ -488,8 +508,9 @@ Request::~Request() void Request::init_default_headers() { - set_header_field(restinio::http_field_t::host, get_url().host + ":" + get_url().service); - set_target(resolver_->get_url().target); + const auto& url = resolver_->get_url(); + set_header_field(restinio::http_field_t::host, url.host + ":" + url.service); + set_target(url.target); } void @@ -627,29 +648,29 @@ void Request::add_on_status_callback(OnStatusCb cb) { std::lock_guard<std::mutex> lock(cbs_mutex_); - cbs_->on_status = std::move(cb); + cbs_.on_status = std::move(cb); } void Request::add_on_body_callback(OnDataCb cb) { std::lock_guard<std::mutex> lock(cbs_mutex_); - cbs_->on_body = std::move(cb); + cbs_.on_body = std::move(cb); } void Request::add_on_state_change_callback(OnStateChangeCb cb) { std::lock_guard<std::mutex> lock(cbs_mutex_); - cbs_->on_state_change = std::move(cb); + cbs_.on_state_change = std::move(cb); } void Request::notify_state_change(const State state) { state_ = state; - if (cbs_ and cbs_->on_state_change) - cbs_->on_state_change(state, response_); + if (cbs_.on_state_change) + cbs_.on_state_change(state, response_); } void @@ -658,7 +679,7 @@ Request::init_parser() if (!parser_) parser_ = std::make_unique<http_parser>(); http_parser_init(parser_.get(), HTTP_RESPONSE); - parser_->data = static_cast<void*>(cbs_.get()); + parser_->data = static_cast<void*>(&cbs_); if (!parser_s_) parser_s_ = std::make_unique<http_parser_settings>(); @@ -666,36 +687,36 @@ Request::init_parser() { // user registered callbacks wrappers to store its data in the response std::lock_guard<std::mutex> lock(cbs_mutex_); - auto on_status_cb = cbs_->on_status; - cbs_->on_status = [this, on_status_cb](unsigned int status_code){ + auto on_status_cb = cbs_.on_status; + cbs_.on_status = [this, on_status_cb](unsigned int status_code){ response_.status_code = status_code; if (on_status_cb) on_status_cb(status_code); }; auto header_field = std::make_shared<std::string>(""); - auto on_header_field_cb = cbs_->on_header_field; - cbs_->on_header_field = [header_field, on_header_field_cb](const char* at, size_t length) { + auto on_header_field_cb = cbs_.on_header_field; + cbs_.on_header_field = [header_field, on_header_field_cb](const char* at, size_t length) { header_field->erase(); auto field = std::string(at, length); header_field->append(field); if (on_header_field_cb) on_header_field_cb(at, length); }; - auto on_header_value_cb = cbs_->on_header_value; - cbs_->on_header_value = [this, header_field, on_header_value_cb](const char* at, size_t length) { + auto on_header_value_cb = cbs_.on_header_value; + cbs_.on_header_value = [this, header_field, on_header_value_cb](const char* at, size_t length) { response_.headers[*header_field] = std::string(at, length); if (on_header_value_cb) on_header_value_cb(at, length); }; - cbs_->on_headers_complete = [this](){ + cbs_.on_headers_complete = [this](){ notify_state_change(State::HEADER_RECEIVED); }; - auto on_body_cb = cbs_->on_body; - cbs_->on_body = [on_body_cb](const char* at, size_t length) { + auto on_body_cb = cbs_.on_body; + cbs_.on_body = [on_body_cb](const char* at, size_t length) { if (on_body_cb) on_body_cb(at, length); }; - cbs_->on_message_complete = [this](){ + cbs_.on_message_complete = [this](){ if (logger_) logger_->d("[http:client] [request:%i] response: message complete", id_); message_complete_.store(true); @@ -827,7 +848,7 @@ Request::send() } else post(); - }); + }, family_); } void