diff --git a/include/opendht/http.h b/include/opendht/http.h index 380802264440fa12eb5cdc5de34bd575c7ea8424..6656a42e31f970f216a2c2530e916024d497ff10 100644 --- a/include/opendht/http.h +++ b/include/opendht/http.h @@ -99,7 +99,7 @@ public: asio::streambuf& input(); std::istream& data() { return istream_; } - std::string read_bytes(size_t bytes); + std::string read_bytes(size_t bytes = 0); std::string read_until(const char delim); void async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb); @@ -108,11 +108,20 @@ public: void async_read_until(const char* delim, BytesHandlerCb cb); void async_read_until(char delim, BytesHandlerCb cb); void async_read(size_t bytes, BytesHandlerCb cb); + void async_read_some(size_t bytes, BytesHandlerCb cb); void timeout(const std::chrono::seconds timeout, HandlerCb cb = {}); void close(); private: + + template<typename T> + T wrapCallabck(T cb) const { + return [t=shared_from_this(),cb=std::move(cb)](auto ...params) { + cb(params...); + }; + } + unsigned int id_; static std::atomic_uint ids_; @@ -232,7 +241,7 @@ public: ~Request(); - unsigned int id() const; + inline unsigned int id() const { return id_; }; void set_connection(std::shared_ptr<Connection> connection); std::shared_ptr<Connection> get_connection() const; inline const Url& get_url() const { @@ -272,15 +281,10 @@ private: using OnCompleteCb = std::function<void()>; struct Callbacks { - Callbacks(){} - OnStatusCb on_status; OnDataCb on_header_field; OnDataCb on_header_value; OnDataCb on_body; - OnCompleteCb on_headers_complete; - OnCompleteCb on_message_complete; - OnStateChangeCb on_state_change; }; @@ -301,17 +305,11 @@ private: void post(); void handle_request(const asio::error_code& ec); + void handle_response(const asio::error_code& ec, size_t bytes); - void handle_response_header(const asio::error_code& ec); - - void handle_response_body(const asio::error_code& ec, size_t bytes); - - /** - * Parse the request with http_parser. - * Return how many bytes were parsed. - * Note: we pass requerst.size()==0 to signal that EOF has been received. - */ - size_t parse_request(const std::string& request); + void onHeadersComplete(); + void onBody(const char* at, size_t length); + void onComplete(); std::shared_ptr<dht::Logger> logger_; @@ -320,7 +318,6 @@ private: restinio::http_connection_header_t connection_type_ {restinio::http_connection_header_t::close}; std::string body_; - std::mutex cbs_mutex_; Callbacks cbs_; State state_; @@ -338,7 +335,6 @@ private: Response response_ {}; std::string request_; - std::atomic<bool> message_complete_ {false}; std::atomic<bool> finishing_ {false}; std::unique_ptr<http_parser> parser_; std::unique_ptr<http_parser_settings> parser_s_; diff --git a/src/http.cpp b/src/http.cpp index d34da523813d1bd0b79e72d22653052948de2a70..4f53f6152a462db0af850f6d281c98c30b730c84 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -29,15 +29,8 @@ namespace dht { namespace http { -constexpr char HTTP_HEADER_CONNECTION[] = "Connection"; -constexpr char HTTP_HEADER_CONNECTION_KEEP_ALIVE[] = "keep-alive"; -constexpr char HTTP_HEADER_CONNECTION_CLOSE[] = "close"; -constexpr char HTTP_HEADER_CONTENT_LENGTH[] = "Content-Length"; constexpr char HTTP_HEADER_CONTENT_TYPE_JSON[] = "application/json"; -constexpr char HTTP_HEADER_TRANSFER_ENCODING[] = "Transfer-Encoding"; -constexpr char HTTP_HEADER_TRANSFER_ENCODING_CHUNKED[] = "chunked"; constexpr char HTTP_HEADER_DELIM[] = "\r\n\r\n"; -constexpr char BODY_VALUE_DELIM = '\n'; Url::Url(const std::string& url): url(url) { @@ -130,8 +123,7 @@ Connection::Connection(asio::io_context& ctx, std::shared_ptr<dht::crypto::Certi ssl_socket_ = std::make_unique<ssl_socket_t>(ctx_, ssl_ctx_); } -Connection::~Connection() -{ +Connection::~Connection() { close(); } @@ -200,6 +192,8 @@ Connection::input() std::string Connection::read_bytes(size_t bytes) { + if (bytes == 0) + bytes = read_buf_.in_avail(); std::string content; content.resize(bytes); auto rb = istream_.readsome(&content[0], bytes); @@ -219,9 +213,9 @@ void Connection::async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb cb) { if (ssl_socket_) - asio::async_connect(ssl_socket_->lowest_layer(), std::move(endpoints), cb); + asio::async_connect(ssl_socket_->lowest_layer(), std::move(endpoints), wrapCallabck(std::move(cb))); else if (socket_) - asio::async_connect(*socket_, std::move(endpoints), cb); + asio::async_connect(*socket_, std::move(endpoints), wrapCallabck(std::move(cb))); else if (cb) cb(asio::error::operation_aborted, {}); } @@ -261,8 +255,8 @@ void Connection::async_write(BytesHandlerCb cb) { if (!is_open()) return; - if (ssl_socket_) asio::async_write(*ssl_socket_, write_buf_, cb); - else if (socket_) asio::async_write(*socket_, write_buf_, cb); + if (ssl_socket_) asio::async_write(*ssl_socket_, write_buf_, wrapCallabck(std::move(cb))); + else if (socket_) asio::async_write(*socket_, write_buf_, wrapCallabck(std::move(cb))); else if (cb) cb(asio::error::operation_aborted, 0); } @@ -270,8 +264,8 @@ void Connection::async_read_until(const char* delim, BytesHandlerCb cb) { if (!is_open()) return; - if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, cb); - else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, cb); + if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, wrapCallabck(std::move(cb))); + else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, wrapCallabck(std::move(cb))); else if (cb) cb(asio::error::operation_aborted, 0); } @@ -279,8 +273,8 @@ void Connection::async_read_until(char delim, BytesHandlerCb cb) { if (!is_open()) return; - if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, cb); - else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, cb); + if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, wrapCallabck(std::move(cb))); + else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, wrapCallabck(std::move(cb))); else if (cb) cb(asio::error::operation_aborted, 0); } @@ -288,11 +282,27 @@ void Connection::async_read(size_t bytes, BytesHandlerCb cb) { if (!is_open()) return; - if (ssl_socket_) asio::async_read(*ssl_socket_, read_buf_, asio::transfer_exactly(bytes), cb); - else if (socket_) asio::async_read(*socket_, read_buf_, asio::transfer_exactly(bytes), cb); + if (ssl_socket_) asio::async_read(*ssl_socket_, read_buf_, asio::transfer_exactly(bytes), wrapCallabck(std::move(cb))); + else if (socket_) asio::async_read(*socket_, read_buf_, asio::transfer_exactly(bytes), wrapCallabck(std::move(cb))); else if (cb) cb(asio::error::operation_aborted, 0); } +void +Connection::async_read_some(size_t bytes, BytesHandlerCb cb) +{ + if (!is_open()) { + if (cb) cb(asio::error::operation_aborted, 0); + return; + } + auto buf = read_buf_.prepare(bytes); + auto onEnd = [this_=shared_from_this(), cb=std::move(cb)](const asio::error_code& ec, size_t t){ + this_->read_buf_.commit(t); + cb(ec, t); + }; + if (ssl_socket_) ssl_socket_->async_read_some(buf, onEnd); + else socket_->async_read_some(buf, onEnd); +} + void Connection::timeout(const std::chrono::seconds timeout, HandlerCb cb) { @@ -438,6 +448,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu set_header_field(restinio::http_field_t::content_type, HTTP_HEADER_CONTENT_TYPE_JSON); set_header_field(restinio::http_field_t::accept, HTTP_HEADER_CONTENT_TYPE_JSON); Json::StreamWriterBuilder wbuilder; + set_method(restinio::http_method_post()); set_body(Json::writeString(wbuilder, json)); add_on_state_change_callback([this, jsoncb](State state, const Response& response){ if (state != Request::State::DONE) @@ -465,7 +476,8 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr< Request::Request(asio::io_context& ctx, const std::string& url, OnDoneCb onDone, std::shared_ptr<dht::Logger> logger) : logger_(logger), id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger)) { - add_on_state_change_callback([this, onDone](State state, const Response& response){ + init_default_headers(); + add_on_state_change_callback([onDone](State state, const Response& response){ if (state == Request::State::DONE) onDone(response); }); @@ -510,7 +522,8 @@ void Request::init_default_headers() { const auto& url = resolver_->get_url(); - set_header_field(restinio::http_field_t::host, url.host + ":" + url.service); + set_header_field(restinio::http_field_t::user_agent, "Mozilla/5.0"); + set_header_field(restinio::http_field_t::accept, "text/html"); set_target(url.target); } @@ -521,39 +534,28 @@ Request::cancel() c->close(); } -unsigned int -Request::id() const -{ - return id_; -} - void -Request::set_connection(std::shared_ptr<Connection> connection) -{ - conn_ = connection; +Request::set_connection(std::shared_ptr<Connection> connection) { + conn_ = std::move(connection); } std::shared_ptr<Connection> -Request::get_connection() const -{ +Request::get_connection() const { return conn_; } void -Request::set_certificate_authority(std::shared_ptr<dht::crypto::Certificate> certificate) -{ +Request::set_certificate_authority(std::shared_ptr<dht::crypto::Certificate> certificate) { server_ca_ = certificate; } void -Request::set_identity(const dht::crypto::Identity& identity) -{ +Request::set_identity(const dht::crypto::Identity& identity) { client_identity_ = identity; } void -Request::set_logger(std::shared_ptr<dht::Logger> logger) -{ +Request::set_logger(std::shared_ptr<dht::Logger> logger) { logger_ = logger; } @@ -564,32 +566,27 @@ Request::set_header(restinio::http_request_header_t header) } void -Request::set_method(restinio::http_method_id_t method) -{ +Request::set_method(restinio::http_method_id_t method) { header_.method(method); } void -Request::set_target(std::string target) -{ +Request::set_target(std::string target) { header_.request_target(std::move(target)); } void -Request::set_header_field(restinio::http_field_t field, std::string value) -{ +Request::set_header_field(restinio::http_field_t field, std::string value) { headers_[field] = std::move(value); } void -Request::set_connection_type(restinio::http_connection_header_t connection) -{ +Request::set_connection_type(restinio::http_connection_header_t connection) { connection_type_ = connection; } void -Request::set_body(std::string body) -{ +Request::set_body(std::string body) { body_ = std::move(body); } @@ -608,7 +605,7 @@ void Request::build() { std::stringstream request; - bool append_body = true; + bool append_body = !body_.empty(); // first header request << header_.method().c_str() << " " << header_.request_target() << " " << @@ -624,47 +621,41 @@ Request::build() // last connection header const char* conn_str = nullptr; switch (connection_type_){ + case restinio::http_connection_header_t::keep_alive: + conn_str = "keep-alive"; + break; case restinio::http_connection_header_t::upgrade: if (logger_) logger_->e("Unsupported connection type 'upgrade', fallback to 'close'"); // fallthrough case restinio::http_connection_header_t::close: - conn_str = "close"; - break; - case restinio::http_connection_header_t::keep_alive: - conn_str = "keep-alive"; + conn_str = "close"; // default break; } if (conn_str) request << "Connection: " << conn_str << "\r\n"; // body & content-length - if (!body_.empty()) - request << "Content-Length: " << body_.size() << "\r\n\r\n"; - // last delim - if (append_body) - request << body_ << "\r\n"; + if (append_body) { + request << "Content-Length: " << body_.size() << "\r\n\r\n" + << body_; + } else + request << "\r\n"; request_ = request.str(); } void -Request::add_on_status_callback(OnStatusCb cb) -{ - std::lock_guard<std::mutex> lock(cbs_mutex_); +Request::add_on_status_callback(OnStatusCb cb) { cbs_.on_status = std::move(cb); } void -Request::add_on_body_callback(OnDataCb cb) -{ - std::lock_guard<std::mutex> lock(cbs_mutex_); +Request::add_on_body_callback(OnDataCb cb) { cbs_.on_body = std::move(cb); } void -Request::add_on_state_change_callback(OnStateChangeCb cb) -{ - std::lock_guard<std::mutex> lock(cbs_mutex_); +Request::add_on_state_change_callback(OnStateChangeCb cb) { cbs_.on_state_change = std::move(cb); } @@ -687,38 +678,20 @@ Request::init_parser() if (!parser_s_) parser_s_ = std::make_unique<http_parser_settings>(); http_parser_settings_init(parser_s_.get()); - { - // user registered callbacks wrappers to store its data in the response - std::lock_guard<std::mutex> lock(cbs_mutex_); - cbs_.on_status = [this, statusCb = std::move(cbs_.on_status)](unsigned int status_code){ - response_.status_code = status_code; - if (statusCb) - statusCb(status_code); - }; - auto header_field = std::make_shared<std::string>(); - cbs_.on_header_field = [header_field, headerFieldCb = std::move(cbs_.on_header_field)](const char* at, size_t length) { - *header_field = std::string(at, length); - if (headerFieldCb) - headerFieldCb(at, length); - }; - cbs_.on_header_value = [this, header_field, headerValueCb = std::move(cbs_.on_header_value)](const char* at, size_t length) { - response_.headers[*header_field] = std::string(at, length); - if (headerValueCb) - headerValueCb(at, length); - }; - cbs_.on_headers_complete = [this](){ - notify_state_change(State::HEADER_RECEIVED); - }; - cbs_.on_body = [bodyCb = std::move(cbs_.on_body)](const char* at, size_t length) { - if (bodyCb) - bodyCb(at, length); - }; - cbs_.on_message_complete = [this](){ - if (logger_) - logger_->d("[http:request:%i] response: message complete", id_); - message_complete_.store(true); - }; - } + + cbs_.on_status = [this, statusCb = std::move(cbs_.on_status)](unsigned int status_code){ + response_.status_code = status_code; + if (statusCb) + statusCb(status_code); + }; + auto header_field = std::make_shared<std::string>(); + cbs_.on_header_field = [header_field](const char* at, size_t length) { + *header_field = std::string(at, length); + }; + cbs_.on_header_value = [this, header_field](const char* at, size_t length) { + response_.headers[*header_field] = std::string(at, length); + }; + // http_parser raw c callback (note: no context can be passed into them) parser_s_->on_status = [](http_parser* parser, const char* /*at*/, size_t /*length*/) -> int { static_cast<Request*>(parser->data)->cbs_.on_status(parser->status_code); @@ -733,15 +706,15 @@ Request::init_parser() return 0; }; parser_s_->on_body = [](http_parser* parser, const char* at, size_t length) -> int { - static_cast<Request*>(parser->data)->cbs_.on_body(at, length); + static_cast<Request*>(parser->data)->onBody(at, length); return 0; }; parser_s_->on_headers_complete = [](http_parser* parser) -> int { - static_cast<Request*>(parser->data)->cbs_.on_headers_complete(); + static_cast<Request*>(parser->data)->onHeadersComplete(); return 0; }; parser_s_->on_message_complete = [](http_parser* parser) -> int { - static_cast<Request*>(parser->data)->cbs_.on_message_complete(); + static_cast<Request*>(parser->data)->onComplete(); return 0; }; } @@ -759,7 +732,7 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) if (logger_){ std::string eps = ""; for (const auto& endpoint : endpoints) - eps.append(endpoint.address().to_string() + " "); + eps.append(endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + " "); logger_->d("[http:request:%i] connect begin: %s", id_, eps.c_str()); } if (get_url().protocol == "https"){ @@ -789,7 +762,15 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) if (this_.logger_) this_.logger_->d("[http:request:%i] connect success", this_.id_); - if (this_.get_url().protocol == "https"){ + const auto& url = this_.get_url(); + auto port = endpoint.port(); + if ((url.protocol == "http" && port == (in_port_t)80) + || (url.protocol == "https" && port == (in_port_t)443)) + this_.set_header_field(restinio::http_field_t::host, url.host); + else + this_.set_header_field(restinio::http_field_t::host, url.host + ":" + std::to_string(port)); + + if (url.protocol == "https") { if (this_.server_ca_) this_.conn_->set_ssl_verification(endpoint, asio::ssl::verify_peer | asio::ssl::verify_fail_if_no_peer_cert); @@ -831,7 +812,7 @@ Request::send() this_.logger_->e("[http:request:%i] resolve error: %s", this_.id_, ec.message().c_str()); this_.terminate(asio::error::connection_aborted); } - else if (!this_.conn_ or !this_.conn_->is_open()){ + else if (!this_.conn_ or !this_.conn_->is_open()) { this_.connect(std::move(endpoints), [wthis](const asio::error_code &ec) { if (auto sthis = wthis.lock()) { if (ec) @@ -857,10 +838,9 @@ Request::post() build(); init_parser(); - if (logger_){ - std::string header; std::getline(std::istringstream(request_), header); - logger_->d("[http:request:%i] send: %s", id_, header.c_str()); - } + if (logger_) + logger_->d("[http:request:%i] sending %zu bytes", id_, request_.size()); + // write the request to buffer std::ostream request_stream(&conn_->input()); request_stream << request_; @@ -881,20 +861,16 @@ Request::terminate(const asio::error_code& ec) if (finishing_.exchange(true)) return; - if (ec != asio::error::eof and ec != asio::error::operation_aborted and logger_) - logger_->e("[http:request:%i] end with error: %s", id_, ec.message().c_str()); - - // set response outcome, ignore end of file and abort - if ((!ec or ec == asio::error::eof) and parser_) - response_.status_code = parser_->status_code; - else - response_.status_code = 0; - response_.aborted = ec == asio::error::operation_aborted; - if (logger_) - logger_->d("[http:request:%i] done with status code %u", id_, response_.status_code); - if (connection_type_ != restinio::http_connection_header_t::keep_alive) + if (logger_) { + if (ec and ec != asio::error::eof and ec != asio::error::operation_aborted) + logger_->e("[http:request:%i] end with error: %s", id_, ec.message().c_str()); + else + logger_->d("[http:request:%i] done with status code %u", id_, response_.status_code); + } + + if (!parser_ or !http_should_keep_alive(parser_.get())) if (auto c = conn_) c->close(); notify_state_change(State::DONE); @@ -917,204 +893,76 @@ Request::handle_request(const asio::error_code& ec) notify_state_change(State::RECEIVING); std::weak_ptr<Request> wthis = shared_from_this(); - conn_->async_read_until(HTTP_HEADER_DELIM, [wthis](const asio::error_code& ec, size_t){ + conn_->async_read_until(HTTP_HEADER_DELIM, [wthis](const asio::error_code& ec, size_t n_bytes){ if (auto sthis = wthis.lock()) - sthis->handle_response_header(ec); + sthis->handle_response(ec, n_bytes); }); } void -Request::handle_response_header(const asio::error_code& ec) +Request::handle_response(const asio::error_code& ec, size_t n_bytes) { if (ec && ec != asio::error::eof){ terminate(ec); return; } - if (!conn_->is_open()){ - terminate(asio::error::not_connected); - return; - } - if (logger_) - logger_->d("[http:request:%i] response headers received", id_); - // read the header - std::string header; - std::string headers; - while (std::getline(conn_->data(), header) && header != "\r"){ - headers.append(header + "\n"); - } - headers.append("\n"); - parse_request(headers); - - std::weak_ptr<Request> wthis = shared_from_this(); - - auto expect_it = headers_.find(restinio::http_field_t::expect); - if (expect_it != headers_.end() and (expect_it->second == "100-continue") and response_.status_code != 200){ - notify_state_change(State::SENDING); - request_.append(body_); - std::ostream request_stream(&conn_->input()); - request_stream << body_ << "\r\n"; - conn_->async_write([wthis](const asio::error_code& ec, size_t) { - if (auto sthis = wthis.lock()) - sthis->handle_request(ec); - }); + auto request = (ec == asio::error::eof) ? std::string{} : conn_->read_bytes(); + size_t ret = http_parser_execute(parser_.get(), parser_s_.get(), request.c_str(), request.size()); + if (ret != request.size()) { + if (logger_) + logger_->e("Error parsing HTTP: %zu %s %s", ret, + http_errno_name(HTTP_PARSER_ERRNO(parser_)), + http_errno_description(HTTP_PARSER_ERRNO(parser_))); + terminate(asio::error::basic_errors::broken_pipe); return; } - auto connection_it = response_.headers.find(HTTP_HEADER_CONNECTION); - auto content_length_it = response_.headers.find(HTTP_HEADER_CONTENT_LENGTH); - auto transfer_encoding_it = response_.headers.find(HTTP_HEADER_TRANSFER_ENCODING); + if (state_ != State::DONE and parser_ and not http_body_is_final(parser_.get())) { + if (auto toRead = parser_->content_length ? std::min<uint64_t>(parser_->content_length, 64 * 1024) : 64 * 1024) { + if (logger_) + logger_->d("[http:request:%i] read more %llu", id_, toRead); - // has content-length - if (content_length_it != response_.headers.end()) - { - unsigned int content_length = atoi(content_length_it->second.c_str()); - response_.body.append(conn_->read_bytes(content_length)); - // full body already in the header - if (response_.body.size() + 1 == content_length) { - response_.body.append("\n"); - parse_request(response_.body); - if (message_complete_.load()) - terminate(asio::error::eof); - } - else { // more chunks to come (don't add the missing \n from std::getline) - conn_->async_read(content_length - response_.body.size(), [wthis](const asio::error_code& ec, size_t bytes){ + std::weak_ptr<Request> wthis = shared_from_this(); + conn_->async_read_some(toRead, [wthis](const asio::error_code& ec, size_t bytes){ if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); + sthis->handle_response(ec, bytes); }); } } - // server wants to keep sending or we have content-length defined - else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_KEEP_ALIVE) - { - conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){ - if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); - }); - } - // server wants to close the connection - else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_CLOSE) - { - terminate(asio::error::eof); - } - // client wants to close the connection - else if (connection_type_ == restinio::http_connection_header_t::close) - { - terminate(asio::error::eof); - } - else if (transfer_encoding_it != response_.headers.end() and - transfer_encoding_it->second == HTTP_HEADER_TRANSFER_ENCODING_CHUNKED) - { - std::string chunk_size; - std::getline(conn_->data(), chunk_size); - unsigned int content_length = std::stoul(chunk_size, nullptr, 16); - conn_->async_read(content_length, [wthis](const asio::error_code& ec, size_t bytes){ - if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); - }); - } } void -Request::handle_response_body(const asio::error_code& ec, size_t bytes) +Request::onBody(const char* at, size_t length) { - if (ec && ec != asio::error::eof){ - terminate(ec); - return; - } - if (!conn_->is_open()){ - terminate(asio::error::not_connected); - return; - } - if (logger_) - logger_->d("[http:request:%i] response body: %i bytes received", id_, bytes); - - if (bytes == 0){ - terminate(asio::error::eof); - return; - } + if (cbs_.on_body) + cbs_.on_body(at, length); + else + response_.body.insert(response_.body.end(), at, at+length); +} - // avoid creating non-existant headers by accessing the headers map without the presence of key - auto connection_it = response_.headers.find(HTTP_HEADER_CONNECTION); - auto content_length_it = response_.headers.find(HTTP_HEADER_CONTENT_LENGTH); - auto transfer_encoding_it = response_.headers.find(HTTP_HEADER_TRANSFER_ENCODING); - - // read the content-length body - unsigned int content_length; - if (content_length_it != response_.headers.end() and !response_.body.empty()){ - // extract the content-length - content_length = atoi(content_length_it->second.c_str()); - response_.body.append(conn_->read_bytes(bytes)); - // check if fully parsed - if (response_.body.size() == content_length) - parse_request(response_.body); - } - // read and parse the chunked encoding fragment - else { - auto body = conn_->read_until(BODY_VALUE_DELIM); - response_.body += body; - if (body == "0\r\n"){ - parse_request(response_.body); - terminate(asio::error::eof); - return; - } - parse_request(body + '\n'); - } +void +Request::onComplete() { + terminate(asio::error::eof); +} - std::weak_ptr<Request> wthis = shared_from_this(); +void +Request::onHeadersComplete() { + notify_state_change(State::HEADER_RECEIVED); - // should be executed after each parse_request who can trigger http_parser on_message_complete - if (message_complete_.load()){ - terminate(asio::error::eof); - } - // has content-length - else if (content_length_it != response_.headers.end() and response_.body.size() != content_length) - { - conn_->async_read(content_length - response_.body.size(), [wthis](const asio::error_code& ec, size_t bytes){ - if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); - }); - } - // server wants to keep sending - else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_KEEP_ALIVE) - { - conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){ - if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); - }); - } - // server wants to close the connection - else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_CLOSE) - { - terminate(asio::error::eof); - } - // client wants to close the connection - else if (connection_type_ == restinio::http_connection_header_t::close) - { - terminate(asio::error::eof); - } - else if (transfer_encoding_it != response_.headers.end() and - transfer_encoding_it->second == HTTP_HEADER_TRANSFER_ENCODING_CHUNKED) - { - std::string chunk_size; - std::getline(conn_->data(), chunk_size); - if (chunk_size.size() == 0){ - parse_request(response_.body); - terminate(asio::error::eof); - } - else - conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){ + auto expect_it = headers_.find(restinio::http_field_t::expect); + if (expect_it != headers_.end() and (expect_it->second == "100-continue") and response_.status_code != 200){ + notify_state_change(State::SENDING); + request_.append(body_); + std::ostream request_stream(&conn_->input()); + request_stream << body_ << "\r\n"; + std::weak_ptr<Request> wthis = shared_from_this(); + conn_->async_write([wthis](const asio::error_code& ec, size_t) { if (auto sthis = wthis.lock()) - sthis->handle_response_body(ec, bytes); + sthis->handle_request(ec); }); + return; } } -size_t -Request::parse_request(const std::string& request) -{ - std::lock_guard<std::mutex> lock(cbs_mutex_); - return http_parser_execute(parser_.get(), parser_s_.get(), request.c_str(), request.size()); -} - } // namespace http } // namespace dht