diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 363d287306fd0a26efaeaa4df763a517968282e7..2c0c3fb45c1a54581d4edb31d8c5920b185ff69c 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -308,9 +308,8 @@ private: void handleResubscribe(const asio::error_code& ec, const InfoHash& key, const size_t token, std::shared_ptr<OperationState> opstate); - void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent); - void handleRefreshPut(const asio::error_code& ec, const InfoHash& key, const Value::Id id, - std::shared_ptr<std::atomic_bool> ok); + void doPut(const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created, bool permanent); + void handleRefreshPut(const asio::error_code& ec, InfoHash key, Value::Id id); /** * Initialize statusIpvX_ diff --git a/src/dht.cpp b/src/dht.cpp index 659435a5bb2e1aaa736b5b28960d50c3ab731d7d..1ac6ddd7702a9e4e3cf37a2f1d96c8e4a4e07512 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -921,8 +921,7 @@ Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point cr } if (val->id == Value::INVALID_ID) { crypto::random_device rdev; - std::uniform_int_distribution<Value::Id> rand_id {}; - val->id = rand_id(rdev); + val->id = std::uniform_int_distribution<Value::Id>{1}(rdev); } scheduler.syncTime(); const auto& now = scheduler.time(); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index d026619d5819919c0709356484597d3ca75a2f98..d1a82f599e217aa21505651e13b8a8b6dfb9a607 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -360,35 +360,43 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po if (val->id == Value::INVALID_ID) { crypto::random_device rdev; - std::uniform_int_distribution<Value::Id> rand_id {}; - val->id = rand_id(rdev); + val->id = std::uniform_int_distribution<Value::Id>{1}(rdev); } + std::shared_ptr<std::atomic_bool> ok; if (permanent) { std::lock_guard<std::mutex> lock(searchLock_); auto id = val->id; auto& search = searches_[key]; auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); - auto ok = std::make_shared<std::atomic_bool>(); - ok->store(true); - refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, - std::placeholders::_1, key, id, ok)); + refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); search.puts.erase(id); + ok = std::make_shared<std::atomic_bool>(true); search.puts.emplace(std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(val, std::move(refreshPutTimer), ok)); } - doPut(key, val, std::move(cb), created, permanent); + doPut(key, val, [this, cb, ok](bool result){ + if (ok) + *ok = result; + { + std::lock_guard<std::mutex> lock(lockCallbacks_); + callbacks_.emplace_back([cb, result](){ + cb(result, {}); + }); + } + loopSignal_(); + }, created, permanent); } + void -DhtProxyClient::handleRefreshPut(const asio::error_code &ec, const InfoHash& key, const Value::Id id, - std::shared_ptr<std::atomic_bool> ok) +DhtProxyClient::handleRefreshPut(const asio::error_code &ec, InfoHash key, Value::Id id) { if (ec == asio::error::operation_aborted) return; else if (ec){ if (logger_) - logger_->e("[proxy:client] [listener] [refresh %s] %s", key.toString().c_str(), ec.message().c_str()); + logger_->e("[proxy:client] [put] [refresh %s] %s", key.toString().c_str(), ec.message().c_str()); return; } if (logger_) @@ -398,12 +406,11 @@ DhtProxyClient::handleRefreshPut(const asio::error_code &ec, const InfoHash& key if (search != searches_.end()) { auto p = search->second.puts.find(id); if (p != search->second.puts.end()){ - doPut(key, p->second.value, [ok](bool result, const std::vector<std::shared_ptr<dht::Node>>&){ + doPut(key, p->second.value, [ok = p->second.ok](bool result){ *ok = result; }, time_point::max(), true); - p->second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + - proxy::OP_TIMEOUT - proxy::OP_MARGIN); - p->second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id, ok)); + p->second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); + p->second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); } } } @@ -426,7 +433,7 @@ DhtProxyClient::buildRequest(const std::string& target) } void -DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent) +DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, time_point /*created*/, bool permanent) { if (logger_) logger_->d("[proxy:client] [put] [search %s] executing for %s", key.to_c_str(), val->toString().c_str()); @@ -452,28 +459,18 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ } } request->set_body(Json::writeString(jsonBuilder_, json)); - - auto ok = std::make_shared<std::atomic_bool>(); - ok->store(true); - - request->add_on_state_change_callback([this, reqid, ok, cb, key] + request->add_on_state_change_callback([this, reqid, cb] (http::Request::State state, const http::Response& response){ if (state == http::Request::State::DONE){ - if (response.status_code != 200) { + bool ok = response.status_code == 200; + if (not ok) { if (logger_) logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); - ok->store(false); if (response.status_code == 0) opFailed(); } if (cb){ - { - std::lock_guard<std::mutex> lock(lockCallbacks_); - callbacks_.emplace_back([cb, ok](){ - cb(ok->load(), {}); - }); - } - loopSignal_(); + cb(ok); } requests_.erase(reqid); } @@ -1018,17 +1015,15 @@ DhtProxyClient::restartListeners() auto key = search.first; for (auto& put : search.second.puts) { if (!*put.second.ok) { - auto ok = put.second.ok; - doPut(key, put.second.value, [ok](bool result, const std::vector<std::shared_ptr<dht::Node>>&){ + doPut(key, put.second.value, [ok = put.second.ok](bool result){ *ok = result; }, time_point::max(), true); if (!put.second.refreshPutTimer){ put.second.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_); } - put.second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + - proxy::OP_TIMEOUT - proxy::OP_MARGIN); + put.second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); put.second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, - std::placeholders::_1, key, put.first, put.second.ok)); + std::placeholders::_1, key, put.first)); } } } @@ -1100,7 +1095,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string put.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now()); else put.refreshPutTimer->expires_at(std::chrono::steady_clock::now()); - put.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, vid, put.ok)); + put.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, vid)); } else { // Refresh listen for (auto& list : search.listeners) diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index e2b5bc08088df80641daee7fd4c37ed41c006bcd..9acaf7a1f09ffb9cee5029047d7ec755f3d9d619 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -951,14 +951,12 @@ DhtProxyServer::put(restinio::request_handle_t request, infoHash = dht::InfoHash::get(params["hash"].to_string()); if (!dht_){ - auto response = this->initHttpResponse( - request->create_response(restinio::status_service_unavailable())); + auto response = initHttpResponse(request->create_response(restinio::status_service_unavailable())); response.set_body(RESP_MSG_SERVICE_UNAVAILABLE); return response.done(); } else if (request->body().empty()){ - auto response = this->initHttpResponse( - request->create_response(restinio::status_bad_request())); + auto response = initHttpResponse(request->create_response(restinio::status_bad_request())); response.set_body(RESP_MSG_MISSING_PARAMS); return response.done(); }