Skip to content
Snippets Groups Projects
Commit 4a20814d authored by Adrien Béraud's avatar Adrien Béraud
Browse files

proxy client: let the server choose the value id

parent 707fb399
Branches
Tags
No related merge requests found
...@@ -68,6 +68,7 @@ struct DhtProxyClient::ProxySearch { ...@@ -68,6 +68,7 @@ struct DhtProxyClient::ProxySearch {
std::unique_ptr<asio::steady_timer> opExpirationTimer; std::unique_ptr<asio::steady_timer> opExpirationTimer;
std::map<size_t, Listener> listeners {}; std::map<size_t, Listener> listeners {};
std::map<Value::Id, PermanentPut> puts {}; std::map<Value::Id, PermanentPut> puts {};
std::set<Sp<Value>> pendingPuts {};
}; };
struct LineSplit { struct LineSplit {
...@@ -398,23 +399,22 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po ...@@ -398,23 +399,22 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po
if (logger_) if (logger_)
logger_->d("[proxy:client] [put] [search %s]", key.to_c_str()); logger_->d("[proxy:client] [put] [search %s]", key.to_c_str());
if (val->id == Value::INVALID_ID) {
crypto::random_device rdev;
val->id = std::uniform_int_distribution<Value::Id>{1}(rdev);
}
std::shared_ptr<std::atomic_bool> ok; std::shared_ptr<std::atomic_bool> ok;
if (permanent) { if (permanent) {
std::lock_guard<std::mutex> lock(searchLock_); std::lock_guard<std::mutex> lock(searchLock_);
auto id = val->id; ok = std::make_shared<std::atomic_bool>(true);
auto& search = searches_[key]; auto& search = searches_[key];
auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now() + if (val->id) {
proxy::OP_TIMEOUT - proxy::OP_MARGIN); auto id = val->id;
auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN);
refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
search.puts.erase(id); search.puts.erase(id);
ok = std::make_shared<std::atomic_bool>(true);
search.puts.emplace(std::piecewise_construct, search.puts.emplace(std::piecewise_construct,
std::forward_as_tuple(id), std::forward_as_tuple(id),
std::forward_as_tuple(val, std::move(refreshPutTimer), ok)); std::forward_as_tuple(val, std::move(refreshPutTimer), ok));
} else {
search.pendingPuts.emplace(val);
}
} }
doPut(key, val, [this, cb, ok](bool result){ doPut(key, val, [this, cb, ok](bool result){
if (ok) if (ok)
...@@ -499,9 +499,35 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, ...@@ -499,9 +499,35 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb,
} }
} }
request->set_body(Json::writeString(jsonBuilder_, json)); request->set_body(Json::writeString(jsonBuilder_, json));
request->add_on_done_callback([this, reqid, cb] (const http::Response& response){ request->add_on_done_callback([this, reqid, cb, val, key, permanent] (const http::Response& response){
bool ok = response.status_code == 200; bool ok = response.status_code == 200;
if (not ok) { if (ok) {
if (val->id == Value::INVALID_ID) {
std::string err;
Json::Value parsedValue;
if (jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &parsedValue, &err)){
auto id = dht::Value(parsedValue).id;
val->id = id;
if (permanent) {
std::lock_guard<std::mutex> lock(searchLock_);
auto& search = searches_[key];
auto it = search.pendingPuts.find(val);
if (it != search.pendingPuts.end()) {
auto sok = std::make_shared<std::atomic_bool>(ok);
auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN);
refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id));
search.puts.emplace(std::piecewise_construct,
std::forward_as_tuple(id),
std::forward_as_tuple(val, std::move(refreshPutTimer), sok));
search.pendingPuts.erase(it);
}
}
} else {
if (logger_)
logger_->e("[proxy:client] [status] failed to parse value from server", response.status_code);
}
}
} else {
if (logger_) if (logger_)
logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); logger_->e("[proxy:client] [status] failed with code=%i", response.status_code);
if (not response.aborted and response.status_code == 0) if (not response.aborted and response.status_code == 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment