diff --git a/daemon/src/managerimpl.cpp b/daemon/src/managerimpl.cpp index 8261b5d85cf68cb7a3f39375d8ae9b8e11c9c984..755b6448fd84a4afa1135301efc79bd814669693 100644 --- a/daemon/src/managerimpl.cpp +++ b/daemon/src/managerimpl.cpp @@ -943,6 +943,11 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer if (participants.empty()) ERROR("Participant list is empty for this conference"); +#ifdef SFL_VIDEO + // request participant video streams to be routed to video mixer + SIPVoIPLink::instance()->getSipCall(callId)->getVideoRtp().enterConference(conf); +#endif // SFL_VIDEO + // Connect stream addStream(callId); return true; @@ -1111,6 +1116,12 @@ ManagerImpl::joinParticipant(const std::string& callId1, const std::string& call conf->setRecordingSmplRate(audiodriver_->getSampleRate()); } +#ifdef SFL_VIDEO + // request participant video streams to be routed to video mixer + SIPVoIPLink::instance()->getSipCall(callId1)->getVideoRtp().enterConference(conf); + SIPVoIPLink::instance()->getSipCall(callId2)->getVideoRtp().enterConference(conf); +#endif // SFL_VIDEO + getMainBuffer().dumpInfo(); return true; } diff --git a/daemon/src/video/video_base.h b/daemon/src/video/video_base.h index 2eafbd670b078676cca47758640c31c0c348e32a..7fd1eb894f85bbd38b2136d82e607760b904d6ba 100644 --- a/daemon/src/video/video_base.h +++ b/daemon/src/video/video_base.h @@ -79,12 +79,14 @@ public: void attach(Observer<T>* o) { std::unique_lock<std::mutex> lk(mutex_); - observers_.insert(o); + if (o) + observers_.insert(o); } void detach(Observer<T>* o) { std::unique_lock<std::mutex> lk(mutex_); - observers_.erase(o); + if (o) + observers_.erase(o); } void notify(T& data) { diff --git a/daemon/src/video/video_mixer.cpp b/daemon/src/video/video_mixer.cpp index f470c9da7297d761aa110fe2d2251b226c59dfda..f551790fe4746fa30953e85420eb7bdb9879378c 100644 --- a/daemon/src/video/video_mixer.cpp +++ b/daemon/src/video/video_mixer.cpp @@ -45,9 +45,7 @@ VideoMixer::VideoMixer() : , height_(0) , renderMutex_() , renderCv_() -{ - start(); -} +{ start(); } VideoMixer::~VideoMixer() { diff --git a/daemon/src/video/video_receive_thread.cpp b/daemon/src/video/video_receive_thread.cpp index 4610529e9d75608d47266cb964acc809756189b3..bc57341f2cdac3f014c52be6359d69ee0046ca63 100644 --- a/daemon/src/video/video_receive_thread.cpp +++ b/daemon/src/video/video_receive_thread.cpp @@ -47,18 +47,17 @@ using std::string; const int SDP_BUFFER_SIZE = 8192; VideoReceiveThread::VideoReceiveThread(const std::string& id, - const std::map<string, string>& args, - const std::shared_ptr<SHMSink>& sink) : + const std::map<string, string>& args) : VideoGenerator::VideoGenerator() , args_(args) , videoDecoder_() - , sink_(sink) , dstWidth_(0) , dstHeight_(0) , id_(id) , stream_(args_["receiving_sdp"]) , sdpContext_(SDP_BUFFER_SIZE, false, &readFunction, 0, 0, this) , demuxContext_() + , sink_() , requestKeyFrameCallback_(0) {} @@ -140,32 +139,19 @@ bool VideoReceiveThread::setup() dstHeight_ = videoDecoder_->getHeight(); } - // Sink startup - if (sink_) { - EXIT_IF_FAIL(sink_->start(), "RX: sink startup failed"); - Manager::instance().getVideoControls()->startedDecoding(id_, - sink_->openedName(), - dstWidth_, - dstHeight_); - DEBUG("RX: shm sink <%s> started: size = %dx%d", - sink_->openedName().c_str(), dstWidth_, dstHeight_); - attach(sink_.get()); - } + auto conf = Manager::instance().getConferenceFromCallID(id_); + if (!conf) + exitConference(); return true; } void VideoReceiveThread::process() -{ - decodeFrame(); -} +{ decodeFrame(); } void VideoReceiveThread::cleanup() { - if (sink_) { - detach(sink_.get()); - sink_->stop(); - } + Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName()); if (videoDecoder_) delete videoDecoder_; @@ -218,12 +204,51 @@ bool VideoReceiveThread::decodeFrame() return false; } -void VideoReceiveThread::setRequestKeyFrameCallback( - void (*cb)(const std::string &)) +void VideoReceiveThread::addReceivingDetails( + std::map<std::string, std::string> &details) +{ + if (isRunning() and dstWidth_ > 0 and dstHeight_ > 0) { + details["VIDEO_SHM_PATH"] = sink_.openedName(); + std::ostringstream os; + os << dstWidth_; + details["VIDEO_WIDTH"] = os.str(); + os.str(""); + os << dstHeight_; + details["VIDEO_HEIGHT"] = os.str(); + } +} + +void VideoReceiveThread::enterConference() +{ + if (!isRunning()) + return; + + Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName()); + detach(&sink_); + sink_.stop(); +} + +void VideoReceiveThread::exitConference() { - requestKeyFrameCallback_ = cb; + if (!isRunning()) + return; + + // Sink startup + EXIT_IF_FAIL(sink_.start(), "RX: sink startup failed"); + Manager::instance().getVideoControls()->startedDecoding(id_, + sink_.openedName(), + dstWidth_, + dstHeight_); + DEBUG("RX: shm sink <%s> started: size = %dx%d", + sink_.openedName().c_str(), dstWidth_, dstHeight_); + + attach(&sink_); } +void VideoReceiveThread::setRequestKeyFrameCallback( + void (*cb)(const std::string &)) +{ requestKeyFrameCallback_ = cb; } + int VideoReceiveThread::getWidth() const { return dstWidth_; } diff --git a/daemon/src/video/video_receive_thread.h b/daemon/src/video/video_receive_thread.h index b7882479f83c161d53c74c52a8a4305a57c87581..851d9329aba7117de85fe5ca320d22416b8d5165 100644 --- a/daemon/src/video/video_receive_thread.h +++ b/daemon/src/video/video_receive_thread.h @@ -49,12 +49,14 @@ class SocketPair; class VideoReceiveThread : public VideoGenerator, public SFLThread { public: VideoReceiveThread(const std::string &id, - const std::map<std::string, std::string> &args, - const std::shared_ptr<SHMSink>& sink); + const std::map<std::string, std::string> &args); ~VideoReceiveThread(); void addIOContext(SocketPair &socketPair); void setRequestKeyFrameCallback(void (*)(const std::string &)); + void addReceivingDetails(std::map<std::string, std::string> &details); + void enterConference(); + void exitConference(); // as VideoGenerator int getWidth() const; @@ -70,13 +72,13 @@ private: /* These variables should be used in thread (i.e. run()) only! */ /*-------------------------------------------------------------*/ VideoDecoder *videoDecoder_; - const std::shared_ptr<SHMSink> sink_; int dstWidth_; int dstHeight_; const std::string id_; std::istringstream stream_; VideoIOHandle sdpContext_; VideoIOHandle *demuxContext_; + SHMSink sink_; void (*requestKeyFrameCallback_)(const std::string &); void openDecoder(); @@ -84,6 +86,7 @@ private: static int interruptCb(void *ctx); static int readFunction(void *opaque, uint8_t *buf, int buf_size); + // as SFLThread bool setup(); void process(); diff --git a/daemon/src/video/video_rtp_session.cpp b/daemon/src/video/video_rtp_session.cpp index 486e1dc20c3fa245249854dc3ca3d95a5e264d92..d8dd05bfebef7292ce45b84c874166a39477df60 100644 --- a/daemon/src/video/video_rtp_session.cpp +++ b/daemon/src/video/video_rtp_session.cpp @@ -55,7 +55,7 @@ VideoRtpSession::VideoRtpSession(const string &callID, const map<string, string> &txArgs) : socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs), rxArgs_(), sending_(false), receiving_(false), callID_(callID), - videoMixer_(), videoLocal_(), sink_(new SHMSink()) + videoMixer_(), videoLocal_() {} VideoRtpSession::~VideoRtpSession() @@ -131,11 +131,6 @@ void VideoRtpSession::updateDestination(const string &destination, void VideoRtpSession::start(int localPort) { - std::string curcid = Manager::instance().getCurrentCallId(); - - videoMixer_ = nullptr; - videoLocal_ = nullptr; - if (not sending_ and not receiving_) return; @@ -154,28 +149,11 @@ void VideoRtpSession::start(int localPort) std::this_thread::sleep_for(std::chrono::seconds(1)); } - // Check for video conference mode - auto conf = Manager::instance().getConferenceFromCallID(callID_); videoLocal_ = videoCtrl->getVideoPreview(); - if (not conf and not videoLocal_) { - ERROR("Sending disabled, no local video"); - sending_ = false; - sendThread_.reset(); - } else { - if (conf) { - // setup mixer pipeline - videoMixer_ = conf->getVideoMixer(); - if (videoLocal_) - videoLocal_->attach(videoMixer_); - } - - if (sendThread_.get()) - WARN("Restarting video sender"); - - sendThread_.reset(new VideoSendThread(callID_, txArgs_, - *socketPair_, videoLocal_, - videoMixer_)); - } + if (sendThread_.get()) + WARN("Restarting video sender"); + + sendThread_.reset(new VideoSendThread(callID_, txArgs_, *socketPair_)); } else { DEBUG("Video sending disabled"); sendThread_.reset(); @@ -184,8 +162,7 @@ void VideoRtpSession::start(int localPort) if (receiving_) { if (receiveThread_.get()) WARN("restarting video receiver"); - receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_, - !videoMixer_?sink_:nullptr)); + receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_)); receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest); receiveThread_->addIOContext(*socketPair_); receiveThread_->start(); @@ -193,22 +170,35 @@ void VideoRtpSession::start(int localPort) DEBUG("Video receiving disabled"); receiveThread_.reset(); } + + // Setup pipeline + if (videoMixer_) { + // We are in conference mode + + if (sendThread_) { + videoLocal_->detach(sendThread_.get()); + videoMixer_->attach(sendThread_.get()); + } + + if (receiveThread_) { + receiveThread_->enterConference(); + receiveThread_->attach(videoMixer_); + } + } else { + if (sendThread_) + videoLocal_->attach(sendThread_.get()); + } } void VideoRtpSession::stop() { - Manager::instance().getVideoControls()->stoppedDecoding(callID_, - sink_->openedName()); - if (videoLocal_ and videoMixer_) { - videoLocal_->detach(videoMixer_); - videoMixer_->detach(sink_.get()); - } else if (videoLocal_) - videoLocal_->detach(sink_.get()); - else if (videoMixer_) - videoMixer_->detach(sink_.get()); - - videoLocal_ = nullptr; - videoMixer_ = nullptr; + if (videoLocal_) + videoLocal_->detach(sendThread_.get()); + + if (videoMixer_) { + videoMixer_->detach(sendThread_.get()); + receiveThread_->detach(videoMixer_); + } if (socketPair_.get()) socketPair_->interrupt(); @@ -226,15 +216,40 @@ void VideoRtpSession::forceKeyFrame() void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details) { - if (receiveThread_.get()) { - details["VIDEO_SHM_PATH"] = sink_->openedName(); - std::ostringstream os; - os << receiveThread_->getWidth(); - details["VIDEO_WIDTH"] = os.str(); - os.str(""); - os << receiveThread_->getHeight(); - details["VIDEO_HEIGHT"] = os.str(); + if (receiveThread_) + receiveThread_->addReceivingDetails(details); +} + +void VideoRtpSession::enterConference(Conference *conf) +{ + auto mixer = conf->getVideoMixer(); + if (!mixer) { + ERROR("No conference mixer!"); + return; } + + /* Detach from a possible previous conference */ + exitConference(); + + videoMixer_ = mixer; // catched during start() +} + +void VideoRtpSession::exitConference() +{ + if (videoMixer_) { + if (sendThread_) + videoMixer_->detach(sendThread_.get()); + + if (receiveThread_) { + receiveThread_->detach(videoMixer_); + receiveThread_->exitConference(); + } + + videoMixer_ = nullptr; + } + + if (videoLocal_) + videoLocal_->attach(sendThread_.get()); } } // end namespace sfl_video diff --git a/daemon/src/video/video_rtp_session.h b/daemon/src/video/video_rtp_session.h index abf7fb2d45929c24d39d2bc809fd591892e71ad2..9fecef713a1ab8bcefed18fcb4fc37e31ca817cf 100644 --- a/daemon/src/video/video_rtp_session.h +++ b/daemon/src/video/video_rtp_session.h @@ -34,7 +34,7 @@ #include "video_base.h" #include "video_mixer.h" -#include "shm_sink.h" +#include "conference.h" #include "noncopyable.h" #include <string> @@ -64,6 +64,8 @@ public: void addReceivingDetails(std::map<std::string, std::string> &details); void bindMixer(VideoMixer* mixer); void unbindMixer(); + void enterConference(Conference *conf); + void exitConference(); private: NON_COPYABLE(VideoRtpSession); @@ -78,7 +80,6 @@ private: const std::string callID_; VideoMixer* videoMixer_; VideoFrameActiveWriter *videoLocal_; - std::shared_ptr<SHMSink> sink_; }; } diff --git a/daemon/src/video/video_send_thread.cpp b/daemon/src/video/video_send_thread.cpp index 2d58bc3d2c280948b2172bf92901fdd16c85a22a..3b4d318e6cb075c38c71f1f8745df2da54005a3d 100644 --- a/daemon/src/video/video_send_thread.cpp +++ b/daemon/src/video/video_send_thread.cpp @@ -46,35 +46,16 @@ using std::string; VideoSendThread::VideoSendThread(const std::string &id, const std::map<string, string> &args, - SocketPair& socketPair, - VideoFrameActiveWriter *local_video, - VideoFrameActiveWriter *mixer) : + SocketPair& socketPair) : args_(args) , id_(id) , videoEncoder_() - , videoSource_(local_video) , forceKeyFrame_(0) , frameNumber_(0) , muxContext_(socketPair.getIOContext()) , sdp_() { - if (setup()) { - // do video pipeline - if (mixer) { - videoSource_ = mixer; - static_cast<VideoMixer*>(mixer)->setDimensions( - videoEncoder_->getWidth(), - videoEncoder_->getHeight()); - } - if (videoSource_) - videoSource_->attach(this); - } -} - -VideoSendThread::~VideoSendThread() -{ - if (videoSource_) - videoSource_->detach(this); + setup(); } bool VideoSendThread::setup() @@ -142,7 +123,9 @@ void VideoSendThread::encodeAndSendVideo(VideoFrame& input_frame) } void VideoSendThread::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p) -{ encodeAndSendVideo(*frame_p); } +{ + encodeAndSendVideo(*frame_p); +} void VideoSendThread::forceKeyFrame() { atomic_increment(&forceKeyFrame_); } diff --git a/daemon/src/video/video_send_thread.h b/daemon/src/video/video_send_thread.h index e7d04b8fdba22c5242daa8efb254f6b9fb3d59f7..7d768c55a1e00631890fefbf9d694a13d8bf87dd 100644 --- a/daemon/src/video/video_send_thread.h +++ b/daemon/src/video/video_send_thread.h @@ -49,10 +49,7 @@ class VideoSendThread : public VideoFramePassiveReader public: VideoSendThread(const std::string &id, const std::map<std::string, std::string> &args, - SocketPair& socketPair, - VideoFrameActiveWriter *local_video, - VideoFrameActiveWriter *mixer); - virtual ~VideoSendThread(); + SocketPair& socketPair); std::string getSDP() const { return sdp_; } void forceKeyFrame(); @@ -69,7 +66,6 @@ private: const std::string &id_; VideoEncoder *videoEncoder_; - VideoFrameActiveWriter *videoSource_; int forceKeyFrame_; int frameNumber_;