diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 005c8f2cd6333ca0a955de179ec38679ddcddf6a..c504a197e40895872cd01da75e41de62fe078d11 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -24,6 +24,7 @@ #include "utils.h" #include <http_parser.h> +#include <deque> namespace dht { @@ -69,6 +70,29 @@ struct DhtProxyClient::ProxySearch { std::map<Value::Id, PermanentPut> puts {}; }; +struct LineSplit { + void append(const char* d, size_t l) { + buf_.insert(buf_.end(), d, d+l); + } + bool getLine(char c) { + auto it = buf_.begin(); + while (it != buf_.end()) { + if (*it == c) { + line_.clear(); + line_.insert(line_.end(), buf_.begin(), ++it); + buf_.erase(buf_.begin(), it); + return true; + } + it++; + } + return false; + } + const std::string& line() const { return line_; } +private: + std::deque<char> buf_ {}; + std::string line_ {}; +}; + DhtProxyClient::DhtProxyClient() {} DhtProxyClient::DhtProxyClient( @@ -286,18 +310,18 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va auto opstate = std::make_shared<OperationState>(); Value::Filter filter = w.empty() ? f : f.chain(w.getFilter()); - request->add_on_body_callback([this, key, opstate, filter, cb](const char* at, size_t length){ + auto rxBuf = std::make_shared<LineSplit>(); + request->add_on_body_callback([this, key, opstate, filter, rxBuf, cb](const char* at, size_t length){ try { - auto body = std::string(at, length); + auto& b = *rxBuf; + b.append(at, length); // one value per body line - std::string data_line; - std::stringstream body_stream(body); - while (std::getline(body_stream, data_line, '\n') and !(opstate->stop.load())){ + while (b.getLine('\n') and !opstate->stop){ std::string err; Json::Value json; - auto* char_data = static_cast<const char*>(&data_line[0]); + const auto& line = b.line(); auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader()); - if (!reader->parse(char_data, char_data + data_line.size(), &json, &err)){ + if (!reader->parse(line.data(), line.data() + line.size(), &json, &err)){ opstate->ok.store(false); return; } @@ -941,19 +965,19 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, body = fillBody(method == ListenMethod::RESUBSCRIBE); request->set_body(body); #endif - - request->add_on_body_callback([this, reqid, opstate, cb](const char* at, size_t length){ + auto rxBuf = std::make_shared<LineSplit>(); + request->add_on_body_callback([this, reqid, opstate, rxBuf, cb](const char* at, size_t length){ try { - auto body = std::string(at, length); + auto& b = *rxBuf; + b.append(at, length); + // one value per body line - std::string data_line; - std::stringstream body_stream(body); - while (std::getline(body_stream, data_line, '\n') and !opstate->stop){ + while (b.getLine('\n') and !opstate->stop){ std::string err; Json::Value json; - auto* char_data = static_cast<const char*>(&data_line[0]); + const auto& line = b.line(); auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader()); - if (!reader->parse(char_data, char_data + data_line.size(), &json, &err)){ + if (!reader->parse(line.data(), line.data() + line.size(), &json, &err)){ opstate->ok.store(false); return; }