diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index e68bfdb4d7b717ddbe9d3a670bad4adb55513fcb..24319cdfdc21102bf85777501de461e6a961e8bc 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -68,6 +68,7 @@ struct DhtProxyClient::ProxySearch { std::unique_ptr<asio::steady_timer> opExpirationTimer; std::map<size_t, Listener> listeners {}; std::map<Value::Id, PermanentPut> puts {}; + std::set<Sp<Value>> pendingPuts {}; }; struct LineSplit { @@ -398,23 +399,22 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po if (logger_) logger_->d("[proxy:client] [put] [search %s]", key.to_c_str()); - if (val->id == Value::INVALID_ID) { - crypto::random_device rdev; - val->id = std::uniform_int_distribution<Value::Id>{1}(rdev); - } std::shared_ptr<std::atomic_bool> ok; if (permanent) { std::lock_guard<std::mutex> lock(searchLock_); - auto id = val->id; - auto& search = searches_[key]; - auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now() + - proxy::OP_TIMEOUT - proxy::OP_MARGIN); - refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); - search.puts.erase(id); ok = std::make_shared<std::atomic_bool>(true); - search.puts.emplace(std::piecewise_construct, - std::forward_as_tuple(id), - std::forward_as_tuple(val, std::move(refreshPutTimer), ok)); + auto& search = searches_[key]; + if (val->id) { + auto id = val->id; + auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN); + refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); + search.puts.erase(id); + search.puts.emplace(std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple(val, std::move(refreshPutTimer), ok)); + } else { + search.pendingPuts.emplace(val); + } } doPut(key, val, [this, cb, ok](bool result){ if (ok) @@ -499,9 +499,35 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, } } request->set_body(Json::writeString(jsonBuilder_, json)); - request->add_on_done_callback([this, reqid, cb] (const http::Response& response){ + request->add_on_done_callback([this, reqid, cb, val, key, permanent] (const http::Response& response){ bool ok = response.status_code == 200; - if (not ok) { + if (ok) { + if (val->id == Value::INVALID_ID) { + std::string err; + Json::Value parsedValue; + if (jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &parsedValue, &err)){ + auto id = dht::Value(parsedValue).id; + val->id = id; + if (permanent) { + std::lock_guard<std::mutex> lock(searchLock_); + auto& search = searches_[key]; + auto it = search.pendingPuts.find(val); + if (it != search.pendingPuts.end()) { + auto sok = std::make_shared<std::atomic_bool>(ok); + auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN); + refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); + search.puts.emplace(std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple(val, std::move(refreshPutTimer), sok)); + search.pendingPuts.erase(it); + } + } + } else { + if (logger_) + logger_->e("[proxy:client] [status] failed to parse value from server", response.status_code); + } + } + } else { if (logger_) logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); if (not response.aborted and response.status_code == 0)