diff --git a/contrib/src/pjproject/keep_alive.patch b/contrib/src/pjproject/keep_alive.patch new file mode 100644 index 0000000000000000000000000000000000000000..05ca2152eab56abbb4e12b798b57dc6a3be056d9 --- /dev/null +++ b/contrib/src/pjproject/keep_alive.patch @@ -0,0 +1,149 @@ + pjlib/src/pj/sock_bsd.c | 31 +++++++++++++++++++++++++++++++ + pjnath/include/pjnath/ice_session.h | 4 ++++ + pjnath/include/pjnath/ice_strans.h | 7 +++++++ + pjnath/src/pjnath/ice_session.c | 6 ++++++ + pjnath/src/pjnath/ice_strans.c | 11 +++++++++++ + 5 files changed, 59 insertions(+) + +diff --git a/pjlib/src/pj/sock_bsd.c b/pjlib/src/pj/sock_bsd.c +index e416991d..9db16b93 100644 +--- a/pjlib/src/pj/sock_bsd.c ++++ b/pjlib/src/pj/sock_bsd.c +@@ -28,6 +28,15 @@ + + #define THIS_FILE "sock_bsd.c" + ++#if !defined(PJ_WIN32) && !defined(PJ_WIN64) ++# if !defined(SOL_TCP) && defined(IPPROTO_TCP) ++# define SOL_TCP IPPROTO_TCP ++# endif ++# if !defined(TCP_KEEPIDLE) && defined(TCP_KEEPALIVE) ++# define TCP_KEEPIDLE TCP_KEEPALIVE ++# endif ++#endif ++ + /* + * Address families conversion. + * The values here are indexed based on pj_addr_family. +@@ -517,6 +526,20 @@ PJ_DEF(pj_status_t) pj_sock_socket(int af, + if (rc==SOCKET_ERROR) { + // Ignored.. + } ++ } else if(type == pj_SOCK_STREAM()) { ++#ifndef SIO_KEEPALIVE_VALS ++# define SIO_KEEPALIVE_VALS _WSAIOW(IOC_VENDOR, 4) ++#endif ++ DWORD dwBytesReturned = 0; ++ struct tcp_keepalive { ++ ULONG onoff; ++ ULONG keepalivetime; ++ ULONG keepaliveinterval; ++ } vals = { TRUE, 30000, 30000 }; ++ WSAIoctl(*sock, SIO_KEEPALIVE_VALS, ++ &vals, sizeof(vals), ++ NULL, 0, &dwBytesReturned, ++ NULL, NULL); + } + #endif + +@@ -548,6 +571,14 @@ PJ_DEF(pj_status_t) pj_sock_socket(int af, + if (type == pj_SOCK_STREAM()) { + pj_sock_setsockopt(*sock, pj_SOL_SOCKET(), pj_SO_NOSIGPIPE(), + &val, sizeof(val)); ++ uint32_t val = 1; ++ setsockopt(*sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(uint32_t)); ++ uint32_t keepint = 30; // seconds ++ setsockopt(*sock, SOL_TCP, TCP_KEEPIDLE, &keepint, sizeof(uint32_t)); ++ keepint = 30; // seconds ++ setsockopt(*sock, SOL_TCP, TCP_KEEPINTVL, &keepint, sizeof(uint32_t)); ++ uint32_t cnt = 1; ++ setsockopt(*sock, SOL_TCP, TCP_KEEPCNT, &cnt, sizeof(uint32_t)); + } + #if defined(PJ_SOCK_HAS_IPV6_V6ONLY) && PJ_SOCK_HAS_IPV6_V6ONLY != 0 + if (af == PJ_AF_INET6) { +diff --git a/pjnath/include/pjnath/ice_session.h b/pjnath/include/pjnath/ice_session.h +index 77e1278d..141d5b3e 100644 +--- a/pjnath/include/pjnath/ice_session.h ++++ b/pjnath/include/pjnath/ice_session.h +@@ -630,6 +630,10 @@ typedef struct pj_ice_sess_cb + pj_status_t (*close_tcp_connection)(pj_ice_sess *ice, + pj_ice_sess_checklist *clist, + unsigned check_id); ++ /** ++ * If an internal TCP keep alive, this mount the error to the application ++ */ ++ void (*on_ice_destroy)(pj_ice_sess *ice); + + } pj_ice_sess_cb; + +diff --git a/pjnath/include/pjnath/ice_strans.h b/pjnath/include/pjnath/ice_strans.h +index afaddce2..de14be0b 100644 +--- a/pjnath/include/pjnath/ice_strans.h ++++ b/pjnath/include/pjnath/ice_strans.h +@@ -182,6 +182,13 @@ typedef struct pj_ice_strans_cb + void (*on_data_sent)(pj_ice_strans *ice_st, unsigned comp_id, + pj_ssize_t size); + ++ /** ++ * This callback is called if an internal operation fails ++ * ++ * @param ice_st The ICE stream transport. ++ */ ++ void (*on_destroy)(pj_ice_strans *ice_st); ++ + } pj_ice_strans_cb; + + +diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c +index 7373cdf3..6e47ca9d 100644 +--- a/pjnath/src/pjnath/ice_session.c ++++ b/pjnath/src/pjnath/ice_session.c +@@ -1413,6 +1413,12 @@ static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now) + PJ_FALSE, PJ_FALSE, + &the_check->rcand->addr, + addr_len, tdata); ++ if (status == 120032) { ++ if (ice->cb.on_ice_destroy) { ++ ice->cb.on_ice_destroy(ice); ++ } ++ return; ++ } + + /* Restore FINGERPRINT usage */ + pj_stun_session_use_fingerprint(comp->stun_sess, saved); +diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c +index e758ad84..5537c7a0 100644 +--- a/pjnath/src/pjnath/ice_strans.c ++++ b/pjnath/src/pjnath/ice_strans.c +@@ -90,6 +90,7 @@ static pj_uint8_t srflx_pref_table[PJ_ICE_CAND_TYPE_MAX] = + + + /* ICE callbacks */ ++static void on_ice_destroy(pj_ice_sess *ice); + static void on_ice_complete(pj_ice_sess *ice, pj_status_t status); + static pj_status_t ice_tx_pkt(pj_ice_sess *ice, + unsigned comp_id, +@@ -1174,6 +1175,7 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st, + ice_cb.on_rx_data = &ice_rx_data; + ice_cb.on_tx_pkt = &ice_tx_pkt; + #if PJ_HAS_TCP ++ ice_cb.on_ice_destroy = &on_ice_destroy; + ice_cb.wait_tcp_connection = &ice_wait_tcp_connection; + ice_cb.select_turn_dataconn = &ice_select_turn_dataconn; + ice_cb.reconnect_tcp_connection = &ice_reconnect_tcp_connection; +@@ -1700,6 +1702,15 @@ pj_ice_strans_sendto2(pj_ice_strans *ice_st, unsigned comp_id, const void *data, + return PJ_EINVALIDOP; + } + ++static void on_ice_destroy(pj_ice_sess *ice) ++{ ++ pj_ice_strans *ice_st = (pj_ice_strans*)ice->user_data; ++ ++ if (ice_st->cb.on_destroy) { ++ (*ice_st->cb.on_destroy)(ice_st); ++ } ++} ++ + /* + * Callback called by ICE session when ICE processing is complete, either + * successfully or with failure. diff --git a/contrib/src/pjproject/package.json b/contrib/src/pjproject/package.json index c198db9b183e6800c8f2e4b0226ae0f8d2a89bdb..6d2a00c88f62ab3f7422e621fff6e4d104072e9a 100644 --- a/contrib/src/pjproject/package.json +++ b/contrib/src/pjproject/package.json @@ -15,7 +15,11 @@ "ice_config.patch", "fix_first_packet_turn_tcp.patch", "fix_ebusy_turn.patch", - "ignore_ipv6_on_transport_check.patch" + "ignore_ipv6_on_transport_check.patch", + "fix_turn_connection_failure.patch", + "disable_local_resolution.patch", + "fix_assert_on_connection_attempt.patch", + "keep_alive.patch" ], "win_patches": [ "win32_vs_gnutls.patch", diff --git a/contrib/src/pjproject/rules.mak b/contrib/src/pjproject/rules.mak index 1ebc2bd428d5043311cc0fc0a569ca61ae1ee942..8b432bee0533b7f1068ba9d9d775efbf7b7f20d2 100644 --- a/contrib/src/pjproject/rules.mak +++ b/contrib/src/pjproject/rules.mak @@ -71,6 +71,7 @@ endif $(APPLY) $(SRC)/pjproject/fix_turn_connection_failure.patch $(APPLY) $(SRC)/pjproject/disable_local_resolution.patch $(APPLY) $(SRC)/pjproject/fix_assert_on_connection_attempt.patch + $(APPLY) $(SRC)/pjproject/keep_alive.patch $(UPDATE_AUTOCONFIG) $(MOVE) diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index c0b8799b031c00bbc49eb06a4db42173a28de8cd..798dfa5c477081cdb20c723bd7ae391421f38ac8 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -187,6 +187,8 @@ public: // Wait data on components std::vector<pj_ssize_t> lastReadLen_; std::condition_variable waitDataCv_ = {}; + + onShutdownCb scb; }; //============================================================================== @@ -347,6 +349,15 @@ IceTransport::Impl::Impl(const char* name, int component_count, bool master, JAMI_WARN("null IceTransport"); }; + icecb.on_destroy = [](pj_ice_strans* ice_st) { + if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) { + if (tr->scb) + tr->scb(); + } else { + JAMI_WARN("null IceTransport"); + } + }; + // Add STUN servers for (auto& server : options.stunServers) add_stun_server(*pool_, config_, server); @@ -1283,6 +1294,13 @@ IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) } } +void +IceTransport::setOnShutdown(onShutdownCb&& cb) +{ + pimpl_->scb = cb; +} + + ssize_t IceTransport::send(int comp_id, const unsigned char* buf, size_t len) { @@ -1299,7 +1317,7 @@ IceTransport::send(int comp_id, const unsigned char* buf, size_t len) auto current_size = sent_size; // NOTE; because we are in TCP, the sent size will count the header (2 // bytes length). - while (static_cast<std::size_t>(comp_id) < pimpl_->lastReadLen_.size() && current_size < len) { + while (static_cast<std::size_t>(comp_id) < pimpl_->lastReadLen_.size() && current_size < static_cast<pj_ssize_t>(len)) { std::unique_lock<std::mutex> lk(pimpl_->iceMutex_); pimpl_->waitDataCv_.wait(lk); current_size = pimpl_->lastReadLen_[comp_id]; diff --git a/src/ice_transport.h b/src/ice_transport.h index cc50941ae282ca9c5c5cf75ceb3dbd66668ddc73..5a7a59ae0c908ac2cee0d28cbb67a0393a5e628f 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -44,6 +44,7 @@ using IceTransportCompleteCb = std::function<void(bool)>; using IceRecvInfo = std::function<void(void)>; using IceRecvCb = std::function<ssize_t(unsigned char *buf, size_t len)>; using IceCandidate = pj_ice_sess_cand; +using onShutdownCb = std::function<void(void)>; struct ICESDP { std::vector<IceCandidate> rem_candidates; @@ -191,6 +192,7 @@ public: // I/O methods void setOnRecv(unsigned comp_id, IceRecvCb cb); + void setOnShutdown(onShutdownCb&& cb); ssize_t recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec); ssize_t recvfrom(int comp_id, char *buf, size_t len, std::error_code& ec); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 17a40f5201298985df4fc39761d42d27b26eeab4..7d80aedc5a0a4fa7ce542806235494273632dd9c 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -414,6 +414,13 @@ public: /*.cert_check = */ nullptr, }; tls = std::make_unique<tls::TlsSession>(std::move(ep), tls_param, tls_cbs); + + const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(ep_); + if (iceSocket) { + iceSocket->underlyingICE()->setOnShutdown([this]() { + tls.reset(); + }); + } } Impl(std::unique_ptr<AbstractSocketEndpoint>&& ep, @@ -438,6 +445,15 @@ public: /*.cert_check = */ nullptr, }; tls = std::make_unique<tls::TlsSession>(std::move(ep), tls_param, tls_cbs); + + const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(ep_); + if (iceSocket) { + iceSocket->underlyingICE()->setOnShutdown([this]() { + tls->shutdown(); + if (onStateChangeCb_) + onStateChangeCb_(tls::TlsSessionState::SHUTDOWN); + }); + } } ~Impl() { @@ -608,6 +624,18 @@ TlsSocketEndpoint::shutdown() pimpl_->tls->shutdown(); } +std::shared_ptr<IceTransport> +TlsSocketEndpoint::underlyingICE() const +{ + if (pimpl_->ep_) { + const IceSocketEndpoint* iceSocket = (const IceSocketEndpoint*)(pimpl_->ep_); + if (iceSocket) { + return iceSocket->underlyingICE(); + } + } + return {}; +} + //============================================================================== // following namespace prevents an ODR violation with definitions in p2p.cpp diff --git a/src/peer_connection.h b/src/peer_connection.h index ce81b3a51a6e4ecdefb41953f13f8a681574fda4..60d1728ebbb9aab857308b8f0f2ac3b73a0200d0 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -47,6 +47,7 @@ namespace jami { using OnStateChangeCb = std::function<void(tls::TlsSessionState state)>; using OnReadyCb = std::function<void(bool ok)>; +using onShutdownCb = std::function<void(void)>; class TurnTransport; class ConnectedTurnTransport; @@ -121,6 +122,8 @@ public: void setOnRecv(RecvCb &&) override { throw std::logic_error("AbstractSocketEndpoint::setOnRecv not implemented"); } + + virtual void setOnShutdown(onShutdownCb&&) {}; }; /// Implement system socket IO @@ -139,7 +142,12 @@ public: std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override; void connect(const std::chrono::milliseconds& timeout = {}) override; + void setOnShutdown(onShutdownCb&& cb) { + scb = cb; + } + private: + onShutdownCb scb; const IpAddr addr_; int sock_ {-1}; }; @@ -164,11 +172,13 @@ public: } void setOnRecv(RecvCb&& cb) override { - if (ice_) { + if (ice_) ice_->setOnRecv(compId_, cb); - } } + void setOnShutdown(onShutdownCb&& cb) { + ice_->setOnShutdown(std::move(cb)); + } private: std::shared_ptr<IceTransport> ice_ {nullptr}; std::atomic_bool iceStopped{false}; @@ -213,6 +223,8 @@ public: void setOnStateChange(OnStateChangeCb&& cb); void setOnReady(OnReadyCb&& cb); + std::shared_ptr<IceTransport> underlyingICE() const; + private: class Impl; std::unique_ptr<Impl> pimpl_;