Commit 6d4630bf authored by Mohamed Chibani's avatar Mohamed Chibani
Browse files

ICE: flush timer heap when destroying ICE instance

When requesting destruction of ice session and stream
transport instances from PJNATH, some operation will be
done asynchronously through scheduled timers. If not
properly handled, the timer heap will still contain
timer entries when the timer heap and the IO queueu are
destroyed.
Now, the timer heap is given more time to flush the
remaining timer entries before the IO queue and the
timer heap are destructed.
The timer heap may still contain timer entries if
PJNATH does not process all the timers within the
givent max time (currently set to 3s).

Gitlab: #637

Change-Id: I5ef2fe9d824e8b57191a51fac8f9e53e0e626fcd
parent 43136e05
......@@ -29,7 +29,7 @@ on behalf of Savoir-faire Linux.
pjnath/src/pjnath-test/concur_test.c | 5 +-
pjnath/src/pjnath-test/sess_auth.c | 14 +-
pjnath/src/pjnath-test/stun_sock_test.c | 7 +-
pjnath/src/pjnath/ice_session.c | 629 +++++++++++--
pjnath/src/pjnath/ice_session.c | 632 +++++++++++--
pjnath/src/pjnath/ice_strans.c | 749 +++++++++++++---
pjnath/src/pjnath/nat_detect.c | 7 +-
pjnath/src/pjnath/stun_session.c | 19 +-
......@@ -42,7 +42,7 @@ on behalf of Savoir-faire Linux.
pjnath/src/pjturn-srv/server.c | 2 +-
pjsip-apps/src/samples/icedemo.c | 116 ++-
pjsip/src/pjsua-lib/pjsua_core.c | 2 +-
22 files changed, 2603 insertions(+), 464 deletions(-)
22 files changed, 2606 insertions(+), 464 deletions(-)
diff --git a/pjnath/include/pjnath/config.h b/pjnath/include/pjnath/config.h
index 8a656c225..3e7c6ae3a 100644
......@@ -762,7 +762,7 @@ index 18a1bbd13..577659c87 100644
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
app_perror(" error: server sending data", status);
diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index 3fecc3def..f35cfba5c 100644
index 3fecc3def..8917f647c 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -18,6 +18,7 @@
......@@ -844,7 +844,17 @@ index 3fecc3def..f35cfba5c 100644
pj_ansi_snprintf(ice->obj_name, sizeof(ice->obj_name),
name, ice);
@@ -733,7 +745,8 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
@@ -508,6 +520,9 @@ static void destroy_ice(pj_ice_sess *ice,
pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap,
&ice->timer, PJ_FALSE);
+ pj_timer_heap_cancel_if_active(ice->stun_cfg.timer_heap,
+ &ice->timer_connect, TIMER_NONE);
+
for (i=0; i<ice->comp_cnt; ++i) {
if (ice->comp[i].stun_sess) {
pj_stun_session_destroy(ice->comp[i].stun_sess);
@@ -733,7 +748,8 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
const pj_sockaddr_t *base_addr,
const pj_sockaddr_t *rel_addr,
int addr_len,
......@@ -854,7 +864,7 @@ index 3fecc3def..f35cfba5c 100644
{
pj_ice_sess_cand *lcand;
pj_status_t status = PJ_SUCCESS;
@@ -779,6 +792,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
@@ -779,6 +795,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
lcand->comp_id = (pj_uint8_t)comp_id;
lcand->transport_id = (pj_uint8_t)transport_id;
lcand->type = type;
......@@ -862,7 +872,7 @@ index 3fecc3def..f35cfba5c 100644
pj_strdup(ice->pool, &lcand->foundation, foundation);
lcand->local_pref = local_pref;
lcand->prio = CALC_CAND_PRIO(ice, type, local_pref, lcand->comp_id);
@@ -1020,6 +1034,9 @@ static void check_set_state(pj_ice_sess *ice, pj_ice_sess_check *check,
@@ -1020,6 +1037,9 @@ static void check_set_state(pj_ice_sess *ice, pj_ice_sess_check *check,
pj_ice_sess_check_state st,
pj_status_t err_code)
{
......@@ -872,7 +882,7 @@ index 3fecc3def..f35cfba5c 100644
LOG5((ice->obj_name, "Check %s: state changed from %s to %s",
dump_check(ice->tmp.txt, sizeof(ice->tmp.txt), &ice->clist, check),
check_state_name[check->state],
@@ -1171,6 +1188,15 @@ static pj_status_t prune_checklist(pj_ice_sess *ice,
@@ -1171,6 +1191,15 @@ static pj_status_t prune_checklist(pj_ice_sess *ice,
return PJNATH_EICENOHOSTCAND;
}
}
......@@ -888,7 +898,7 @@ index 3fecc3def..f35cfba5c 100644
}
/* Next remove a pair if its local and remote candidates are identical
@@ -1240,10 +1266,14 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
@@ -1240,10 +1269,14 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
}
switch (type) {
......@@ -905,7 +915,7 @@ index 3fecc3def..f35cfba5c 100644
on_ice_complete(ice, PJNATH_EICENOMTIMEOUT);
break;
case TIMER_COMPLETION_CALLBACK:
@@ -1273,6 +1303,9 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
@@ -1273,6 +1306,9 @@ static void on_timer(pj_timer_heap_t *th, pj_timer_entry *te)
case TIMER_KEEP_ALIVE:
ice_keep_alive(ice, PJ_TRUE);
break;
......@@ -915,7 +925,7 @@ index 3fecc3def..f35cfba5c 100644
case TIMER_NONE:
/* Nothing to do, just to get rid of gcc warning */
break;
@@ -1481,15 +1514,27 @@ static pj_bool_t check_ice_complete(pj_ice_sess *ice)
@@ -1481,15 +1517,27 @@ static pj_bool_t check_ice_complete(pj_ice_sess *ice)
* See if all checks in the checklist have completed. If we do,
* then mark ICE processing as failed.
*/
......@@ -950,7 +960,7 @@ index 3fecc3def..f35cfba5c 100644
if (no_pending_check) {
/* All checks have completed, but we don't have nominated pair.
@@ -1506,8 +1551,8 @@ static pj_bool_t check_ice_complete(pj_ice_sess *ice)
@@ -1506,8 +1554,8 @@ static pj_bool_t check_ice_complete(pj_ice_sess *ice)
if (i < ice->comp_cnt) {
/* This component ID doesn't have valid pair.
......@@ -961,7 +971,7 @@ index 3fecc3def..f35cfba5c 100644
on_ice_complete(ice, PJNATH_EICEFAILED);
return PJ_TRUE;
} else {
@@ -1541,11 +1586,48 @@ static pj_bool_t check_ice_complete(pj_ice_sess *ice)
@@ -1541,11 +1589,48 @@ static pj_bool_t check_ice_complete(pj_ice_sess *ice)
/* Unreached */
} else if (ice->is_nominating) {
......@@ -1015,7 +1025,7 @@ index 3fecc3def..f35cfba5c 100644
return PJ_TRUE;
} else {
@@ -1737,6 +1819,44 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice,
@@ -1737,6 +1822,44 @@ static pj_bool_t on_check_complete(pj_ice_sess *ice,
return check_ice_complete(ice);
}
......@@ -1060,7 +1070,7 @@ index 3fecc3def..f35cfba5c 100644
/* Get foundation index of a check pair. This function can also be used for
* adding a new foundation (combination of local & remote cands foundations)
@@ -1905,6 +2025,29 @@ static pj_status_t add_rcand_and_update_checklist(
@@ -1905,6 +2028,29 @@ static pj_status_t add_rcand_and_update_checklist(
continue;
}
......@@ -1090,7 +1100,7 @@ index 3fecc3def..f35cfba5c 100644
#if 0
/* Trickle ICE:
* Make sure that pair has not been added to checklist
@@ -1925,7 +2068,6 @@ static pj_status_t add_rcand_and_update_checklist(
@@ -1925,7 +2071,6 @@ static pj_status_t add_rcand_and_update_checklist(
}
#endif
......@@ -1098,7 +1108,7 @@ index 3fecc3def..f35cfba5c 100644
/* Add the pair */
chk = &clist->checks[clist->count];
chk->lcand = lcand;
@@ -2121,6 +2263,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
@@ -2121,6 +2266,7 @@ PJ_DEF(pj_status_t) pj_ice_sess_create_check_list(
td = PJ_POOL_ZALLOC_T(ice->pool, timer_data);
td->ice = ice;
td->clist = clist;
......@@ -1106,7 +1116,7 @@ index 3fecc3def..f35cfba5c 100644
clist->timer.user_data = (void*)td;
clist->timer.cb = &periodic_timer;
@@ -2209,6 +2352,36 @@ PJ_DEF(pj_status_t) pj_ice_sess_update_check_list(
@@ -2209,6 +2355,36 @@ PJ_DEF(pj_status_t) pj_ice_sess_update_check_list(
return status;
}
......@@ -1143,7 +1153,7 @@ index 3fecc3def..f35cfba5c 100644
/* Perform check on the specified candidate pair. */
static pj_status_t perform_check(pj_ice_sess *ice,
pj_ice_sess_checklist *clist,
@@ -2219,19 +2392,17 @@ static pj_status_t perform_check(pj_ice_sess *ice,
@@ -2219,19 +2395,17 @@ static pj_status_t perform_check(pj_ice_sess *ice,
pj_ice_msg_data *msg_data;
pj_ice_sess_check *check;
const pj_ice_sess_cand *lcand;
......@@ -1164,7 +1174,7 @@ index 3fecc3def..f35cfba5c 100644
/* Create request */
status = pj_stun_session_create_req(comp->stun_sess,
@@ -2282,32 +2453,71 @@ static pj_status_t perform_check(pj_ice_sess *ice,
@@ -2282,32 +2456,71 @@ static pj_status_t perform_check(pj_ice_sess *ice,
&ice->tie_breaker);
} else {
......@@ -1253,7 +1263,7 @@ index 3fecc3def..f35cfba5c 100644
}
@@ -2344,55 +2554,108 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2344,55 +2557,108 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
pj_log_push_indent();
/* Send STUN Binding request for check with highest priority on
......@@ -1383,7 +1393,7 @@ index 3fecc3def..f35cfba5c 100644
}
pj_grp_lock_release(ice->grp_lock);
@@ -2636,6 +2899,222 @@ static pj_status_t on_stun_send_msg(pj_stun_session *sess,
@@ -2636,6 +2902,222 @@ static pj_status_t on_stun_send_msg(pj_stun_session *sess,
return status;
}
......@@ -1606,7 +1616,7 @@ index 3fecc3def..f35cfba5c 100644
/* This callback is called when outgoing STUN request completed */
static void on_stun_request_complete(pj_stun_session *stun_sess,
@@ -2941,7 +3420,9 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
@@ -2941,7 +3423,9 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
&check->lcand->base_addr,
&check->lcand->base_addr,
pj_sockaddr_get_len(&xaddr->sockaddr),
......@@ -1617,7 +1627,7 @@ index 3fecc3def..f35cfba5c 100644
if (status != PJ_SUCCESS) {
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED,
status);
@@ -3003,11 +3484,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
@@ -3003,11 +3487,7 @@ static void on_stun_request_complete(pj_stun_session *stun_sess,
/* Perform 7.1.2.2.2. Updating Pair States.
* This may terminate ICE processing.
*/
......@@ -1630,7 +1640,7 @@ index 3fecc3def..f35cfba5c 100644
pj_grp_lock_release(ice->grp_lock);
}
@@ -3202,9 +3679,13 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
@@ -3202,9 +3682,13 @@ static pj_status_t on_stun_rx_request(pj_stun_session *sess,
msg_data->has_req_data = PJ_FALSE;
/* Send the response */
......@@ -1646,7 +1656,7 @@ index 3fecc3def..f35cfba5c 100644
/*
* Handling early check.
@@ -3323,12 +3804,12 @@ static void handle_incoming_check(pj_ice_sess *ice,
@@ -3323,12 +3807,12 @@ static void handle_incoming_check(pj_ice_sess *ice,
/* Just get candidate with the highest priority and same transport ID
* for the specified component ID in the checklist.
*/
......
......@@ -27,7 +27,7 @@ index f35cfba5c..cc7e7d564 100644
/* String names for candidate types */
static const char *cand_type_names[] =
{
@@ -731,6 +746,144 @@ static pj_uint32_t CALC_CAND_PRIO(pj_ice_sess *ice,
@@ -734,6 +749,144 @@ static pj_uint32_t CALC_CAND_PRIO(pj_ice_sess *ice,
#endif
}
......@@ -172,7 +172,7 @@ index f35cfba5c..cc7e7d564 100644
/*
* Add ICE candidate
@@ -748,6 +901,29 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
@@ -751,6 +904,29 @@ PJ_DEF(pj_status_t) pj_ice_sess_add_cand(pj_ice_sess *ice,
unsigned *p_cand_id,
pj_ice_cand_transport transport)
{
......
......@@ -283,7 +283,7 @@ diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index cc7e7d564..0b1c5fd56 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -1531,6 +1531,12 @@ static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now)
@@ -1534,6 +1534,12 @@ static void ice_keep_alive(pj_ice_sess *ice, pj_bool_t send_now)
PJ_FALSE, PJ_FALSE,
&the_check->rcand->addr,
addr_len, tdata);
......
......@@ -24,4 +24,4 @@ index d5bd8fdda..e168f14bc 100644
TRACE_((THIS_FILE, "Transport type in tpsel not matched"));
return PJSIP_ETPNOTSUITABLE;
--
2.30.2
\ No newline at end of file
2.30.2
From f448e54047b49938218f120e72d34563a94c3078 Mon Sep 17 00:00:00 2001
From: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com>
Date: Thu, 26 Aug 2021 10:41:39 -0400
Subject: [PATCH 1/2] pjproject/0016-use-larger-Ta-interval
---
pjnath/include/pjnath/config.h | 2 +-
pjnath/src/pjnath/ice_session.c | 31 ++++++++++++++++++++-----------
2 files changed, 21 insertions(+), 12 deletions(-)
......@@ -25,7 +19,7 @@ diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index 0b1c5fd56..503cbfd91 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -2702,9 +2702,8 @@ static pj_status_t perform_check(pj_ice_sess *ice,
@@ -2705,9 +2705,8 @@ static pj_status_t perform_check(pj_ice_sess *ice,
return status;
}
......@@ -36,7 +30,7 @@ index 0b1c5fd56..503cbfd91 100644
*/
static pj_status_t start_periodic_check(pj_timer_heap_t *th,
pj_timer_entry *te)
@@ -2732,8 +2731,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2735,8 +2734,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
/* Set checklist state to Running */
clist_set_state(ice, clist, PJ_ICE_SESS_CHECKLIST_ST_RUNNING);
......@@ -46,7 +40,7 @@ index 0b1c5fd56..503cbfd91 100644
/* Send STUN Binding request for check with highest priority on
* Retry state.
@@ -2741,9 +2739,11 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2744,9 +2742,11 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
if (start_count == 0) {
for (i = 0; i < clist->count; ++i) {
......@@ -59,7 +53,7 @@ index 0b1c5fd56..503cbfd91 100644
status = perform_check(ice, clist, i, ice->is_nominating);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED,
@@ -2760,13 +2760,15 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2763,13 +2763,15 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
// TODO (sblin) remove - https://github.com/coturn/coturn/issues/408
pj_bool_t inc_counter = PJ_TRUE;
for (i = 0; i < clist->count; ++i) {
......@@ -76,7 +70,7 @@ index 0b1c5fd56..503cbfd91 100644
status = perform_check(ice, clist, i, ice->is_nominating);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED,
@@ -2786,9 +2788,12 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2789,9 +2791,12 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
if (start_count == 0) {
for (i = 0; i < clist->count; ++i) {
......@@ -90,7 +84,7 @@ index 0b1c5fd56..503cbfd91 100644
status = perform_check(ice, clist, i, ice->is_nominating);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED,
@@ -2806,9 +2811,11 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2809,9 +2814,11 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
*/
if (start_count == 0) {
for (i = 0; i < clist->count; ++i) {
......@@ -103,7 +97,7 @@ index 0b1c5fd56..503cbfd91 100644
status = perform_check(ice, clist, i, ice->is_nominating);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
check_set_state(ice, check, PJ_ICE_SESS_CHECK_STATE_FAILED, status);
@@ -2823,7 +2830,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2826,7 +2833,7 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
if (start_count == 0) {
// If all sockets are pending, do nothing
for (i = 0; i < clist->count; ++i) {
......@@ -112,7 +106,7 @@ index 0b1c5fd56..503cbfd91 100644
if (check->state == PJ_ICE_SESS_CHECK_STATE_PENDING) {
++start_count;
break;
@@ -2831,8 +2838,11 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2834,8 +2841,11 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
}
}
......@@ -125,7 +119,7 @@ index 0b1c5fd56..503cbfd91 100644
pj_time_val timeout = {0, PJ_ICE_TA_VAL};
pj_time_val_normalize(&timeout);
@@ -2845,7 +2855,6 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
@@ -2848,7 +2858,6 @@ static pj_status_t start_periodic_check(pj_timer_heap_t *th,
return PJ_SUCCESS;
}
......
From 4608050fb0be665fe731e8112a26d1382ed78552 Mon Sep 17 00:00:00 2001
From: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com>
Date: Wed, 1 Sep 2021 15:13:07 -0400
Subject: [PATCH 19/21] resort-check-list-after-adding-prflx
---
pjnath/src/pjnath/ice_session.c | 4 ++++
1 file changed, 4 insertions(+)
......@@ -11,7 +5,7 @@ diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index 503cbfd91..f060aaa9c 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -4135,7 +4135,11 @@ static void handle_incoming_check(pj_ice_sess *ice,
@@ -4138,7 +4138,11 @@ static void handle_incoming_check(pj_ice_sess *ice,
LOG4((ice->obj_name, "New triggered check added: %d",
ice->clist.count));
pj_log_push_indent();
......
......@@ -5,7 +5,7 @@ diff --git a/pjnath/src/pjnath/ice_session.c b/pjnath/src/pjnath/ice_session.c
index f060aaa9c..f04b7314e 100644
--- a/pjnath/src/pjnath/ice_session.c
+++ b/pjnath/src/pjnath/ice_session.c
@@ -4054,14 +4054,22 @@ static void handle_incoming_check(pj_ice_sess *ice,
@@ -4057,14 +4057,22 @@ static void handle_incoming_check(pj_ice_sess *ice,
if (c->state == PJ_ICE_SESS_CHECK_STATE_FROZEN ||
c->state == PJ_ICE_SESS_CHECK_STATE_WAITING)
{
......@@ -35,7 +35,7 @@ index f060aaa9c..f04b7314e 100644
} else if (c->state == PJ_ICE_SESS_CHECK_STATE_IN_PROGRESS) {
/* Should retransmit immediately
*/
@@ -4121,7 +4129,7 @@ static void handle_incoming_check(pj_ice_sess *ice,
@@ -4124,7 +4132,7 @@ static void handle_incoming_check(pj_ice_sess *ice,
else if (ice->clist.count < PJ_ICE_MAX_CHECKS) {
pj_ice_sess_check *c = &ice->clist.checks[ice->clist.count];
......@@ -44,7 +44,7 @@ index f060aaa9c..f04b7314e 100644
c->lcand = lcand;
c->rcand = rcand;
@@ -4129,19 +4137,38 @@ static void handle_incoming_check(pj_ice_sess *ice,
@@ -4132,19 +4140,38 @@ static void handle_incoming_check(pj_ice_sess *ice,
c->state = PJ_ICE_SESS_CHECK_STATE_WAITING;
c->nominated = rcheck->use_candidate;
c->err_code = PJ_SUCCESS;
......
......@@ -64,9 +64,8 @@ static constexpr unsigned STUN_MAX_PACKET_SIZE {8192};
static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header
static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header
static constexpr int MAX_CANDIDATES {32};
static constexpr int MAX_DESTRUCTION_TIMEOUT {3};
static constexpr int MAX_DESTRUCTION_TIMEOUT {3000};
static constexpr int HANDLE_EVENT_DURATION {500};
static constexpr int HANDLE_EVENT_TENTATIVE {6};
//==============================================================================
......@@ -130,6 +129,8 @@ public:
void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr);
const IpAddr getDefaultRemoteAddress(unsigned comp_id) const;
bool handleEvents(unsigned max_msec);
int flushTimerHeapAndIoQueue();
int checkEventQueue(int maxEventToPoll);
std::unique_ptr<pj_pool_t, std::function<void(pj_pool_t*)>> pool_ {};
IceTransportCompleteCb on_initdone_cb_ {};
......@@ -204,8 +205,6 @@ public:
std::atomic_bool destroying_ {false};
onShutdownCb scb {};
std::shared_ptr<pj_caching_pool> cp_;
};
//==============================================================================
......@@ -318,7 +317,7 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options)
upnp_.reset(new upnp::Controller());
auto& iceTransportFactory = Manager::instance().getIceTransportFactory();
cp_ = iceTransportFactory.getPoolCaching();
config_ = iceTransportFactory.getIceCfg(); // config copy
if (options.tcpEnable) {
config_.protocol = PJ_ICE_TP_TCP;
......@@ -453,7 +452,7 @@ IceTransport::Impl::Impl(const char* name, const IceTransportOptions& options)
while (not threadTerminateFlags_) {
// NOTE: handleEvents can return false in this case
// but here we don't care if there is event or not.
handleEvents(500); // limit polling to 500ms
handleEvents(HANDLE_EVENT_DURATION);
}
});
......@@ -484,18 +483,23 @@ IceTransport::Impl::~Impl()
pj_ice_strans_stop_ice(strans);
pj_ice_strans_destroy(strans);
// NOTE: This last handleEvents is necessary to close TURN socket.
// NOTE: This last timer heap and IO queue polling is necessary to close
// TURN socket.
// Because when destroying the TURN session pjproject creates a pj_timer
// to postpone the TURN destruction. This timer is only called if we poll
// the event queue.
auto tentative = 0;
while (tentative < HANDLE_EVENT_TENTATIVE && handleEvents(HANDLE_EVENT_DURATION)) {
tentative++;
int ret = flushTimerHeapAndIoQueue();
if (ret < 0) {
JAMI_ERR("[ice:%p] IO queue polling failed", this);
} else if (ret > 0) {
JAMI_ERR("[ice:%p] Unexpected left timer in timer heap. Please report the bug", this);
}
if (checkEventQueue(1) > 0) {
JAMI_WARN("[ice:%p] Unexpected left events in IO queue", this);
}
if (tentative == HANDLE_EVENT_TENTATIVE)
JAMI_ERR(
"handle events didn't finish after %d tentatives. This is a bug. Please report it.",
HANDLE_EVENT_TENTATIVE);
if (config_.stun_cfg.ioqueue)
pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
......@@ -550,28 +554,27 @@ IceTransport::Impl::handleEvents(unsigned max_msec)
// By tests, never seen more than two events per 500ms
static constexpr auto MAX_NET_EVENTS = 2;
pj_time_val max_timeout = {0, 0};
pj_time_val max_timeout = {0, static_cast<long>(max_msec)};
pj_time_val timeout = {0, 0};
unsigned net_event_count = 0;
max_timeout.msec = max_msec;
timeout.sec = timeout.msec = 0;
pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout);
auto ret = timeout.msec != PJ_MAXINT32;
auto hasActiveTimer = timeout.sec != PJ_MAXINT32 || timeout.msec != PJ_MAXINT32;
// timeout limitation
if (timeout.msec >= 1000)
timeout.msec = 999;
if (PJ_TIME_VAL_GT(timeout, max_timeout))
if (hasActiveTimer)
pj_time_val_normalize(&timeout);
if (PJ_TIME_VAL_GT(timeout, max_timeout)) {
timeout = max_timeout;
}
do {
auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
// timeout
if (not n_events)
return ret;
return hasActiveTimer;
// error
if (n_events < 0) {
......@@ -580,13 +583,72 @@ IceTransport::Impl::handleEvents(unsigned max_msec)
last_errmsg_ = sip_utils::sip_strerror(err);
JAMI_DBG("[ice:%p] ioqueue error %d: %s", this, err, last_errmsg_.c_str());
std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
return ret;
return hasActiveTimer;
}
net_event_count += n_events;
timeout.sec = timeout.msec = 0;
} while (net_event_count < MAX_NET_EVENTS);
return ret;
return hasActiveTimer;
}
int
IceTransport::Impl::flushTimerHeapAndIoQueue()
{
pj_time_val timerTimeout = {0, 0};
pj_time_val defaultWaitTime = {0, HANDLE_EVENT_DURATION};
bool hasActiveTimer = false;
std::chrono::milliseconds totalWaitTime {0};
auto const start = std::chrono::steady_clock::now();
// We try to process pending events as fast as possible to
// speed-up the release.
int maxEventToProcess = 10;
do {
if (checkEventQueue(maxEventToProcess) < 0)
return -1;
pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timerTimeout);
hasActiveTimer = !(timerTimeout.sec == PJ_MAXINT32 && timerTimeout.msec == PJ_MAXINT32);
if (hasActiveTimer) {
pj_time_val_normalize(&timerTimeout);
auto waitTime = std::chrono::milliseconds(
std::min(PJ_TIME_VAL_MSEC(timerTimeout), PJ_TIME_VAL_MSEC(defaultWaitTime)));
std::this_thread::sleep_for(waitTime);
totalWaitTime += waitTime;
}
} while (hasActiveTimer && totalWaitTime < std::chrono::milliseconds(MAX_DESTRUCTION_TIMEOUT));
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
JAMI_DBG("[ice:%p] Timer heap flushed after %ld ms", this, duration.count());
return static_cast<int>(pj_timer_heap_count(config_.stun_cfg.timer_heap));
}
int
IceTransport::Impl::checkEventQueue(int maxEventToPoll)
{
pj_time_val timeout = {0, 0};
int eventCount = 0;
int events = 0;
do {
events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
if (events < 0) {
const auto err = pj_get_os_error();
last_errmsg_ = sip_utils::sip_strerror(err);
JAMI_ERR("[ice:%p] ioqueue error %d: %s", this, err, last_errmsg_.c_str());
return events;
}
eventCount += events;
} while (events > 0 && eventCount < maxEventToPoll);
return eventCount;
}
void
......
......@@ -133,7 +133,6 @@ using ConferenceMap = std::map<std::string, std::shared_ptr<Conference>>;
/** To store uniquely a list of Call ids */
using CallIDSet = std::set<std::string>;
static constexpr std::chrono::seconds ICE_INIT_TIMEOUT {10};
static constexpr const char* PACKAGE_OLD = "ring";
std::atomic_bool Manager::initialized = {false};
......
......@@ -56,10 +56,7 @@ static constexpr int POOL_INITIAL_SIZE = 16384;
static constexpr int POOL_INCREMENT_SIZE = POOL_INITIAL_SIZE;
Sdp::Sdp(const std::string& id)
: memPool_(nullptr,
[](pj_pool_t* pool) {
pj_pool_release(pool);
})
: memPool_(nullptr, [](pj_pool_t* pool) { pj_pool_release(pool); })
, publishedIpAddr_()
, publishedIpAddrType_()
, telephoneEventPayload_(101) // same as asterisk
......@@ -133,12 +130,14 @@ randomFill(std::vector<uint8_t>& dest)
void
Sdp::setActiveLocalSdpSession(const pjmedia_sdp_session* sdp)
{
JAMI_DBG("Set active local session to [%p]. Was [%p]", sdp, activeLocalSession_);
activeLocalSession_ = sdp;
}
void
Sdp::setActiveRemoteSdpSession(const pjmedia_sdp_session* sdp)
{
JAMI_DBG("Set active remote session to [%p]. Was [%p]", sdp, activeRemoteSession_);
activeRemoteSession_ = sdp;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment