diff --git a/src/call.cpp b/src/call.cpp index 020b99743569c6189a1d9e20fcdcb0e5ac3203b8..a54f2789f69ecfc36d034e7ec939bf3db923d7f5 100644 --- a/src/call.cpp +++ b/src/call.cpp @@ -37,8 +37,47 @@ #include "errno.h" +#include <stdexcept> +#include <system_error> +#include <algorithm> +#include <functional> +#include <utility> + namespace ring { +/// Hangup many calls with same error code, filtered by a predicate +/// +/// For each call pointer given by iterating on given \a callptr_list +/// calls the unary predicate \a pred with this call pointer and hangup the call with given error +/// code \a errcode when the predicate return true. +/// The predicate should have <code>bool(Call*) signature</code>. +inline void +hangupCallsIf(Call::SubcallSet callptr_list, int errcode, const std::function<bool(Call*)>& pred) +{ + std::for_each(std::begin(callptr_list), std::end(callptr_list), + [&](const std::shared_ptr<Call>& call_ptr) { + if (pred(call_ptr.get())) { + try { + call_ptr->hangup(errcode); + } catch (const std::exception& e) { + RING_ERR("[call:%s] hangup failed: %s", + call_ptr->getCallId().c_str(), e.what()); + } + } + }); +} + +/// Hangup many calls with same error code. +/// +/// Works as hangupCallsIf() with a predicate that always return true. +inline void +hangupCalls(Call::SubcallSet callptr_list, int errcode) +{ + hangupCallsIf(callptr_list, errcode, [](Call*){ return true; }); +} + +//============================================================================== + Call::Call(Account& account, const std::string& id, Call::CallType type) : id_(id) , type_(type) @@ -49,6 +88,10 @@ Call::Call(Account& account, const std::string& id, Call::CallType type) UNUSED int code) { checkPendingIM(); checkAudio(); + + // kill pending subcalls at disconnect + if (call_state == CallState::OVER) + hangupCalls(safePopSubcalls(), 0); }); time(×tamp_start_); @@ -170,7 +213,7 @@ Call::setState(CallState call_state, ConnectionState cnx_state, signed code) l(callState_, connectionState_, code); if (old_client_state != new_client_state) { - if (not quiet) { + if (not parent_.load()) { RING_DBG("[call:%s] emit client call state change %s, code %d", id_.c_str(), new_client_state.c_str(), code); emitSignal<DRing::CallSignal::StateChange>(id_, new_client_state, code); @@ -336,7 +379,7 @@ Call::newIceSocket(unsigned compId) void Call::onTextMessage(std::map<std::string, std::string>&& messages) { - if (quiet) + if (parent_.load()) pendingInMessages_.emplace_back(std::move(messages), ""); else Manager::instance().incomingMessage(getCallId(), getPeerNumber(), messages); @@ -352,133 +395,159 @@ Call::peerHungup() } void -Call::addSubCall(const std::shared_ptr<Call>& call) +Call::addSubCall(Call& subcall) { + std::lock_guard<std::recursive_mutex> lk {callMutex_}; + + // XXX: following check seems wobbly - need comment if (connectionState_ == ConnectionState::CONNECTED - || callState_ == CallState::ACTIVE - || callState_ == CallState::OVER) { - call->removeCall(); - } else { - std::lock_guard<std::recursive_mutex> lk (callMutex_); - if (not subcalls.emplace(call).second) - return; - call->quiet = true; - - for (auto& pmsg : pendingOutMessages_) - call->sendTextMessage(pmsg.first, pmsg.second); - - std::weak_ptr<Call> wthis = shared_from_this(); - std::weak_ptr<Call> wcall = call; - call->addStateListener([wcall, wthis](Call::CallState new_state, - Call::ConnectionState new_cstate, - UNUSED int code) { - if (auto call = wcall.lock()) { - if (auto sthis = wthis.lock()) { - auto& this_ = *sthis; - auto sit = this_.subcalls.find(call); - if (sit == this_.subcalls.end()) - return; - RING_WARN("[call:%s] DeviceCall call %s state changed %d %d", - this_.getCallId().c_str(), call->getCallId().c_str(), - static_cast<int>(new_state), static_cast<int>(new_cstate)); - if (new_state == CallState::OVER) { - std::lock_guard<std::recursive_mutex> lk (this_.callMutex_); - this_.subcalls.erase(call); - } else if (new_state == CallState::ACTIVE && this_.callState_ == CallState::INACTIVE) { - this_.setState(new_state); - } - if ((unsigned)this_.connectionState_ < (unsigned)new_cstate && (unsigned)new_cstate <= (unsigned)ConnectionState::RINGING) { - this_.setState(new_cstate); - } else if (new_cstate == ConnectionState::DISCONNECTED && new_state == CallState::ACTIVE) { - std::lock_guard<std::recursive_mutex> lk (this_.callMutex_); - RING_WARN("[call:%s] peer hangup", this_.getCallId().c_str()); - auto subcalls = std::move(this_.subcalls); - for (auto& sub : subcalls) { - if (sub != call) - try { - sub->hangup(0); - } catch(const std::exception& e) { - RING_WARN("[call:%s] error hanging up: %s", - this_.getCallId().c_str(), e.what()); - } - } - this_.peerHungup(); - } - if (new_state == CallState::ACTIVE && new_cstate == ConnectionState::CONNECTED) { - std::lock_guard<std::recursive_mutex> lk (this_.callMutex_); - RING_WARN("[call:%s] peer answer", this_.getCallId().c_str()); - auto subcalls = std::move(this_.subcalls); - for (auto& sub : subcalls) { - if (sub != call) - sub->hangup(0); - } - this_.merge(call); - Manager::instance().peerAnsweredCall(this_); - } - RING_WARN("[call:%s] Remaining %zu subcalls", this_.getCallId().c_str(), - this_.subcalls.size()); - if (this_.subcalls.empty()) - this_.pendingOutMessages_.clear(); - } else { - RING_WARN("DeviceCall IGNORED call %s state changed %d %d", - call->getCallId().c_str(), static_cast<int>(new_state), - static_cast<int>(new_cstate)); - } - } + || callState_ == CallState::ACTIVE + || callState_ == CallState::OVER) { + subcall.removeCall(); + return; + } + + if (not subcalls_.emplace(getPtr(subcall)).second) { + RING_ERR("[call:%s] add twice subcall %s", getCallId().c_str(), subcall.getCallId().c_str()); + return; + } + + RING_DBG("[call:%s] add subcall %s", getCallId().c_str(), subcall.getCallId().c_str()); + subcall.parent_ = this; + + for (const auto& msg : pendingOutMessages_) + subcall.sendTextMessage(msg.first, msg.second); + + subcall.addStateListener( + [&subcall](Call::CallState new_state, + Call::ConnectionState new_cstate, + UNUSED int code) { + auto ptr = getPtr(subcall); // keep the subcall valid (subcallStateChanged may remove it) + auto parent = subcall.parent_.load(); + assert(parent != nullptr); + parent->subcallStateChanged(subcall, new_state, new_cstate); }); - setState(ConnectionState::TRYING); +} + +/// Called by a subcall when its states change (multidevice) +/// +/// Its purpose is to manage per device call and try to found the first responding. +/// Parent call states are managed by these subcalls. +/// \note this method may decrease the given \a subcall ref count. +void +Call::subcallStateChanged(Call& subcall, + Call::CallState new_state, + Call::ConnectionState new_cstate) +{ + { + // This condition happens when a subcall hangups/fails after removed from parent's list. + // This is normal to keep parent_ != nullptr on the subcall, as it's the way to flag it + // as an subcall and not a master call. + // XXX: having a way to unsubscribe the state listener could be better than such test + std::lock_guard<std::recursive_mutex> lk {callMutex_}; + auto sit = subcalls_.find(getPtr(subcall)); + if (sit == subcalls_.end()) + return; + } + + // We found a responding device: hangup all other subcalls and merge + if (new_state == CallState::ACTIVE and new_cstate == ConnectionState::CONNECTED) { + RING_DBG("[call:%s] subcall %s answered by peer", getCallId().c_str(), + subcall.getCallId().c_str()); + + hangupCallsIf(safePopSubcalls(), 0, [&](const Call* call){ return call != &subcall; }); + merge(subcall); + Manager::instance().peerAnsweredCall(*this); + return; + } + + // Hangup the call if any device hangup + // XXX: not sure it's what we really want + if (new_state == CallState::ACTIVE and new_cstate == ConnectionState::DISCONNECTED) { + RING_WARN("[call:%s] subcall %s hangup by peer", getCallId().c_str(), + subcall.getCallId().c_str()); + + hangupCalls(safePopSubcalls(), 0); + Manager::instance().peerHungupCall(*this); + return; + } + + // Subcall is busy or failed + if (new_state >= CallState::BUSY) { + RING_WARN("[call:%s] subcall %s failed", getCallId().c_str(), subcall.getCallId().c_str()); + std::lock_guard<std::recursive_mutex> lk {callMutex_}; + subcalls_.erase(getPtr(subcall)); + + // Parent call fails if all subcalls have failed + if (subcalls_.empty()) + // XXX: first idea was to use std::errc::host_unreachable, but it's not available on some platforms + // like mingw. + setState(CallState::MERROR, ConnectionState::DISCONNECTED, static_cast<int>(std::errc::io_error)); + else + RING_DBG("[call:%s] remains %zu subcall(s)", getCallId().c_str(), subcalls_.size()); + return; + } + + // Copy call/cnx states (forward only) + if (new_state == CallState::ACTIVE && callState_ == CallState::INACTIVE) { + setState(new_state); + } + if (static_cast<unsigned>(connectionState_) < static_cast<unsigned>(new_cstate) + and static_cast<unsigned>(new_cstate) <= static_cast<unsigned>(ConnectionState::RINGING)) { + setState(new_cstate); } } +/// Replace current call data with ones from the given \a subcall. +/// void -Call::merge(const std::shared_ptr<Call>& scall) +Call::merge(Call& subcall) { - RING_WARN("[call:%s] merge to -> [call:%s]", scall->getCallId().c_str(), getCallId().c_str()); - auto& call = *scall; - std::lock(callMutex_, call.callMutex_); - std::lock_guard<std::recursive_mutex> lk1 (callMutex_, std::adopt_lock); - std::lock_guard<std::recursive_mutex> lk2 (call.callMutex_, std::adopt_lock); - auto pendingInMessages = std::move(call.pendingInMessages_); - iceTransport_ = std::move(call.iceTransport_); - if (peerNumber_.empty()) - peerNumber_ = std::move(call.peerNumber_); - peerDisplayName_ = std::move(call.peerDisplayName_); - localAudioPort_ = call.localAudioPort_; - localVideoPort_ = call.localVideoPort_; - setState(call.getState()); - setState(call.getConnectionState()); - for (const auto& msg : pendingInMessages) - Manager::instance().incomingMessage(getCallId(), getPeerNumber(), msg.first); - scall->removeCall(); + RING_DBG("[call:%s] merge subcall %s", getCallId().c_str(), subcall.getCallId().c_str()); + + // Merge data + { + std::lock(callMutex_, subcall.callMutex_); + std::lock_guard<std::recursive_mutex> lk1 {callMutex_, std::adopt_lock}; + std::lock_guard<std::recursive_mutex> lk2 {subcall.callMutex_, std::adopt_lock}; + pendingInMessages_ = std::move(subcall.pendingInMessages_); + iceTransport_ = std::move(subcall.iceTransport_); + if (peerNumber_.empty()) + peerNumber_ = std::move(subcall.peerNumber_); + peerDisplayName_ = std::move(subcall.peerDisplayName_); + localAudioPort_ = subcall.localAudioPort_; + localVideoPort_ = subcall.localVideoPort_; + setState(subcall.getState(), subcall.getConnectionState()); + } + + subcall.removeCall(); } +/// Handle pending IM message +/// +/// Used in multi-device context to send pending IM when the master call is connected. void Call::checkPendingIM() { - using namespace DRing::Call; + std::lock_guard<std::recursive_mutex> lk {callMutex_}; auto state = getStateStr(); - if (state == StateEvent::OVER) { - // Hangup device's call when parent is over - if (not subcalls.empty()) { - auto subs = std::move(subcalls); - for (auto c : subs) - c->hangup(0); + // Let parent call handles IM after the merge + if (not parent_.load()) { + if (state == DRing::Call::StateEvent::CURRENT) { + for (const auto& msg : pendingInMessages_) + Manager::instance().incomingMessage(getCallId(), getPeerNumber(), msg.first); pendingInMessages_.clear(); + + for (const auto& msg : pendingOutMessages_) + sendTextMessage(msg.first, msg.second); pendingOutMessages_.clear(); } - } else if (state == StateEvent::CURRENT) { - // Peer connected, time to send pending IM - // TODO not thread safe - for (const auto& msg : pendingOutMessages_) - sendTextMessage(msg.first, msg.second); - pendingOutMessages_.clear(); } } -/** - * Handle tones for RINGING and BUSY calls - */ +/// Handle tones for RINGING and BUSY calls +/// void Call::checkAudio() { @@ -492,4 +561,15 @@ Call::checkAudio() } } +// Helper to safely pop subcalls list +Call::SubcallSet +Call::safePopSubcalls() +{ + std::lock_guard<std::recursive_mutex> lk {callMutex_}; + // std::exchange is C++14 + auto old_value = std::move(subcalls_); + subcalls_.clear(); + return old_value; +} + } // namespace ring diff --git a/src/call.h b/src/call.h index 421f9c5353cd46f072502f145e1d0da84ba6f36c..b44843a049e9034508ef7c96b94ce91ccaae5468 100644 --- a/src/call.h +++ b/src/call.h @@ -41,6 +41,7 @@ #include <condition_variable> #include <set> #include <list> +#include <atomic> namespace ring { @@ -57,6 +58,8 @@ template <class T> using CallMap = std::map<std::string, std::shared_ptr<T> >; class Call : public Recordable, public std::enable_shared_from_this<Call> { public: + using SubcallSet = std::set<std::shared_ptr<Call>, std::owner_less<std::shared_ptr<Call>>>; + static const char * const DEFAULT_ID; /** @@ -323,11 +326,16 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { void addStateListener(T&& list) { stateChangedListeners_.emplace_back(std::forward<T>(list)); } - void addSubCall(const std::shared_ptr<Call>& call); - virtual void merge(const std::shared_ptr<Call>& scall); + /** + * Attach subcall to this instance. + * If this subcall is answered, this subcall and this instance will be merged using merge(). + */ + void addSubCall(Call& call); protected: + virtual void merge(Call& scall); + /** * Constructor of a call * @param id Unique identifier of the call @@ -341,12 +349,20 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { bool isAudioMuted_{false}; bool isVideoMuted_{false}; - bool quiet {false}; - std::set<std::shared_ptr<Call>> subcalls {}; - std::list<std::pair<std::map<std::string, std::string>, std::string>> pendingInMessages_ {}; - std::list<std::pair<std::map<std::string, std::string>, std::string>> pendingOutMessages_ {}; + + ///< MultiDevice: parent call, nullptr otherwise. + std::atomic<Call*> parent_ {nullptr}; + + ///< MultiDevice: list of attached subcall + SubcallSet subcalls_; + + using MsgList = std::list<std::pair<std::map<std::string, std::string>, std::string>>; + + ///< MultiDevice: message waiting to be sent (need a valid subcall) + MsgList pendingOutMessages_; private: + friend void hangupCallsIf(Call::SubcallSet, int, const std::function<bool(Call*)>&); bool validStateTransition(CallState newState); @@ -354,6 +370,10 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { void checkAudio(); + void subcallStateChanged(Call&, Call::CallState, Call::ConnectionState); + + SubcallSet safePopSubcalls(); + /** Protect every attribute that can be changed by two threads */ mutable std::recursive_mutex callMutex_ {}; @@ -395,6 +415,19 @@ class Call : public Recordable, public std::enable_shared_from_this<Call> { std::string peerDisplayName_ {}; time_t timestamp_start_ {0}; + + ///< MultiDevice: message received by subcall to merged yet + MsgList pendingInMessages_; }; +// Helpers + +/** + * Obtain a shared smart pointer of instance + */ +inline std::shared_ptr<Call> getPtr(Call& call) +{ + return call.shared_from_this(); +} + } // namespace ring diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index d8c98156d8662a13729a90980c36223ba6569914..c2968e86fd5f73651d2ba713888c50c414b0b6fb 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -453,7 +453,7 @@ RingAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std:: return; } - call->addSubCall(dev_call); + call->addSubCall(*dev_call); manager.addTask([sthis, weak_dev_call, ice, dev] { auto call = weak_dev_call.lock(); diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index f476af3ad3836c7cb51e3d0e1f1537176f5910c5..287bbd6e9edac52eae0a4e432efa1100eabc24c2 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -663,9 +663,9 @@ SIPCall::sendTextMessage(const std::map<std::string, std::string>& messages, //TODO: for now we ignore the "from" (the previous implementation for sending this info was // buggy and verbose), another way to send the original message sender will be implemented // in the future - if (not subcalls.empty()) { + if (not subcalls_.empty()) { pendingOutMessages_.emplace_back(messages, from); - for (auto& c : subcalls) + for (auto& c : subcalls_) c->sendTextMessage(messages, from); } else { if (inv) { @@ -710,7 +710,7 @@ SIPCall::onAnswered() RING_WARN("[call:%s] onAnswered()", getCallId().c_str()); if (getConnectionState() != ConnectionState::CONNECTED) { setState(CallState::ACTIVE, ConnectionState::CONNECTED); - if (not quiet) + if (not parent_.load()) Manager::instance().peerAnsweredCall(*this); } } @@ -888,7 +888,7 @@ SIPCall::startAllMedia() } } - if (not quiet and peerHolding_ != peer_holding) { + if (not parent_.load() and peerHolding_ != peer_holding) { peerHolding_ = peer_holding; emitSignal<DRing::CallSignal::PeerHold>(getCallId(), peerHolding_); } @@ -934,7 +934,7 @@ SIPCall::muteMedia(const std::string& mediaType, bool mute) isVideoMuted_ = mute; videoInput_ = isVideoMuted_ ? "" : Manager::instance().getVideoManager().videoDeviceMonitor.getMRLForDefaultDevice(); DRing::switchInput(getCallId(), videoInput_); - if (not quiet) + if (not parent_.load()) emitSignal<DRing::CallSignal::VideoMuted>(getCallId(), isVideoMuted_); #endif } else if (mediaType.compare(DRing::Media::Details::MEDIA_TYPE_AUDIO) == 0) { @@ -942,7 +942,7 @@ SIPCall::muteMedia(const std::string& mediaType, bool mute) RING_WARN("[call:%s] audio muting %s", getCallId().c_str(), bool_to_str(mute)); isAudioMuted_ = mute; avformatrtp_->setMuted(isAudioMuted_); - if (not quiet) + if (not parent_.load()) emitSignal<DRing::CallSignal::AudioMuted>(getCallId(), isAudioMuted_); } } @@ -954,7 +954,7 @@ SIPCall::onMediaUpdate() stopAllMedia(); openPortsUPnP(); if (startIce()) { - if (not quiet) + if (not parent_.load()) waitForIceAndStartMedia(); } else { RING_WARN("[call:%s] ICE not used for media", getCallId().c_str()); @@ -1113,18 +1113,24 @@ SIPCall::initIceTransport(bool master, unsigned channel_num) } void -SIPCall::merge(const std::shared_ptr<SIPCall>& scall) +SIPCall::merge(Call& call) { - RING_WARN("SIPCall::merge %s -> %s", scall->getCallId().c_str(), getCallId().c_str()); - inv = std::move(scall->inv); + RING_DBG("[sipcall:%s] merge subcall %s", getCallId().c_str(), call.getCallId().c_str()); + + // This static cast is safe as this method is private and overload Call::merge + auto& subcall = static_cast<SIPCall&>(call); + + inv = std::move(subcall.inv); inv->mod_data[getSIPVoIPLink()->getModId()] = this; - setTransport(scall->transport_); - sdp_ = std::move(scall->sdp_); - peerHolding_ = scall->peerHolding_; - upnp_ = std::move(scall->upnp_); - std::copy_n(scall->contactBuffer_, PJSIP_MAX_URL_SIZE, contactBuffer_); - pj_strcpy(&contactHeader_, &scall->contactHeader_); - Call::merge(scall); + setTransport(subcall.transport_); + sdp_ = std::move(subcall.sdp_); + peerHolding_ = subcall.peerHolding_; + upnp_ = std::move(subcall.upnp_); + std::copy_n(subcall.contactBuffer_, PJSIP_MAX_URL_SIZE, contactBuffer_); + pj_strcpy(&contactHeader_, &subcall.contactHeader_); + + Call::merge(subcall); + if (iceTransport_->isStarted()) waitForIceAndStartMedia(); } diff --git a/src/sip/sipcall.h b/src/sip/sipcall.h index d5bc13a97a030208bb029275d0822409d368b347..34e169d127af66aa8ff0a0836e8b0d236e4a594f 100644 --- a/src/sip/sipcall.h +++ b/src/sip/sipcall.h @@ -215,11 +215,6 @@ class SIPCall : public Call void terminateSipSession(int status); - virtual void merge(const std::shared_ptr<Call>& scall) { - merge(std::dynamic_pointer_cast<SIPCall>(scall)); - } - virtual void merge(const std::shared_ptr<SIPCall>& scall); - void setPeerRegistredName(const std::string& name) { peerRegistredName_ = name; } @@ -246,6 +241,8 @@ class SIPCall : public Call int SIPSessionReinvite(); + void merge(Call& call) override; // only called by Call + std::vector<IceCandidate> getAllRemoteCandidates(); std::unique_ptr<AudioRtpSession> avformatrtp_; @@ -282,6 +279,16 @@ class SIPCall : public Call std::unique_ptr<ring::upnp::Controller> upnp_; }; +// Helpers + +/** + * Obtain a shared smart pointer of instance + */ +inline std::shared_ptr<SIPCall> getPtr(SIPCall& call) +{ + return std::static_pointer_cast<SIPCall>(call.shared_from_this()); +} + } // namespace ring #endif // __SIPCALL_H__