From 0a3320e2c873e47f4a11b8799bdda926f93704bd Mon Sep 17 00:00:00 2001
From: Seva <seva@binarytrails.net>
Date: Tue, 4 Jun 2019 16:14:37 -0400
Subject: [PATCH] dhtproxy: enhance http async api & finish client get

---
 include/opendht/dht_proxy_client.h |  7 +--
 include/opendht/http.h             | 82 +++++++++++++------------
 src/dht_proxy_client.cpp           | 66 +++++++++++---------
 src/dht_proxy_server.cpp           | 24 ++++----
 src/http.cpp                       | 96 ++++++++++++++++++------------
 5 files changed, 154 insertions(+), 121 deletions(-)

diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h
index c6767005..09c794db 100644
--- a/include/opendht/dht_proxy_client.h
+++ b/include/opendht/dht_proxy_client.h
@@ -70,9 +70,9 @@ public:
     virtual ~DhtProxyClient();
 
     /**
-     * Get Asio I/O Context.
+     * Get Asio I/O Context from the http::Client instance.
      */
-    std::shared_ptr<asio::io_context> context();
+    asio::io_context& httpIOContext();
 
     /**
      * Get the ID of the node.
@@ -327,8 +327,7 @@ private:
     /*
      * ASIO I/O Context for sockets used in requests
      */
-    std::shared_ptr<restinio::asio_ns::io_context> ctx_;
-    http::Client httpClient_ {serverHostIp_, serverHostPort_};
+    http::Client httpClient_;
 
     mutable std::mutex lockCurrentProxyInfos_;
     NodeStatus statusIpv4_ {NodeStatus::Disconnected};
diff --git a/include/opendht/http.h b/include/opendht/http.h
index fb0c0696..00ba868c 100644
--- a/include/opendht/http.h
+++ b/include/opendht/http.h
@@ -27,51 +27,55 @@ namespace http {
 
 class Connection
 {
-    public:
-        Connection(const uint16_t id, asio::ip::tcp::socket socket);
-        ~Connection();
-
-        uint16_t id();
-        void start(asio::ip::tcp::resolver::iterator &r_iter);
-        bool is_open();
-        std::string read(std::error_code& ec);
-        void write(std::string request, std::error_code& ec);
-        void close();
-
-    private:
-        uint16_t id_;
-        asio::ip::tcp::socket socket_;
+public:
+    Connection(const uint16_t id, asio::ip::tcp::socket socket);
+    ~Connection();
+
+    uint16_t id();
+    void start(asio::ip::tcp::resolver::iterator &r_iter);
+    bool is_open();
+    asio::ip::tcp::socket& get_socket();
+    std::string read(std::error_code& ec);
+    void write(std::string request, std::error_code& ec);
+    void close();
+
+private:
+    uint16_t id_;
+    asio::ip::tcp::socket socket_;
 };
 
 using ResponseCallback = std::function<void(const std::string data)>;
 
 class Client
 {
-    public:
-        Client(std::string ip, std::uint16_t port);
-
-        asio::io_context& context();
-
-        asio::ip::tcp::resolver::query query();
-
-        std::string create_request(const restinio::http_request_header_t header,
-                                   const restinio::http_header_fields_t header_fields,
-                                   const restinio::http_connection_header_t connection,
-                                   const std::string body);
-
-
-        void post_request(std::string request, http_parser_settings parser_s,
-                          const ResponseCallback respcb = nullptr);
-
-    private:
-        void async_request(std::string request, http_parser_settings parser_s,
-                           const ResponseCallback respcb = nullptr);
-
-        std::uint16_t port_;
-        asio::ip::address addr_;
-        asio::io_context ctx_;
-        asio::ip::tcp::resolver resolver_;
-        uint16_t connId_ {1};
+public:
+    Client() = default;
+    Client(std::string ip, uint16_t port);
+
+    asio::io_context& io_context();
+
+    void set_query_address(const std::string ip, const uint16_t port);
+    asio::ip::tcp::resolver::query build_query();
+
+    std::string create_request(const restinio::http_request_header_t header,
+                               const restinio::http_header_fields_t header_fields,
+                               const restinio::http_connection_header_t connection,
+                               const std::string body);
+
+    void post_request(std::string request,
+                      std::shared_ptr<http_parser> parser = nullptr,
+                      std::shared_ptr<http_parser_settings> parser_s = nullptr);
+
+private:
+    void async_request(std::string request,
+                       std::shared_ptr<http_parser> parser = nullptr,
+                       std::shared_ptr<http_parser_settings> parser_s = nullptr);
+
+    uint16_t port_;
+    asio::ip::address addr_;
+    asio::io_context ctx_;
+    asio::ip::tcp::resolver resolver_ {ctx_};
+    uint16_t connId_ {1};
 };
 
 }
diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp
index 57482182..e0191895 100644
--- a/src/dht_proxy_client.cpp
+++ b/src/dht_proxy_client.cpp
@@ -67,35 +67,41 @@ struct DhtProxyClient::ProxySearch {
 DhtProxyClient::DhtProxyClient() {}
 
 DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId, const Logger& l):
-    serverHost_(serverHost), pushClientId_(pushClientId), loopSignal_(signal), ctx_(new restinio::asio_ns::io_context())
+    serverHost_(serverHost), pushClientId_(pushClientId), loopSignal_(signal)
 {
     auto hostAndPort = splitPort(serverHost_);
-    this->serverHostIp_ = hostAndPort.first;
-    this->serverHostPort_ = std::atoi(hostAndPort.second.c_str());
+    serverHostIp_ = hostAndPort.first;
+    serverHostPort_ = std::atoi(hostAndPort.second.c_str());
 
     if (serverHost_.find("://") == std::string::npos)
         serverHost_ = proxy::HTTP_PROTO + serverHost_;
+
     if (!serverHost_.empty())
         startProxy();
 }
 
-std::shared_ptr<asio::io_context>
-DhtProxyClient::context(){
-    return ctx_;
+asio::io_context&
+DhtProxyClient::httpIOContext(){
+    return httpClient_.io_context();
 }
 
 void
 DhtProxyClient::confirmProxy()
 {
-    if (serverHost_.empty()) return;
+    if (serverHost_.empty())
+        return;
     getConnectivityStatus();
 }
 
 void
 DhtProxyClient::startProxy()
 {
-    if (serverHost_.empty()) return;
+    if (serverHost_.empty())
+        return;
+
     DHT_LOG.w("Staring proxy client to %s", serverHost_.c_str());
+    httpClient_.set_query_address(serverHostIp_, serverHostPort_);
+
     nextProxyConfirmation = scheduler.add(scheduler.time(), std::bind(&DhtProxyClient::confirmProxy, this));
     listenerRestart = std::make_shared<Scheduler::Job>(std::bind(&DhtProxyClient::restartListeners, this));
     loopSignal_();
@@ -133,8 +139,9 @@ DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const {
 void
 DhtProxyClient::cancelAllOperations()
 {
-    // for graceful finish, call reset() before
-    this->ctx_->stop();
+    auto &io_context = this->httpIOContext();
+    if (io_context.stopped())
+        io_context.stop();
 }
 
 void
@@ -253,11 +260,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va
     context->filter = w.empty() ? f : f.chain(w.getFilter());
     context->cb = cb; context->donecb = donecb;
 
-    http_parser parser;
-    parser.data = static_cast<void*>(context.get());
-    http_parser_settings settings;
-    http_parser_settings_init(&settings);
-    settings.on_status = [](http_parser *parser, const char *at, size_t length) -> int {
+    auto parser = std::make_shared<http_parser>();
+    http_parser_init(parser.get(), HTTP_RESPONSE);
+    parser->data = static_cast<void*>(context.get());
+
+    auto parser_s = std::make_shared<http_parser_settings>();
+    http_parser_settings_init(parser_s.get());
+    parser_s->on_status = [](http_parser *parser, const char *at, size_t length) -> int {
         auto context = reinterpret_cast<GetContext*>(parser->data);
         if (parser->status_code != 200){
             std::cerr << "Error in get status_code=" << parser->status_code << std::endl;
@@ -265,7 +274,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va
         }
         return 0;
     };
-    settings.on_body = [](http_parser *parser, const char *at, size_t length) -> int {
+    parser_s->on_body = [](http_parser *parser, const char *at, size_t length) -> int {
         auto context = reinterpret_cast<GetContext*>(parser->data);
         try{
             Json::Value json;
@@ -289,7 +298,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va
         }
         return 0;
     };
-    settings.on_message_complete = [](http_parser * parser) -> int {
+    parser_s->on_message_complete = [](http_parser *parser) -> int {
         auto context = reinterpret_cast<GetContext*>(parser->data);
         try {
             if (context->donecb)
@@ -300,8 +309,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va
         }
         return 0;
     };
-    //httpClient_.do_request(request, serverHostIp_, serverHostPort_,
-    //                             parser, settings, this->ctx_);
+    httpClient_.post_request(request, parser, parser_s);
 }
 
 void
@@ -386,9 +394,9 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_
 
     http_parser parser;
     parser.data = static_cast<void*>(context.get());
-    http_parser_settings settings;
-    http_parser_settings_init(&settings);
-    settings.on_status = [](http_parser *parser, const char *at, size_t length) -> int {
+    http_parser_settings parser_s;
+    http_parser_settings_init(&parser_s);
+    parser_s.on_status = [](http_parser *parser, const char *at, size_t length) -> int {
         GetContext* context = reinterpret_cast<GetContext*>(parser->data);
         if (parser->status_code == 200){
             context->ok = true;
@@ -397,7 +405,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_
         }
         return 0;
     };
-    settings.on_message_complete = [](http_parser * parser) -> int {
+    parser_s.on_message_complete = [](http_parser * parser) -> int {
         auto context = reinterpret_cast<GetContext*>(parser->data);
         try {
             if (context->donecb)
@@ -409,7 +417,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_
         return 0;
     };
     //do_request(request, serverHostIp_, serverHostPort_,
-    //                             parser, settings, this->ctx_);
+    //                             parser, parser_s, this->ctx_);
 }
 
 /**
@@ -761,9 +769,9 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header,
 
     http_parser parser;
     parser.data = static_cast<void*>(context.get());
-    http_parser_settings settings;
-    http_parser_settings_init(&settings);
-    settings.on_status = [](http_parser *parser, const char *at, size_t length) -> int {
+    http_parser_settings parser_s;
+    http_parser_settings_init(&parser_s);
+    parser_s.on_status = [](http_parser *parser, const char *at, size_t length) -> int {
         auto context = reinterpret_cast<ListenContext*>(parser->data);
         if (parser->status_code != 200){
             std::cerr << "Error in listen status_code=" << parser->status_code << std::endl;
@@ -771,7 +779,7 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header,
         }
         return 0;
     };
-    settings.on_body = [](http_parser *parser, const char *at, size_t length) -> int {
+    parser_s.on_body = [](http_parser *parser, const char *at, size_t length) -> int {
         auto context = reinterpret_cast<ListenContext*>(parser->data);
         try {
             Json::Value json;
@@ -808,7 +816,7 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header,
         return 0;
     };
     //do_request(request, serverHostIp_, serverHostPort_,
-    //                             parser, settings, this->ctx_);
+    //                             parser, parser_s, this->ctx_);
 }
 
 bool
diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp
index 4b63d024..9e36e285 100644
--- a/src/dht_proxy_server.cpp
+++ b/src/dht_proxy_server.cpp
@@ -576,14 +576,6 @@ DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& jso
 {
     if (pushServer_.empty())
         return;
-    http_parser_settings parser_s;
-    http_parser_settings_init(&parser_s);
-    parser_s.on_status = []( http_parser * parser, const char * at, size_t length ) -> int {
-        if (parser->status_code == 200)
-            return 0;
-        std::cerr << "Error in SendPushNotification status_code=" << parser->status_code << std::endl;
-        return 1;
-    };
 
     restinio::http_request_header_t header;
     header.request_target("/api/push");
@@ -620,10 +612,22 @@ DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& jso
     uint16_t port = std::atoi(hostAndPort.second.c_str());
     http::Client httpClient {hostAndPort.first, port};
 
+    auto parser = std::make_shared<http_parser>();
+    http_parser_init(parser.get(), HTTP_RESPONSE);
+
+    auto parser_s = std::make_shared<http_parser_settings>();
+    http_parser_settings_init(parser_s.get());
+    parser_s->on_status = []( http_parser * parser, const char * at, size_t length ) -> int {
+        if (parser->status_code == 200)
+            return 0;
+        std::cerr << "Error in SendPushNotification status_code=" << parser->status_code << std::endl;
+        return 1;
+    };
+
     auto request = httpClient.create_request(header, header_fields,
         restinio::http_connection_header_t::close, body);
-    httpClient.post_request(request, parser_s);
-    httpClient.context().run();
+    httpClient.post_request(request, parser, parser_s);
+    httpClient.io_context().run();
 }
 
 #endif //OPENDHT_PUSH_NOTIFICATIONS
diff --git a/src/http.cpp b/src/http.cpp
index 87716671..bcc92720 100644
--- a/src/http.cpp
+++ b/src/http.cpp
@@ -45,6 +45,11 @@ Connection::is_open(){
     return socket_.is_open();
 }
 
+asio::ip::tcp::socket&
+Connection::get_socket(){
+    return socket_;
+}
+
 std::string
 Connection::read(std::error_code& ec){
     std::string response;
@@ -64,18 +69,24 @@ Connection::close(){
 
 // client
 
-Client::Client(std::string ip, std::uint16_t port): resolver_(ctx_){
-    addr_ = asio::ip::address::from_string(ip);
-    port_ = port;
+Client::Client(const std::string ip, const uint16_t port){
+    set_query_address(ip, port);
 }
 
 asio::io_context&
-Client::context(){
+Client::io_context(){
     return ctx_;
 }
 
+void
+Client::set_query_address(const std::string ip, const uint16_t port){
+    addr_ = asio::ip::address::from_string(ip);
+    port_ = port;
+}
+
 asio::ip::tcp::resolver::query
-Client::query(){
+Client::build_query(){
+    // support of ipv4 & ipv6
     return asio::ip::tcp::resolver::query {
         addr_.is_v4() ? asio::ip::tcp::v4() : asio::ip::tcp::v6(),
         addr_.to_string(), std::to_string(port_)};
@@ -87,38 +98,43 @@ Client::create_request(const restinio::http_request_header_t header,
                        const restinio::http_connection_header_t connection,
                        const std::string body){
     std::stringstream request;
+    // first header
     request << restinio::method_to_string(header.method()) << " " <<
                header.request_target() << " " <<
                "HTTP/" << header.http_major() << "." << header.http_minor() << "\r\n";
+    // other headers
     for (auto header_field: header_fields)
         request << header_field.name() << ": " << header_field.value() << "\r\n";
+    // last connection header
     std::string conn_str;
     switch (connection){
-        case restinio::http_connection_header_t::keep_alive:
-            conn_str = "keep-alive";
-            break;
-        case restinio::http_connection_header_t::close:
-            conn_str = "close";
-            break;
-        case restinio::http_connection_header_t::upgrade:
-            throw std::invalid_argument("upgrade");
-            break;
+    case restinio::http_connection_header_t::keep_alive:
+        conn_str = "keep-alive";
+        break;
+    case restinio::http_connection_header_t::close:
+        conn_str = "close";
+        break;
+    case restinio::http_connection_header_t::upgrade:
+        throw std::invalid_argument("upgrade");
+        break;
     }
     request << "Connection: " << conn_str << "\r\n";
+    // body & content-length
     if (!body.empty()){
         request << "Content-Length: " << body.size() << "\r\n\r\n";
         request << body;
     }
+    // last delim
     request << "\r\n";
     return request.str();
 }
 
 void
-Client::post_request(std::string request, http_parser_settings parser_s,
-                     const ResponseCallback respcb){
+Client::post_request(std::string request, std::shared_ptr<http_parser> parser,
+                     std::shared_ptr<http_parser_settings> parser_s){
     // invoke the given handler and return immediately
-    asio::post(ctx_, [this, request, parser_s, respcb](){
-        this->async_request(request, parser_s, respcb);
+    asio::post(ctx_, [this, request, parser, parser_s](){
+        this->async_request(request, parser, parser_s);
         // execute at most one handler, it ensures that same func call
         // with different callback gets the priority on the io_context
         ctx_.run_one();
@@ -126,8 +142,8 @@ Client::post_request(std::string request, http_parser_settings parser_s,
 }
 
 void
-Client::async_request(std::string request, http_parser_settings parser_s,
-                      const ResponseCallback respcb){
+Client::async_request(std::string request, std::shared_ptr<http_parser> parser,
+                      std::shared_ptr<http_parser_settings> parser_s){
     using namespace asio::ip;
 
     auto conn = std::make_shared<Connection>(connId_, std::move(tcp::socket{ctx_}));
@@ -135,7 +151,8 @@ Client::async_request(std::string request, http_parser_settings parser_s,
     connId_++;
 
     // resolve sometime in future
-    resolver_.async_resolve(query(), [=](std::error_code ec, tcp::resolver::results_type res){
+    resolver_.async_resolve(build_query(), [=](std::error_code ec,
+                                               tcp::resolver::results_type res){
         if (ec or res.empty()){
             printf("[connection:%i] error resolving\n", conn->id());
             conn->close();
@@ -160,26 +177,27 @@ Client::async_request(std::string request, http_parser_settings parser_s,
         }
         // read response
         printf("[connection:%i] response read\n", conn->id());
-        auto data = conn->read(ec);
-        if (ec and ec != asio::error::eof){
-            printf("[connection:%i] error: %s\n", conn->id(), ec.message().c_str());
-            return;
+        asio::streambuf resp_s;
+        auto& socket = conn->get_socket();
+        asio::read_until(socket, resp_s, "\r\n\r\n");
+
+        while(asio::read(socket, resp_s, asio::transfer_at_least(1), ec)){
+            std::ostringstream str_s;
+            str_s << &resp_s;
+            // parse the request
+            http_parser_execute(parser.get(), parser_s.get(),
+                                str_s.str().c_str(), str_s.str().size());
+            // detect parsing errors
+            if (HPE_OK != parser->http_errno && HPE_PAUSED != parser->http_errno){
+                auto err = HTTP_PARSER_ERRNO(parser.get());
+                printf("[connection:%i] error parsing: %s\n", conn->id(), http_errno_name(err));
+            }
         }
-        if (respcb)
-            respcb(data.c_str());
-
-        // parse the request
-        http_parser parser;
-
-        http_parser_settings_init(const_cast<http_parser_settings*>(&parser_s));
-        http_parser_init(&parser, HTTP_RESPONSE);
-
-        http_parser_execute(&parser, &parser_s, data.c_str(), data.size());
-
-        if (HPE_OK != parser.http_errno && HPE_PAUSED != parser.http_errno){
-            auto err = HTTP_PARSER_ERRNO(&parser);
-            printf("[connection:%i] error parsing: %s\n", conn->id(), http_errno_name(err));
+        if (ec != asio::error::eof){
+            throw std::runtime_error{fmt::format(
+                "[connection:{}] error parsing: {}\n", conn->id(), ec)};
         }
+        printf("[connection:%i] request finished\n", conn->id());
     });
 }
 
-- 
GitLab