From 6266f88cd54bb82e46ff9afbeb88ff551c4354a4 Mon Sep 17 00:00:00 2001 From: Seva <seva@binarytrails.net> Date: Tue, 4 Jun 2019 17:21:44 -0400 Subject: [PATCH] dhtproxy: port doPut,sendListen logic new http client --- include/opendht/dht_proxy_client.h | 3 +- src/dht_proxy_client.cpp | 51 ++++++++++++++---------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 09c794db..4d2b1d35 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -203,7 +203,6 @@ public: return periodic(buf, buflen, SockAddr(from, fromlen)); } - /** * Similar to Dht::get, but sends a Query to filter data remotely. * @param key the key for which to query data for. @@ -301,7 +300,7 @@ private: RESUBSCRIBE, }; void sendListen(const restinio::http_request_header_t header, - const ValueCallback cb, const Value::Filter &filter, + const ValueCallback &cb, const Value::Filter &filter, const Sp<ListenState> &state, ListenMethod method = ListenMethod::LISTEN); void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index e0191895..854cd1b1 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -224,6 +224,7 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) for (auto& callback : callbacks) callback(); callbacks.clear(); + return scheduler.run(); } @@ -392,11 +393,13 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ auto context = std::make_shared<GetContext>(); context->donecb = cb; - http_parser parser; - parser.data = static_cast<void*>(context.get()); - http_parser_settings parser_s; - http_parser_settings_init(&parser_s); - parser_s.on_status = [](http_parser *parser, const char *at, size_t length) -> int { + auto parser = std::make_shared<http_parser>(); + http_parser_init(parser.get(), HTTP_RESPONSE); + parser->data = static_cast<void*>(context.get()); + + auto parser_s = std::make_shared<http_parser_settings>(); + http_parser_settings_init(parser_s.get()); + parser_s->on_status = [](http_parser *parser, const char *at, size_t length) -> int { GetContext* context = reinterpret_cast<GetContext*>(parser->data); if (parser->status_code == 200){ context->ok = true; @@ -405,7 +408,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ } return 0; }; - parser_s.on_message_complete = [](http_parser * parser) -> int { + parser_s->on_message_complete = [](http_parser * parser) -> int { auto context = reinterpret_cast<GetContext*>(parser->data); try { if (context->donecb) @@ -416,8 +419,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ } return 0; }; - //do_request(request, serverHostIp_, serverHostPort_, - // parser, parser_s, this->ctx_); + httpClient_.post_request(request, parser, parser_s); } /** @@ -705,9 +707,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt header.method(restinio::http_method_t::http_subscribe); header.request_target("/" + key.toString()); } - //l->second.thread = std::thread([this, header, vcb, filter, state, method]() { - sendListen(header, /*vcb*/ cb, filter, state, method); - //}); + sendListen(header, /*vcb*/ cb, filter, state, method); return token; }); return token; @@ -741,7 +741,7 @@ DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) { } void DhtProxyClient::sendListen(const restinio::http_request_header_t header, - const ValueCallback cb, const Value::Filter &filter, + const ValueCallback &cb, const Value::Filter &filter, const Sp<ListenState> &state, ListenMethod method){ auto headers = this->initHeaderFields(); auto conn = restinio::http_connection_header_t::close; @@ -767,11 +767,13 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header, context->state = state; context->filter = filter; - http_parser parser; - parser.data = static_cast<void*>(context.get()); - http_parser_settings parser_s; - http_parser_settings_init(&parser_s); - parser_s.on_status = [](http_parser *parser, const char *at, size_t length) -> int { + auto parser = std::make_shared<http_parser>(); + http_parser_init(parser.get(), HTTP_RESPONSE); + parser->data = static_cast<void*>(context.get()); + + auto parser_s = std::make_shared<http_parser_settings>(); + http_parser_settings_init(parser_s.get()); + parser_s->on_status = [](http_parser *parser, const char *at, size_t length) -> int { auto context = reinterpret_cast<ListenContext*>(parser->data); if (parser->status_code != 200){ std::cerr << "Error in listen status_code=" << parser->status_code << std::endl; @@ -779,7 +781,7 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header, } return 0; }; - parser_s.on_body = [](http_parser *parser, const char *at, size_t length) -> int { + parser_s->on_body = [](http_parser *parser, const char *at, size_t length) -> int { auto context = reinterpret_cast<ListenContext*>(parser->data); try { Json::Value json; @@ -798,14 +800,8 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header, auto value = std::make_shared<Value>(json); auto expired = json.get("expired", Json::Value(false)).asBool(); std::cout << json << " : expired=" << expired << std::endl; - context->cb({value}, expired); if ((not context->filter or context->filter(*value)) and context->cb){ - //std::lock_guard<std::mutex> lock(lockCallbacks); - //callbacks_.emplace_back([cb, value, state, expired]() { - if (not context->state->cancel and not context->cb({value}, expired)) - context->state->cancel = true; - //}); - //loopSignal_(); + context->cb({value}, expired); } } catch(const std::exception& e) { /*context->logger->d*/printf("Error in listen parsing: %s\n", e.what()); @@ -815,8 +811,7 @@ void DhtProxyClient::sendListen(const restinio::http_request_header_t header, } return 0; }; - //do_request(request, serverHostIp_, serverHostPort_, - // parser, parser_s, this->ctx_); + httpClient_.post_request(request, parser, parser_s); } bool @@ -1028,6 +1023,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string ids.emplace_back(std::stoull(substr)); } { + /* std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([this, key, token, state, ids]() { if (state->cancel) return; @@ -1039,6 +1035,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string if (not state->cancel and not l->second.cache.onValuesExpired(ids)) state->cancel = true; }); + */ } loopSignal_(); } -- GitLab