diff --git a/include/opendht/http.h b/include/opendht/http.h index 6656a42e31f970f216a2c2530e916024d497ff10..7c143ee3fb0b65f72826dda35d43550143ef3b21 100644 --- a/include/opendht/http.h +++ b/include/opendht/http.h @@ -269,6 +269,7 @@ public: void add_on_status_callback(OnStatusCb cb); void add_on_body_callback(OnDataCb cb); void add_on_state_change_callback(OnStateChangeCb cb); + void add_on_done_callback(OnDoneCb cb); void send(); @@ -288,7 +289,7 @@ private: OnStateChangeCb on_state_change; }; - void notify_state_change(const State state); + void notify_state_change(State state); void build(); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index c504a197e40895872cd01da75e41de62fe078d11..278f95a1c2cc83ca5aff8e07a438aa025783d6cc 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -344,30 +344,27 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va opstate->ok.store(false); } }); - request->add_on_state_change_callback([this, reqid, opstate, donecb, key] - (http::Request::State state, const http::Response& response){ - if (state == http::Request::State::DONE){ - if (response.status_code != 200) { - if (logger_) - logger_->e("[proxy:client] [get %s] failed with code=%i", key.to_c_str(), response.status_code); - opstate->ok.store(false); - if (not response.aborted and response.status_code == 0) - opFailed(); - } - if (donecb) { - { - std::lock_guard<std::mutex> lock(lockCallbacks_); - callbacks_.emplace_back([donecb, opstate](){ - donecb(opstate->ok, {}); - opstate->stop.store(true); - }); - } - loopSignal_(); - } - if (not isDestroying_) { - std::lock_guard<std::mutex> l(requestLock_); - requests_.erase(reqid); + request->add_on_done_callback([this, reqid, opstate, donecb, key] (const http::Response& response){ + if (response.status_code != 200) { + if (logger_) + logger_->e("[proxy:client] [get %s] failed with code=%i", key.to_c_str(), response.status_code); + opstate->ok.store(false); + if (not response.aborted and response.status_code == 0) + opFailed(); + } + if (donecb) { + { + std::lock_guard<std::mutex> lock(lockCallbacks_); + callbacks_.emplace_back([donecb, opstate](){ + donecb(opstate->ok, {}); + opstate->stop.store(true); + }); } + loopSignal_(); + } + if (not isDestroying_) { + std::lock_guard<std::mutex> l(requestLock_); + requests_.erase(reqid); } }); { @@ -493,22 +490,19 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, } } request->set_body(Json::writeString(jsonBuilder_, json)); - request->add_on_state_change_callback([this, reqid, cb] - (http::Request::State state, const http::Response& response){ - if (state == http::Request::State::DONE){ - bool ok = response.status_code == 200; - if (not ok) { - if (logger_) - logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); - if (not response.aborted and response.status_code == 0) - opFailed(); - } - if (cb) - cb(ok); - if (not isDestroying_) { - std::lock_guard<std::mutex> l(requestLock_); - requests_.erase(reqid); - } + request->add_on_done_callback([this, reqid, cb] (const http::Response& response){ + bool ok = response.status_code == 200; + if (not ok) { + if (logger_) + logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); + if (not response.aborted and response.status_code == 0) + opFailed(); + } + if (cb) + cb(ok); + if (not isDestroying_) { + std::lock_guard<std::mutex> l(requestLock_); + requests_.erase(reqid); } }); { @@ -608,33 +602,30 @@ DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t auto reqid = request->id(); request->set_method(restinio::http_method_get()); setHeaderFields(*request); - request->add_on_state_change_callback([this, reqid, family, infoState] - (http::Request::State state, const http::Response& response){ - if (state == http::Request::State::DONE) { - if (infoState->cancel.load()) + request->add_on_done_callback([this, reqid, family, infoState] (const http::Response& response){ + if (infoState->cancel.load()) + return; + if (response.status_code != 200) { + if (logger_) + logger_->e("[proxy:client] [status] ipv%i failed with code=%i", + family == AF_INET ? 4 : 6, response.status_code); + // pass along the failures + if ((family == AF_INET and infoState->ipv4 == 0) or (family == AF_INET6 and infoState->ipv6 == 0)) + onProxyInfos(Json::Value{}, family); + } else { + std::string err; + Json::Value proxyInfos; + auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader()); + if (!reader->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){ + onProxyInfos(Json::Value{}, family); return; - if (response.status_code != 200) { - if (logger_) - logger_->e("[proxy:client] [status] ipv%i failed with code=%i", - family == AF_INET ? 4 : 6, response.status_code); - // pass along the failures - if ((family == AF_INET and infoState->ipv4 == 0) or (family == AF_INET6 and infoState->ipv6 == 0)) - onProxyInfos(Json::Value{}, family); - } else { - std::string err; - Json::Value proxyInfos; - auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader()); - if (!reader->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){ - onProxyInfos(Json::Value{}, family); - return; - } - if (not infoState->cancel) - onProxyInfos(proxyInfos, family); - } - if (not isDestroying_) { - std::lock_guard<std::mutex> l(requestLock_); - requests_.erase(reqid); } + if (not infoState->cancel) + onProxyInfos(proxyInfos, family); + } + if (not isDestroying_) { + std::lock_guard<std::mutex> l(requestLock_); + requests_.erase(reqid); } }); @@ -898,20 +889,17 @@ DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& body["key"] = deviceKey_; body["client_id"] = pushClientId_; request->set_body(Json::writeString(jsonBuilder_, body)); - request->add_on_state_change_callback([this, reqid, key] - (http::Request::State state, const http::Response& response){ - if (state == http::Request::State::DONE){ - if (response.status_code != 200) { - if (logger_) - logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i", - key.to_c_str(), response.status_code); - if (not response.aborted and response.status_code == 0) - opFailed(); - } - if (not isDestroying_) { - std::lock_guard<std::mutex> l(requestLock_); - requests_.erase(reqid); - } + request->add_on_done_callback([this, reqid, key] (const http::Response& response){ + if (response.status_code != 200) { + if (logger_) + logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i", + key.to_c_str(), response.status_code); + if (not response.aborted and response.status_code == 0) + opFailed(); + } + if (not isDestroying_) { + std::lock_guard<std::mutex> l(requestLock_); + requests_.erase(reqid); } }); { @@ -1004,21 +992,18 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, opstate->ok.store(false); } }); - request->add_on_state_change_callback([this, opstate, reqid] - (http::Request::State state, const http::Response& response){ - if (state == http::Request::State::DONE){ - if (response.status_code != 200) { - if (logger_) - logger_->e("[proxy:client] [listen] send request #%i failed with code=%i", - reqid, response.status_code); - opstate->ok.store(false); - if (not response.aborted and response.status_code == 0) - opFailed(); - } - if (not isDestroying_) { - std::lock_guard<std::mutex> l(requestLock_); - requests_.erase(reqid); - } + request->add_on_done_callback([this, opstate, reqid] (const http::Response& response) { + if (response.status_code != 200) { + if (logger_) + logger_->e("[proxy:client] [listen] send request #%i failed with code=%i", + reqid, response.status_code); + opstate->ok.store(false); + if (not response.aborted and response.status_code == 0) + opFailed(); + } + if (not isDestroying_) { + std::lock_guard<std::mutex> l(requestLock_); + requests_.erase(reqid); } }); { diff --git a/src/http.cpp b/src/http.cpp index 4f53f6152a462db0af850f6d281c98c30b730c84..2548fd95d8d48f60ce0813788b71bd2cd509b5c2 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -450,9 +450,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu Json::StreamWriterBuilder wbuilder; set_method(restinio::http_method_post()); set_body(Json::writeString(wbuilder, json)); - add_on_state_change_callback([this, jsoncb](State state, const Response& response){ - if (state != Request::State::DONE) - return; + add_on_done_callback([this, jsoncb](const Response& response){ Json::Value json; if (response.status_code != 0) { std::string err; @@ -462,7 +460,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, const Json::Valu logger_->e("[http:request:%i] can't parse response to json", id_, err.c_str()); } if (jsoncb) - jsoncb(json, response.status_code); + jsoncb(std::move(json), response.status_code); }); } @@ -477,10 +475,7 @@ Request::Request(asio::io_context& ctx, const std::string& url, OnDoneCb onDone, : logger_(logger), id_(Request::ids_++), ctx_(ctx), resolver_(std::make_shared<Resolver>(ctx, url, logger)) { init_default_headers(); - add_on_state_change_callback([onDone](State state, const Response& response){ - if (state == Request::State::DONE) - onDone(response); - }); + add_on_done_callback(std::move(onDone)); } Request::Request(asio::io_context& ctx, const std::string& host, const std::string& service, @@ -660,8 +655,15 @@ Request::add_on_state_change_callback(OnStateChangeCb cb) { } void -Request::notify_state_change(const State state) -{ +Request::add_on_done_callback(OnDoneCb cb) { + add_on_state_change_callback([onDone=std::move(cb)](State state, const Response& response){ + if (state == Request::State::DONE) + onDone(response); + }); +} + +void +Request::notify_state_change(State state) { state_ = state; if (cbs_.on_state_change) cbs_.on_state_change(state, response_);