From 475b8e5256a901bb2ffaecfe58d0299e76a73053 Mon Sep 17 00:00:00 2001 From: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> Date: Tue, 17 Sep 2013 10:42:10 -0400 Subject: [PATCH] #29579: video mixing implementation and conference fixes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mixer rendering implemention => frame based (was per sources batch based) - add backward signaling to Observer/Obsevable classes => This help mixer to index sources for layout them. - mutex'ed frame publish (VideoGenerator). - sinks creation are now done at right places. => one per mixer (new), one per camera, one per stream reception. - VideoRTPSession is fully responsible to handle video pipeline, between RX/TS streams. => exhibit enterConference/exitConference to be aknowledged by upper layers. - VideoSendThread is not longer a «thread», renamed as VideoSender. - videoMixer_ is now a shared ptr in Conference objects. => getVideoMixer() return a rew shared_ptr also. - Conference is now responsible to trig video conference pipeline - std::this_thread::sleep_for() is not usable before GCC 4.1 --- daemon/src/conference.cpp | 26 ++- daemon/src/conference.h | 10 +- daemon/src/managerimpl.cpp | 11 -- daemon/src/video/Makefile.am | 2 +- daemon/src/video/socket_pair.h | 3 - daemon/src/video/video_base.cpp | 5 +- daemon/src/video/video_base.h | 38 ++++- daemon/src/video/video_camera.cpp | 23 +-- daemon/src/video/video_mixer.cpp | 152 +++++++++++------- daemon/src/video/video_mixer.h | 31 ++-- daemon/src/video/video_receive_thread.cpp | 24 ++- daemon/src/video/video_rtp_session.cpp | 87 +++++----- daemon/src/video/video_rtp_session.h | 7 +- ...video_send_thread.cpp => video_sender.cpp} | 20 +-- .../{video_send_thread.h => video_sender.h} | 17 +- 15 files changed, 259 insertions(+), 197 deletions(-) rename daemon/src/video/{video_send_thread.cpp => video_sender.cpp} (91%) rename daemon/src/video/{video_send_thread.h => video_sender.h} (86%) diff --git a/daemon/src/conference.cpp b/daemon/src/conference.cpp index 49ceeb800b..4d4745ca7d 100644 --- a/daemon/src/conference.cpp +++ b/daemon/src/conference.cpp @@ -37,6 +37,8 @@ #include "audio/mainbuffer.h" #ifdef SFL_VIDEO +#include "sip/sipvoiplink.h" +#include "sip/sipcall.h" #include "client/video_controls.h" #include "video/video_camera.h" #endif @@ -49,12 +51,18 @@ Conference::Conference() , confState_(ACTIVE_ATTACHED) , participants_() #ifdef SFL_VIDEO - , videoMixer_() + , videoMixer_(new sfl_video::VideoMixer(id_)) #endif { Recordable::initRecFilename(id_); } +Conference::~Conference() +{ + for (auto participant_id : participants_) + remove(participant_id); +} + Conference::ConferenceState Conference::getState() const { return confState_; @@ -67,12 +75,20 @@ void Conference::setState(ConferenceState state) void Conference::add(const std::string &participant_id) { - participants_.insert(participant_id); + if (participants_.insert(participant_id).second) { +#ifdef SFL_VIDEO + SIPVoIPLink::instance()->getSipCall(participant_id)->getVideoRtp().enterConference(this); +#endif // SFL_VIDEO + } } void Conference::remove(const std::string &participant_id) { - participants_.erase(participant_id); + if (participants_.erase(participant_id)) { +#ifdef SFL_VIDEO + SIPVoIPLink::instance()->getSipCall(participant_id)->getVideoRtp().exitConference(); +#endif // SFL_VIDEO + } } void Conference::bindParticipant(const std::string &participant_id) @@ -144,8 +160,8 @@ std::string Conference::getConfID() const { } #ifdef SFL_VIDEO -sfl_video::VideoMixer* Conference::getVideoMixer() +std::shared_ptr<sfl_video::VideoMixer> Conference::getVideoMixer() { - return &videoMixer_; + return videoMixer_; } #endif diff --git a/daemon/src/conference.h b/daemon/src/conference.h index f682e5b5e3..52856c60fb 100644 --- a/daemon/src/conference.h +++ b/daemon/src/conference.h @@ -36,6 +36,7 @@ #include <set> #include <string> +#include <memory> #include "audio/recordable.h" @@ -54,6 +55,11 @@ class Conference : public Recordable { */ Conference(); + /** + * Destructor for this class, decrement static counter + */ + ~Conference(); + /** * Return the conference id */ @@ -100,7 +106,7 @@ class Conference : public Recordable { virtual bool toggleRecording(); #ifdef SFL_VIDEO - sfl_video::VideoMixer* getVideoMixer(); + std::shared_ptr<sfl_video::VideoMixer> getVideoMixer(); #endif private: @@ -109,7 +115,7 @@ class Conference : public Recordable { ParticipantSet participants_; #ifdef SFL_VIDEO - sfl_video::VideoMixer videoMixer_; + std::shared_ptr<sfl_video::VideoMixer> videoMixer_; #endif }; diff --git a/daemon/src/managerimpl.cpp b/daemon/src/managerimpl.cpp index 755b6448fd..8261b5d85c 100644 --- a/daemon/src/managerimpl.cpp +++ b/daemon/src/managerimpl.cpp @@ -943,11 +943,6 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer if (participants.empty()) ERROR("Participant list is empty for this conference"); -#ifdef SFL_VIDEO - // request participant video streams to be routed to video mixer - SIPVoIPLink::instance()->getSipCall(callId)->getVideoRtp().enterConference(conf); -#endif // SFL_VIDEO - // Connect stream addStream(callId); return true; @@ -1116,12 +1111,6 @@ ManagerImpl::joinParticipant(const std::string& callId1, const std::string& call conf->setRecordingSmplRate(audiodriver_->getSampleRate()); } -#ifdef SFL_VIDEO - // request participant video streams to be routed to video mixer - SIPVoIPLink::instance()->getSipCall(callId1)->getVideoRtp().enterConference(conf); - SIPVoIPLink::instance()->getSipCall(callId2)->getVideoRtp().enterConference(conf); -#endif // SFL_VIDEO - getMainBuffer().dumpInfo(); return true; } diff --git a/daemon/src/video/Makefile.am b/daemon/src/video/Makefile.am index c17f203554..2187c8e177 100644 --- a/daemon/src/video/Makefile.am +++ b/daemon/src/video/Makefile.am @@ -16,7 +16,7 @@ libvideo_la_SOURCES = \ shm_sink.cpp shm_sink.h \ video_camera.cpp video_camera.h \ video_receive_thread.cpp video_receive_thread.h \ - video_send_thread.cpp video_send_thread.h \ + video_sender.cpp video_sender.h \ video_rtp_session.cpp video_rtp_session.h \ check.h shm_header.h video_provider.h \ libav_utils.cpp libav_utils.h libav_deps.h diff --git a/daemon/src/video/socket_pair.h b/daemon/src/video/socket_pair.h index 3a7dfe3a4b..7329aa13bd 100644 --- a/daemon/src/video/socket_pair.h +++ b/daemon/src/video/socket_pair.h @@ -40,9 +40,6 @@ namespace sfl_video { - class VideoSendThread; - class VideoReceiveThread; - class SocketPair { public: SocketPair(const char *uri, int localPort); diff --git a/daemon/src/video/video_base.cpp b/daemon/src/video/video_base.cpp index c3a31e7f37..81cb094954 100644 --- a/daemon/src/video/video_base.cpp +++ b/daemon/src/video/video_base.cpp @@ -211,6 +211,7 @@ void VideoFrame::test() VideoFrame& VideoGenerator::getNewFrame() { + std::unique_lock<std::mutex> lk(mutex_); if (writableFrame_) writableFrame_->setdefaults(); else @@ -220,12 +221,14 @@ VideoFrame& VideoGenerator::getNewFrame() void VideoGenerator::publishFrame() { + std::unique_lock<std::mutex> lk(mutex_); lastFrame_ = std::move(writableFrame_); - notify(std::ref(lastFrame_)); + notify(lastFrame_); } VideoFrameSP VideoGenerator::obtainLastFrame() { + std::unique_lock<std::mutex> lk(mutex_); return lastFrame_; } diff --git a/daemon/src/video/video_base.h b/daemon/src/video/video_base.h index 7fd1eb894f..8722e1cb9c 100644 --- a/daemon/src/video/video_base.h +++ b/daemon/src/video/video_base.h @@ -39,8 +39,16 @@ #include <memory> #include <set> #include <mutex> -#include <condition_variable> +// std::this_thread::sleep_for is by default supported since 4.1 +#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1) +#include <chrono> +#include <thread> +#define MYSLEEP(x) std::this_thread::sleep_for(std::chrono::seconds(x)) +#else +#include <unistd.h> +#define MYSLEEP(x) sleep(x) +#endif class AVFrame; class AVPacket; @@ -77,16 +85,22 @@ public: Observable() : observers_(), mutex_() {} virtual ~Observable() {}; - void attach(Observer<T>* o) { + bool attach(Observer<T>* o) { std::unique_lock<std::mutex> lk(mutex_); - if (o) - observers_.insert(o); + if (o and observers_.insert(o).second) { + o->attached(this); + return true; + } + return false; } - void detach(Observer<T>* o) { + bool detach(Observer<T>* o) { std::unique_lock<std::mutex> lk(mutex_); - if (o) - observers_.erase(o); + if (o and observers_.erase(o)) { + o->detached(this); + return true; + } + return false; } void notify(T& data) { @@ -95,6 +109,11 @@ public: observer->update(this, data); } + int getObserversCount() { + std::unique_lock<std::mutex> lk(mutex_); + return observers_.size(); + } + private: NON_COPYABLE(Observable<T>); @@ -110,6 +129,8 @@ class Observer public: virtual ~Observer() {}; virtual void update(Observable<T>*, T&) = 0; + virtual void attached(Observable<T>*) {}; + virtual void detached(Observable<T>*) {}; }; /*=== VideoPacket ===========================================================*/ @@ -207,7 +228,7 @@ class VideoFramePassiveReader : class VideoGenerator : public VideoFrameActiveWriter { public: - VideoGenerator() : writableFrame_(), lastFrame_() {} + VideoGenerator() : writableFrame_(), lastFrame_(), mutex_() {} virtual int getWidth() const = 0; virtual int getHeight() const = 0; @@ -223,6 +244,7 @@ protected: private: VideoFrameUP writableFrame_; VideoFrameSP lastFrame_; + std::mutex mutex_; // lock writableFrame_/lastFrame_ access }; } diff --git a/daemon/src/video/video_camera.cpp b/daemon/src/video/video_camera.cpp index b5a74332ca..5e917065ab 100644 --- a/daemon/src/video/video_camera.cpp +++ b/daemon/src/video/video_camera.cpp @@ -38,6 +38,8 @@ #include <map> #include <string> +#define SINK_ID "local" + namespace sfl_video { using std::string; @@ -100,14 +102,11 @@ bool VideoCamera::setup() /* Sink setup */ EXIT_IF_FAIL(sink_.start(), "Cannot start shared memory sink"); - Manager::instance().getVideoControls()->startedDecoding(id_, - sink_.openedName(), - sinkWidth_, - sinkHeight_); - DEBUG("TX: shm sink <%s> started: size = %dx%d", - sink_.openedName().c_str(), sinkWidth_, sinkHeight_); - - attach(&sink_); + if (attach(&sink_)) { + Manager::instance().getVideoControls()->startedDecoding(SINK_ID, sink_.openedName(), sinkWidth_, sinkHeight_); + DEBUG("LOCAL: shm sink <%s> started: size = %dx%d", + sink_.openedName().c_str(), sinkWidth_, sinkHeight_); + } return true; } @@ -117,9 +116,11 @@ void VideoCamera::process() void VideoCamera::cleanup() { - Manager::instance().getVideoControls()->stoppedDecoding(id_, - sink_.openedName()); - detach(&sink_); + if (detach(&sink_)) { + Manager::instance().getVideoControls()->stoppedDecoding(SINK_ID, sink_.openedName()); + sink_.stop(); + } + delete decoder_; } diff --git a/daemon/src/video/video_mixer.cpp b/daemon/src/video/video_mixer.cpp index f551790fe4..6c11d4ae4c 100644 --- a/daemon/src/video/video_mixer.cpp +++ b/daemon/src/video/video_mixer.cpp @@ -32,105 +32,141 @@ #include "libav_deps.h" #include "video_mixer.h" #include "check.h" +#include "client/video_controls.h" +#include "manager.h" +#include "logger.h" #include <cmath> namespace sfl_video { -VideoMixer::VideoMixer() : +VideoMixer::VideoMixer(const std::string id) : VideoGenerator::VideoGenerator() - , sourceScaler_() - , scaledFrame_() + , id_(id) , width_(0) , height_(0) - , renderMutex_() - , renderCv_() -{ start(); } + , sources_() + , mutex_() + , sink_() +{ + auto videoCtrl = Manager::instance().getVideoControls(); + if (!videoCtrl->hasPreviewStarted()) { + videoCtrl->startPreview(); + MYSLEEP(1); + } + + // Local video camera is always attached + videoCtrl->getVideoPreview()->attach(this); +} VideoMixer::~VideoMixer() { - stop(); - join(); + stop_sink(); + + auto videoCtrl = Manager::instance().getVideoControls(); + videoCtrl->getVideoPreview()->detach(this); } -void VideoMixer::process() +void VideoMixer::attached(Observable<VideoFrameSP>* ob) { - waitForUpdate(); - rendering(); + std::unique_lock<std::mutex> lk(mutex_); + sources_.push_back(ob); } -void VideoMixer::waitForUpdate() +void VideoMixer::detached(Observable<VideoFrameSP>* ob) { - std::unique_lock<std::mutex> lk(renderMutex_); - renderCv_.wait(lk); + std::unique_lock<std::mutex> lk(mutex_); + sources_.remove(ob); } void VideoMixer::update(Observable<VideoFrameSP>* ob, VideoFrameSP& frame_p) -{ renderCv_.notify_one(); } +{ + std::unique_lock<std::mutex> lk(mutex_); + int i=0; + for (auto x : sources_) { + if (x == ob) break; + i++; + } + render_frame(*frame_p, i); +} -void VideoMixer::rendering() +void VideoMixer::render_frame(VideoFrame& input, const int index) { + VideoScaler scaler; + VideoFrame scaled_input; + if (!width_ or !height_) return; -#if 0 - // For all sources: - // - take source frame - // - scale it down and layout it - // - publish the result frame - // Current layout is a squared distribution + VideoFrame &output = getNewFrame(); - const int n=sourceList_.size(); + if (!output.allocBuffer(width_, height_, VIDEO_PIXFMT_YUV420P)) { + ERROR("VideoFrame::allocBuffer() failed"); + return; + } + + VideoFrameSP previous_p=obtainLastFrame(); + if (previous_p) + previous_p->copy(output); + previous_p.reset(); + + const int n=sources_.size(); const int zoom=ceil(sqrt(n)); const int cell_width=width_ / zoom; const int cell_height=height_ / zoom; - VideoFrame &output = getNewFrame(); - - // Blit frame function support only YUV420P pixel format - if (!output.allocBuffer(width_, height_, VIDEO_PIXFMT_YUV420P)) - WARN("VideoFrame::allocBuffer() failed"); + if (!scaled_input.allocBuffer(cell_width, cell_height, + VIDEO_PIXFMT_YUV420P)) { + ERROR("VideoFrame::allocBuffer() failed"); + return; + } - if (!scaledFrame_.allocBuffer(cell_width, cell_height, VIDEO_PIXFMT_YUV420P)) - WARN("VideoFrame::allocBuffer() failed"); + int xoff = (index % zoom) * cell_width; + int yoff = (index / zoom) * cell_height; - int lastInputWidth=0; - int lastInputHeight=0; - int i=0; - for (VideoNode* src : sourceList_) { - int xoff = (i % zoom) * cell_width; - int yoff = (i / zoom) * cell_height; - - VideoFrameSP input=src->obtainLastFrame(); - if (input) { - // scaling context allocation may be time consuming - // so reset it only if needed - if (input->getWidth() != lastInputWidth || - input->getHeight() != lastInputHeight) - sourceScaler_.reset(); - - sourceScaler_.scale(*input, scaledFrame_); - output.blit(scaledFrame_, xoff, yoff); - - lastInputWidth = input->getWidth(); - lastInputHeight = input->getHeight(); - } + scaler.scale(input, scaled_input); + output.blit(scaled_input, xoff, yoff); - i++; - } publishFrame(); -#endif } void VideoMixer::setDimensions(int width, int height) { - // FIXME: unprotected write (see rendering()) + std::unique_lock<std::mutex> lk(mutex_); width_ = width; height_ = height; + + stop_sink(); + start_sink(); } -int VideoMixer::getWidth() const { return width_; } -int VideoMixer::getHeight() const { return height_; } -int VideoMixer::getPixelFormat() const { return VIDEO_PIXFMT_YUV420P; } +void VideoMixer::start_sink() +{ + if (sink_.start()) { + if (this->attach(&sink_)) { + Manager::instance().getVideoControls()->startedDecoding(id_+"_MX", sink_.openedName(), width_, height_); + DEBUG("MX: shm sink <%s> started: size = %dx%d", + sink_.openedName().c_str(), width_, height_); + } + } else + WARN("MX: sink startup failed"); +} + +void VideoMixer::stop_sink() +{ + if (this->detach(&sink_)) { + Manager::instance().getVideoControls()->stoppedDecoding(id_+"_MX", sink_.openedName()); + sink_.stop(); + } +} + +int VideoMixer::getWidth() const +{ return width_; } + +int VideoMixer::getHeight() const +{ return height_; } + +int VideoMixer::getPixelFormat() const +{ return VIDEO_PIXFMT_YUV420P; } } // end namespace sfl_video diff --git a/daemon/src/video/video_mixer.h b/daemon/src/video/video_mixer.h index e00d00474f..cefc76de07 100644 --- a/daemon/src/video/video_mixer.h +++ b/daemon/src/video/video_mixer.h @@ -35,25 +35,24 @@ #include "noncopyable.h" #include "video_base.h" #include "video_scaler.h" +#include "shm_sink.h" #include "sflthread.h" #include <mutex> -#include <condition_variable> +#include <list> namespace sfl_video { class VideoMixer : public VideoGenerator, - public VideoFramePassiveReader, - public SFLThread + public VideoFramePassiveReader { public: - VideoMixer(); - ~VideoMixer(); + VideoMixer(const std::string id_); + virtual ~VideoMixer(); void setDimensions(int width, int height); - void render(); int getWidth() const; int getHeight() const; @@ -61,24 +60,22 @@ public: // as VideoFramePassiveReader void update(Observable<VideoFrameSP>*, VideoFrameSP&); + void attached(Observable<VideoFrameSP>* ob); + void detached(Observable<VideoFrameSP>* ob); private: NON_COPYABLE(VideoMixer); - // as SFLThread - void process(); - void waitForUpdate(); - void encode(); - void rendering(); - - VideoScaler sourceScaler_; - VideoFrame scaledFrame_; + void render_frame(VideoFrame& input, const int index); + void start_sink(); + void stop_sink(); + const std::string id_; int width_; int height_; - - std::mutex renderMutex_; - std::condition_variable renderCv_; + std::list<Observable<VideoFrameSP>*> sources_; + std::mutex mutex_; + SHMSink sink_; }; } diff --git a/daemon/src/video/video_receive_thread.cpp b/daemon/src/video/video_receive_thread.cpp index bc57341f2c..43d9663bbf 100644 --- a/daemon/src/video/video_receive_thread.cpp +++ b/daemon/src/video/video_receive_thread.cpp @@ -151,7 +151,8 @@ void VideoReceiveThread::process() void VideoReceiveThread::cleanup() { - Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName()); + if (detach(&sink_)) + Manager::instance().getVideoControls()->stoppedDecoding(id_+"RX", sink_.openedName()); if (videoDecoder_) delete videoDecoder_; @@ -223,9 +224,10 @@ void VideoReceiveThread::enterConference() if (!isRunning()) return; - Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName()); - detach(&sink_); - sink_.stop(); + if (detach(&sink_)) { + Manager::instance().getVideoControls()->stoppedDecoding(id_+"RX", sink_.openedName()); + sink_.stop(); + } } void VideoReceiveThread::exitConference() @@ -233,16 +235,12 @@ void VideoReceiveThread::exitConference() if (!isRunning()) return; - // Sink startup EXIT_IF_FAIL(sink_.start(), "RX: sink startup failed"); - Manager::instance().getVideoControls()->startedDecoding(id_, - sink_.openedName(), - dstWidth_, - dstHeight_); - DEBUG("RX: shm sink <%s> started: size = %dx%d", - sink_.openedName().c_str(), dstWidth_, dstHeight_); - - attach(&sink_); + if (attach(&sink_)) { + Manager::instance().getVideoControls()->startedDecoding(id_+"RX", sink_.openedName(), dstWidth_, dstHeight_); + DEBUG("RX: shm sink <%s> started: size = %dx%d", + sink_.openedName().c_str(), dstWidth_, dstHeight_); + } } void VideoReceiveThread::setRequestKeyFrameCallback( diff --git a/daemon/src/video/video_rtp_session.cpp b/daemon/src/video/video_rtp_session.cpp index d8dd05bfeb..e080963e96 100644 --- a/daemon/src/video/video_rtp_session.cpp +++ b/daemon/src/video/video_rtp_session.cpp @@ -31,7 +31,7 @@ #include "client/video_controls.h" #include "video_rtp_session.h" -#include "video_send_thread.h" +#include "video_sender.h" #include "video_receive_thread.h" #include "video_mixer.h" #include "socket_pair.h" @@ -43,8 +43,6 @@ #include <sstream> #include <map> #include <string> -#include <thread> -#include <chrono> namespace sfl_video { @@ -53,9 +51,9 @@ using std::string; VideoRtpSession::VideoRtpSession(const string &callID, const map<string, string> &txArgs) : - socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs), + socketPair_(), sender_(), receiveThread_(), txArgs_(txArgs), rxArgs_(), sending_(false), receiving_(false), callID_(callID), - videoMixer_(), videoLocal_() + videoMixerSP_(), videoLocal_() {} VideoRtpSession::~VideoRtpSession() @@ -115,7 +113,7 @@ void VideoRtpSession::updateDestination(const string &destination, tmp << "rtp://" << destination << ":" << port; // if destination has changed if (tmp.str() != txArgs_["destination"]) { - if (sendThread_.get() != 0) { + if (sender_.get() != 0) { ERROR("Video is already being sent"); return; } @@ -146,17 +144,17 @@ void VideoRtpSession::start(int localPort) auto videoCtrl = Manager::instance().getVideoControls(); if (!videoCtrl->hasPreviewStarted()) { videoCtrl->startPreview(); - std::this_thread::sleep_for(std::chrono::seconds(1)); + MYSLEEP(1); } videoLocal_ = videoCtrl->getVideoPreview(); - if (sendThread_.get()) + if (sender_.get()) WARN("Restarting video sender"); - sendThread_.reset(new VideoSendThread(callID_, txArgs_, *socketPair_)); + sender_.reset(new VideoSender(callID_, txArgs_, *socketPair_)); } else { DEBUG("Video sending disabled"); - sendThread_.reset(); + sender_.reset(); } if (receiving_) { @@ -172,46 +170,37 @@ void VideoRtpSession::start(int localPort) } // Setup pipeline - if (videoMixer_) { - // We are in conference mode - - if (sendThread_) { - videoLocal_->detach(sendThread_.get()); - videoMixer_->attach(sendThread_.get()); - } - - if (receiveThread_) { - receiveThread_->enterConference(); - receiveThread_->attach(videoMixer_); - } + if (videoMixerSP_) { + setupConferenceVideoPipeline(); } else { - if (sendThread_) - videoLocal_->attach(sendThread_.get()); + if (sender_) + videoLocal_->attach(sender_.get()); } } void VideoRtpSession::stop() { - if (videoLocal_) - videoLocal_->detach(sendThread_.get()); + if (videoLocal_) { + videoLocal_->detach(sender_.get()); + } - if (videoMixer_) { - videoMixer_->detach(sendThread_.get()); - receiveThread_->detach(videoMixer_); + if (videoMixerSP_) { + videoMixerSP_->detach(sender_.get()); + receiveThread_->detach(videoMixerSP_.get()); } if (socketPair_.get()) socketPair_->interrupt(); receiveThread_.reset(); - sendThread_.reset(); + sender_.reset(); socketPair_.reset(); } void VideoRtpSession::forceKeyFrame() { - if (sendThread_.get()) - sendThread_->forceKeyFrame(); + if (sender_.get()) + sender_->forceKeyFrame(); } void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details) @@ -220,36 +209,46 @@ void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &de receiveThread_->addReceivingDetails(details); } -void VideoRtpSession::enterConference(Conference *conf) +void VideoRtpSession::setupConferenceVideoPipeline() { - auto mixer = conf->getVideoMixer(); - if (!mixer) { - ERROR("No conference mixer!"); - return; + if (sender_) { + videoMixerSP_->setDimensions(atol(txArgs_["width"].c_str()), + atol(txArgs_["height"].c_str())); + videoLocal_->detach(sender_.get()); + videoMixerSP_->attach(sender_.get()); + + if (receiveThread_) { + receiveThread_->enterConference(); + receiveThread_->attach(videoMixerSP_.get()); + } } +} +void VideoRtpSession::enterConference(Conference *conf) +{ /* Detach from a possible previous conference */ exitConference(); + videoMixerSP_ = std::move(conf->getVideoMixer()); - videoMixer_ = mixer; // catched during start() + setupConferenceVideoPipeline(); } void VideoRtpSession::exitConference() { - if (videoMixer_) { - if (sendThread_) - videoMixer_->detach(sendThread_.get()); + if (videoMixerSP_) { + if (sender_) + videoMixerSP_->detach(sender_.get()); if (receiveThread_) { - receiveThread_->detach(videoMixer_); + receiveThread_->detach(videoMixerSP_.get()); receiveThread_->exitConference(); } - videoMixer_ = nullptr; + videoMixerSP_.reset(); } if (videoLocal_) - videoLocal_->attach(sendThread_.get()); + videoLocal_->attach(sender_.get()); } } // end namespace sfl_video diff --git a/daemon/src/video/video_rtp_session.h b/daemon/src/video/video_rtp_session.h index 9fecef713a..a6ff2a101c 100644 --- a/daemon/src/video/video_rtp_session.h +++ b/daemon/src/video/video_rtp_session.h @@ -45,7 +45,7 @@ class Sdp; namespace sfl_video { -class VideoSendThread; +class VideoSender; class VideoReceiveThread; class SocketPair; @@ -64,6 +64,7 @@ public: void addReceivingDetails(std::map<std::string, std::string> &details); void bindMixer(VideoMixer* mixer); void unbindMixer(); + void setupConferenceVideoPipeline(); void enterConference(Conference *conf); void exitConference(); @@ -71,14 +72,14 @@ private: NON_COPYABLE(VideoRtpSession); std::shared_ptr<SocketPair> socketPair_; - std::shared_ptr<VideoSendThread> sendThread_; + std::shared_ptr<VideoSender> sender_; std::shared_ptr<VideoReceiveThread> receiveThread_; std::map<std::string, std::string> txArgs_; std::map<std::string, std::string> rxArgs_; bool sending_; bool receiving_; const std::string callID_; - VideoMixer* videoMixer_; + std::shared_ptr<VideoMixer> videoMixerSP_; VideoFrameActiveWriter *videoLocal_; }; diff --git a/daemon/src/video/video_send_thread.cpp b/daemon/src/video/video_sender.cpp similarity index 91% rename from daemon/src/video/video_send_thread.cpp rename to daemon/src/video/video_sender.cpp index 3b4d318e6c..6604ceb1ec 100644 --- a/daemon/src/video/video_send_thread.cpp +++ b/daemon/src/video/video_sender.cpp @@ -29,7 +29,7 @@ * as that of the covered work. */ -#include "video_send_thread.h" +#include "video_sender.h" #include "video_mixer.h" #include "socket_pair.h" #include "client/video_controls.h" @@ -44,7 +44,7 @@ namespace sfl_video { using std::string; -VideoSendThread::VideoSendThread(const std::string &id, +VideoSender::VideoSender(const std::string &id, const std::map<string, string> &args, SocketPair& socketPair) : args_(args) @@ -54,11 +54,9 @@ VideoSendThread::VideoSendThread(const std::string &id, , frameNumber_(0) , muxContext_(socketPair.getIOContext()) , sdp_() -{ - setup(); -} +{ setup(); } -bool VideoSendThread::setup() +bool VideoSender::setup() { const char *enc_name = args_["codec"].c_str(); @@ -111,7 +109,7 @@ bool VideoSendThread::setup() return true; } -void VideoSendThread::encodeAndSendVideo(VideoFrame& input_frame) +void VideoSender::encodeAndSendVideo(VideoFrame& input_frame) { bool is_keyframe = forceKeyFrame_ > 0; @@ -122,12 +120,10 @@ void VideoSendThread::encodeAndSendVideo(VideoFrame& input_frame) ERROR("encoding failed"); } -void VideoSendThread::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p) -{ - encodeAndSendVideo(*frame_p); -} +void VideoSender::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p) +{ encodeAndSendVideo(*frame_p); } -void VideoSendThread::forceKeyFrame() +void VideoSender::forceKeyFrame() { atomic_increment(&forceKeyFrame_); } } // end namespace sfl_video diff --git a/daemon/src/video/video_send_thread.h b/daemon/src/video/video_sender.h similarity index 86% rename from daemon/src/video/video_send_thread.h rename to daemon/src/video/video_sender.h index 7d768c55a1..c64fb108a0 100644 --- a/daemon/src/video/video_send_thread.h +++ b/daemon/src/video/video_sender.h @@ -29,8 +29,8 @@ * as that of the covered work. */ -#ifndef __VIDEO_SEND_THREAD_H__ -#define __VIDEO_SEND_THREAD_H__ +#ifndef __VIDEO_SENDER_H__ +#define __VIDEO_SENDER_H__ #include "noncopyable.h" #include "video_encoder.h" @@ -44,12 +44,13 @@ namespace sfl_video { class SocketPair; -class VideoSendThread : public VideoFramePassiveReader +class VideoSender : public VideoFramePassiveReader { public: - VideoSendThread(const std::string &id, - const std::map<std::string, std::string> &args, - SocketPair& socketPair); + VideoSender(const std::string &id, + const std::map<std::string, std::string> &args, + SocketPair& socketPair); + std::string getSDP() const { return sdp_; } void forceKeyFrame(); @@ -57,7 +58,7 @@ public: void update(Observable<VideoFrameSP>*, VideoFrameSP&); private: - NON_COPYABLE(VideoSendThread); + NON_COPYABLE(VideoSender); bool setup(); void encodeAndSendVideo(VideoFrame&); @@ -75,4 +76,4 @@ private: } -#endif // __VIDEO_SEND_THREAD_H__ +#endif // __VIDEO_SENDER_H__ -- GitLab