Skip to content
Snippets Groups Projects
Commit 5b87090c authored by Adrien Béraud's avatar Adrien Béraud
Browse files

http: use http_parser parsing logic

parent fffee634
Branches
Tags
No related merge requests found
...@@ -99,7 +99,7 @@ public: ...@@ -99,7 +99,7 @@ public:
asio::streambuf& input(); asio::streambuf& input();
std::istream& data() { return istream_; } std::istream& data() { return istream_; }
std::string read_bytes(size_t bytes); std::string read_bytes(size_t bytes = 0);
std::string read_until(const char delim); std::string read_until(const char delim);
void async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb); void async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb);
...@@ -108,11 +108,20 @@ public: ...@@ -108,11 +108,20 @@ public:
void async_read_until(const char* delim, BytesHandlerCb cb); void async_read_until(const char* delim, BytesHandlerCb cb);
void async_read_until(char delim, BytesHandlerCb cb); void async_read_until(char delim, BytesHandlerCb cb);
void async_read(size_t bytes, BytesHandlerCb cb); void async_read(size_t bytes, BytesHandlerCb cb);
void async_read_some(size_t bytes, BytesHandlerCb cb);
void timeout(const std::chrono::seconds timeout, HandlerCb cb = {}); void timeout(const std::chrono::seconds timeout, HandlerCb cb = {});
void close(); void close();
private: private:
template<typename T>
T wrapCallabck(T cb) const {
return [t=shared_from_this(),cb=std::move(cb)](auto ...params) {
cb(params...);
};
}
unsigned int id_; unsigned int id_;
static std::atomic_uint ids_; static std::atomic_uint ids_;
...@@ -232,7 +241,7 @@ public: ...@@ -232,7 +241,7 @@ public:
~Request(); ~Request();
unsigned int id() const; inline unsigned int id() const { return id_; };
void set_connection(std::shared_ptr<Connection> connection); void set_connection(std::shared_ptr<Connection> connection);
std::shared_ptr<Connection> get_connection() const; std::shared_ptr<Connection> get_connection() const;
inline const Url& get_url() const { inline const Url& get_url() const {
...@@ -272,15 +281,10 @@ private: ...@@ -272,15 +281,10 @@ private:
using OnCompleteCb = std::function<void()>; using OnCompleteCb = std::function<void()>;
struct Callbacks { struct Callbacks {
Callbacks(){}
OnStatusCb on_status; OnStatusCb on_status;
OnDataCb on_header_field; OnDataCb on_header_field;
OnDataCb on_header_value; OnDataCb on_header_value;
OnDataCb on_body; OnDataCb on_body;
OnCompleteCb on_headers_complete;
OnCompleteCb on_message_complete;
OnStateChangeCb on_state_change; OnStateChangeCb on_state_change;
}; };
...@@ -301,17 +305,11 @@ private: ...@@ -301,17 +305,11 @@ private:
void post(); void post();
void handle_request(const asio::error_code& ec); void handle_request(const asio::error_code& ec);
void handle_response(const asio::error_code& ec, size_t bytes);
void handle_response_header(const asio::error_code& ec); void onHeadersComplete();
void onBody(const char* at, size_t length);
void handle_response_body(const asio::error_code& ec, size_t bytes); void onComplete();
/**
* Parse the request with http_parser.
* Return how many bytes were parsed.
* Note: we pass requerst.size()==0 to signal that EOF has been received.
*/
size_t parse_request(const std::string& request);
std::shared_ptr<dht::Logger> logger_; std::shared_ptr<dht::Logger> logger_;
...@@ -320,7 +318,6 @@ private: ...@@ -320,7 +318,6 @@ private:
restinio::http_connection_header_t connection_type_ {restinio::http_connection_header_t::close}; restinio::http_connection_header_t connection_type_ {restinio::http_connection_header_t::close};
std::string body_; std::string body_;
std::mutex cbs_mutex_;
Callbacks cbs_; Callbacks cbs_;
State state_; State state_;
...@@ -338,7 +335,6 @@ private: ...@@ -338,7 +335,6 @@ private:
Response response_ {}; Response response_ {};
std::string request_; std::string request_;
std::atomic<bool> message_complete_ {false};
std::atomic<bool> finishing_ {false}; std::atomic<bool> finishing_ {false};
std::unique_ptr<http_parser> parser_; std::unique_ptr<http_parser> parser_;
std::unique_ptr<http_parser_settings> parser_s_; std::unique_ptr<http_parser_settings> parser_s_;
......
...@@ -29,15 +29,8 @@ ...@@ -29,15 +29,8 @@
namespace dht { namespace dht {
namespace http { namespace http {
constexpr char HTTP_HEADER_CONNECTION[] = "Connection";
constexpr char HTTP_HEADER_CONNECTION_KEEP_ALIVE[] = "keep-alive";
constexpr char HTTP_HEADER_CONNECTION_CLOSE[] = "close";
constexpr char HTTP_HEADER_CONTENT_LENGTH[] = "Content-Length";
constexpr char HTTP_HEADER_CONTENT_TYPE_JSON[] = "application/json"; constexpr char HTTP_HEADER_CONTENT_TYPE_JSON[] = "application/json";
constexpr char HTTP_HEADER_TRANSFER_ENCODING[] = "Transfer-Encoding";
constexpr char HTTP_HEADER_TRANSFER_ENCODING_CHUNKED[] = "chunked";
constexpr char HTTP_HEADER_DELIM[] = "\r\n\r\n"; constexpr char HTTP_HEADER_DELIM[] = "\r\n\r\n";
constexpr char BODY_VALUE_DELIM = '\n';
Url::Url(const std::string& url): url(url) Url::Url(const std::string& url): url(url)
{ {
...@@ -130,8 +123,7 @@ Connection::Connection(asio::io_context& ctx, std::shared_ptr<dht::crypto::Certi ...@@ -130,8 +123,7 @@ Connection::Connection(asio::io_context& ctx, std::shared_ptr<dht::crypto::Certi
ssl_socket_ = std::make_unique<ssl_socket_t>(ctx_, ssl_ctx_); ssl_socket_ = std::make_unique<ssl_socket_t>(ctx_, ssl_ctx_);
} }
Connection::~Connection() Connection::~Connection() {
{
close(); close();
} }
...@@ -200,6 +192,8 @@ Connection::input() ...@@ -200,6 +192,8 @@ Connection::input()
std::string std::string
Connection::read_bytes(size_t bytes) Connection::read_bytes(size_t bytes)
{ {
if (bytes == 0)
bytes = read_buf_.in_avail();
std::string content; std::string content;
content.resize(bytes); content.resize(bytes);
auto rb = istream_.readsome(&content[0], bytes); auto rb = istream_.readsome(&content[0], bytes);
...@@ -219,9 +213,9 @@ void ...@@ -219,9 +213,9 @@ void
Connection::async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb cb) Connection::async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb cb)
{ {
if (ssl_socket_) if (ssl_socket_)
asio::async_connect(ssl_socket_->lowest_layer(), std::move(endpoints), cb); asio::async_connect(ssl_socket_->lowest_layer(), std::move(endpoints), wrapCallabck(std::move(cb)));
else if (socket_) else if (socket_)
asio::async_connect(*socket_, std::move(endpoints), cb); asio::async_connect(*socket_, std::move(endpoints), wrapCallabck(std::move(cb)));
else if (cb) else if (cb)
cb(asio::error::operation_aborted, {}); cb(asio::error::operation_aborted, {});
} }
...@@ -261,8 +255,8 @@ void ...@@ -261,8 +255,8 @@ void
Connection::async_write(BytesHandlerCb cb) Connection::async_write(BytesHandlerCb cb)
{ {
if (!is_open()) return; if (!is_open()) return;
if (ssl_socket_) asio::async_write(*ssl_socket_, write_buf_, cb); if (ssl_socket_) asio::async_write(*ssl_socket_, write_buf_, wrapCallabck(std::move(cb)));
else if (socket_) asio::async_write(*socket_, write_buf_, cb); else if (socket_) asio::async_write(*socket_, write_buf_, wrapCallabck(std::move(cb)));
else if (cb) cb(asio::error::operation_aborted, 0); else if (cb) cb(asio::error::operation_aborted, 0);
} }
...@@ -270,8 +264,8 @@ void ...@@ -270,8 +264,8 @@ void
Connection::async_read_until(const char* delim, BytesHandlerCb cb) Connection::async_read_until(const char* delim, BytesHandlerCb cb)
{ {
if (!is_open()) return; if (!is_open()) return;
if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, cb); if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, wrapCallabck(std::move(cb)));
else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, cb); else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, wrapCallabck(std::move(cb)));
else if (cb) cb(asio::error::operation_aborted, 0); else if (cb) cb(asio::error::operation_aborted, 0);
} }
...@@ -279,8 +273,8 @@ void ...@@ -279,8 +273,8 @@ void
Connection::async_read_until(char delim, BytesHandlerCb cb) Connection::async_read_until(char delim, BytesHandlerCb cb)
{ {
if (!is_open()) return; if (!is_open()) return;
if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, cb); if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, wrapCallabck(std::move(cb)));
else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, cb); else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, wrapCallabck(std::move(cb)));
else if (cb) cb(asio::error::operation_aborted, 0); else if (cb) cb(asio::error::operation_aborted, 0);
} }
...@@ -288,11 +282,27 @@ void ...@@ -288,11 +282,27 @@ void
Connection::async_read(size_t bytes, BytesHandlerCb cb) Connection::async_read(size_t bytes, BytesHandlerCb cb)
{ {
if (!is_open()) return; if (!is_open()) return;
if (ssl_socket_) asio::async_read(*ssl_socket_, read_buf_, asio::transfer_exactly(bytes), cb); if (ssl_socket_) asio::async_read(*ssl_socket_, read_buf_, asio::transfer_exactly(bytes), wrapCallabck(std::move(cb)));
else if (socket_) asio::async_read(*socket_, read_buf_, asio::transfer_exactly(bytes), cb); else if (socket_) asio::async_read(*socket_, read_buf_, asio::transfer_exactly(bytes), wrapCallabck(std::move(cb)));
else if (cb) cb(asio::error::operation_aborted, 0); else if (cb) cb(asio::error::operation_aborted, 0);
} }
void
Connection::async_read_some(size_t bytes, BytesHandlerCb cb)
{
if (!is_open()) {
if (cb) cb(asio::error::operation_aborted, 0);
return;
}
auto buf = read_buf_.prepare(bytes);
auto onEnd = [this_=shared_from_this(), cb=std::move(cb)](const asio::error_code& ec, size_t t){
this_->read_buf_.commit(t);
cb(ec, t);
};
if (ssl_socket_) ssl_socket_->async_read_some(buf, onEnd);
else socket_->async_read_some(buf, onEnd);
}
void void
Connection::timeout(const std::chrono::seconds timeout, HandlerCb cb) Connection::timeout(const std::chrono::seconds timeout, HandlerCb cb)
{ {
...@@ -438,6 +448,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu ...@@ -438,6 +448,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu
set_header_field(restinio::http_field_t::content_type, HTTP_HEADER_CONTENT_TYPE_JSON); set_header_field(restinio::http_field_t::content_type, HTTP_HEADER_CONTENT_TYPE_JSON);
set_header_field(restinio::http_field_t::accept, HTTP_HEADER_CONTENT_TYPE_JSON); set_header_field(restinio::http_field_t::accept, HTTP_HEADER_CONTENT_TYPE_JSON);
Json::StreamWriterBuilder wbuilder; Json::StreamWriterBuilder wbuilder;
set_method(restinio::http_method_post());
set_body(Json::writeString(wbuilder, json)); set_body(Json::writeString(wbuilder, json));
add_on_state_change_callback([this, jsoncb](State state, const Response& response){ add_on_state_change_callback([this, jsoncb](State state, const Response& response){
if (state != Request::State::DONE) if (state != Request::State::DONE)
...@@ -465,7 +476,8 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr< ...@@ -465,7 +476,8 @@ Request::Request(asio::io_context& ctx, const std::string& url, std::shared_ptr<
Request::Request(asio::io_context& ctx, const std::string& url, OnDoneCb onDone, std::shared_ptr<dht::Logger> logger) Request::Request(asio::io_context& ctx, const std::string& url, OnDoneCb onDone, std::shared_ptr<dht::Logger> logger)
: logger_(logger), id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger)) : logger_(logger), id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger))
{ {
add_on_state_change_callback([this, onDone](State state, const Response& response){ init_default_headers();
add_on_state_change_callback([onDone](State state, const Response& response){
if (state == Request::State::DONE) if (state == Request::State::DONE)
onDone(response); onDone(response);
}); });
...@@ -510,7 +522,8 @@ void ...@@ -510,7 +522,8 @@ void
Request::init_default_headers() Request::init_default_headers()
{ {
const auto& url = resolver_->get_url(); const auto& url = resolver_->get_url();
set_header_field(restinio::http_field_t::host, url.host + ":" + url.service); set_header_field(restinio::http_field_t::user_agent, "Mozilla/5.0");
set_header_field(restinio::http_field_t::accept, "text/html");
set_target(url.target); set_target(url.target);
} }
...@@ -521,39 +534,28 @@ Request::cancel() ...@@ -521,39 +534,28 @@ Request::cancel()
c->close(); c->close();
} }
unsigned int
Request::id() const
{
return id_;
}
void void
Request::set_connection(std::shared_ptr<Connection> connection) Request::set_connection(std::shared_ptr<Connection> connection) {
{ conn_ = std::move(connection);
conn_ = connection;
} }
std::shared_ptr<Connection> std::shared_ptr<Connection>
Request::get_connection() const Request::get_connection() const {
{
return conn_; return conn_;
} }
void void
Request::set_certificate_authority(std::shared_ptr<dht::crypto::Certificate> certificate) Request::set_certificate_authority(std::shared_ptr<dht::crypto::Certificate> certificate) {
{
server_ca_ = certificate; server_ca_ = certificate;
} }
void void
Request::set_identity(const dht::crypto::Identity& identity) Request::set_identity(const dht::crypto::Identity& identity) {
{
client_identity_ = identity; client_identity_ = identity;
} }
void void
Request::set_logger(std::shared_ptr<dht::Logger> logger) Request::set_logger(std::shared_ptr<dht::Logger> logger) {
{
logger_ = logger; logger_ = logger;
} }
...@@ -564,32 +566,27 @@ Request::set_header(restinio::http_request_header_t header) ...@@ -564,32 +566,27 @@ Request::set_header(restinio::http_request_header_t header)
} }
void void
Request::set_method(restinio::http_method_id_t method) Request::set_method(restinio::http_method_id_t method) {
{
header_.method(method); header_.method(method);
} }
void void
Request::set_target(std::string target) Request::set_target(std::string target) {
{
header_.request_target(std::move(target)); header_.request_target(std::move(target));
} }
void void
Request::set_header_field(restinio::http_field_t field, std::string value) Request::set_header_field(restinio::http_field_t field, std::string value) {
{
headers_[field] = std::move(value); headers_[field] = std::move(value);
} }
void void
Request::set_connection_type(restinio::http_connection_header_t connection) Request::set_connection_type(restinio::http_connection_header_t connection) {
{
connection_type_ = connection; connection_type_ = connection;
} }
void void
Request::set_body(std::string body) Request::set_body(std::string body) {
{
body_ = std::move(body); body_ = std::move(body);
} }
...@@ -608,7 +605,7 @@ void ...@@ -608,7 +605,7 @@ void
Request::build() Request::build()
{ {
std::stringstream request; std::stringstream request;
bool append_body = true; bool append_body = !body_.empty();
// first header // first header
request << header_.method().c_str() << " " << header_.request_target() << " " << request << header_.method().c_str() << " " << header_.request_target() << " " <<
...@@ -624,47 +621,41 @@ Request::build() ...@@ -624,47 +621,41 @@ Request::build()
// last connection header // last connection header
const char* conn_str = nullptr; const char* conn_str = nullptr;
switch (connection_type_){ switch (connection_type_){
case restinio::http_connection_header_t::keep_alive:
conn_str = "keep-alive";
break;
case restinio::http_connection_header_t::upgrade: case restinio::http_connection_header_t::upgrade:
if (logger_) if (logger_)
logger_->e("Unsupported connection type 'upgrade', fallback to 'close'"); logger_->e("Unsupported connection type 'upgrade', fallback to 'close'");
// fallthrough // fallthrough
case restinio::http_connection_header_t::close: case restinio::http_connection_header_t::close:
conn_str = "close"; conn_str = "close"; // default
break;
case restinio::http_connection_header_t::keep_alive:
conn_str = "keep-alive";
break; break;
} }
if (conn_str) if (conn_str)
request << "Connection: " << conn_str << "\r\n"; request << "Connection: " << conn_str << "\r\n";
// body & content-length // body & content-length
if (!body_.empty()) if (append_body) {
request << "Content-Length: " << body_.size() << "\r\n\r\n"; request << "Content-Length: " << body_.size() << "\r\n\r\n"
// last delim << body_;
if (append_body) } else
request << body_ << "\r\n"; request << "\r\n";
request_ = request.str(); request_ = request.str();
} }
void void
Request::add_on_status_callback(OnStatusCb cb) Request::add_on_status_callback(OnStatusCb cb) {
{
std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_.on_status = std::move(cb); cbs_.on_status = std::move(cb);
} }
void void
Request::add_on_body_callback(OnDataCb cb) Request::add_on_body_callback(OnDataCb cb) {
{
std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_.on_body = std::move(cb); cbs_.on_body = std::move(cb);
} }
void void
Request::add_on_state_change_callback(OnStateChangeCb cb) Request::add_on_state_change_callback(OnStateChangeCb cb) {
{
std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_.on_state_change = std::move(cb); cbs_.on_state_change = std::move(cb);
} }
...@@ -687,38 +678,20 @@ Request::init_parser() ...@@ -687,38 +678,20 @@ Request::init_parser()
if (!parser_s_) if (!parser_s_)
parser_s_ = std::make_unique<http_parser_settings>(); parser_s_ = std::make_unique<http_parser_settings>();
http_parser_settings_init(parser_s_.get()); http_parser_settings_init(parser_s_.get());
{
// user registered callbacks wrappers to store its data in the response
std::lock_guard<std::mutex> lock(cbs_mutex_);
cbs_.on_status = [this, statusCb = std::move(cbs_.on_status)](unsigned int status_code){ cbs_.on_status = [this, statusCb = std::move(cbs_.on_status)](unsigned int status_code){
response_.status_code = status_code; response_.status_code = status_code;
if (statusCb) if (statusCb)
statusCb(status_code); statusCb(status_code);
}; };
auto header_field = std::make_shared<std::string>(); auto header_field = std::make_shared<std::string>();
cbs_.on_header_field = [header_field, headerFieldCb = std::move(cbs_.on_header_field)](const char* at, size_t length) { cbs_.on_header_field = [header_field](const char* at, size_t length) {
*header_field = std::string(at, length); *header_field = std::string(at, length);
if (headerFieldCb)
headerFieldCb(at, length);
}; };
cbs_.on_header_value = [this, header_field, headerValueCb = std::move(cbs_.on_header_value)](const char* at, size_t length) { cbs_.on_header_value = [this, header_field](const char* at, size_t length) {
response_.headers[*header_field] = std::string(at, length); response_.headers[*header_field] = std::string(at, length);
if (headerValueCb)
headerValueCb(at, length);
};
cbs_.on_headers_complete = [this](){
notify_state_change(State::HEADER_RECEIVED);
};
cbs_.on_body = [bodyCb = std::move(cbs_.on_body)](const char* at, size_t length) {
if (bodyCb)
bodyCb(at, length);
}; };
cbs_.on_message_complete = [this](){
if (logger_)
logger_->d("[http:request:%i] response: message complete", id_);
message_complete_.store(true);
};
}
// http_parser raw c callback (note: no context can be passed into them) // http_parser raw c callback (note: no context can be passed into them)
parser_s_->on_status = [](http_parser* parser, const char* /*at*/, size_t /*length*/) -> int { parser_s_->on_status = [](http_parser* parser, const char* /*at*/, size_t /*length*/) -> int {
static_cast<Request*>(parser->data)->cbs_.on_status(parser->status_code); static_cast<Request*>(parser->data)->cbs_.on_status(parser->status_code);
...@@ -733,15 +706,15 @@ Request::init_parser() ...@@ -733,15 +706,15 @@ Request::init_parser()
return 0; return 0;
}; };
parser_s_->on_body = [](http_parser* parser, const char* at, size_t length) -> int { parser_s_->on_body = [](http_parser* parser, const char* at, size_t length) -> int {
static_cast<Request*>(parser->data)->cbs_.on_body(at, length); static_cast<Request*>(parser->data)->onBody(at, length);
return 0; return 0;
}; };
parser_s_->on_headers_complete = [](http_parser* parser) -> int { parser_s_->on_headers_complete = [](http_parser* parser) -> int {
static_cast<Request*>(parser->data)->cbs_.on_headers_complete(); static_cast<Request*>(parser->data)->onHeadersComplete();
return 0; return 0;
}; };
parser_s_->on_message_complete = [](http_parser* parser) -> int { parser_s_->on_message_complete = [](http_parser* parser) -> int {
static_cast<Request*>(parser->data)->cbs_.on_message_complete(); static_cast<Request*>(parser->data)->onComplete();
return 0; return 0;
}; };
} }
...@@ -759,7 +732,7 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) ...@@ -759,7 +732,7 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb)
if (logger_){ if (logger_){
std::string eps = ""; std::string eps = "";
for (const auto& endpoint : endpoints) for (const auto& endpoint : endpoints)
eps.append(endpoint.address().to_string() + " "); eps.append(endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + " ");
logger_->d("[http:request:%i] connect begin: %s", id_, eps.c_str()); logger_->d("[http:request:%i] connect begin: %s", id_, eps.c_str());
} }
if (get_url().protocol == "https"){ if (get_url().protocol == "https"){
...@@ -789,7 +762,15 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) ...@@ -789,7 +762,15 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb)
if (this_.logger_) if (this_.logger_)
this_.logger_->d("[http:request:%i] connect success", this_.id_); this_.logger_->d("[http:request:%i] connect success", this_.id_);
if (this_.get_url().protocol == "https"){ const auto& url = this_.get_url();
auto port = endpoint.port();
if ((url.protocol == "http" && port == (in_port_t)80)
|| (url.protocol == "https" && port == (in_port_t)443))
this_.set_header_field(restinio::http_field_t::host, url.host);
else
this_.set_header_field(restinio::http_field_t::host, url.host + ":" + std::to_string(port));
if (url.protocol == "https") {
if (this_.server_ca_) if (this_.server_ca_)
this_.conn_->set_ssl_verification(endpoint, asio::ssl::verify_peer this_.conn_->set_ssl_verification(endpoint, asio::ssl::verify_peer
| asio::ssl::verify_fail_if_no_peer_cert); | asio::ssl::verify_fail_if_no_peer_cert);
...@@ -857,10 +838,9 @@ Request::post() ...@@ -857,10 +838,9 @@ Request::post()
build(); build();
init_parser(); init_parser();
if (logger_){ if (logger_)
std::string header; std::getline(std::istringstream(request_), header); logger_->d("[http:request:%i] sending %zu bytes", id_, request_.size());
logger_->d("[http:request:%i] send: %s", id_, header.c_str());
}
// write the request to buffer // write the request to buffer
std::ostream request_stream(&conn_->input()); std::ostream request_stream(&conn_->input());
request_stream << request_; request_stream << request_;
...@@ -881,20 +861,16 @@ Request::terminate(const asio::error_code& ec) ...@@ -881,20 +861,16 @@ Request::terminate(const asio::error_code& ec)
if (finishing_.exchange(true)) if (finishing_.exchange(true))
return; return;
if (ec != asio::error::eof and ec != asio::error::operation_aborted and logger_)
logger_->e("[http:request:%i] end with error: %s", id_, ec.message().c_str());
// set response outcome, ignore end of file and abort
if ((!ec or ec == asio::error::eof) and parser_)
response_.status_code = parser_->status_code;
else
response_.status_code = 0;
response_.aborted = ec == asio::error::operation_aborted; response_.aborted = ec == asio::error::operation_aborted;
if (logger_) if (logger_) {
if (ec and ec != asio::error::eof and ec != asio::error::operation_aborted)
logger_->e("[http:request:%i] end with error: %s", id_, ec.message().c_str());
else
logger_->d("[http:request:%i] done with status code %u", id_, response_.status_code); logger_->d("[http:request:%i] done with status code %u", id_, response_.status_code);
if (connection_type_ != restinio::http_connection_header_t::keep_alive) }
if (!parser_ or !http_should_keep_alive(parser_.get()))
if (auto c = conn_) if (auto c = conn_)
c->close(); c->close();
notify_state_change(State::DONE); notify_state_change(State::DONE);
...@@ -917,204 +893,76 @@ Request::handle_request(const asio::error_code& ec) ...@@ -917,204 +893,76 @@ Request::handle_request(const asio::error_code& ec)
notify_state_change(State::RECEIVING); notify_state_change(State::RECEIVING);
std::weak_ptr<Request> wthis = shared_from_this(); std::weak_ptr<Request> wthis = shared_from_this();
conn_->async_read_until(HTTP_HEADER_DELIM, [wthis](const asio::error_code& ec, size_t){ conn_->async_read_until(HTTP_HEADER_DELIM, [wthis](const asio::error_code& ec, size_t n_bytes){
if (auto sthis = wthis.lock()) if (auto sthis = wthis.lock())
sthis->handle_response_header(ec); sthis->handle_response(ec, n_bytes);
}); });
} }
void void
Request::handle_response_header(const asio::error_code& ec) Request::handle_response(const asio::error_code& ec, size_t n_bytes)
{ {
if (ec && ec != asio::error::eof){ if (ec && ec != asio::error::eof){
terminate(ec); terminate(ec);
return; return;
} }
if (!conn_->is_open()){ auto request = (ec == asio::error::eof) ? std::string{} : conn_->read_bytes();
terminate(asio::error::not_connected); size_t ret = http_parser_execute(parser_.get(), parser_s_.get(), request.c_str(), request.size());
return; if (ret != request.size()) {
}
if (logger_) if (logger_)
logger_->d("[http:request:%i] response headers received", id_); logger_->e("Error parsing HTTP: %zu %s %s", ret,
// read the header http_errno_name(HTTP_PARSER_ERRNO(parser_)),
std::string header; http_errno_description(HTTP_PARSER_ERRNO(parser_)));
std::string headers; terminate(asio::error::basic_errors::broken_pipe);
while (std::getline(conn_->data(), header) && header != "\r"){
headers.append(header + "\n");
}
headers.append("\n");
parse_request(headers);
std::weak_ptr<Request> wthis = shared_from_this();
auto expect_it = headers_.find(restinio::http_field_t::expect);
if (expect_it != headers_.end() and (expect_it->second == "100-continue") and response_.status_code != 200){
notify_state_change(State::SENDING);
request_.append(body_);
std::ostream request_stream(&conn_->input());
request_stream << body_ << "\r\n";
conn_->async_write([wthis](const asio::error_code& ec, size_t) {
if (auto sthis = wthis.lock())
sthis->handle_request(ec);
});
return; return;
} }
auto connection_it = response_.headers.find(HTTP_HEADER_CONNECTION); if (state_ != State::DONE and parser_ and not http_body_is_final(parser_.get())) {
auto content_length_it = response_.headers.find(HTTP_HEADER_CONTENT_LENGTH); if (auto toRead = parser_->content_length ? std::min<uint64_t>(parser_->content_length, 64 * 1024) : 64 * 1024) {
auto transfer_encoding_it = response_.headers.find(HTTP_HEADER_TRANSFER_ENCODING); if (logger_)
logger_->d("[http:request:%i] read more %llu", id_, toRead);
// has content-length std::weak_ptr<Request> wthis = shared_from_this();
if (content_length_it != response_.headers.end()) conn_->async_read_some(toRead, [wthis](const asio::error_code& ec, size_t bytes){
{
unsigned int content_length = atoi(content_length_it->second.c_str());
response_.body.append(conn_->read_bytes(content_length));
// full body already in the header
if (response_.body.size() + 1 == content_length) {
response_.body.append("\n");
parse_request(response_.body);
if (message_complete_.load())
terminate(asio::error::eof);
}
else { // more chunks to come (don't add the missing \n from std::getline)
conn_->async_read(content_length - response_.body.size(), [wthis](const asio::error_code& ec, size_t bytes){
if (auto sthis = wthis.lock())
sthis->handle_response_body(ec, bytes);
});
}
}
// server wants to keep sending or we have content-length defined
else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_KEEP_ALIVE)
{
conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){
if (auto sthis = wthis.lock()) if (auto sthis = wthis.lock())
sthis->handle_response_body(ec, bytes); sthis->handle_response(ec, bytes);
}); });
} }
// server wants to close the connection
else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_CLOSE)
{
terminate(asio::error::eof);
}
// client wants to close the connection
else if (connection_type_ == restinio::http_connection_header_t::close)
{
terminate(asio::error::eof);
}
else if (transfer_encoding_it != response_.headers.end() and
transfer_encoding_it->second == HTTP_HEADER_TRANSFER_ENCODING_CHUNKED)
{
std::string chunk_size;
std::getline(conn_->data(), chunk_size);
unsigned int content_length = std::stoul(chunk_size, nullptr, 16);
conn_->async_read(content_length, [wthis](const asio::error_code& ec, size_t bytes){
if (auto sthis = wthis.lock())
sthis->handle_response_body(ec, bytes);
});
} }
} }
void void
Request::handle_response_body(const asio::error_code& ec, size_t bytes) Request::onBody(const char* at, size_t length)
{ {
if (ec && ec != asio::error::eof){ if (cbs_.on_body)
terminate(ec); cbs_.on_body(at, length);
return; else
} response_.body.insert(response_.body.end(), at, at+length);
if (!conn_->is_open()){
terminate(asio::error::not_connected);
return;
} }
if (logger_)
logger_->d("[http:request:%i] response body: %i bytes received", id_, bytes);
if (bytes == 0){ void
Request::onComplete() {
terminate(asio::error::eof); terminate(asio::error::eof);
return;
} }
// avoid creating non-existant headers by accessing the headers map without the presence of key void
auto connection_it = response_.headers.find(HTTP_HEADER_CONNECTION); Request::onHeadersComplete() {
auto content_length_it = response_.headers.find(HTTP_HEADER_CONTENT_LENGTH); notify_state_change(State::HEADER_RECEIVED);
auto transfer_encoding_it = response_.headers.find(HTTP_HEADER_TRANSFER_ENCODING);
// read the content-length body
unsigned int content_length;
if (content_length_it != response_.headers.end() and !response_.body.empty()){
// extract the content-length
content_length = atoi(content_length_it->second.c_str());
response_.body.append(conn_->read_bytes(bytes));
// check if fully parsed
if (response_.body.size() == content_length)
parse_request(response_.body);
}
// read and parse the chunked encoding fragment
else {
auto body = conn_->read_until(BODY_VALUE_DELIM);
response_.body += body;
if (body == "0\r\n"){
parse_request(response_.body);
terminate(asio::error::eof);
return;
}
parse_request(body + '\n');
}
auto expect_it = headers_.find(restinio::http_field_t::expect);
if (expect_it != headers_.end() and (expect_it->second == "100-continue") and response_.status_code != 200){
notify_state_change(State::SENDING);
request_.append(body_);
std::ostream request_stream(&conn_->input());
request_stream << body_ << "\r\n";
std::weak_ptr<Request> wthis = shared_from_this(); std::weak_ptr<Request> wthis = shared_from_this();
conn_->async_write([wthis](const asio::error_code& ec, size_t) {
// should be executed after each parse_request who can trigger http_parser on_message_complete
if (message_complete_.load()){
terminate(asio::error::eof);
}
// has content-length
else if (content_length_it != response_.headers.end() and response_.body.size() != content_length)
{
conn_->async_read(content_length - response_.body.size(), [wthis](const asio::error_code& ec, size_t bytes){
if (auto sthis = wthis.lock())
sthis->handle_response_body(ec, bytes);
});
}
// server wants to keep sending
else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_KEEP_ALIVE)
{
conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){
if (auto sthis = wthis.lock())
sthis->handle_response_body(ec, bytes);
});
}
// server wants to close the connection
else if (connection_it != response_.headers.end() and connection_it->second == HTTP_HEADER_CONNECTION_CLOSE)
{
terminate(asio::error::eof);
}
// client wants to close the connection
else if (connection_type_ == restinio::http_connection_header_t::close)
{
terminate(asio::error::eof);
}
else if (transfer_encoding_it != response_.headers.end() and
transfer_encoding_it->second == HTTP_HEADER_TRANSFER_ENCODING_CHUNKED)
{
std::string chunk_size;
std::getline(conn_->data(), chunk_size);
if (chunk_size.size() == 0){
parse_request(response_.body);
terminate(asio::error::eof);
}
else
conn_->async_read_until(BODY_VALUE_DELIM, [wthis](const asio::error_code& ec, size_t bytes){
if (auto sthis = wthis.lock()) if (auto sthis = wthis.lock())
sthis->handle_response_body(ec, bytes); sthis->handle_request(ec);
}); });
return;
} }
} }
size_t
Request::parse_request(const std::string& request)
{
std::lock_guard<std::mutex> lock(cbs_mutex_);
return http_parser_execute(parser_.get(), parser_s_.get(), request.c_str(), request.size());
}
} // namespace http } // namespace http
} // namespace dht } // namespace dht
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment