Commit 2074c7fb authored by Guillaume Roguez's avatar Guillaume Roguez
Browse files

Ice: fix implementation

IceTransport::isXXX() API are not thread-safe, not clear in which
state the transport is and not easy to manipulate (order of state).

This patch tries to solve that by:
- Procted API by a mutex
- Remove uneeded isComplete() API
- Ensure that each API are ordered (one at true implies true
  on all lower states: INITIALIZED -> STARTED -> RUNNING).
  All return false on Fail state.
- Change usage accordingly

Tuleap: #107
Change-Id: I17211e54322d70bbfe18c28f06cf9967b9ef93d2
parent ef44e3dc
...@@ -102,7 +102,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master, ...@@ -102,7 +102,7 @@ IceTransport::IceTransport(const char* name, int component_count, bool master,
, on_negodone_cb_(options.onNegoDone) , on_negodone_cb_(options.onNegoDone)
, component_count_(component_count) , component_count_(component_count)
, compIO_(component_count) , compIO_(component_count)
, initiator_session_(master) , initiatorSession_(master)
, thread_() , thread_()
{ {
if (options.upnpEnable) if (options.upnpEnable)
...@@ -248,33 +248,27 @@ IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, ...@@ -248,33 +248,27 @@ IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op,
pj_status_t status) pj_status_t status)
{ {
const char *opname = const char *opname =
op == PJ_ICE_STRANS_OP_INIT? "initialization" : op == PJ_ICE_STRANS_OP_INIT ? "initialization" :
op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op";
if (!icest_.get())
icest_.reset(ice_st);
const bool done = status == PJ_SUCCESS; const bool done = status == PJ_SUCCESS;
RING_DBG("ICE %s with %s", opname, done?"success":"error"); if (done)
RING_DBG("ICE %s success", opname);
if (!done) else
RING_ERR("ICE %s failed: %s", opname, RING_ERR("ICE %s failed: %s", opname, sip_utils::sip_strerror(status).c_str());
sip_utils::sip_strerror(status).c_str());
{ {
std::lock_guard<std::mutex> lk(iceMutex_); std::lock_guard<std::mutex> lk(iceMutex_);
if (op == PJ_ICE_STRANS_OP_INIT) { if (!icest_.get())
iceTransportInitDone_ = done; icest_.reset(ice_st);
if (iceTransportInitDone_) { }
if (initiator_session_)
setInitiatorSession(); if (done and op == PJ_ICE_STRANS_OP_INIT) {
else if (initiatorSession_)
setSlaveSession(); setInitiatorSession();
selectUPnPIceCandidates(); else
} setSlaveSession();
} else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) { selectUPnPIceCandidates();
iceTransportNegoDone_ = done;
}
} }
if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_) if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_)
...@@ -282,6 +276,7 @@ IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, ...@@ -282,6 +276,7 @@ IceTransport::onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op,
else if (op == PJ_ICE_STRANS_OP_NEGOTIATION and on_negodone_cb_) else if (op == PJ_ICE_STRANS_OP_NEGOTIATION and on_negodone_cb_)
on_negodone_cb_(*this, done); on_negodone_cb_(*this, done);
// Unlock waitForXXX APIs
iceCV_.notify_all(); iceCV_.notify_all();
} }
...@@ -304,7 +299,7 @@ IceTransport::getDefaultCanditates() ...@@ -304,7 +299,7 @@ IceTransport::getDefaultCanditates()
bool bool
IceTransport::createIceSession(pj_ice_sess_role role) IceTransport::createIceSession(pj_ice_sess_role role)
{ {
if (pj_ice_strans_init_ice(icest_.get(), role, NULL, NULL) != PJ_SUCCESS) { if (pj_ice_strans_init_ice(icest_.get(), role, nullptr, nullptr) != PJ_SUCCESS) {
RING_ERR("pj_ice_strans_init_ice() failed"); RING_ERR("pj_ice_strans_init_ice() failed");
return false; return false;
} }
...@@ -320,7 +315,7 @@ bool ...@@ -320,7 +315,7 @@ bool
IceTransport::setInitiatorSession() IceTransport::setInitiatorSession()
{ {
RING_DBG("ICE as master"); RING_DBG("ICE as master");
initiator_session_ = true; initiatorSession_ = true;
if (isInitialized()) { if (isInitialized()) {
auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING); auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLING);
if (status != PJ_SUCCESS) { if (status != PJ_SUCCESS) {
...@@ -336,7 +331,7 @@ bool ...@@ -336,7 +331,7 @@ bool
IceTransport::setSlaveSession() IceTransport::setSlaveSession()
{ {
RING_DBG("ICE as slave"); RING_DBG("ICE as slave");
initiator_session_ = false; initiatorSession_ = false;
if (isInitialized()) { if (isInitialized()) {
auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED); auto status = pj_ice_strans_change_role(icest_.get(), PJ_ICE_SESS_ROLE_CONTROLLED);
if (status != PJ_SUCCESS) { if (status != PJ_SUCCESS) {
...@@ -353,7 +348,7 @@ IceTransport::isInitiator() const ...@@ -353,7 +348,7 @@ IceTransport::isInitiator() const
{ {
if (isInitialized()) if (isInitialized())
return pj_ice_strans_get_role(icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING; return pj_ice_strans_get_role(icest_.get()) == PJ_ICE_SESS_ROLE_CONTROLLING;
return initiator_session_; return initiatorSession_;
} }
bool bool
...@@ -441,50 +436,52 @@ IceTransport::start(const std::vector<uint8_t>& rem_data) ...@@ -441,50 +436,52 @@ IceTransport::start(const std::vector<uint8_t>& rem_data)
bool bool
IceTransport::stop() IceTransport::stop()
{ {
if (not isInitialized()) { if (isStarted()) {
RING_ERR("Session not created yet"); auto status = pj_ice_strans_stop_ice(icest_.get());
return false; if (status != PJ_SUCCESS) {
} RING_ERR("ICE stop failed: %s", sip_utils::sip_strerror(status).c_str());
return false;
auto status = pj_ice_strans_stop_ice(icest_.get()); }
if (status != PJ_SUCCESS) {
RING_ERR("ICE start failed: %s", sip_utils::sip_strerror(status).c_str());
return false;
} }
return true; return true;
} }
bool bool
IceTransport::isInitialized() const IceTransport::_isInitialized() const
{ {
if (auto icest = icest_.get()) if (auto icest = icest_.get()) {
return pj_ice_strans_has_sess(icest); auto state = pj_ice_strans_get_state(icest);
return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED;
}
return false; return false;
} }
bool bool
IceTransport::isStarted() const IceTransport::_isStarted() const
{ {
return pj_ice_strans_sess_is_running(icest_.get()); if (auto icest = icest_.get()) {
} auto state = pj_ice_strans_get_state(icest);
return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED;
bool }
IceTransport::isCompleted() const return false;
{
return pj_ice_strans_sess_is_complete(icest_.get());
} }
bool bool
IceTransport::isRunning() const IceTransport::_isRunning() const
{ {
return isInitialized() and pj_ice_strans_get_state(icest_.get()) == PJ_ICE_STRANS_STATE_RUNNING; if (auto icest = icest_.get()) {
auto state = pj_ice_strans_get_state(icest);
return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED;
}
return false;
} }
bool bool
IceTransport::isFailed() const IceTransport::_isFailed() const
{ {
return isInitialized() and pj_ice_strans_get_state(icest_.get()) == PJ_ICE_STRANS_STATE_FAILED; if (auto icest = icest_.get())
return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED;
return false;
} }
IpAddr IpAddr
...@@ -560,6 +557,9 @@ IceTransport::getLocalCandidatesAddr(unsigned comp_id) const ...@@ -560,6 +557,9 @@ IceTransport::getLocalCandidatesAddr(unsigned comp_id) const
bool bool
IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP) IceTransport::registerPublicIP(unsigned compId, const IpAddr& publicIP)
{ {
if (not isInitialized())
return false;
// Register only if no NAT traversal methods exists // Register only if no NAT traversal methods exists
if (upnp_ or config_.stun.port > 0 or config_.turn.port > 0) if (upnp_ or config_.stun.port > 0 or config_.turn.port > 0)
return false; return false;
...@@ -663,17 +663,19 @@ IceTransport::selectUPnPIceCandidates() ...@@ -663,17 +663,19 @@ IceTransport::selectUPnPIceCandidates()
std::vector<uint8_t> std::vector<uint8_t>
IceTransport::getLocalAttributesAndCandidates() const IceTransport::getLocalAttributesAndCandidates() const
{ {
if (not isInitialized())
return {};
std::stringstream ss; std::stringstream ss;
ss << local_ufrag_ << NEW_LINE; ss << local_ufrag_ << NEW_LINE;
ss << local_pwd_ << NEW_LINE; ss << local_pwd_ << NEW_LINE;
for (unsigned i=0; i<component_count_; i++) { for (unsigned i=0; i<component_count_; i++) {
const auto& candidates = getLocalCandidates(i); const auto& candidates = getLocalCandidates(i);
for (const auto& c : candidates) for (const auto& c : candidates)
ss << c << NEW_LINE; ss << c << NEW_LINE;
} }
auto str(ss.str()); auto str(ss.str());
std::vector<uint8_t> ret(str.begin(), str.end()); return std::vector<uint8_t>(str.begin(), str.end());
return ret;
} }
void void
...@@ -786,11 +788,6 @@ IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb) ...@@ -786,11 +788,6 @@ IceTransport::setOnRecv(unsigned comp_id, IceRecvCb cb)
ssize_t ssize_t
IceTransport::send(int comp_id, const unsigned char* buf, size_t len) IceTransport::send(int comp_id, const unsigned char* buf, size_t len)
{ {
if (not isInitialized()) {
RING_ERR("ICE: not initialized transport");
return -1;
}
register_thread(); register_thread();
auto remote = getRemoteAddress(comp_id); auto remote = getRemoteAddress(comp_id);
if (!remote) { if (!remote) {
...@@ -828,12 +825,11 @@ IceTransport::waitForInitialization(unsigned timeout) ...@@ -828,12 +825,11 @@ IceTransport::waitForInitialization(unsigned timeout)
{ {
std::unique_lock<std::mutex> lk(iceMutex_); std::unique_lock<std::mutex> lk(iceMutex_);
if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout),
[this]{ return iceTransportInitDone_; })) { [this]{ return _isInitialized() or _isFailed(); })) {
RING_WARN("waitForInitialization: timeout"); RING_WARN("waitForInitialization: timeout");
return -1; return -1;
} }
RING_DBG("waitForInitialization: %u", iceTransportInitDone_); return not _isFailed();
return iceTransportInitDone_;
} }
int int
...@@ -841,12 +837,11 @@ IceTransport::waitForNegotiation(unsigned timeout) ...@@ -841,12 +837,11 @@ IceTransport::waitForNegotiation(unsigned timeout)
{ {
std::unique_lock<std::mutex> lk(iceMutex_); std::unique_lock<std::mutex> lk(iceMutex_);
if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout), if (!iceCV_.wait_for(lk, std::chrono::seconds(timeout),
[this]{ return iceTransportNegoDone_; })) { [this]{ return _isRunning() or _isFailed(); })) {
RING_WARN("waitForIceNegotiation: timeout"); RING_WARN("waitForIceNegotiation: timeout");
return -1; return -1;
} }
RING_DBG("waitForNegotiation: %u", iceTransportNegoDone_); return not _isFailed();
return iceTransportNegoDone_;
} }
ssize_t ssize_t
...@@ -895,8 +890,7 @@ IceTransportFactory::createTransport(const char* name, int component_count, ...@@ -895,8 +890,7 @@ IceTransportFactory::createTransport(const char* name, int component_count,
const IceTransportOptions& options) const IceTransportOptions& options)
{ {
try { try {
return std::make_shared<IceTransport>(name, component_count, master, return std::make_shared<IceTransport>(name, component_count, master, options);
options);
} catch(const std::exception& e) { } catch(const std::exception& e) {
RING_ERR("%s",e.what()); RING_ERR("%s",e.what());
return nullptr; return nullptr;
......
...@@ -112,14 +112,41 @@ class IceTransport { ...@@ -112,14 +112,41 @@ class IceTransport {
*/ */
bool stop(); bool stop();
bool isInitialized() const; /**
* Returns true if ICE transport has been initialized
* [mutex protected]
*/
bool isInitialized() const {
std::lock_guard<std::mutex> lk(iceMutex_);
return _isInitialized();
}
bool isStarted() const; /**
* Returns true if ICE negotiation has been started
* [mutex protected]
*/
bool isStarted() const {
std::lock_guard<std::mutex> lk(iceMutex_);
return _isStarted();
}
bool isCompleted() const; /**
* Returns true if ICE negotiation has completed with success
* [mutex protected]
*/
bool isRunning() const {
std::lock_guard<std::mutex> lk(iceMutex_);
return _isRunning();
}
bool isRunning() const; /**
bool isFailed() const; * Returns true if ICE transport is in failure state
* [mutex protected]
*/
bool isFailed() const {
std::lock_guard<std::mutex> lk(iceMutex_);
return _isFailed();
}
IpAddr getLocalAddress(unsigned comp_id) const; IpAddr getLocalAddress(unsigned comp_id) const;
...@@ -203,11 +230,15 @@ class IceTransport { ...@@ -203,11 +230,15 @@ class IceTransport {
void getDefaultCanditates(); void getDefaultCanditates();
// Non-mutex protected of public versions
bool _isInitialized() const;
bool _isStarted() const;
bool _isRunning() const;
bool _isFailed() const;
std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> pool_; std::unique_ptr<pj_pool_t, decltype(pj_pool_release)&> pool_;
IceTransportCompleteCb on_initdone_cb_; IceTransportCompleteCb on_initdone_cb_;
IceTransportCompleteCb on_negodone_cb_; IceTransportCompleteCb on_negodone_cb_;
bool iceTransportInitDone_ {false};
bool iceTransportNegoDone_ {false};
std::unique_ptr<pj_ice_strans, IceSTransDeleter> icest_; std::unique_ptr<pj_ice_strans, IceSTransDeleter> icest_;
unsigned component_count_; unsigned component_count_;
pj_ice_sess_cand cand_[MAX_CANDIDATES] {}; pj_ice_sess_cand cand_[MAX_CANDIDATES] {};
...@@ -231,7 +262,7 @@ class IceTransport { ...@@ -231,7 +262,7 @@ class IceTransport {
}; };
std::vector<ComponentIO> compIO_; std::vector<ComponentIO> compIO_;
bool initiator_session_ {true}; std::atomic_bool initiatorSession_ {true};
/** /**
* Returns the IP of each candidate for a given component in the ICE session * Returns the IP of each candidate for a given component in the ICE session
......
...@@ -768,9 +768,9 @@ SIPCall::getAllRemoteCandidates() ...@@ -768,9 +768,9 @@ SIPCall::getAllRemoteCandidates()
bool bool
SIPCall::startIce() SIPCall::startIce()
{ {
if (not iceTransport_) if (not iceTransport_ or iceTransport_->isFailed())
return false; return false;
if (iceTransport_->isStarted() || iceTransport_->isCompleted()) { if (iceTransport_->isStarted()) {
RING_DBG("[call:%s] ICE already started", getCallId().c_str()); RING_DBG("[call:%s] ICE already started", getCallId().c_str());
return true; return true;
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment