From f760ff44800ea3b517dc2c99120e39c6bf89503c Mon Sep 17 00:00:00 2001 From: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> Date: Fri, 13 Sep 2013 12:11:27 -0400 Subject: [PATCH] #29564, #29818: new video node based system. Fix bug #29818. Note: non-conference mode has been tested and works. Conference mode may works, but some issues have been detected. --- daemon/src/client/dbus/video_controls.cpp | 2 +- daemon/src/client/video_controls.h | 9 +- daemon/src/conference.cpp | 22 ++-- daemon/src/managerimpl.cpp | 19 +-- daemon/src/video/shm_sink.cpp | 55 ++++++--- daemon/src/video/shm_sink.h | 53 +++++---- daemon/src/video/socket_pair.cpp | 4 +- daemon/src/video/video_base.cpp | 80 +++---------- daemon/src/video/video_base.h | 99 ++++++++++++---- daemon/src/video/video_camera.cpp | 82 +++++-------- daemon/src/video/video_camera.h | 37 ++---- daemon/src/video/video_decoder.cpp | 44 +++---- daemon/src/video/video_decoder.h | 16 +-- daemon/src/video/video_mixer.cpp | 57 +++------ daemon/src/video/video_mixer.h | 30 ++--- daemon/src/video/video_receive_thread.cpp | 125 +++++++------------- daemon/src/video/video_receive_thread.h | 58 +++++----- daemon/src/video/video_rtp_session.cpp | 99 ++++++++++++---- daemon/src/video/video_rtp_session.h | 56 +++++---- daemon/src/video/video_send_thread.cpp | 135 ++++++++-------------- daemon/src/video/video_send_thread.h | 40 +++---- 21 files changed, 527 insertions(+), 595 deletions(-) diff --git a/daemon/src/client/dbus/video_controls.cpp b/daemon/src/client/dbus/video_controls.cpp index 22b473b666..bc2e5999d8 100644 --- a/daemon/src/client/dbus/video_controls.cpp +++ b/daemon/src/client/dbus/video_controls.cpp @@ -182,7 +182,7 @@ VideoControls::stopPreview() } } -sfl_video::VideoSource* VideoControls::getVideoPreview() +sfl_video::VideoFrameActiveWriter* VideoControls::getVideoPreview() { return videoPreview_.get(); } diff --git a/daemon/src/client/video_controls.h b/daemon/src/client/video_controls.h index a5fe850dda..b97551312b 100644 --- a/daemon/src/client/video_controls.h +++ b/daemon/src/client/video_controls.h @@ -57,16 +57,13 @@ #include <memory> // for shared_ptr #include "video/video_preferences.h" - -namespace sfl_video { - class VideoSource; -} +#include "video/video_base.h" class VideoControls : public org::sflphone::SFLphone::VideoControls_adaptor, public DBus::IntrospectableAdaptor, public DBus::ObjectAdaptor { private: - std::shared_ptr<sfl_video::VideoSource> videoPreview_; + std::unique_ptr<sfl_video::VideoFrameActiveWriter> videoPreview_; VideoPreference videoPreference_; public: @@ -126,7 +123,7 @@ class VideoControls : public org::sflphone::SFLphone::VideoControls_adaptor, void startPreview(); void stopPreview(); bool hasPreviewStarted(); - sfl_video::VideoSource* getVideoPreview(); + sfl_video::VideoFrameActiveWriter* getVideoPreview(); }; #endif // VIDEO_CONTROLS_H_ diff --git a/daemon/src/conference.cpp b/daemon/src/conference.cpp index c91ee009b2..49ceeb800b 100644 --- a/daemon/src/conference.cpp +++ b/daemon/src/conference.cpp @@ -41,6 +41,8 @@ #include "video/video_camera.h" #endif +#include "logger.h" + Conference::Conference() : id_(Manager::instance().getNewCallID()) @@ -51,15 +53,6 @@ Conference::Conference() #endif { Recordable::initRecFilename(id_); - -#ifdef SFL_VIDEO - sfl_video::VideoCamera *camera = static_cast<sfl_video::VideoCamera*>(Manager::instance().getVideoControls()->getVideoPreview()); - if (camera) { - videoMixer_.addSource(camera); - camera->setMixer(&videoMixer_); - } -#endif - } Conference::ConferenceState Conference::getState() const @@ -84,11 +77,16 @@ void Conference::remove(const std::string &participant_id) void Conference::bindParticipant(const std::string &participant_id) { - for (const auto &item : participants_) + auto mainBuffer = Manager::instance().getMainBuffer(); + + for (const auto &item : participants_) { if (participant_id != item) - Manager::instance().getMainBuffer().bindCallID(participant_id, item); + mainBuffer.bindCallID(participant_id, item); + mainBuffer.flush(item); + } - Manager::instance().getMainBuffer().bindCallID(participant_id, MainBuffer::DEFAULT_ID); + mainBuffer.bindCallID(participant_id, MainBuffer::DEFAULT_ID); + mainBuffer.flush(MainBuffer::DEFAULT_ID); } std::string Conference::getStateStr() const diff --git a/daemon/src/managerimpl.cpp b/daemon/src/managerimpl.cpp index d65fa7c05c..4b08ab34b1 100644 --- a/daemon/src/managerimpl.cpp +++ b/daemon/src/managerimpl.cpp @@ -930,12 +930,15 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer std::string callState(callDetails.find("CALL_STATE")->second); if (callState == "HOLD") { + ERROR("foo1: %s", callId.c_str()); conf->bindParticipant(callId); offHoldCall(callId); } else if (callState == "INCOMING") { + ERROR("foo2: %s", callId.c_str()); conf->bindParticipant(callId); answerCall(callId); } else if (callState == "CURRENT") + ERROR("foo3: %s", callId.c_str()); conf->bindParticipant(callId); ParticipantSet participants(conf->getParticipantList()); @@ -943,13 +946,6 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer if (participants.empty()) ERROR("Participant list is empty for this conference"); - // reset ring buffer for all conference participant - // flush conference participants only - for (const auto &p : participants) - getMainBuffer().flush(p); - - getMainBuffer().flush(MainBuffer::DEFAULT_ID); - // Connect stream addStream(callId); return true; @@ -1349,15 +1345,8 @@ void ManagerImpl::addStream(const std::string& call_id) if (iter != conferenceMap_.end() and iter->second) { Conference* conf = iter->second; + ERROR("bar: %s", call_id.c_str()); conf->bindParticipant(call_id); - - ParticipantSet participants(conf->getParticipantList()); - - // reset ring buffer for all conference participant - for (const auto &participant : participants) - getMainBuffer().flush(participant); - - getMainBuffer().flush(MainBuffer::DEFAULT_ID); } } else { diff --git a/daemon/src/video/shm_sink.cpp b/daemon/src/video/shm_sink.cpp index 09a8779e8e..bf9d79a31d 100644 --- a/daemon/src/video/shm_sink.cpp +++ b/daemon/src/video/shm_sink.cpp @@ -37,10 +37,12 @@ #include "config.h" #endif +#include "video_scaler.h" + #include "shm_sink.h" #include "shm_header.h" #include "logger.h" -#include "video_provider.h" + #include <sys/mman.h> #include <fcntl.h> #include <cstdio> @@ -49,13 +51,15 @@ #include <cerrno> #include <cstring> +namespace sfl_video { + SHMSink::SHMSink(const std::string &shm_name) : - shm_name_(shm_name), - fd_(-1), - shm_area_(static_cast<SHMHeader*>(MAP_FAILED)), - shm_area_len_(0), - opened_name_() - {} + shm_name_(shm_name) + , fd_(-1) + , shm_area_(static_cast<SHMHeader*>(MAP_FAILED)) + , shm_area_len_(0) + , opened_name_() +{} SHMSink::~SHMSink() { @@ -187,8 +191,14 @@ void SHMSink::render(const std::vector<unsigned char> &data) shm_unlock(); } -void SHMSink::render_callback(sfl_video::VideoProvider &provider, size_t bytes) +void SHMSink::render_frame(VideoFrame& src) { + VideoFrame dst; + VideoScaler scaler; + + dst.setGeometry(src.getWidth(), src.getHeight(), VIDEO_PIXFMT_BGRA); + size_t bytes = dst.getSize(); + shm_lock(); if (!resize_area(sizeof(SHMHeader) + bytes)) { @@ -196,19 +206,38 @@ void SHMSink::render_callback(sfl_video::VideoProvider &provider, size_t bytes) return; } - provider.fillBuffer(static_cast<void*>(shm_area_->data)); + dst.setDestination(shm_area_->data); + scaler.scale(src, dst); + shm_area_->buffer_size = bytes; shm_area_->buffer_gen++; sem_post(&shm_area_->notification); shm_unlock(); } -void SHMSink::shm_lock() +void SHMSink::render_callback(VideoProvider &provider, size_t bytes) { - sem_wait(&shm_area_->mutex); + shm_lock(); + + if (!resize_area(sizeof(SHMHeader) + bytes)) { + ERROR("Could not resize area"); + return; + } + + provider.fillBuffer(static_cast<void*>(shm_area_->data)); + shm_area_->buffer_size = bytes; + shm_area_->buffer_gen++; + sem_post(&shm_area_->notification); + shm_unlock(); } +void SHMSink::shm_lock() +{ sem_wait(&shm_area_->mutex); } + void SHMSink::shm_unlock() -{ - sem_post(&shm_area_->mutex); +{ sem_post(&shm_area_->mutex); } + +void SHMSink::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p) +{ render_frame(*frame_p); } + } diff --git a/daemon/src/video/shm_sink.h b/daemon/src/video/shm_sink.h index 11dfd0a44d..fd93d084da 100644 --- a/daemon/src/video/shm_sink.h +++ b/daemon/src/video/shm_sink.h @@ -36,39 +36,48 @@ #ifndef SHM_SINK_H_ #define SHM_SINK_H_ +#include "noncopyable.h" +#include "video_provider.h" +#include "video_base.h" + #include <string> #include <vector> -#include "noncopyable.h" class SHMHeader; + namespace sfl_video { - class VideoProvider; -} -class SHMSink { - public: - SHMSink(const std::string &shm_name = ""); - std::string openedName() const { return opened_name_; } - ~SHMSink(); +class SHMSink : public VideoFramePassiveReader +{ +public: + SHMSink(const std::string &shm_name = ""); + std::string openedName() const { return opened_name_; } + ~SHMSink(); + + bool start(); + bool stop(); - bool start(); - bool stop(); + bool resize_area(size_t desired_length); - bool resize_area(size_t desired_length); + void render(const std::vector<unsigned char> &data); + void render_frame(VideoFrame& src); + void render_callback(VideoProvider &provider, size_t bytes); - void render(const std::vector<unsigned char> &data); - void render_callback(sfl_video::VideoProvider &provider, size_t bytes); + // as VideoFramePassiveReader + void update(Observable<VideoFrameSP>*, VideoFrameSP&); - private: - NON_COPYABLE(SHMSink); +private: + NON_COPYABLE(SHMSink); - void shm_lock(); - void shm_unlock(); - std::string shm_name_; - int fd_; - SHMHeader *shm_area_; - size_t shm_area_len_; - std::string opened_name_; + void shm_lock(); + void shm_unlock(); + std::string shm_name_; + int fd_; + SHMHeader *shm_area_; + size_t shm_area_len_; + std::string opened_name_; }; +} + #endif // SHM_SINK_H_ diff --git a/daemon/src/video/socket_pair.cpp b/daemon/src/video/socket_pair.cpp index e919a0df3d..775496e330 100644 --- a/daemon/src/video/socket_pair.cpp +++ b/daemon/src/video/socket_pair.cpp @@ -222,8 +222,10 @@ int SocketPair::readCallback(void *opaque, uint8_t *buf, int buf_size) {context->rtcpHandle_, POLLIN, 0}}; for(;;) { - if (context->interrupted_) + if (context->interrupted_) { + ERROR("interrupted"); return -EINTR; + } /* build fdset to listen to RTP and RTCP packets */ n = poll(p, 2, 100); diff --git a/daemon/src/video/video_base.cpp b/daemon/src/video/video_base.cpp index 2fca184d0f..c3a31e7f37 100644 --- a/daemon/src/video/video_base.cpp +++ b/daemon/src/video/video_base.cpp @@ -82,12 +82,14 @@ VideoFrame::~VideoFrame() avcodec_free_frame(&frame_); } -int VideoFrame::getFormat() const -{ - return libav_utils::sfl_pixel_format(frame_->format); -} -int VideoFrame::getWidth() const { return frame_->width; } -int VideoFrame::getHeight() const { return frame_->height; } +int VideoFrame::getPixelFormat() const +{ return libav_utils::sfl_pixel_format(frame_->format); } + +int VideoFrame::getWidth() const +{ return frame_->width; } + +int VideoFrame::getHeight() const +{ return frame_->height; } bool VideoFrame::allocBuffer(int width, int height, int pix_fmt) { @@ -207,70 +209,24 @@ void VideoFrame::test() /*=== VideoGenerator =========================================================*/ -VideoGenerator::VideoGenerator() : - VideoSource::VideoSource() - , mutex_() - , condition_() - , writableFrame_() - , lastFrame_() -{ - pthread_mutex_init(&mutex_, NULL); - pthread_cond_init(&condition_, NULL); -} - -VideoGenerator::~VideoGenerator() +VideoFrame& VideoGenerator::getNewFrame() { - pthread_cond_destroy(&condition_); - pthread_mutex_destroy(&mutex_); + if (writableFrame_) + writableFrame_->setdefaults(); + else + writableFrame_.reset(new VideoFrame()); + return *writableFrame_.get(); } void VideoGenerator::publishFrame() { - pthread_mutex_lock(&mutex_); - { - lastFrame_ = std::move(writableFrame_); // we owns it now - pthread_cond_signal(&condition_); - } - pthread_mutex_unlock(&mutex_); + lastFrame_ = std::move(writableFrame_); + notify(std::ref(lastFrame_)); } -std::shared_ptr<VideoFrame> VideoGenerator::waitNewFrame() +VideoFrameSP VideoGenerator::obtainLastFrame() { - pthread_mutex_lock(&mutex_); - pthread_cond_wait(&condition_, &mutex_); - pthread_mutex_unlock(&mutex_); - - return obtainLastFrame(); -} - -std::shared_ptr<VideoFrame> VideoGenerator::obtainLastFrame() -{ - std::shared_ptr<VideoFrame> frame; - - pthread_mutex_lock(&mutex_); - frame = lastFrame_; - pthread_mutex_unlock(&mutex_); - - return frame; -} - -VideoFrame& VideoGenerator::getNewFrame() -{ - VideoFrame* frame; - - pthread_mutex_lock(&mutex_); - { - if (writableFrame_) { - frame = writableFrame_.get(); - frame->setdefaults(); - } else { - frame = new VideoFrame(); - writableFrame_.reset(frame); - } - } - pthread_mutex_unlock(&mutex_); - - return *frame; + return lastFrame_; } } diff --git a/daemon/src/video/video_base.h b/daemon/src/video/video_base.h index 70674eda7c..325c11b160 100644 --- a/daemon/src/video/video_base.h +++ b/daemon/src/video/video_base.h @@ -37,7 +37,9 @@ #include <cstdlib> #include <cstdint> #include <memory> -#include <forward_list> +#include <set> +#include <mutex> +#include <condition_variable> class AVFrame; @@ -55,10 +57,59 @@ enum VideoPixelFormat { namespace sfl_video { +template <typename T> class Observator; +template <typename T> class Observable; +class VideoFrame; + typedef int(*io_readcallback)(void *opaque, uint8_t *buf, int buf_size); typedef int(*io_writecallback)(void *opaque, uint8_t *buf, int buf_size); typedef int64_t(*io_seekcallback)(void *opaque, int64_t offset, int whence); +typedef std::shared_ptr<VideoFrame> VideoFrameUP; +typedef std::shared_ptr<VideoFrame> VideoFrameSP; + +/*=== Observable =============================================================*/ + +template <typename T> +class Observable +{ +public: + Observable() : observators_(), mutex_() {} + virtual ~Observable() {}; + + void attach(Observator<T>* o) { + std::unique_lock<std::mutex> lk(mutex_); + observators_.insert(o); + } + + void detach(Observator<T>* o) { + std::unique_lock<std::mutex> lk(mutex_); + observators_.erase(o); + } + + void notify(T& data) { + std::unique_lock<std::mutex> lk(mutex_); + for (auto observator : observators_) + observator->update(this, data); + } + +private: + NON_COPYABLE(Observable<T>); + + std::set<Observator<T>*> observators_; + std::mutex mutex_; // lock observators_ +}; + +/*=== Observator =============================================================*/ + +template <typename T> +class Observator +{ +public: + virtual ~Observator() {}; + virtual void update(Observable<T>*, T&) = 0; +}; + /*=== VideoPacket ===========================================================*/ class VideoPacket { @@ -115,7 +166,7 @@ public: ~VideoFrame(); AVFrame* get() { return frame_; }; - int getFormat() const; + int getPixelFormat() const; int getWidth() const; int getHeight() const; void setGeometry(int width, int height, int pix_fmt); @@ -134,36 +185,42 @@ private: bool allocated_; }; -/*=== VideoSource ============================================================*/ +/*=== VideoNode ============================================================*/ -class VideoSource { -public: - virtual ~VideoSource() {} - virtual std::shared_ptr<VideoFrame> waitNewFrame() = 0; - virtual std::shared_ptr<VideoFrame> obtainLastFrame() = 0; - virtual int getWidth() const = 0; - virtual int getHeight() const = 0; -}; +class VideoNode { public: virtual ~VideoNode() {}; }; +typedef VideoNode VideoSource; + +class VideoFrameActiveWriter : + public Observable<VideoFrameSP>, + public VideoNode +{}; + +class VideoFramePassiveReader : + public Observator<VideoFrameSP>, + public VideoNode +{}; /*=== VideoGenerator =========================================================*/ -class VideoGenerator : public VideoSource { +class VideoGenerator : public VideoFrameActiveWriter +{ public: - VideoGenerator(); - virtual ~VideoGenerator(); + VideoGenerator() : writableFrame_(), lastFrame_() {} + + virtual int getWidth() const = 0; + virtual int getHeight() const = 0; + virtual int getPixelFormat() const = 0; - std::shared_ptr<VideoFrame> waitNewFrame(); - std::shared_ptr<VideoFrame> obtainLastFrame(); + VideoFrameSP obtainLastFrame(); protected: - void publishFrame(); + // getNewFrame and publishFrame must be called by the same thread only VideoFrame& getNewFrame(); + void publishFrame(); private: - pthread_mutex_t mutex_; - pthread_cond_t condition_; - std::unique_ptr<VideoFrame> writableFrame_; - std::shared_ptr<VideoFrame> lastFrame_; + VideoFrameUP writableFrame_; + VideoFrameSP lastFrame_; }; } diff --git a/daemon/src/video/video_camera.cpp b/daemon/src/video/video_camera.cpp index 49edc59800..b5a74332ca 100644 --- a/daemon/src/video/video_camera.cpp +++ b/daemon/src/video/video_camera.cpp @@ -43,20 +43,14 @@ namespace sfl_video { using std::string; VideoCamera::VideoCamera(const std::map<std::string, std::string> &args) : - VideoSource::VideoSource() + VideoGenerator::VideoGenerator() , id_("local") , args_(args) , decoder_(0) , sink_() - , bufferSize_(0) - , previewWidth_(0) - , previewHeight_(0) - , scaler_() - , frame_() - , mixer_() -{ - start(); -} + , sinkWidth_(0) + , sinkHeight_(0) +{ start(); } VideoCamera::~VideoCamera() { @@ -96,41 +90,37 @@ bool VideoCamera::setup() /* Preview frame size? (defaults from decoder) */ if (!args_["width"].empty()) - previewWidth_ = atoi(args_["width"].c_str()); + sinkWidth_ = atoi(args_["width"].c_str()); else - previewWidth_ = decoder_->getWidth(); + sinkWidth_ = decoder_->getWidth(); if (!args_["height"].empty()) - previewHeight_ = atoi(args_["height"].c_str()); + sinkHeight_ = atoi(args_["height"].c_str()); else - previewHeight_ = decoder_->getHeight(); + sinkHeight_ = decoder_->getHeight(); - /* Previewing setup */ + /* Sink setup */ EXIT_IF_FAIL(sink_.start(), "Cannot start shared memory sink"); + Manager::instance().getVideoControls()->startedDecoding(id_, + sink_.openedName(), + sinkWidth_, + sinkHeight_); + DEBUG("TX: shm sink <%s> started: size = %dx%d", + sink_.openedName().c_str(), sinkWidth_, sinkHeight_); - frame_.setGeometry(previewWidth_, previewHeight_, VIDEO_PIXFMT_BGRA); - bufferSize_ = frame_.getSize(); - EXIT_IF_FAIL(bufferSize_ > 0, "Incorrect buffer size for decoding"); + attach(&sink_); - string name = sink_.openedName(); - Manager::instance().getVideoControls()->startedDecoding(id_, name, - previewWidth_, - previewHeight_); - DEBUG("TX: shm sink started with size %d, width %d and height %d", - bufferSize_, previewWidth_, previewHeight_); return true; } void VideoCamera::process() -{ - if (captureFrame() and isRunning()) - renderFrame(); -} +{ captureFrame(); } void VideoCamera::cleanup() { - delete decoder_; Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName()); + detach(&sink_); + delete decoder_; } int VideoCamera::interruptCb(void *data) @@ -141,7 +131,7 @@ int VideoCamera::interruptCb(void *data) bool VideoCamera::captureFrame() { - int ret = decoder_->decode(); + int ret = decoder_->decode(getNewFrame()); if (ret <= 0) { if (ret < 0) @@ -149,35 +139,19 @@ bool VideoCamera::captureFrame() return false; } + publishFrame(); return true; } -void VideoCamera::renderFrame() -{ - // we want our rendering code to be called by the shm_sink, - // because it manages the shared memory synchronization - sink_.render_callback(*this, bufferSize_); +int VideoCamera::getWidth() const +{ return decoder_->getWidth(); } - if (mixer_) - mixer_->render(); -} - -// This function is called by sink -void VideoCamera::fillBuffer(void *data) -{ - frame_.setDestination(data); - decoder_->scale(scaler_, frame_); -} +int VideoCamera::getHeight() const +{ return decoder_->getHeight(); } -void VideoCamera::setMixer(VideoMixer* mixer) -{ - mixer_ = mixer; -} +int VideoCamera::getPixelFormat() const +{ return decoder_->getPixelFormat(); } -int VideoCamera::getWidth() const { return decoder_->getWidth(); } -int VideoCamera::getHeight() const { return decoder_->getHeight(); } -std::shared_ptr<VideoFrame> VideoCamera::waitNewFrame() { return decoder_->waitNewFrame(); } -std::shared_ptr<VideoFrame> VideoCamera::obtainLastFrame() { return decoder_->obtainLastFrame(); } -} // end namspace sfl_video +} // end namespace sfl_video diff --git a/daemon/src/video/video_camera.h b/daemon/src/video/video_camera.h index 0650ab48c4..9a1e52aa1f 100644 --- a/daemon/src/video/video_camera.h +++ b/daemon/src/video/video_camera.h @@ -35,10 +35,7 @@ #include "noncopyable.h" #include "shm_sink.h" -#include "video_provider.h" -#include "video_scaler.h" #include "video_decoder.h" -#include "video_mixer.h" #include "sflthread.h" #include <string> @@ -49,28 +46,17 @@ namespace sfl_video { using std::string; class VideoCamera : - public VideoProvider, - public VideoSource, + public VideoGenerator, public SFLThread { public: VideoCamera(const std::map<string, string> &args); ~VideoCamera(); + + // as VideoGenerator int getWidth() const; int getHeight() const; - VideoFrame *lockFrame(); - void unlockFrame(); - void waitFrame(); - void setMixer(VideoMixer* mixer); - - std::shared_ptr<VideoFrame> waitNewFrame(); - std::shared_ptr<VideoFrame> obtainLastFrame(); - -protected: - // threading - bool setup(); - void process(); - void cleanup(); + int getPixelFormat() const; private: NON_COPYABLE(VideoCamera); @@ -79,17 +65,16 @@ private: std::map<string, string> args_; VideoDecoder *decoder_; SHMSink sink_; - size_t bufferSize_; - int previewWidth_; - int previewHeight_; - VideoScaler scaler_; - VideoFrame frame_; - VideoMixer* mixer_; + int sinkWidth_; + int sinkHeight_; + + // as SFLThread + bool setup(); + void process(); + void cleanup(); static int interruptCb(void *ctx); - void fillBuffer(void *data); bool captureFrame(); - void renderFrame(); }; } diff --git a/daemon/src/video/video_decoder.cpp b/daemon/src/video/video_decoder.cpp index 39f71b1d9e..2cbe2786c5 100644 --- a/daemon/src/video/video_decoder.cpp +++ b/daemon/src/video/video_decoder.cpp @@ -41,14 +41,10 @@ namespace sfl_video { using std::string; VideoDecoder::VideoDecoder() : - VideoGenerator::VideoGenerator() - , inputDecoder_(0) + inputDecoder_(0) , decoderCtx_(0) , inputCtx_(avformat_alloc_context()) - , scaledPicture_() , streamIndex_(-1) - , dstWidth_(0) - , dstHeight_(0) { } @@ -96,9 +92,7 @@ void VideoDecoder::setInterruptCallback(int (*cb)(void*), void *opaque) } void VideoDecoder::setIOContext(VideoIOHandle *ioctx) -{ - inputCtx_->pb = ioctx->get(); -} +{ inputCtx_->pb = ioctx->get(); } int VideoDecoder::setupFromVideoData() { @@ -165,13 +159,10 @@ int VideoDecoder::setupFromVideoData() return -1; } - dstWidth_ = decoderCtx_->width; - dstHeight_ = decoderCtx_->height; - return 0; } -int VideoDecoder::decode() +int VideoDecoder::decode(VideoFrame& result) { // Guarantee that we free the packet every iteration VideoPacket video_packet; @@ -188,49 +179,44 @@ int VideoDecoder::decode() if (inpacket->stream_index != streamIndex_) return 0; - VideoFrame &frame = getNewFrame(); int frameFinished = 0; - int len = avcodec_decode_video2(decoderCtx_, frame.get(), + int len = avcodec_decode_video2(decoderCtx_, result.get(), &frameFinished, inpacket); if (len <= 0) return -2; - if (frameFinished) { - publishFrame(); + if (frameFinished) return 1; - } return 0; } -int VideoDecoder::flush() +int VideoDecoder::flush(VideoFrame& result) { AVPacket inpacket; av_init_packet(&inpacket); inpacket.data = NULL; inpacket.size = 0; - VideoFrame &frame = getNewFrame(); int frameFinished = 0; - int len = avcodec_decode_video2(decoderCtx_, frame.get(), + int len = avcodec_decode_video2(decoderCtx_, result.get(), &frameFinished, &inpacket); if (len <= 0) return -2; - if (frameFinished) { - publishFrame(); + if (frameFinished) return 1; - } return 0; } -void VideoDecoder::scale(VideoScaler &scaler, VideoFrame &output) -{ - VideoFrame* frame = obtainLastFrame().get(); +int VideoDecoder::getWidth() const +{ return decoderCtx_->width; } - if (frame) - scaler.scale(*frame, output); -} +int VideoDecoder::getHeight() const +{ return decoderCtx_->height; } + +int VideoDecoder::getPixelFormat() const +{ return libav_utils::sfl_pixel_format(decoderCtx_->pix_fmt); } } diff --git a/daemon/src/video/video_decoder.h b/daemon/src/video/video_decoder.h index 76a70f6a15..440090a02b 100644 --- a/daemon/src/video/video_decoder.h +++ b/daemon/src/video/video_decoder.h @@ -46,7 +46,7 @@ class AVCodec; namespace sfl_video { - class VideoDecoder : public VideoCodec, public VideoGenerator { + class VideoDecoder : public VideoCodec { public: VideoDecoder(); ~VideoDecoder(); @@ -56,12 +56,12 @@ namespace sfl_video { int openInput(const std::string &source_str, const std::string &format_str); int setupFromVideoData(); - int decode(); - int flush(); - void scale(VideoScaler &ctx, VideoFrame &output); + int decode(VideoFrame&); + int flush(VideoFrame&); - int getWidth() const { return dstWidth_; } - int getHeight() const { return dstHeight_; } + int getWidth() const; + int getHeight() const; + int getPixelFormat() const; private: NON_COPYABLE(VideoDecoder); @@ -69,11 +69,7 @@ namespace sfl_video { AVCodec *inputDecoder_; AVCodecContext *decoderCtx_; AVFormatContext *inputCtx_; - VideoFrame scaledPicture_; - int streamIndex_; - int dstWidth_; - int dstHeight_; }; } diff --git a/daemon/src/video/video_mixer.cpp b/daemon/src/video/video_mixer.cpp index 726e7bfbcf..f470c9da72 100644 --- a/daemon/src/video/video_mixer.cpp +++ b/daemon/src/video/video_mixer.cpp @@ -31,9 +31,6 @@ #include "libav_deps.h" #include "video_mixer.h" -#include "video_camera.h" -#include "client/video_controls.h" -#include "manager.h" #include "check.h" #include <cmath> @@ -42,16 +39,13 @@ namespace sfl_video { VideoMixer::VideoMixer() : VideoGenerator::VideoGenerator() - , updateMutex_() - , updateCondition_() , sourceScaler_() , scaledFrame_() - , sourceList_() , width_(0) , height_(0) + , renderMutex_() + , renderCv_() { - pthread_mutex_init(&updateMutex_, NULL); - pthread_cond_init(&updateCondition_, NULL); start(); } @@ -59,36 +53,29 @@ VideoMixer::~VideoMixer() { stop(); join(); - pthread_cond_destroy(&updateCondition_); - pthread_mutex_destroy(&updateMutex_); } -void VideoMixer::addSource(VideoSource *source) +void VideoMixer::process() { - sourceList_.push_back(source); + waitForUpdate(); + rendering(); } -void VideoMixer::removeSource(VideoSource *source) +void VideoMixer::waitForUpdate() { - sourceList_.remove(source); + std::unique_lock<std::mutex> lk(renderMutex_); + renderCv_.wait(lk); } -void VideoMixer::clearSources() -{ - sourceList_.clear(); -} - -void VideoMixer::process() -{ - waitForUpdate(); - rendering(); -} +void VideoMixer::update(Observable<VideoFrameSP>* ob, VideoFrameSP& frame_p) +{ renderCv_.notify_one(); } void VideoMixer::rendering() { if (!width_ or !height_) return; +#if 0 // For all sources: // - take source frame // - scale it down and layout it @@ -112,11 +99,11 @@ void VideoMixer::rendering() int lastInputWidth=0; int lastInputHeight=0; int i=0; - for (VideoSource* src : sourceList_) { + for (VideoNode* src : sourceList_) { int xoff = (i % zoom) * cell_width; int yoff = (i / zoom) * cell_height; - std::shared_ptr<VideoFrame> input=src->obtainLastFrame(); + VideoFrameSP input=src->obtainLastFrame(); if (input) { // scaling context allocation may be time consuming // so reset it only if needed @@ -134,20 +121,7 @@ void VideoMixer::rendering() i++; } publishFrame(); -} - -void VideoMixer::render() -{ - pthread_mutex_lock(&updateMutex_); - pthread_cond_signal(&updateCondition_); - pthread_mutex_unlock(&updateMutex_); -} - -void VideoMixer::waitForUpdate() -{ - pthread_mutex_lock(&updateMutex_); - pthread_cond_wait(&updateCondition_, &updateMutex_); - pthread_mutex_unlock(&updateMutex_); +#endif } void VideoMixer::setDimensions(int width, int height) @@ -159,5 +133,6 @@ void VideoMixer::setDimensions(int width, int height) int VideoMixer::getWidth() const { return width_; } int VideoMixer::getHeight() const { return height_; } +int VideoMixer::getPixelFormat() const { return VIDEO_PIXFMT_YUV420P; } -} // end namspace sfl_video +} // end namespace sfl_video diff --git a/daemon/src/video/video_mixer.h b/daemon/src/video/video_mixer.h index d1b8739d96..e00d00474f 100644 --- a/daemon/src/video/video_mixer.h +++ b/daemon/src/video/video_mixer.h @@ -33,48 +33,52 @@ #define __VIDEO_MIXER_H__ #include "noncopyable.h" +#include "video_base.h" #include "video_scaler.h" #include "sflthread.h" -#include <pthread.h> -#include <list> +#include <mutex> +#include <condition_variable> + namespace sfl_video { -using std::forward_list; -class VideoMixer : public VideoGenerator, public SFLThread +class VideoMixer : + public VideoGenerator, + public VideoFramePassiveReader, + public SFLThread { public: VideoMixer(); ~VideoMixer(); void setDimensions(int width, int height); - void addSource(VideoSource *source); - void removeSource(VideoSource *source); - void clearSources(); void render(); int getWidth() const; int getHeight() const; + int getPixelFormat() const; -private: - // threading - void process(); + // as VideoFramePassiveReader + void update(Observable<VideoFrameSP>*, VideoFrameSP&); +private: NON_COPYABLE(VideoMixer); + // as SFLThread + void process(); void waitForUpdate(); void encode(); void rendering(); - pthread_mutex_t updateMutex_; - pthread_cond_t updateCondition_; VideoScaler sourceScaler_; VideoFrame scaledFrame_; - std::list<VideoSource*> sourceList_; int width_; int height_; + + std::mutex renderMutex_; + std::condition_variable renderCv_; }; } diff --git a/daemon/src/video/video_receive_thread.cpp b/daemon/src/video/video_receive_thread.cpp index 0def0f9a48..5fad83fb80 100644 --- a/daemon/src/video/video_receive_thread.cpp +++ b/daemon/src/video/video_receive_thread.cpp @@ -31,45 +31,36 @@ */ #include "libav_deps.h" + #include "video_receive_thread.h" #include "socket_pair.h" +#include "manager.h" #include "client/video_controls.h" #include "check.h" -#include "video_decoder.h" #include <unistd.h> #include <map> -#include "manager.h" -#include "conference.h" - - namespace sfl_video { using std::string; const int SDP_BUFFER_SIZE = 8192; -VideoReceiveThread::VideoReceiveThread(const std::string &id, - const std::map<string, string> &args) : - args_(args) - , videoDecoder_(nullptr) - , mixer_(nullptr) +VideoReceiveThread::VideoReceiveThread(const std::string& id, + const std::map<string, string>& args, + const std::shared_ptr<SHMSink>& sink) : + VideoGenerator::VideoGenerator() + , args_(args) + , videoDecoder_() + , sink_(sink) , dstWidth_(0) , dstHeight_(0) - , sink_() - , bufferSize_(0) , id_(id) , stream_(args_["receiving_sdp"]) , sdpContext_(SDP_BUFFER_SIZE, false, &readFunction, 0, 0, this) - , demuxContext_(nullptr) - , scaler_() - , previewFrame_() + , demuxContext_() , requestKeyFrameCallback_(0) -{ - Conference *conf = Manager::instance().getConferenceFromCallID(id); - if (conf) - mixer_ = conf->getVideoMixer(); -} +{} VideoReceiveThread::~VideoReceiveThread() { @@ -149,22 +140,16 @@ bool VideoReceiveThread::setup() dstHeight_ = videoDecoder_->getHeight(); } - // allocate our preview frame and shared sink - previewFrame_.setGeometry(dstWidth_, dstHeight_, VIDEO_PIXFMT_BGRA); - bufferSize_ = previewFrame_.getSize(); - EXIT_IF_FAIL(bufferSize_ > 0, "Incorrect buffer size for decoding"); - - // Choose between direct rendering (sink) or mixing class. - if (mixer_) { - mixer_->addSource(videoDecoder_); - } else { - EXIT_IF_FAIL(sink_.start(), "Cannot start shared memory sink"); + // Sink startup + if (sink_) { + EXIT_IF_FAIL(sink_->start(), "RX: sink startup failed"); Manager::instance().getVideoControls()->startedDecoding(id_, - sink_.openedName(), + sink_->openedName(), dstWidth_, dstHeight_); - DEBUG("RX: shm sink started with size %d, width %d and height %d", - bufferSize_, dstWidth_, dstHeight_); + DEBUG("RX: shm sink <%s> started: size = %dx%d", + sink_->openedName().c_str(), dstWidth_, dstHeight_); + attach(sink_.get()); } return true; @@ -172,16 +157,15 @@ bool VideoReceiveThread::setup() void VideoReceiveThread::process() { - if (decodeFrame()) - renderFrame(); + decodeFrame(); } void VideoReceiveThread::cleanup() { - if (mixer_) - mixer_->removeSource(videoDecoder_); - else - Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName()); + if (sink_) { + detach(sink_.get()); + sink_->stop(); + } if (videoDecoder_) delete videoDecoder_; @@ -212,50 +196,26 @@ void VideoReceiveThread::addIOContext(SocketPair &socketPair) #endif } -/// Copies and scales our rendered frame to the buffer pointed to by data -void VideoReceiveThread::fillBuffer(void *data) -{ - previewFrame_.setDestination(data); - videoDecoder_->scale(scaler_, previewFrame_); -} - bool VideoReceiveThread::decodeFrame() { - int ret = videoDecoder_->decode(); + int ret = videoDecoder_->decode(getNewFrame()); - // fatal error? - if (ret == -1) { - stop(); - return false; + if (ret > 0) { + publishFrame(); + return true; } // decoding error? if (ret == -2 and requestKeyFrameCallback_) { + WARN("VideoDecoder error, restarting it..."); videoDecoder_->setupFromVideoData(); requestKeyFrameCallback_(id_); + } else if (ret < 0) { + ERROR("VideoDecoder fatal error, stopping it..."); + stop(); } - return (ret<0?0:1); -} - -void VideoReceiveThread::renderFrame() -{ - // FIXME: current design doesn't permit to be acknowledged - // when the current call enter in conference. - // So we always check the conference status here. - Conference *conf = Manager::instance().getConferenceFromCallID(id_); - if (conf and !mixer_) { - mixer_ = conf->getVideoMixer(); - mixer_->addSource(videoDecoder_); - } else if (!conf and mixer_) { - mixer_->removeSource(videoDecoder_); - mixer_ = nullptr; - } - - if (mixer_) - mixer_->render(); - else - sink_.render_callback(*this, bufferSize_); + return false; } void VideoReceiveThread::setRequestKeyFrameCallback( @@ -264,18 +224,13 @@ void VideoReceiveThread::setRequestKeyFrameCallback( requestKeyFrameCallback_ = cb; } -void VideoReceiveThread::addDetails( - std::map<std::string, std::string> &details) -{ - if (isRunning() and dstWidth_ > 0 and dstHeight_ > 0) { - details["VIDEO_SHM_PATH"] = sink_.openedName(); - std::ostringstream os; - os << dstWidth_; - details["VIDEO_WIDTH"] = os.str(); - os.str(""); - os << dstHeight_; - details["VIDEO_HEIGHT"] = os.str(); - } -} +int VideoReceiveThread::getWidth() const +{ return dstWidth_; } + +int VideoReceiveThread::getHeight() const +{ return dstHeight_; } + +int VideoReceiveThread::getPixelFormat() const +{ return videoDecoder_->getPixelFormat(); } } // end namespace sfl_video diff --git a/daemon/src/video/video_receive_thread.h b/daemon/src/video/video_receive_thread.h index cea6317f05..b7882479f8 100644 --- a/daemon/src/video/video_receive_thread.h +++ b/daemon/src/video/video_receive_thread.h @@ -31,69 +31,63 @@ #ifndef _VIDEO_RECEIVE_THREAD_H_ #define _VIDEO_RECEIVE_THREAD_H_ +#include "video_decoder.h" +#include "shm_sink.h" +#include "sflthread.h" +#include "noncopyable.h" + #include <map> #include <string> #include <climits> #include <sstream> -#include <tr1/memory> -#include "shm_sink.h" -#include "noncopyable.h" -#include "video_provider.h" -#include "video_decoder.h" -#include "video_scaler.h" -#include "video_mixer.h" -#include "sflthread.h" - +#include <memory> namespace sfl_video { class SocketPair; -class VideoReceiveThread : public VideoProvider, public SFLThread { +class VideoReceiveThread : public VideoGenerator, public SFLThread { +public: + VideoReceiveThread(const std::string &id, + const std::map<std::string, std::string> &args, + const std::shared_ptr<SHMSink>& sink); + ~VideoReceiveThread(); + + void addIOContext(SocketPair &socketPair); + void setRequestKeyFrameCallback(void (*)(const std::string &)); + + // as VideoGenerator + int getWidth() const; + int getHeight() const; + int getPixelFormat() const; + private: NON_COPYABLE(VideoReceiveThread); + std::map<std::string, std::string> args_; /*-------------------------------------------------------------*/ /* These variables should be used in thread (i.e. run()) only! */ /*-------------------------------------------------------------*/ VideoDecoder *videoDecoder_; - VideoMixer *mixer_; - + const std::shared_ptr<SHMSink> sink_; int dstWidth_; int dstHeight_; - - SHMSink sink_; - size_t bufferSize_; const std::string id_; std::istringstream stream_; VideoIOHandle sdpContext_; VideoIOHandle *demuxContext_; - VideoScaler scaler_; - VideoFrame previewFrame_; - void (* requestKeyFrameCallback_)(const std::string &); + void (*requestKeyFrameCallback_)(const std::string &); void openDecoder(); - void fillBuffer(void *data); - static int interruptCb(void *ctx); - friend struct VideoRxContextHandle; - bool decodeFrame(); - void renderFrame(); + static int interruptCb(void *ctx); static int readFunction(void *opaque, uint8_t *buf, int buf_size); - // threading + // as SFLThread bool setup(); void process(); void cleanup(); - -public: - VideoReceiveThread(const std::string &id, - const std::map<std::string, std::string> &args); - void addIOContext(SocketPair &socketPair); - void addDetails(std::map<std::string, std::string> &details); - ~VideoReceiveThread(); - void setRequestKeyFrameCallback(void (*)(const std::string &)); }; } diff --git a/daemon/src/video/video_rtp_session.cpp b/daemon/src/video/video_rtp_session.cpp index 1063b6b18e..486e1dc20c 100644 --- a/daemon/src/video/video_rtp_session.cpp +++ b/daemon/src/video/video_rtp_session.cpp @@ -1,6 +1,7 @@ /* * Copyright (C) 2004-2013 Savoir-Faire Linux Inc. * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com> + * Author: Guillaume Roguez <Guillaume.Roguez@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -28,18 +29,23 @@ * as that of the covered work. */ +#include "client/video_controls.h" #include "video_rtp_session.h" -#include <sstream> -#include <map> -#include <string> #include "video_send_thread.h" #include "video_receive_thread.h" +#include "video_mixer.h" #include "socket_pair.h" #include "sip/sdp.h" #include "sip/sipvoiplink.h" #include "manager.h" #include "logger.h" +#include <sstream> +#include <map> +#include <string> +#include <thread> +#include <chrono> + namespace sfl_video { using std::map; @@ -48,13 +54,12 @@ using std::string; VideoRtpSession::VideoRtpSession(const string &callID, const map<string, string> &txArgs) : socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs), - rxArgs_(), sending_(false), receiving_(false), callID_(callID) + rxArgs_(), sending_(false), receiving_(false), callID_(callID), + videoMixer_(), videoLocal_(), sink_(new SHMSink()) {} VideoRtpSession::~VideoRtpSession() -{ - stop(); -} +{ stop(); } void VideoRtpSession::updateSDP(const Sdp &sdp) { @@ -128,6 +133,9 @@ void VideoRtpSession::start(int localPort) { std::string curcid = Manager::instance().getCurrentCallId(); + videoMixer_ = nullptr; + videoLocal_ = nullptr; + if (not sending_ and not receiving_) return; @@ -138,10 +146,46 @@ void VideoRtpSession::start(int localPort) return; } + if (sending_) { + // Local video startup if needed + auto videoCtrl = Manager::instance().getVideoControls(); + if (!videoCtrl->hasPreviewStarted()) { + videoCtrl->startPreview(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + // Check for video conference mode + auto conf = Manager::instance().getConferenceFromCallID(callID_); + videoLocal_ = videoCtrl->getVideoPreview(); + if (not conf and not videoLocal_) { + ERROR("Sending disabled, no local video"); + sending_ = false; + sendThread_.reset(); + } else { + if (conf) { + // setup mixer pipeline + videoMixer_ = conf->getVideoMixer(); + if (videoLocal_) + videoLocal_->attach(videoMixer_); + } + + if (sendThread_.get()) + WARN("Restarting video sender"); + + sendThread_.reset(new VideoSendThread(callID_, txArgs_, + *socketPair_, videoLocal_, + videoMixer_)); + } + } else { + DEBUG("Video sending disabled"); + sendThread_.reset(); + } + if (receiving_) { if (receiveThread_.get()) WARN("restarting video receiver"); - receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_)); + receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_, + !videoMixer_?sink_:nullptr)); receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest); receiveThread_->addIOContext(*socketPair_); receiveThread_->start(); @@ -149,23 +193,26 @@ void VideoRtpSession::start(int localPort) DEBUG("Video receiving disabled"); receiveThread_.reset(); } - - if (sending_) { - if (sendThread_.get()) - WARN("Restarting video sender"); - sendThread_.reset(new VideoSendThread(callID_, txArgs_)); - sendThread_->addIOContext(*socketPair_); - sendThread_->start(); - } else { - DEBUG("Video sending disabled"); - sendThread_.reset(); - } } void VideoRtpSession::stop() { + Manager::instance().getVideoControls()->stoppedDecoding(callID_, + sink_->openedName()); + if (videoLocal_ and videoMixer_) { + videoLocal_->detach(videoMixer_); + videoMixer_->detach(sink_.get()); + } else if (videoLocal_) + videoLocal_->detach(sink_.get()); + else if (videoMixer_) + videoMixer_->detach(sink_.get()); + + videoLocal_ = nullptr; + videoMixer_ = nullptr; + if (socketPair_.get()) socketPair_->interrupt(); + receiveThread_.reset(); sendThread_.reset(); socketPair_.reset(); @@ -177,11 +224,17 @@ void VideoRtpSession::forceKeyFrame() sendThread_->forceKeyFrame(); } -void -VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details) +void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details) { - if (receiveThread_.get()) - receiveThread_->addDetails(details); + if (receiveThread_.get()) { + details["VIDEO_SHM_PATH"] = sink_->openedName(); + std::ostringstream os; + os << receiveThread_->getWidth(); + details["VIDEO_WIDTH"] = os.str(); + os.str(""); + os << receiveThread_->getHeight(); + details["VIDEO_HEIGHT"] = os.str(); + } } } // end namespace sfl_video diff --git a/daemon/src/video/video_rtp_session.h b/daemon/src/video/video_rtp_session.h index 574902bf3c..abf7fb2d45 100644 --- a/daemon/src/video/video_rtp_session.h +++ b/daemon/src/video/video_rtp_session.h @@ -1,6 +1,7 @@ /* * Copyright (C) 2004-2013 Savoir-Faire Linux Inc. * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com> + * Author: Guillaume Roguez <Guillaume.Roguez@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -31,9 +32,14 @@ #ifndef __VIDEO_RTP_SESSION_H__ #define __VIDEO_RTP_SESSION_H__ +#include "video_base.h" +#include "video_mixer.h" +#include "shm_sink.h" +#include "noncopyable.h" + #include <string> #include <map> -#include <tr1/memory> +#include <memory> class Sdp; @@ -44,29 +50,37 @@ class VideoReceiveThread; class SocketPair; class VideoRtpSession { - public: - VideoRtpSession(const std::string &callID, - const std::map<std::string, std::string> &txArgs); - ~VideoRtpSession(); +public: + VideoRtpSession(const std::string &callID, + const std::map<std::string, std::string> &txArgs); + ~VideoRtpSession(); + + void start(int localPort); + void stop(); + void updateDestination(const std::string &destination, + unsigned int port); + void updateSDP(const Sdp &sdp); + void forceKeyFrame(); + void addReceivingDetails(std::map<std::string, std::string> &details); + void bindMixer(VideoMixer* mixer); + void unbindMixer(); - void start(int localPort); - void stop(); - void updateDestination(const std::string &destination, - unsigned int port); - void updateSDP(const Sdp &sdp); - void forceKeyFrame(); - void addReceivingDetails(std::map<std::string, std::string> &details); +private: + NON_COPYABLE(VideoRtpSession); - private: - std::tr1::shared_ptr<SocketPair> socketPair_; - std::tr1::shared_ptr<VideoSendThread> sendThread_; - std::tr1::shared_ptr<VideoReceiveThread> receiveThread_; - std::map<std::string, std::string> txArgs_; - std::map<std::string, std::string> rxArgs_; - bool sending_; - bool receiving_; - const std::string callID_; + std::shared_ptr<SocketPair> socketPair_; + std::shared_ptr<VideoSendThread> sendThread_; + std::shared_ptr<VideoReceiveThread> receiveThread_; + std::map<std::string, std::string> txArgs_; + std::map<std::string, std::string> rxArgs_; + bool sending_; + bool receiving_; + const std::string callID_; + VideoMixer* videoMixer_; + VideoFrameActiveWriter *videoLocal_; + std::shared_ptr<SHMSink> sink_; }; + } #endif // __VIDEO_RTP_SESSION_H__ diff --git a/daemon/src/video/video_send_thread.cpp b/daemon/src/video/video_send_thread.cpp index 7cc98ca9f1..2d58bc3d2c 100644 --- a/daemon/src/video/video_send_thread.cpp +++ b/daemon/src/video/video_send_thread.cpp @@ -1,6 +1,5 @@ /* * Copyright (C) 2004-2013 Savoir-Faire Linux Inc. - * * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com> * Author: Guillaume Roguez <Guillaume.Roguez@savoirfairelinux.com> * @@ -31,6 +30,7 @@ */ #include "video_send_thread.h" +#include "video_mixer.h" #include "socket_pair.h" #include "client/video_controls.h" #include "check.h" @@ -44,53 +44,61 @@ namespace sfl_video { using std::string; -VideoSendThread::VideoSendThread(const std::string &id, const std::map<string, string> &args) : +VideoSendThread::VideoSendThread(const std::string &id, + const std::map<string, string> &args, + SocketPair& socketPair, + VideoFrameActiveWriter *local_video, + VideoFrameActiveWriter *mixer) : args_(args) , id_(id) - , videoEncoder_(nullptr) - , videoSource_(nullptr) - , mixer_(nullptr) + , videoEncoder_() + , videoSource_(local_video) , forceKeyFrame_(0) , frameNumber_(0) - , muxContext_(nullptr) + , muxContext_(socketPair.getIOContext()) , sdp_() -{} +{ + if (setup()) { + // do video pipeline + if (mixer) { + videoSource_ = mixer; + static_cast<VideoMixer*>(mixer)->setDimensions( + videoEncoder_->getWidth(), + videoEncoder_->getHeight()); + } + if (videoSource_) + videoSource_->attach(this); + } +} VideoSendThread::~VideoSendThread() { - stop(); - join(); + if (videoSource_) + videoSource_->detach(this); } bool VideoSendThread::setup() { - // Use local camera as default video source for setup - if (!videoSource_) - WARN("No video source started"); - const char *enc_name = args_["codec"].c_str(); videoEncoder_ = new VideoEncoder(); - videoEncoder_->setInterruptCallback(interruptCb, this); /* Encoder setup */ if (!args_["width"].empty()) { const char *s = args_["width"].c_str(); videoEncoder_->setOption("width", s); - } else if (videoSource_) { - char buf[11]; - sprintf(buf, "%10d", videoSource_->getWidth()); - videoEncoder_->setOption("width", buf); - } + } else { + ERROR("width option not set"); + return false; + } if (!args_["height"].empty()) { const char *s = args_["height"].c_str(); videoEncoder_->setOption("height", s); - } else if (videoSource_) { - char buf[11]; - sprintf(buf, "%10d", videoSource_->getHeight()); - videoEncoder_->setOption("height", buf); - } + } else { + ERROR("height option not set"); + return false; + } videoEncoder_->setOption("bitrate", args_["bitrate"].c_str()); @@ -106,84 +114,37 @@ bool VideoSendThread::setup() videoEncoder_->setOption("payload_type", args_["payload_type"].c_str()); } - EXIT_IF_FAIL(!videoEncoder_->openOutput(enc_name, "rtp", - args_["destination"].c_str(), NULL), - "encoder openOutput() failed"); - videoEncoder_->setIOContext(muxContext_); - EXIT_IF_FAIL(!videoEncoder_->startIO(), "encoder start failed"); - - videoEncoder_->print_sdp(sdp_); - return true; -} - -void VideoSendThread::process() -{ - checkVideoSource(); - if (videoSource_) { - VideoFrame *frame = videoSource_->waitNewFrame().get(); - if (frame) - encodeAndSendVideo(frame); - } else { - WARN("No video source!"); + if (videoEncoder_->openOutput(enc_name, "rtp", args_["destination"].c_str(), + NULL)) { + ERROR("encoder openOutput() failed"); + return false; } -} -void VideoSendThread::cleanup() -{ - if (videoEncoder_) - delete videoEncoder_; - - if (muxContext_) - delete muxContext_; -} - -void VideoSendThread::checkVideoSource() -{ - Conference *conf = Manager::instance().getConferenceFromCallID(id_); - if (conf and !mixer_) { - videoSource_ = mixer_ = conf->getVideoMixer(); - mixer_->setDimensions(videoEncoder_->getWidth(), - videoEncoder_->getHeight()); - } else if (!conf and mixer_) { - mixer_ = nullptr; - videoSource_ = Manager::instance().getVideoControls()->getVideoPreview(); + videoEncoder_->setIOContext(muxContext_); + if (videoEncoder_->startIO()) { + ERROR("encoder start failed"); + return false; } - // if no video source, try to start local camera - if (!videoSource_) { - VideoControls *vctl = Manager::instance().getVideoControls(); - vctl->startPreview(); - sleep(1); // let the camera startup - videoSource_ = vctl->getVideoPreview(); - } + videoEncoder_->print_sdp(sdp_); + return true; } -void VideoSendThread::encodeAndSendVideo(VideoFrame *input_frame) +void VideoSendThread::encodeAndSendVideo(VideoFrame& input_frame) { bool is_keyframe = forceKeyFrame_ > 0; if (is_keyframe) atomic_decrement(&forceKeyFrame_); - EXIT_IF_FAIL(videoEncoder_->encode(*input_frame, is_keyframe, frameNumber_++) >= 0, - "encoding failed"); + if (videoEncoder_->encode(input_frame, is_keyframe, frameNumber_++) < 0) + ERROR("encoding failed"); } -// This callback is used by libav internally to break out of blocking calls -int VideoSendThread::interruptCb(void *data) -{ - VideoSendThread *context = static_cast<VideoSendThread*>(data); - return not context->isRunning(); -} - -void VideoSendThread::addIOContext(SocketPair &socketPair) -{ - muxContext_ = socketPair.getIOContext(); -} +void VideoSendThread::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p) +{ encodeAndSendVideo(*frame_p); } void VideoSendThread::forceKeyFrame() -{ - atomic_increment(&forceKeyFrame_); -} +{ atomic_increment(&forceKeyFrame_); } } // end namespace sfl_video diff --git a/daemon/src/video/video_send_thread.h b/daemon/src/video/video_send_thread.h index 463ba83385..e7d04b8fdb 100644 --- a/daemon/src/video/video_send_thread.h +++ b/daemon/src/video/video_send_thread.h @@ -1,6 +1,7 @@ /* * Copyright (C) 2011-2013 Savoir-Faire Linux Inc. * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com> + * Author: Guillaume Roguez <Guillaume.Roguez@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -31,47 +32,44 @@ #ifndef __VIDEO_SEND_THREAD_H__ #define __VIDEO_SEND_THREAD_H__ -#include <map> -#include <string> -#include <tr1/memory> #include "noncopyable.h" #include "video_encoder.h" #include "video_mixer.h" -#include "sflthread.h" + +#include <map> +#include <string> +#include <memory> namespace sfl_video { class SocketPair; -class VideoSendThread : public SFLThread { +class VideoSendThread : public VideoFramePassiveReader +{ public: VideoSendThread(const std::string &id, - const std::map<std::string, std::string> &args); - ~VideoSendThread(); - void addIOContext(SocketPair &sock); + const std::map<std::string, std::string> &args, + SocketPair& socketPair, + VideoFrameActiveWriter *local_video, + VideoFrameActiveWriter *mixer); + virtual ~VideoSendThread(); std::string getSDP() const { return sdp_; } void forceKeyFrame(); -private: - // threading - bool setup(); - void process(); - void cleanup(); + // as VideoFramePassiveReader + void update(Observable<VideoFrameSP>*, VideoFrameSP&); +private: NON_COPYABLE(VideoSendThread); - static int interruptCb(void *ctx); - void encodeAndSendVideo(VideoFrame *); - void checkVideoSource(); + bool setup(); + void encodeAndSendVideo(VideoFrame&); std::map<std::string, std::string> args_; const std::string &id_; - /*-------------------------------------------------------------*/ - /* These variables should be used in thread (i.e. run()) only! */ - /*-------------------------------------------------------------*/ + VideoEncoder *videoEncoder_; - VideoSource *videoSource_; - VideoMixer *mixer_; + VideoFrameActiveWriter *videoSource_; int forceKeyFrame_; int frameNumber_; -- GitLab