diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index c44be21aa363b21c67bd1dd5cbc89013f88c2b5d..cedb39f6b2e04b5107ea562e8ff2d282f35a65fc 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -82,21 +82,16 @@ class IceLock public: IceLock(pj_ice_strans* strans) - : lk_(pj_ice_strans_get_grp_lock(strans)) { + : lk_(pj_ice_strans_get_grp_lock(strans)) + { lock(); } - ~IceLock() { - unlock(); - } + ~IceLock() { unlock(); } - void lock() { - pj_grp_lock_acquire(lk_); - } + void lock() { pj_grp_lock_acquire(lk_); } - void unlock() { - pj_grp_lock_release(lk_); - } + void unlock() { pj_grp_lock_release(lk_); } }; class IceTransport::Impl @@ -381,6 +376,9 @@ IceTransport::Impl::~Impl() } JAMI_DBG("[ice:%p] done destroying", this); + + if (scb) + scb(); } void @@ -511,8 +509,6 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) std::lock_guard lk(tr->sendDataMutex_); tr->destroying_ = true; tr->waitDataCv_.notify_all(); - if (tr->scb) - tr->scb(); } else { JAMI_WARN("null IceTransport"); } @@ -698,9 +694,9 @@ IceTransport::Impl::checkEventQueue(int maxEventToPoll) void IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status) { - const char* opname = op == PJ_ICE_STRANS_OP_INIT - ? "initialization" - : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" : "unknown_op"; + const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization" + : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation" + : "unknown_op"; const bool done = status == PJ_SUCCESS; if (done) { @@ -748,7 +744,7 @@ IceTransport::Impl::link() const std::ostringstream out; for (unsigned strm = 1; strm <= streamsCount_ * compCountPerStream_; strm++) { auto absIdx = strm; - auto comp = (strm+1)/compCountPerStream_; + auto comp = (strm + 1) / compCountPerStream_; auto laddr = getLocalAddress(absIdx); auto raddr = getRemoteAddress(absIdx); @@ -770,7 +766,6 @@ IceTransport::Impl::setInitiatorSession() JAMI_DBG("[ice:%p] as master", this); initiatorSession_ = true; if (_isInitialized()) { - auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); @@ -788,7 +783,6 @@ IceTransport::Impl::setSlaveSession() JAMI_DBG("[ice:%p] as slave", this); initiatorSession_ = false; if (_isInitialized()) { - auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED); if (status != PJ_SUCCESS) { last_errmsg_ = sip_utils::sip_strerror(status); @@ -1102,7 +1096,8 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size) jami_tracepoint_if_enabled(ice_transport_recv, reinterpret_cast<uint64_t>(this), - comp_id, size, + comp_id, + size, getRemoteAddress(comp_id).toString().c_str()); if (size == 0) return; @@ -1130,8 +1125,8 @@ IceTransport::Impl::_waitForInitialization(std::chrono::milliseconds timeout) IceLock lk(icest_); if (not iceCV_.wait_for(lk, timeout, [this] { - return threadTerminateFlags_ or _isInitialized() or _isFailed(); - })) { + return threadTerminateFlags_ or _isInitialized() or _isFailed(); + })) { JAMI_WARN("[ice:%p] waitForInitialization: timeout", this); return false; } @@ -1156,8 +1151,7 @@ IceTransport::initIceInstance(const IceTransportOptions& options) { pimpl_->initIceInstance(options); - jami_tracepoint(ice_transport_context, - reinterpret_cast<uint64_t>(this)); + jami_tracepoint(ice_transport_context, reinterpret_cast<uint64_t>(this)); } bool @@ -1692,7 +1686,9 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) jami_tracepoint(ice_transport_send, reinterpret_cast<uint64_t>(this), - compId, len, remote.toString().c_str()); + compId, + len, + remote.toString().c_str()); auto status = pj_ice_strans_sendto2(pimpl_->icest_, compId, @@ -1707,8 +1703,7 @@ IceTransport::send(unsigned compId, const unsigned char* buf, size_t len) // NOTE; because we are in TCP, the sent size will count the header (2 // bytes length). pimpl_->waitDataCv_.wait(dlk, [&] { - return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) - or pimpl_->destroying_; + return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_; }); pimpl_->lastSentLen_ = 0; } else if (status != PJ_SUCCESS && status != PJ_EPENDING) { diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 2557ff9c80469c8e47092d7f630fb6e6bac2dfe6..472ace5c51af4ff64480b9f9ec3335da927dccab 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -528,8 +528,10 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif connType, eraseInfo](auto&& ice_config) { auto sthis = w.lock(); - if (!sthis) + if (!sthis) { + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); return; + } ice_config.tcpEnable = true; ice_config.onInitDone = [w, deviceId = std::move(deviceId), @@ -540,9 +542,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif connType, eraseInfo](bool ok) { auto sthis = w.lock(); - if (!sthis) - return; - if (!ok) { + if (!sthis || !ok) { JAMI_ERR("Cannot initialize ICE session."); runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); return; @@ -553,9 +553,9 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif vid = std::move(vid), eraseInfo, connType] { - if (auto sthis = w.lock()) - if (!sthis->connectDeviceStartIce(devicePk, vid, connType)) - runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + auto sthis = w.lock(); + if (!sthis || !sthis->connectDeviceStartIce(devicePk, vid, connType)) + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); }); }; ice_config.onNegoDone = [w, @@ -565,9 +565,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif vid, eraseInfo](bool ok) { auto sthis = w.lock(); - if (!sthis) - return; - if (!ok) { + if (!sthis || !ok) { JAMI_ERR("ICE negotiation failed."); runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); return; @@ -580,9 +578,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif vid = std::move(vid), eraseInfo = std::move(eraseInfo)] { auto sthis = w.lock(); - if (!sthis) - return; - if (!sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert)) + if (!sthis || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert)) runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); }); }; @@ -598,12 +594,17 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif ice_config.compCountPerStream = JamiAccount::ICE_COMP_COUNT_PER_STREAM; info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( sthis->account.getAccountID().c_str()); - info->ice_->initIceInstance(ice_config); - if (!info->ice_) { JAMI_ERR("Cannot initialize ICE session."); eraseInfo(); + return; } + // We need to detect any shutdown if the ice session is destroyed before going to the + // TLS session; + info->ice_->setOnShutdown([eraseInfo]() { + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + }); + info->ice_->initIceInstance(ice_config); }); }); } @@ -901,6 +902,9 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, // all stored structures. auto eraseInfo = [w, id = req.id, deviceId] { if (auto shared = w.lock()) { + // If no new socket is specified, we don't try to generate a new socket + for (const auto& pending : shared->extractPendingCallbacks(deviceId)) + pending.cb(nullptr, deviceId); if (shared->connReadyCb_) shared->connReadyCb_(deviceId, "", nullptr); std::lock_guard<std::mutex> lk(shared->infosMtx_); @@ -962,12 +966,16 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, ice_config.master = true; info->ice_ = Manager::instance().getIceTransportFactory().createUTransport( shared->account.getAccountID().c_str()); - info->ice_->initIceInstance(ice_config); - if (not info->ice_) { JAMI_ERR("Cannot initialize ICE session."); eraseInfo(); + return; } + // We need to detect any shutdown if the ice session is destroyed before going to the TLS session; + info->ice_->setOnShutdown([eraseInfo]() { + runOnMainThread([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); + }); + info->ice_->initIceInstance(ice_config); }); } diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 59b108a562821c15efa9f58b4d89f679436ebc8e..1c732e26096dd0ce24dc9f9bd29f4085eb4fe690 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -203,9 +203,8 @@ public: void stopFetch(const std::string& convId, const std::string& deviceId) { auto it = pendingConversationsFetch_.find(convId); - if (it == pendingConversationsFetch_.end()) { + if (it == pendingConversationsFetch_.end()) return; - } auto& pf = it->second; pf.connectingTo.erase(deviceId); if (pf.connectingTo.empty()) @@ -499,9 +498,8 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat auto erasePending = [&] { std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); auto oldFetch = pendingConversationsFetch_.find(conversationId); - if (oldFetch != pendingConversationsFetch_.end() && !oldFetch->second.removeId.empty()) { + if (oldFetch != pendingConversationsFetch_.end() && !oldFetch->second.removeId.empty()) removeConversation(oldFetch->second.removeId); - } pendingConversationsFetch_.erase(conversationId); }; try { @@ -933,6 +931,22 @@ ConversationModule::loadConversations() JAMI_INFO("[Account %s] Conversations loaded!", pimpl_->accountId_.c_str()); } +void +ConversationModule::clearPendingFetch() +{ + if (!pimpl_->pendingConversationsFetch_.empty()) { + // Note: This is a fallback. convModule() is kept if account is disabled/re-enabled. + // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch is + // not erased (callback not called), it will block the new messages as it will not sync. The + // best way to debug this is to get logs from the last ICE connection for syncing the + // conversation. It may have been killed in some un-expected way avoiding to call the + // callbacks. This should never happen, but if it's the case, this will allow new messages + // to be synced correctly. + JAMI_ERR("This is a bug, seems to still fetch to some device on initializing"); + pimpl_->pendingConversationsFetch_.clear(); + } +} + std::vector<std::string> ConversationModule::getConversations() const { diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h index 89b9b7907a15ba75fb77a86c0a293a38f1568df0..ac5d947a5a42f45460796e7d9b99fc3e95f4de4b 100644 --- a/src/jamidht/conversation_module.h +++ b/src/jamidht/conversation_module.h @@ -59,6 +59,11 @@ public: */ void loadConversations(); + /** + * Clear not removed fetch + */ + void clearPendingFetch(); + /** * Return all conversation's id (including syncing ones) */ diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 2cfe0455680da6de6decdc4ba9acfbfd60b6de9f..4e1c11ffc2ce5c155b386a165cca9f2d6f83c18e 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1943,6 +1943,8 @@ JamiAccount::doRegister_() dht_->join(); } + convModule()->clearPendingFetch(); + #if HAVE_RINGNS // Look for registered name on the blockchain accountManager_->lookupAddress(