Skip to content
Snippets Groups Projects
Commit aa611bde authored by Adrien Béraud's avatar Adrien Béraud Committed by Cyrille Béraud
Browse files

network engine: cleanup

parent 7af85650
No related branches found
No related tags found
No related merge requests found
...@@ -205,34 +205,35 @@ NetworkEngine::connectivityChanged(sa_family_t af) ...@@ -205,34 +205,35 @@ NetworkEngine::connectivityChanged(sa_family_t af)
} }
void void
NetworkEngine::requestStep(std::shared_ptr<Request> req) NetworkEngine::requestStep(std::shared_ptr<Request> sreq)
{ {
if (not req->pending()) { auto& req = *sreq;
if (req->cancelled()) if (not req.pending()) {
requests.erase(req->tid); if (req.cancelled())
requests.erase(req.tid);
return; return;
} }
auto now = scheduler.time(); auto now = scheduler.time();
if (req->isExpired(now)) { auto& node = *req.node;
DHT_LOG.ERR("[node %s] expired !", req->node->toString().c_str()); if (req.isExpired(now)) {
req->node->setExpired(); DHT_LOG.ERR("[node %s] expired !", node.toString().c_str());
requests.erase(req->tid); node.setExpired();
requests.erase(req.tid);
return; return;
} else if (req->attempt_count == 1) { } else if (req.attempt_count == 1) {
req->on_expired(*req, false); req.on_expired(req, false);
} }
send((char*)req->msg.data(), req->msg.size(), send((char*)req.msg.data(), req.msg.size(),
(req->node->reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM, (node.reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM,
req->node->addr); node.addr);
++req->attempt_count; ++req.attempt_count;
req->last_try = now; req.last_try = now;
std::weak_ptr<Request> wreq = req; std::weak_ptr<Request> wreq = sreq;
scheduler.add(req->last_try + Node::MAX_RESPONSE_TIME, [this,wreq]() { scheduler.add(req.last_try + Node::MAX_RESPONSE_TIME, [this,wreq] {
if (auto req = wreq.lock()) { if (auto req = wreq.lock())
requestStep(req); requestStep(req);
}
}); });
} }
...@@ -269,13 +270,9 @@ NetworkEngine::rateLimit(const SockAddr& addr) ...@@ -269,13 +270,9 @@ NetworkEngine::rateLimit(const SockAddr& addr)
} }
} }
// invoke per IP rate limiter
auto it = address_rate_limiter.emplace(addr, IpLimiter{}); auto it = address_rate_limiter.emplace(addr, IpLimiter{});
if (not it.first->second.limit(now)) // invoke per IP, then global rate limiter
return false; return it.first->second.limit(now) and rate_limiter.limit(now);
// invoke global limiter
return rate_limiter.limit(now);
} }
bool bool
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment