Commit 9075330a authored by Guillaume Roguez's avatar Guillaume Roguez Committed by Andreas Traczyk

tls: add out-of-order packet handler

Add a OOO packet handler to prevent such event on DTLS session.
This is a simple handler, with a 1s timeout for any OOO packet.

Change-Id: I2cab097acca7148fc86acb6e206ab3289ac98999
Reviewed-by: Andreas Traczyk's avatarAndreas Traczyk <andreas.traczyk@savoirfairelinux.com>
parent 3e87cc4d
......@@ -29,6 +29,7 @@
#include "logger.h"
#include "noncopyable.h"
#include "compiler_intrinsics.h"
#include "manager.h"
#include <gnutls/gnutls.h>
#include <gnutls/dtls.h>
......@@ -54,6 +55,8 @@ static constexpr uint8_t UDP_HEADER_SIZE = 8; // Size in bytes of UDP packet h
static constexpr uint8_t HEARTBEAT_RETRIES = 1; // Number of tries at each heartbeat ping send (1 + 1 if error)
static constexpr auto HEARTBEAT_RETRANS_TIMEOUT = std::chrono::milliseconds(700); // gnutls heartbeat retransmission timeout for each ping (in milliseconds)
static constexpr auto HEARTBEAT_TOTAL_TIMEOUT = HEARTBEAT_RETRANS_TIMEOUT * HEARTBEAT_RETRIES; // gnutls heartbeat time limit for heartbeat procedure (in milliseconds)
static constexpr int MISS_ORDERING_LIMIT = 32; // maximal accepted distance of out-of-order packet (note: must be a signed type)
static constexpr auto RX_OOO_TIMEOUT = std::chrono::milliseconds(1000);
// mtus array to test, do not add mtu over the interface MTU, this will result in false result due to packet fragmentation.
// also do not set over 16000 this will result in a gnutls error (unexpected packet size)
......@@ -212,6 +215,8 @@ TlsSession::TlsSession(const std::shared_ptr<IceTransport>& ice, int ice_comp_id
return len;
});
Manager::instance().registerEventHandler((uintptr_t)this, [this]{ flushRxQueue(); });
// Run FSM into dedicated thread
thread_.start();
}
......@@ -222,6 +227,8 @@ TlsSession::~TlsSession()
thread_.join();
socket_->setOnRecv(nullptr);
Manager::instance().unregisterEventHandler((uintptr_t)this);
}
const char*
......@@ -855,6 +862,84 @@ TlsSession::pathMtuHeartbeat()
RING_WARN("[TLS] Heartbeat PMTUD : new mtu set to %d", *mtuProbe_);
}
void
TlsSession::handleDataPacket(std::vector<uint8_t>&& buf, const uint8_t* seq_bytes)
{
uint64_t pkt_seq;
for (int i=0; i < 8; ++i)
pkt_seq = (pkt_seq << 8) + seq_bytes[i];
// Init/offset sequence number trackers
if (baseSeq_) {
pkt_seq -= baseSeq_;
} else {
baseSeq_ = pkt_seq - 1;
pkt_seq = 1; // start at 1 to have a positive seq_delta on first packet
gapOffset_ = 1;
}
// Check for a valid seq. num. delta
int64_t seq_delta = pkt_seq - lastRxSeq_;
if (seq_delta > 0) {
lastRxSeq_ = pkt_seq;
} else {
// too old?
if (seq_delta <= -MISS_ORDERING_LIMIT) {
RING_WARN("[dtls] drop old pkt: %lu", pkt_seq);
return;
}
// No duplicate check as DTLS prevents that for us (replay protection)
// accept Out-Of-Order pkt - will be reordered by queue flush operation
RING_WARN("[dtls] OOO pkt: %lu", pkt_seq);
}
std::lock_guard<std::mutex> lk {reorderBufMutex_};
if (reorderBuffer_.empty())
lastReadTime_ = clock::now();
reorderBuffer_.emplace(pkt_seq, std::move(buf));
}
///
/// Reorder and push received packet to upper layer
///
/// \note This method must be called continously, faster than RX_OOO_TIMEOUT
///
void
TlsSession::flushRxQueue()
{
std::unique_lock<std::mutex> lk {reorderBufMutex_};
if (reorderBuffer_.empty())
return;
auto item = std::begin(reorderBuffer_);
auto next_offset = item->first;
// Wait for next continous packet until timeou
if ((lastReadTime_ - clock::now()) >= RX_OOO_TIMEOUT) {
// OOO packet timeout - consider waited packets as lost
} else if (next_offset != gapOffset_)
return;
// Loop on offset-ordered received packet until a discontinuity in sequence number
while (item != std::end(reorderBuffer_) and item->first <= next_offset) {
auto pkt_offset = item->first;
auto& pkt = item->second;
if (callbacks_.onRxData) {
lk.unlock();
callbacks_.onRxData(std::move(pkt));
lk.lock();
}
next_offset = pkt_offset + 1;
item = reorderBuffer_.erase(item);
}
gapOffset_ = std::max(gapOffset_, next_offset);
lastReadTime_ = clock::now();
}
TlsSessionState
TlsSession::handleStateEstablished(TlsSessionState state)
......@@ -869,15 +954,13 @@ TlsSession::handleStateEstablished(TlsSessionState state)
// Handle RX data from network
if (!rxQueue_.empty()) {
std::vector<uint8_t> buf(INPUT_BUFFER_SIZE);
unsigned char sequence[8];
uint8_t seq[8];
lk.unlock();
auto ret = gnutls_record_recv_seq(session_, buf.data(), buf.size(), sequence);
auto ret = gnutls_record_recv_seq(session_, buf.data(), buf.size(), seq);
if (ret > 0 && pmtudOver_) {
buf.resize(ret);
// TODO: handle sequence re-order
if (callbacks_.onRxData)
callbacks_.onRxData(std::move(buf));
handleDataPacket(std::move(buf), seq);
return state;
} else if (ret == GNUTLS_E_HEARTBEAT_PING_RECEIVED) {
......
......@@ -43,6 +43,7 @@
#include <iterator>
#include <array>
#include <stdexcept>
#include <bitset>
namespace ring {
class IceTransport;
......@@ -215,12 +216,22 @@ private:
std::condition_variable rxCv_ {};
std::list<std::vector<uint8_t>> rxQueue_ {};
std::mutex reorderBufMutex_;
uint64_t baseSeq_ {0}; // sequence number of first application data packet received
uint64_t lastRxSeq_ {0}; // last received and valid packet sequence number
uint64_t gapOffset_ {1}; // offset of first byte not received yet (start at 1)
clock::time_point lastReadTime_;
std::map<uint64_t, std::vector<uint8_t>> reorderBuffer_ {};
ssize_t send_(const uint8_t* tx_data, std::size_t tx_size);
ssize_t sendRaw(const void*, size_t);
ssize_t sendRawVec(const giovec_t*, int);
ssize_t recvRaw(void*, size_t);
int waitForRawData(unsigned);
void handleDataPacket(std::vector<uint8_t>&&, const uint8_t*);
void flushRxQueue();
// Statistics
std::atomic<std::size_t> stRxRawPacketCnt_ {0};
std::atomic<std::size_t> stRxRawBytesCnt_ {0};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment