diff --git a/include/opendht/http.h b/include/opendht/http.h index 380802264440fa12eb5cdc5de34bd575c7ea8424..774bff399c0d39648659274b45bed9177e1ade2a 100644 --- a/include/opendht/http.h +++ b/include/opendht/http.h @@ -302,10 +302,12 @@ private: void handle_request(const asio::error_code& ec); - void handle_response_header(const asio::error_code& ec); + void handle_response_header(const asio::error_code& ec, size_t bytes); void handle_response_body(const asio::error_code& ec, size_t bytes); + void onHeadersComplete(); + /** * Parse the request with http_parser. * Return how many bytes were parsed. diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 005c8f2cd6333ca0a955de179ec38679ddcddf6a..bf38b032e4e00f855b3f0ca860ee03e564a0ac07 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -279,7 +279,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va try { auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); - request->set_connection_type(restinio::http_connection_header_t::keep_alive); + //request->set_connection_type(restinio::http_connection_header_t::keep_alive); request->set_method(restinio::http_method_get()); setHeaderFields(*request); diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 018db9e8ea4a52d9a5b0bab1976d260145ac7e2d..6e4bc577a200fbe6096260ef8e8f2d392ba28dfe 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -386,7 +386,6 @@ HttpResponse DhtProxyServer::initHttpResponse(HttpResponse response) const response.append_header("Server", "RESTinio"); response.append_header(restinio::http_field::content_type, "application/json"); response.append_header(restinio::http_field::access_control_allow_origin, "*"); - response.connection_keep_alive(); return response; } diff --git a/src/http.cpp b/src/http.cpp index d34da523813d1bd0b79e72d22653052948de2a70..be8b7396c9c249b1eac2b169b7891f942a0816c4 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -465,6 +465,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr< Request::Request(asio::io_context& ctx, const std::string& url, OnDoneCb onDone, std::shared_ptr<dht::Logger> logger) : logger_(logger), id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger)) { + init_default_headers(); add_on_state_change_callback([this, onDone](State state, const Response& response){ if (state == Request::State::DONE) onDone(response); @@ -510,7 +511,8 @@ void Request::init_default_headers() { const auto& url = resolver_->get_url(); - set_header_field(restinio::http_field_t::host, url.host + ":" + url.service); + set_header_field(restinio::http_field_t::user_agent, "Mozilla/5.0"); + set_header_field(restinio::http_field_t::accept, "text/html"); set_target(url.target); } @@ -608,7 +610,7 @@ void Request::build() { std::stringstream request; - bool append_body = true; + bool append_body = !body_.empty(); // first header request << header_.method().c_str() << " " << header_.request_target() << " " << @@ -624,26 +626,26 @@ Request::build() // last connection header const char* conn_str = nullptr; switch (connection_type_){ + case restinio::http_connection_header_t::keep_alive: + conn_str = "keep-alive"; + break; case restinio::http_connection_header_t::upgrade: if (logger_) logger_->e("Unsupported connection type 'upgrade', fallback to 'close'"); // fallthrough case restinio::http_connection_header_t::close: - conn_str = "close"; - break; - case restinio::http_connection_header_t::keep_alive: - conn_str = "keep-alive"; + conn_str = "close"; // default break; } if (conn_str) request << "Connection: " << conn_str << "\r\n"; // body & content-length - if (!body_.empty()) - request << "Content-Length: " << body_.size() << "\r\n\r\n"; - // last delim - if (append_body) - request << body_ << "\r\n"; + if (append_body) { + request << "Content-Length: " << body_.size() << "\r\n\r\n" + << body_; + } else + request << "\r\n"; request_ = request.str(); } @@ -691,29 +693,35 @@ Request::init_parser() // user registered callbacks wrappers to store its data in the response std::lock_guard<std::mutex> lock(cbs_mutex_); cbs_.on_status = [this, statusCb = std::move(cbs_.on_status)](unsigned int status_code){ + std::cout << "on_status " << status_code << std::endl; response_.status_code = status_code; if (statusCb) statusCb(status_code); }; auto header_field = std::make_shared<std::string>(); cbs_.on_header_field = [header_field, headerFieldCb = std::move(cbs_.on_header_field)](const char* at, size_t length) { + std::cout << "on_header_field " << std::string(at, length) << std::endl; *header_field = std::string(at, length); if (headerFieldCb) headerFieldCb(at, length); }; cbs_.on_header_value = [this, header_field, headerValueCb = std::move(cbs_.on_header_value)](const char* at, size_t length) { + std::cout << "on_header_value " << std::string(at, length) << std::endl; response_.headers[*header_field] = std::string(at, length); if (headerValueCb) headerValueCb(at, length); }; cbs_.on_headers_complete = [this](){ - notify_state_change(State::HEADER_RECEIVED); + std::cout << "on_headers_complete " << std::endl; + onHeadersComplete(); }; cbs_.on_body = [bodyCb = std::move(cbs_.on_body)](const char* at, size_t length) { + std::cout << "on_body " << length << std::endl; if (bodyCb) bodyCb(at, length); }; cbs_.on_message_complete = [this](){ + std::cout << "on_message_complete " << std::endl; if (logger_) logger_->d("[http:request:%i] response: message complete", id_); message_complete_.store(true); @@ -759,7 +767,7 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) if (logger_){ std::string eps = ""; for (const auto& endpoint : endpoints) - eps.append(endpoint.address().to_string() + " "); + eps.append(endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + " "); logger_->d("[http:request:%i] connect begin: %s", id_, eps.c_str()); } if (get_url().protocol == "https"){ @@ -789,7 +797,15 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) if (this_.logger_) this_.logger_->d("[http:request:%i] connect success", this_.id_); - if (this_.get_url().protocol == "https"){ + const auto& url = this_.get_url(); + auto port = endpoint.port(); + if ((url.protocol == "http" && port == (in_port_t)80) + || (url.protocol == "https" && port == (in_port_t)443)) + this_.set_header_field(restinio::http_field_t::host, url.host); + else + this_.set_header_field(restinio::http_field_t::host, url.host + ":" + std::to_string(port)); + + if (url.protocol == "https") { if (this_.server_ca_) this_.conn_->set_ssl_verification(endpoint, asio::ssl::verify_peer | asio::ssl::verify_fail_if_no_peer_cert); @@ -831,7 +847,7 @@ Request::send() this_.logger_->e("[http:request:%i] resolve error: %s", this_.id_, ec.message().c_str()); this_.terminate(asio::error::connection_aborted); } - else if (!this_.conn_ or !this_.conn_->is_open()){ + else if (!this_.conn_ or !this_.conn_->is_open()) { this_.connect(std::move(endpoints), [wthis](const asio::error_code &ec) { if (auto sthis = wthis.lock()) { if (ec) @@ -857,10 +873,9 @@ Request::post() build(); init_parser(); - if (logger_){ - std::string header; std::getline(std::istringstream(request_), header); - logger_->d("[http:request:%i] send: %s", id_, header.c_str()); - } + if (logger_) + logger_->d("[http:request:%i] send:\n%s", id_, request_.c_str()); + // write the request to buffer std::ostream request_stream(&conn_->input()); request_stream << request_; @@ -917,14 +932,14 @@ Request::handle_request(const asio::error_code& ec) notify_state_change(State::RECEIVING); std::weak_ptr<Request> wthis = shared_from_this(); - conn_->async_read_until(HTTP_HEADER_DELIM, [wthis](const asio::error_code& ec, size_t){ + conn_->async_read_until(HTTP_HEADER_DELIM, [wthis](const asio::error_code& ec, size_t n_bytes){ if (auto sthis = wthis.lock()) - sthis->handle_response_header(ec); + sthis->handle_response_header(ec, n_bytes); }); } void -Request::handle_response_header(const asio::error_code& ec) +Request::handle_response_header(const asio::error_code& ec, size_t n_bytes) { if (ec && ec != asio::error::eof){ terminate(ec); @@ -934,16 +949,41 @@ Request::handle_response_header(const asio::error_code& ec) terminate(asio::error::not_connected); return; } + const auto& dat = conn_->read_bytes(n_bytes); if (logger_) - logger_->d("[http:request:%i] response headers received", id_); + logger_->d("[http:request:%i] read %zu bytes:\n\"%.*s\"", id_, n_bytes, dat.size(), dat.data()); + + std::cout << "\n " << std::hex + << std::setfill('0') << std::setw(2) << (unsigned)dat[dat.size() - 4] + << std::setfill('0') << std::setw(2) << (unsigned)dat[dat.size() - 3] + << std::setfill('0') << std::setw(2) << (unsigned)dat[dat.size() - 2] + << std::setfill('0') << std::setw(2) << (unsigned)dat[dat.size() - 1] + << std::endl; + // read the header - std::string header; + /*std::string header; std::string headers; while (std::getline(conn_->data(), header) && header != "\r"){ headers.append(header + "\n"); } headers.append("\n"); - parse_request(headers); + parse_request(headers);*/ + /*auto& dat = conn_->data(); + std::vector<uint8_t> indat;*/ + auto ret = http_parser_execute(parser_.get(), parser_s_.get(), dat.data(), dat.size()); + if (ret != dat.size()) { + if (logger_) + logger_->e("Error parsing HTTP: %zu %s %s", ret, + http_errno_name(HTTP_PARSER_ERRNO(parser_)), + http_errno_description(HTTP_PARSER_ERRNO(parser_))); + terminate(asio::error::invalid_argument); + } + logger_->d("[http:request:%i] read END", id_); +} + +void +Request::onHeadersComplete() { + notify_state_change(State::HEADER_RECEIVED); std::weak_ptr<Request> wthis = shared_from_this(); @@ -965,9 +1005,28 @@ Request::handle_response_header(const asio::error_code& ec) auto transfer_encoding_it = response_.headers.find(HTTP_HEADER_TRANSFER_ENCODING); // has content-length - if (content_length_it != response_.headers.end()) - { - unsigned int content_length = atoi(content_length_it->second.c_str()); + if (content_length_it != response_.headers.end()) { + size_t content_length = atoi(content_length_it->second.c_str()); + /*auto dat = conn_->read_bytes(content_length); + if (logger_) + logger_->d("[http:request:%i] response body: %zu/%zu bytes received:\n\"%.*s\"", id_, dat.size(), content_length, dat.size(), dat.data()); + if (not dat.empty()) { + if (dat.size() == content_length) + dat += HTTP_HEADER_DELIM; + http_parser_execute(parser_.get(), parser_s_.get(), dat.data(), dat.size()); + }*/ + //if (dat.size == ) + conn_->async_read(content_length, [wthis](const asio::error_code& ec, size_t bytes){ + if (auto sthis = wthis.lock()) + sthis->handle_response_header(ec, bytes); + }); + /*auto r = conn_->read_bytes(content_length); + std::cout << "Has content length " << content_length << std::endl; + std::cout << "Read content " << r.size() << std::endl; + if (r.size() == content_length) { + parse_request(r); + }*/ + /* response_.body.append(conn_->read_bytes(content_length)); // full body already in the header if (response_.body.size() + 1 == content_length) { @@ -979,16 +1038,16 @@ Request::handle_response_header(const asio::error_code& ec) else { // more chunks to come (don't add the missing \n from std::getline) conn_->async_read(content_length - response_.body.size(), [wthis](const asio::error_code& ec, size_t bytes){ if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); + sthis->handle_response_header(ec, bytes); }); - } + }*/ } // server wants to keep sending or we have content-length defined else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_KEEP_ALIVE) { conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){ if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); + sthis->handle_response_header(ec, bytes); }); } // server wants to close the connection @@ -1009,7 +1068,7 @@ Request::handle_response_header(const asio::error_code& ec) unsigned int content_length = std::stoul(chunk_size, nullptr, 16); conn_->async_read(content_length, [wthis](const asio::error_code& ec, size_t bytes){ if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); + sthis->handle_response_header(ec, bytes); }); } } diff --git a/tests/httptester.cpp b/tests/httptester.cpp index 3aef94695944890d1804a9acaa6578fbdc67a501..69f9195cc3cb5abf75c360d783ce252881599558 100644 --- a/tests/httptester.cpp +++ b/tests/httptester.cpp @@ -258,8 +258,7 @@ HttpTester::test_send_json() { CPPUNIT_ASSERT(resp_val["data"] == json["data"]); done = false; - url = "https://google.ca"; - request = std::make_shared<dht::http::Request>(serverProxy->io_context(), url, + request = std::make_shared<dht::http::Request>(serverProxy->io_context(), "http://google.ca", [&](const dht::http::Response& response){ logger->w("got answer: %.*s", response.body.size(), response.body.data()); status = response.status_code; @@ -267,9 +266,31 @@ HttpTester::test_send_json() { cv.notify_all(); }, logger); request->send(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&]{ return done; })); + //CPPUNIT_ASSERT(status == 200); + done = false; + request = std::make_shared<dht::http::Request>(serverProxy->io_context(), "https://google.ca", + [&](const dht::http::Response& response){ + logger->w("got answer: %.*s", response.body.size(), response.body.data()); + status = response.status_code; + done = true; + cv.notify_all(); + }, logger); + request->send(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&]{ return done; })); + //CPPUNIT_ASSERT(status == 200); + + done = false; + request = std::make_shared<dht::http::Request>(serverProxy->io_context(), "https://google.ca/sdbjklwGBIP", + [&](const dht::http::Response& response){ + logger->w("got answer: %.*s", response.body.size(), response.body.data()); + status = response.status_code; + done = true; + cv.notify_all(); + }, logger); + request->send(); CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(10), [&]{ return done; })); - CPPUNIT_ASSERT(status == 200); } } // namespace test