Skip to content
Snippets Groups Projects
Commit 2df2f02d authored by Sébastien Blin's avatar Sébastien Blin
Browse files

pjproject: send keep alive on socket and add callback

Change-Id: I8d99d7f229e25f970a2a12cd7d2a2bee69017a5a
parent eb0fb2bd
Branches
No related tags found
No related merge requests found
pjlib/src/pj/sock_bsd.c | 31 +++++++++++++++++++++++++++++++
pjnath/include/pjnath/ice_session.h | 4 ++++
pjnath/include/pjnath/ice_strans.h | 7 +++++++
pjnath/src/pjnath/ice_session.c | 6 ++++++
pjnath/src/pjnath/ice_strans.c | 11 +++++++++++
5 files changed, 59 insertions(+)
diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c
index e416991d..9db16b93 100644
--- a/pjlib/src/pj/sock_bsd.c
+++ b/pjlib/src/pj/sock_bsd.c
@@ -28,6 +28,15 @@
#define THIS_FILE "sock_bsd.c"
+#if !defined(PJ_WIN32) && !defined(PJ_WIN64)
+# if !defined(SOL_TCP) && defined(IPPROTO_TCP)
+# define SOL_TCP IPPROTO_TCP
+# endif
+# if !defined(TCP_KEEPIDLE) && defined(TCP_KEEPALIVE)
+# define TCP_KEEPIDLE TCP_KEEPALIVE
+# endif
+#endif
+
/*
* Address families conversion.
* The values here are indexed based on pj_addr_family.
@@ -517,6 +526,20 @@ PJ_DEF(pj_status_t) pj_sock_socket(int af,
if (rc==SOCKET_ERROR) {
// Ignored..
}
+ } else if(type == pj_SOCK_STREAM()) {
+#ifndef SIO_KEEPALIVE_VALS
+# define SIO_KEEPALIVE_VALS _WSAIOW(IOC_VENDOR, 4)
+#endif
+ DWORD dwBytesReturned = 0;
+ struct tcp_keepalive {
+ ULONG onoff;
+ ULONG keepalivetime;
+ ULONG keepaliveinterval;
+ } vals = { TRUE, 30000, 30000 };
+ WSAIoctl(*sock, SIO_KEEPALIVE_VALS,
+ &vals, sizeof(vals),
+ NULL, 0, &dwBytesReturned,
+ NULL, NULL);
}
#endif
@@ -548,6 +571,14 @@ PJ_DEF(pj_status_t) pj_sock_socket(int af,
if (type == pj_SOCK_STREAM()) {
pj_sock_setsockopt(*sock, pj_SOL_SOCKET(), pj_SO_NOSIGPIPE(),
&val, sizeof(val));
+ uint32_t val = 1;
+ setsockopt(*sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(uint32_t));
+ uint32_t keepint = 30; // seconds
+ setsockopt(*sock, SOL_TCP, TCP_KEEPIDLE, &keepint, sizeof(uint32_t));
+ keepint = 30; // seconds
+ setsockopt(*sock, SOL_TCP, TCP_KEEPINTVL, &keepint, sizeof(uint32_t));
+ uint32_t cnt = 1;
+ setsockopt(*sock, SOL_TCP, TCP_KEEPCNT, &cnt, sizeof(uint32_t));
}
#if defined(PJ_SOCK_HAS_IPV6_V6ONLY) && PJ_SOCK_HAS_IPV6_V6ONLY != 0
if (af == PJ_AF_INET6) {
diff --git a/pjnath/include/pjnath/ice_session.h b/pjnath/include/pjnath/ice_session.h
index 77e1278d..141d5b3e 100644
--- a/pjnath/include/pjnath/ice_session.h
+++ b/pjnath/include/pjnath/ice_session.h
@@ -630,6 +630,10 @@ typedef struct pj_ice_sess_cb
pj_status_t (*close_tcp_connection)(pj_ice_sess *ice,
pj_ice_sess_checklist *clist,
unsigned check_id);
+ /**
+ * If an internal TCP keep alive, this mount the error to the application
+ */
+ void (*on_ice_destroy)(pj_ice_sess *ice);
} pj_ice_sess_cb;
diff --git a/pjnath/include/pjnath/ice_strans.h b/pjnath/include/pjnath/ice_strans.h
index afaddce2..de14be0b 100644
--- a/pjnath/include/pjnath/ice_strans.h
+++ b/pjnath/include/pjnath/ice_strans.h
@@ -182,6 +182,13 @@ typedef struct pj_ice_strans_cb
void (*on_data_sent)(pj_ice_strans *ice_st, unsigned comp_id,
pj_ssize_t size);
+ /**
+ * This callback is called if an internal operation fails
+ *
+ * @param ice_st The ICE stream transport.
+ */
+ void (*on_destroy)(pj_ice_strans *ice_st);
+
} pj_ice_strans_cb;
diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index 7373cdf3..6e47ca9d 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -1413,6 +1413,12 @@ static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now)
PJ_FALSE, PJ_FALSE,
&the_check->rcand->addr,
addr_len, tdata);
+ if (status == 120032) {
+ if (ice->cb.on_ice_destroy) {
+ ice->cb.on_ice_destroy(ice);
+ }
+ return;
+ }
/* Restore FINGERPRINT usage */
pj_stun_session_use_fingerprint(comp->stun_sess, saved);
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c
index e758ad84..5537c7a0 100644
--- a/pjnath/src/pjnath/ice_strans.c
+++ b/pjnath/src/pjnath/ice_strans.c
@@ -90,6 +90,7 @@ static pj_uint8_t srflx_pref_table[PJ_ICE_CAND_TYPE_MAX] =
/* ICE callbacks */
+static void on_ice_destroy(pj_ice_sess *ice);
static void on_ice_complete(pj_ice_sess *ice, pj_status_t status);
static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
unsigned comp_id,
@@ -1174,6 +1175,7 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st,
ice_cb.on_rx_data = &ice_rx_data;
ice_cb.on_tx_pkt = &ice_tx_pkt;
#if PJ_HAS_TCP
+ ice_cb.on_ice_destroy = &on_ice_destroy;
ice_cb.wait_tcp_connection = &ice_wait_tcp_connection;
ice_cb.select_turn_dataconn = &ice_select_turn_dataconn;
ice_cb.reconnect_tcp_connection = &ice_reconnect_tcp_connection;
@@ -1700,6 +1702,15 @@ pj_ice_strans_sendto2(pj_ice_strans *ice_st, unsigned comp_id, const void *data,
return PJ_EINVALIDOP;
}
+static void on_ice_destroy(pj_ice_sess *ice)
+{
+ pj_ice_strans *ice_st = (pj_ice_strans*)ice->user_data;
+
+ if (ice_st->cb.on_destroy) {
+ (*ice_st->cb.on_destroy)(ice_st);
+ }
+}
+
/*
* Callback called by ICE session when ICE processing is complete, either
* successfully or with failure.
......@@ -15,7 +15,11 @@
"ice_config.patch",
"fix_first_packet_turn_tcp.patch",
"fix_ebusy_turn.patch",
"ignore_ipv6_on_transport_check.patch"
"ignore_ipv6_on_transport_check.patch",
"fix_turn_connection_failure.patch",
"disable_local_resolution.patch",
"fix_assert_on_connection_attempt.patch",
"keep_alive.patch"
],
"win_patches": [
"win32_vs_gnutls.patch",
......
......@@ -71,6 +71,7 @@ endif
$(APPLY) $(SRC)/pjproject/fix_turn_connection_failure.patch
$(APPLY) $(SRC)/pjproject/disable_local_resolution.patch
$(APPLY) $(SRC)/pjproject/fix_assert_on_connection_attempt.patch
$(APPLY) $(SRC)/pjproject/keep_alive.patch
$(UPDATE_AUTOCONFIG)
$(MOVE)
......
......@@ -187,6 +187,8 @@ public:
// Wait data on components
std::vector<pj_ssize_t> lastReadLen_;
std::condition_variable waitDataCv_ = {};
onShutdownCb scb;
};
//==============================================================================
......@@ -347,6 +349,15 @@ IceTransport::Impl::Impl(const char* name, int component_count, bool master,
JAMI_WARN("null IceTransport");
};
icecb.on_destroy = [](pj_ice_strans* ice_st) {
if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
if (tr->scb)
tr->scb();
} else {
JAMI_WARN("null IceTransport");
}
};
// Add STUN servers
for (auto& server : options.stunServers)
add_stun_server(*pool_, config_, server);
......@@ -1283,6 +1294,13 @@ IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb)
}
}
void
IceTransport::setOnShutdown(onShutdownCb&& cb)
{
pimpl_->scb = cb;
}
ssize_t
IceTransport::send(int comp_id, const unsigned char* buf, size_t len)
{
......@@ -1299,7 +1317,7 @@ IceTransport::send(int comp_id, const unsigned char* buf, size_t len)
auto current_size = sent_size;
// NOTE; because we are in TCP, the sent size will count the header (2
// bytes length).
while (static_cast<std::size_t>(comp_id) < pimpl_->lastReadLen_.size() && current_size < len) {
while (static_cast<std::size_t>(comp_id) < pimpl_->lastReadLen_.size() && current_size < static_cast<pj_ssize_t>(len)) {
std::unique_lock<std::mutex> lk(pimpl_->iceMutex_);
pimpl_->waitDataCv_.wait(lk);
current_size = pimpl_->lastReadLen_[comp_id];
......
......@@ -44,6 +44,7 @@ using IceTransportCompleteCb = std::function<void(bool)>;
using IceRecvInfo = std::function<void(void)>;
using IceRecvCb = std::function<ssize_t(unsigned char *buf, size_t len)>;
using IceCandidate = pj_ice_sess_cand;
using onShutdownCb = std::function<void(void)>;
struct ICESDP {
std::vector<IceCandidate> rem_candidates;
......@@ -191,6 +192,7 @@ public:
// I/O methods
void setOnRecv(unsigned comp_id, IceRecvCb cb);
void setOnShutdown(onShutdownCb&& cb);
ssize_t recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec);
ssize_t recvfrom(int comp_id, char *buf, size_t len, std::error_code& ec);
......
......@@ -414,6 +414,13 @@ public:
/*.cert_check = */ nullptr,
};
tls = std::make_unique<tls::TlsSession>(std::move(ep), tls_param, tls_cbs);
const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(ep_);
if (iceSocket) {
iceSocket->underlyingICE()->setOnShutdown([this]() {
tls.reset();
});
}
}
Impl(std::unique_ptr<AbstractSocketEndpoint>&& ep,
......@@ -438,6 +445,15 @@ public:
/*.cert_check = */ nullptr,
};
tls = std::make_unique<tls::TlsSession>(std::move(ep), tls_param, tls_cbs);
const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(ep_);
if (iceSocket) {
iceSocket->underlyingICE()->setOnShutdown([this]() {
tls->shutdown();
if (onStateChangeCb_)
onStateChangeCb_(tls::TlsSessionState::SHUTDOWN);
});
}
}
~Impl() {
......@@ -608,6 +624,18 @@ TlsSocketEndpoint::shutdown()
pimpl_->tls->shutdown();
}
std::shared_ptr<IceTransport>
TlsSocketEndpoint::underlyingICE() const
{
if (pimpl_->ep_) {
const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(pimpl_->ep_);
if (iceSocket) {
return iceSocket->underlyingICE();
}
}
return {};
}
//==============================================================================
// following namespace prevents an ODR violation with definitions in p2p.cpp
......
......@@ -47,6 +47,7 @@ namespace jami {
using OnStateChangeCb = std::function<void(tls::TlsSessionState state)>;
using OnReadyCb = std::function<void(bool ok)>;
using onShutdownCb = std::function<void(void)>;
class TurnTransport;
class ConnectedTurnTransport;
......@@ -121,6 +122,8 @@ public:
void setOnRecv(RecvCb &&) override {
throw std::logic_error("AbstractSocketEndpoint::setOnRecv not implemented");
}
virtual void setOnShutdown(onShutdownCb&&) {};
};
/// Implement system socket IO
......@@ -139,7 +142,12 @@ public:
std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override;
void connect(const std::chrono::milliseconds& timeout = {}) override;
void setOnShutdown(onShutdownCb&& cb) {
scb = cb;
}
private:
onShutdownCb scb;
const IpAddr addr_;
int sock_ {-1};
};
......@@ -164,11 +172,13 @@ public:
}
void setOnRecv(RecvCb&& cb) override {
if (ice_) {
if (ice_)
ice_->setOnRecv(compId_, cb);
}
}
void setOnShutdown(onShutdownCb&& cb) {
ice_->setOnShutdown(std::move(cb));
}
private:
std::shared_ptr<IceTransport> ice_ {nullptr};
std::atomic_bool iceStopped{false};
......@@ -213,6 +223,8 @@ public:
void setOnStateChange(OnStateChangeCb&& cb);
void setOnReady(OnReadyCb&& cb);
std::shared_ptr<IceTransport> underlyingICE() const;
private:
class Impl;
std::unique_ptr<Impl> pimpl_;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment