diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 809e91a15cbbf054dab93f0c7be5d9f8d468a901..d3ca82a07b73a72e3d22a1d33d4861483eeaa403 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -37,6 +37,7 @@ namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16; +static constexpr std::chrono::milliseconds RX_QUEUE_MAX_DELAY(500); static constexpr in_port_t PEER_DISCOVERY_PORT = 8888; static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht"; @@ -455,20 +456,35 @@ DhtRunner::loop_() // move to stack received = std::move(rcv); } + + // Discard old packets + size_t dropped {0}; + if (not received.empty()) { + auto now = clock::now(); + while (not received.empty() and now - received.front().received > RX_QUEUE_MAX_DELAY) { + received.pop(); + dropped++; + } + } + + // Handle packets if (not received.empty()) { while (not received.empty()) { auto& pck = received.front(); - auto delay = clock::now() - pck.received; - if (delay > std::chrono::milliseconds(500)) - std::cerr << "Dropping packet with high delay: " << print_dt(delay) << std::endl; + if (clock::now() - pck.received > RX_QUEUE_MAX_DELAY) + dropped++; else wakeup = dht->periodic(pck.data.data(), pck.data.size()-1, pck.from); received.pop(); } } else { + // Or just run the scheduler wakeup = dht->periodic(nullptr, 0, nullptr, 0); } + if (dropped) + std::cerr << "Dropped %zu packets with high delay" << dropped << std::endl; + NodeStatus nstatus4 = dht->getStatus(AF_INET); NodeStatus nstatus6 = dht->getStatus(AF_INET6); if (nstatus4 != status4 || nstatus6 != status6) {