Skip to content
Snippets Groups Projects
Commit dc46c227 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

net: exponential backoff, randomize packet expiration time

parent 052ed1f5
Branches
Tags
No related merge requests found
......@@ -385,6 +385,8 @@ private:
std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
size_t listener_token {1};
std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
// timing
Scheduler scheduler;
Sp<Scheduler::Job> nextNodesConfirmation {};
......@@ -397,8 +399,6 @@ private:
using ReportedAddr = std::pair<unsigned, SockAddr>;
std::vector<ReportedAddr> reported_addr;
std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
std::string persistPath;
// are we a bootstrap node ?
......
......@@ -212,12 +212,13 @@ public:
using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
using RequestExpiredCb = std::function<void(const Request&, bool)>;
NetworkEngine(const Sp<Logger>& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock);
NetworkEngine(const Sp<Logger>& log, std::mt19937_64& rd, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock);
NetworkEngine(
InfoHash& myid,
NetworkConfig config,
std::unique_ptr<DatagramSocket>&& sock,
const Sp<Logger>& log,
std::mt19937_64& rd,
Scheduler& scheduler,
decltype(NetworkEngine::onError)&& onError,
decltype(NetworkEngine::onNewNode)&& onNewNode,
......@@ -526,6 +527,7 @@ private:
const NetworkConfig config {};
const std::unique_ptr<DatagramSocket> dht_socket;
Sp<Logger> logger_;
std::mt19937_64& rd;
NodeCache cache {};
......
......@@ -1739,11 +1739,11 @@ fromDhtConfig(const Config& config)
return netConf;
}
Dht::Dht() : store(), network_engine(logger_, scheduler, {}) {}
Dht::Dht() : store(), network_engine(logger_, rd, scheduler, {}) {}
Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l)
: DhtInterface(l), myid(config.node_id ? config.node_id : InfoHash::getRandom()), store(), store_quota(),
network_engine(myid, fromDhtConfig(config), std::move(sock), logger_, scheduler,
network_engine(myid, fromDhtConfig(config), std::move(sock), logger_, rd, scheduler,
std::bind(&Dht::onError, this, _1, _2),
std::bind(&Dht::onNewNode, this, _1, _2),
std::bind(&Dht::onReportedAddr, this, _1, _2),
......
......@@ -27,6 +27,7 @@
namespace dht {
namespace net {
using namespace std::chrono_literals;
const std::string DhtProtocolException::GET_NO_INFOHASH {"Get_values with no info_hash"};
const std::string DhtProtocolException::LISTEN_NO_INFOHASH {"Listen with no info_hash"};
......@@ -98,11 +99,15 @@ RequestAnswer::RequestAnswer(ParsedMessage&& msg)
nodes6(std::move(msg.nodes6))
{}
NetworkEngine::NetworkEngine(const Sp<Logger>& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock)
: myid(zeroes), dht_socket(std::move(sock)), logger_(log), rate_limiter((size_t)-1), scheduler(scheduler)
NetworkEngine::NetworkEngine(const Sp<Logger>& log, std::mt19937_64& rand, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock)
: myid(zeroes), dht_socket(std::move(sock)), logger_(log), rd(rand), rate_limiter((size_t)-1), scheduler(scheduler)
{}
NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c, std::unique_ptr<DatagramSocket>&& sock, const Sp<Logger>& log, Scheduler& scheduler,
NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c,
std::unique_ptr<DatagramSocket>&& sock,
const Sp<Logger>& log,
std::mt19937_64& rand,
Scheduler& scheduler,
decltype(NetworkEngine::onError)&& onError,
decltype(NetworkEngine::onNewNode)&& onNewNode,
decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
......@@ -121,7 +126,7 @@ NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c, std::unique_ptr<Da
onListen(std::move(onListen)),
onAnnounce(std::move(onAnnounce)),
onRefresh(std::move(onRefresh)),
myid(myid), config(c), dht_socket(std::move(sock)), logger_(log),
myid(myid), config(c), dht_socket(std::move(sock)), logger_(log), rd(rand),
rate_limiter(config.max_req_per_sec),
scheduler(scheduler)
{}
......@@ -270,12 +275,14 @@ NetworkEngine::requestStep(Sp<Request> sreq)
if (not node.id)
requests.erase(req.tid);
} else {
req.last_try = now;
if (err != EAGAIN) {
++req.attempt_count;
req.attempt_duration +=
req.attempt_duration + uniform_duration_distribution<>(0ms, ((duration)Node::MAX_RESPONSE_TIME)/4)(rd);
}
req.last_try = now;
std::weak_ptr<Request> wreq = sreq;
scheduler.add(req.last_try + Node::MAX_RESPONSE_TIME, [this,wreq] {
scheduler.add(req.last_try + req.attempt_duration, [this,wreq] {
if (auto req = wreq.lock())
requestStep(req);
});
......
......@@ -108,7 +108,7 @@ private:
static const constexpr size_t MAX_ATTEMPT_COUNT {3};
bool isExpired(time_point now) const {
return pending() and now > last_try + Node::MAX_RESPONSE_TIME and attempt_count >= Request::MAX_ATTEMPT_COUNT;
return pending() and now > last_try + attempt_duration and attempt_count >= Request::MAX_ATTEMPT_COUNT;
}
void clear() {
......@@ -122,6 +122,7 @@ private:
State state_ {State::PENDING};
unsigned attempt_count {0}; /* number of attempt to process the request. */
duration attempt_duration {((duration)Node::MAX_RESPONSE_TIME)/2};
time_point start {time_point::min()}; /* time when the request is created. */
time_point last_try {time_point::min()}; /* time of the last attempt to process the request. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment