Commit 4325a016 authored by Adrien Béraud's avatar Adrien Béraud

waitForData: use std::chrono::milliseconds for timeout

Change-Id: I6350f39522dcdc6f844daca50d3b9f2ded3913cd
parent be8241d0
......@@ -24,6 +24,7 @@
#include <functional>
#include <vector>
#include <chrono>
#include <system_error>
#include <cstdint>
......@@ -72,7 +73,7 @@ public:
/// \note error code is not set in case of timeout, but set only in case of io error
/// (i.e. socket deconnection).
/// \todo make a std::chrono version for the timeout
virtual int waitForData(unsigned ms_timeout, std::error_code& ec) const = 0;
virtual int waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const = 0;
/// Write a given amount of data.
/// \param buf data to write.
......
......@@ -47,7 +47,7 @@ class IceSocket
void close();
ssize_t recv(unsigned char* buf, size_t len);
ssize_t send(const unsigned char* buf, size_t len);
ssize_t waitForData(unsigned int timeout);
ssize_t waitForData(std::chrono::milliseconds timeout);
void setOnRecv(IceRecvCb cb);
uint16_t getTransportOverhead();
};
......@@ -82,7 +82,7 @@ public:
int maxPayload() const override;
int waitForData(unsigned ms_timeout, std::error_code& ec) const override;
int waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const override;
std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override;
......
......@@ -1322,10 +1322,10 @@ IceTransport::send(int comp_id, const unsigned char* buf, size_t len)
}
int
IceTransport::waitForInitialization(unsigned timeout)
IceTransport::waitForInitialization(std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lk(pimpl_->iceMutex_);
if (!pimpl_->iceCV_.wait_for(lk, std::chrono::seconds(timeout),
if (!pimpl_->iceCV_.wait_for(lk, timeout,
[this]{ return pimpl_->_isInitialized() or pimpl_->_isFailed(); })) {
JAMI_WARN("[ice:%p] waitForInitialization: timeout", this);
return -1;
......@@ -1334,10 +1334,10 @@ IceTransport::waitForInitialization(unsigned timeout)
}
int
IceTransport::waitForNegotiation(unsigned timeout)
IceTransport::waitForNegotiation(std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lk(pimpl_->iceMutex_);
if (!pimpl_->iceCV_.wait_for(lk, std::chrono::seconds(timeout),
if (!pimpl_->iceCV_.wait_for(lk, timeout,
[this]{ return pimpl_->_isRunning() or pimpl_->_isFailed(); })) {
JAMI_WARN("[ice:%p] waitForIceNegotiation: timeout", this);
return -1;
......@@ -1352,9 +1352,9 @@ IceTransport::isDataAvailable(int comp_id)
}
ssize_t
IceTransport::waitForData(int comp_id, unsigned int timeout, std::error_code& ec)
IceTransport::waitForData(int comp_id, std::chrono::milliseconds timeout, std::error_code& ec)
{
return pimpl_->peerChannels_.at(comp_id).wait(std::chrono::milliseconds(timeout));
return pimpl_->peerChannels_.at(comp_id).wait(timeout);
}
std::vector<SDP>
......@@ -1478,10 +1478,10 @@ IceSocketTransport::maxPayload() const
}
int
IceSocketTransport::waitForData(unsigned ms_timeout, std::error_code& ec) const
IceSocketTransport::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
if (!ice_->isRunning()) return -1;
return ice_->waitForData(compId_, ms_timeout, ec);
return ice_->waitForData(compId_, timeout, ec);
}
std::size_t
......@@ -1553,7 +1553,7 @@ IceSocket::send(const unsigned char* buf, size_t len)
}
ssize_t
IceSocket::waitForData(unsigned int timeout)
IceSocket::waitForData(std::chrono::milliseconds timeout)
{
if (!ice_transport_.get())
return -1;
......
......@@ -188,11 +188,11 @@ public:
ssize_t send(int comp_id, const unsigned char* buf, size_t len);
int waitForInitialization(unsigned timeout);
int waitForInitialization(std::chrono::milliseconds timeout);
int waitForNegotiation(unsigned timeout);
int waitForNegotiation(std::chrono::milliseconds timeout);
ssize_t waitForData(int comp_id, unsigned int timeout, std::error_code& ec);
ssize_t waitForData(int comp_id, std::chrono::milliseconds timeout, std::error_code& ec);
/**
* Return without waiting how many bytes are ready to read
......
......@@ -45,11 +45,12 @@
namespace jami {
static constexpr auto DHT_MSG_TIMEOUT = std::chrono::seconds(30);
static constexpr auto NET_CONNECTION_TIMEOUT = std::chrono::seconds(10);
static constexpr auto SOCK_TIMEOUT = std::chrono::seconds(3);
static constexpr auto ICE_READY_TIMEOUT = std::chrono::seconds(10);
static constexpr int ICE_INIT_TIMEOUT{10};
static constexpr std::chrono::seconds DHT_MSG_TIMEOUT{30};
static constexpr std::chrono::seconds NET_CONNECTION_TIMEOUT{10};
static constexpr std::chrono::seconds SOCK_TIMEOUT{3};
static constexpr std::chrono::seconds ICE_READY_TIMEOUT{10};
static constexpr std::chrono::seconds ICE_INIT_TIMEOUT{10};
static constexpr std::chrono::seconds ICE_NOGOTIATION_TIMEOUT{10};
using Clock = std::chrono::system_clock;
using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
......@@ -432,7 +433,7 @@ private:
break;
}
parent_.ice_->waitForNegotiation(10);
parent_.ice_->waitForNegotiation(ICE_NOGOTIATION_TIMEOUT);
if (parent_.ice_->isRunning()) {
peer_ep = std::make_shared<IceSocketEndpoint>(parent_.ice_, true);
JAMI_DBG("[Account:%s] ICE negotiation succeed. Starting file transfer",
......@@ -743,7 +744,7 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
}
if (!hasPubIp) {
ice_->waitForNegotiation(10);
ice_->waitForNegotiation(ICE_NOGOTIATION_TIMEOUT);
if (ice_->isRunning()) {
sendIce = true;
JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", account.getAccountID().c_str());
......@@ -793,9 +794,8 @@ DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request,
request.from, request.respond(addresses));
if (sendIce) {
if (hasPubIp) {
ice_->waitForNegotiation(10);
ice_->waitForNegotiation(ICE_NOGOTIATION_TIMEOUT);
if (ice_->isRunning()) {
JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", account.getAccountID().c_str());
} else {
......
......@@ -729,7 +729,7 @@ SipsIceTransport::eventLoop()
{
while(!stopLoop_) {
std::error_code err;
if (tls_ && tls_->waitForData(100, err)) {
if (tls_ && tls_->waitForData(std::chrono::milliseconds(100), err)) {
std::vector<uint8_t> pkt;
pkt.resize(PJSIP_MAX_PKT_LEN);
auto read = tls_->read(pkt.data(), PJSIP_MAX_PKT_LEN, err);
......
......@@ -111,7 +111,7 @@ using ConferenceMap = std::map<std::string, std::shared_ptr<Conference>>;
/** To store uniquely a list of Call ids */
using CallIDSet = std::set<std::string>;
static constexpr int ICE_INIT_TIMEOUT {10};
static constexpr std::chrono::seconds ICE_INIT_TIMEOUT{10};
static constexpr const char* PACKAGE_OLD = "ring";
std::atomic_bool Manager::initialized = {false};
......
......@@ -135,7 +135,7 @@ TlsTurnEndpoint::Impl::verifyCertificate(gnutls_session_t session)
}
void
TlsTurnEndpoint::Impl::onTlsStateChange(tls::TlsSessionState state)
TlsTurnEndpoint::Impl::onTlsStateChange(tls::TlsSessionState)
{}
void
......@@ -221,9 +221,9 @@ TlsTurnEndpoint::peerCertificate() const
}
int
TlsTurnEndpoint::waitForData(unsigned ms_timeout, std::error_code& ec) const
TlsTurnEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
return pimpl_->tls->waitForData(ms_timeout, ec);
return pimpl_->tls->waitForData(timeout, ec);
}
//==============================================================================
......@@ -249,9 +249,9 @@ TcpSocketEndpoint::~TcpSocketEndpoint()
}
void
TcpSocketEndpoint::connect(const std::chrono::steady_clock::duration& timeout)
TcpSocketEndpoint::connect(const std::chrono::milliseconds& timeout)
{
int ms = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
int ms = timeout.count();
setsockopt(sock_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&ms, sizeof(ms));
setsockopt(sock_, SOL_SOCKET, SO_SNDTIMEO, (const char *)&ms, sizeof(ms));
......@@ -260,12 +260,12 @@ TcpSocketEndpoint::connect(const std::chrono::steady_clock::duration& timeout)
}
int
TcpSocketEndpoint::waitForData(unsigned ms_timeout, std::error_code& ec) const
TcpSocketEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
for (;;) {
struct timeval tv;
tv.tv_sec = ms_timeout / 1000;
tv.tv_usec = (ms_timeout % 1000) * 1000;
tv.tv_sec = timeout.count() / 1000;
tv.tv_usec = (timeout.count() % 1000) * 1000;
fd_set read_fds;
FD_ZERO(&read_fds);
......@@ -331,11 +331,11 @@ IceSocketEndpoint::shutdown() {
}
int
IceSocketEndpoint::waitForData(unsigned ms_timeout, std::error_code& ec) const
IceSocketEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
if (ice_) {
if (!ice_->isRunning()) return -1;
return iceIsSender ? ice_->isDataAvailable(compId_) : ice_->waitForData(compId_, ms_timeout, ec);
return iceIsSender ? ice_->isDataAvailable(compId_) : ice_->waitForData(compId_, timeout, ec);
}
return -1;
}
......@@ -531,15 +531,15 @@ TlsSocketEndpoint::write(const ValueType* buf, std::size_t len, std::error_code&
}
void
TlsSocketEndpoint::waitForReady(const std::chrono::steady_clock::duration& timeout)
TlsSocketEndpoint::waitForReady(const std::chrono::milliseconds& timeout)
{
pimpl_->tls->waitForReady(timeout);
}
int
TlsSocketEndpoint::waitForData(unsigned ms_timeout, std::error_code& ec) const
TlsSocketEndpoint::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
return pimpl_->tls->waitForData(ms_timeout, ec);
return pimpl_->tls->waitForData(timeout, ec);
}
//==============================================================================
......@@ -670,7 +670,7 @@ PeerConnection::PeerConnectionImpl::eventLoop()
msg = ctrlChannel.receive();
} else {
std::error_code ec;
if (endpoint_->waitForData(100, ec) > 0) {
if (endpoint_->waitForData(std::chrono::milliseconds(100), ec) > 0) {
std::vector<uint8_t> buf(IO_BUFFER_SIZE);
JAMI_DBG("A good buffer arrived before any input or output attachment");
auto size = endpoint_->read(buf, ec);
......@@ -734,7 +734,7 @@ PeerConnection::PeerConnectionImpl::eventLoop()
if (!bufferPool_.empty()) {
stream->write(bufferPool_);
bufferPool_.clear();
} else if (endpoint_->waitForData(0, ec) > 0) {
} else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
endpoint_->read(buf, ec);
if (ec)
......@@ -762,7 +762,7 @@ PeerConnection::PeerConnectionImpl::eventLoop()
if (!bufferPool_.empty()) {
stream->write(bufferPool_);
bufferPool_.clear();
} else if (endpoint_->waitForData(0, ec) > 0) {
} else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
endpoint_->read(buf, ec);
if (ec)
......
......@@ -96,8 +96,7 @@ public:
void setOnRecv(RecvCb&&) override {
throw std::logic_error("TlsTurnEndpoint::setOnRecv not implemented");
}
int waitForData(unsigned, std::error_code&) const override;
int waitForData(std::chrono::milliseconds, std::error_code&) const override;
void waitForReady(const std::chrono::steady_clock::duration& timeout = {});
const dht::crypto::Certificate& peerCertificate() const;
......@@ -112,7 +111,7 @@ private:
class AbstractSocketEndpoint : public GenericSocket<uint8_t>
{
public:
virtual void connect(const std::chrono::steady_clock::duration &timeout = {}) {};
virtual void connect(const std::chrono::milliseconds& = {}) {};
void setOnRecv(RecvCb &&) override {
throw std::logic_error("AbstractSocketEndpoint::setOnRecv not implemented");
......@@ -130,10 +129,10 @@ public:
bool isReliable() const override { return true; }
bool isInitiator() const override { return true; }
int maxPayload() const override { return 1280; }
int waitForData(unsigned ms_timeout, std::error_code& ec) const override;
int waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const override;
std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override;
std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override;
void connect(const std::chrono::steady_clock::duration& timeout = {}) override;
void connect(const std::chrono::milliseconds& timeout = {}) override;
private:
const IpAddr addr_;
......@@ -151,7 +150,7 @@ public:
bool isReliable() const override { return ice_ ? ice_->isRunning() : false; }
bool isInitiator() const override { return ice_ ? ice_->isInitiator() : true; }
int maxPayload() const override { return 65536 /* The max for a RTP packet used to wrap data here */; }
int waitForData(unsigned ms_timeout, std::error_code& ec) const override;
int waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const override;
std::size_t read(ValueType* buf, std::size_t len, std::error_code& ec) override;
std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override;
......@@ -198,9 +197,9 @@ public:
void setOnRecv(RecvCb&&) override {
throw std::logic_error("TlsSocketEndpoint::setOnRecv not implemented");
}
int waitForData(unsigned, std::error_code&) const override;
int waitForData(std::chrono::milliseconds timeout, std::error_code&) const override;
void waitForReady(const std::chrono::steady_clock::duration& timeout = {});
void waitForReady(const std::chrono::milliseconds& timeout = {});
private:
class Impl;
......
......@@ -217,7 +217,7 @@ public:
ssize_t sendRaw(const void*, size_t);
ssize_t sendRawVec(const giovec_t*, int);
ssize_t recvRaw(void*, size_t);
int waitForRawData(unsigned);
int waitForRawData(std::chrono::milliseconds);
bool initFromRecordState(int offset=0);
void handleDataPacket(std::vector<ValueType>&&, uint64_t);
......@@ -531,7 +531,7 @@ TlsSession::TlsSessionImpl::commonSessionInit()
gnutls_transport_set_pull_timeout_function(session_,
[](gnutls_transport_ptr_t t, unsigned ms) -> int {
auto this_ = reinterpret_cast<TlsSessionImpl*>(t);
return this_->waitForRawData(ms);
return this_->waitForRawData(std::chrono::milliseconds(ms));
});
return true;
......@@ -660,7 +660,7 @@ TlsSession::TlsSessionImpl::recvRaw(void* buf, size_t size)
// 'timeout' is in milliseconds.
// Should return 0 on timeout, a positive number if data are available for read, or -1 on error.
int
TlsSession::TlsSessionImpl::waitForRawData(unsigned timeout)
TlsSession::TlsSessionImpl::waitForRawData(std::chrono::milliseconds timeout)
{
if (transport_.isReliable()) {
std::error_code ec;
......@@ -677,13 +677,13 @@ TlsSession::TlsSessionImpl::waitForRawData(unsigned timeout)
// non-reliable uses callback installed with setOnRecv()
std::unique_lock<std::mutex> lk {rxMutex_};
rxCv_.wait_for(lk, std::chrono::milliseconds(timeout), [this]{ return !rxQueue_.empty() or state_ == TlsSessionState::SHUTDOWN; });
rxCv_.wait_for(lk, timeout, [this]{ return !rxQueue_.empty() or state_ == TlsSessionState::SHUTDOWN; });
if (state_ == TlsSessionState::SHUTDOWN) {
gnutls_transport_set_errno(session_, EINTR);
return -1;
}
if (rxQueue_.empty()) {
JAMI_ERR("[TLS] waitForRawData: timeout after %u ms", timeout);
JAMI_ERR("[TLS] waitForRawData: timeout after %u ms", timeout.count());
return 0;
}
return 1;
......@@ -1355,9 +1355,9 @@ TlsSession::waitForReady(const std::chrono::steady_clock::duration& timeout)
}
int
TlsSession::waitForData(unsigned ms_timeout, std::error_code& ec) const
TlsSession::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
if (!pimpl_->transport_.waitForData(ms_timeout, ec))
if (!pimpl_->transport_.waitForData(timeout, ec))
return 0;
return 1;
}
......
......@@ -139,7 +139,7 @@ public:
/// Return a positive number for number of bytes read, or 0 and \a ec set in case of error.
std::size_t read(ValueType* data, std::size_t size, std::error_code& ec) override;
int waitForData(unsigned, std::error_code&) const override;
int waitForData(std::chrono::milliseconds, std::error_code&) const override;
private:
class TlsSessionImpl;
......
......@@ -66,8 +66,8 @@ getVideoSettings()
}
#endif
static constexpr int DEFAULT_ICE_INIT_TIMEOUT {35}; // seconds
static constexpr int DEFAULT_ICE_NEGO_TIMEOUT {60}; // seconds
static constexpr std::chrono::seconds DEFAULT_ICE_INIT_TIMEOUT {35}; // seconds
static constexpr std::chrono::seconds DEFAULT_ICE_NEGO_TIMEOUT {60}; // seconds
// SDP media Ids
static constexpr int SDP_AUDIO_MEDIA_ID {0};
......
......@@ -460,13 +460,13 @@ TurnTransport::peerAddresses() const
}
int
TurnTransport::waitForData(const IpAddr& peer, unsigned ms_timeout, std::error_code& ec) const
TurnTransport::waitForData(const IpAddr& peer, std::chrono::milliseconds timeout, std::error_code& ec) const
{
(void)ec; ///< \todo handle errors
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(peer);
lk.unlock();
return channel.wait(std::chrono::milliseconds(ms_timeout));
return channel.wait(timeout);
}
//==============================================================================
......@@ -483,9 +483,9 @@ ConnectedTurnTransport::shutdown()
}
int
ConnectedTurnTransport::waitForData(unsigned ms_timeout, std::error_code& ec) const
ConnectedTurnTransport::waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const
{
return turn_.waitForData(peer_, ms_timeout, ec);
return turn_.waitForData(peer_, timeout, ec);
}
std::size_t
......
......@@ -135,7 +135,7 @@ public:
///
bool sendto(const IpAddr& peer, const char* const buffer, std::size_t size);
int waitForData(const IpAddr& peer, unsigned ms_timeout, std::error_code& ec) const;
int waitForData(const IpAddr& peer, std::chrono::milliseconds timeout, std::error_code& ec) const;
public:
// Move semantic only, not copiable
......@@ -159,7 +159,7 @@ public:
bool isInitiator() const override { return turn_.isInitiator(); }
int maxPayload() const override { return 3000; }
int waitForData(unsigned ms_timeout, std::error_code& ec) const override;
int waitForData(std::chrono::milliseconds timeout, std::error_code& ec) const override;
std::size_t read(ValueType* buf, std::size_t length, std::error_code& ec) override;
std::size_t write(const ValueType* buf, std::size_t length, std::error_code& ec) override;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment