Commit 475b8e52 authored by Guillaume Roguez's avatar Guillaume Roguez

#29579: video mixing implementation and conference fixes.

- mixer rendering implemention
=> frame based (was per sources batch based)

- add backward signaling to Observer/Obsevable classes
=> This help mixer to index sources for layout them.

- mutex'ed frame publish (VideoGenerator).

- sinks creation are now done at right places.
=> one per mixer (new), one per camera, one per stream reception.

- VideoRTPSession is fully responsible to handle video pipeline,
between RX/TS streams.
=> exhibit enterConference/exitConference to be aknowledged by upper layers.

- VideoSendThread is not longer a «thread», renamed as VideoSender.

- videoMixer_ is now a shared ptr in Conference objects.
=> getVideoMixer() return a rew shared_ptr also.

- Conference is now responsible to trig video conference pipeline

- std::this_thread::sleep_for() is not usable before GCC 4.1
parent 523a831f
......@@ -37,6 +37,8 @@
#include "audio/mainbuffer.h"
#ifdef SFL_VIDEO
#include "sip/sipvoiplink.h"
#include "sip/sipcall.h"
#include "client/video_controls.h"
#include "video/video_camera.h"
#endif
......@@ -49,12 +51,18 @@ Conference::Conference()
, confState_(ACTIVE_ATTACHED)
, participants_()
#ifdef SFL_VIDEO
, videoMixer_()
, videoMixer_(new sfl_video::VideoMixer(id_))
#endif
{
Recordable::initRecFilename(id_);
}
Conference::~Conference()
{
for (auto participant_id : participants_)
remove(participant_id);
}
Conference::ConferenceState Conference::getState() const
{
return confState_;
......@@ -67,12 +75,20 @@ void Conference::setState(ConferenceState state)
void Conference::add(const std::string &participant_id)
{
participants_.insert(participant_id);
if (participants_.insert(participant_id).second) {
#ifdef SFL_VIDEO
SIPVoIPLink::instance()->getSipCall(participant_id)->getVideoRtp().enterConference(this);
#endif // SFL_VIDEO
}
}
void Conference::remove(const std::string &participant_id)
{
participants_.erase(participant_id);
if (participants_.erase(participant_id)) {
#ifdef SFL_VIDEO
SIPVoIPLink::instance()->getSipCall(participant_id)->getVideoRtp().exitConference();
#endif // SFL_VIDEO
}
}
void Conference::bindParticipant(const std::string &participant_id)
......@@ -144,8 +160,8 @@ std::string Conference::getConfID() const {
}
#ifdef SFL_VIDEO
sfl_video::VideoMixer* Conference::getVideoMixer()
std::shared_ptr<sfl_video::VideoMixer> Conference::getVideoMixer()
{
return &videoMixer_;
return videoMixer_;
}
#endif
......@@ -36,6 +36,7 @@
#include <set>
#include <string>
#include <memory>
#include "audio/recordable.h"
......@@ -54,6 +55,11 @@ class Conference : public Recordable {
*/
Conference();
/**
* Destructor for this class, decrement static counter
*/
~Conference();
/**
* Return the conference id
*/
......@@ -100,7 +106,7 @@ class Conference : public Recordable {
virtual bool toggleRecording();
#ifdef SFL_VIDEO
sfl_video::VideoMixer* getVideoMixer();
std::shared_ptr<sfl_video::VideoMixer> getVideoMixer();
#endif
private:
......@@ -109,7 +115,7 @@ class Conference : public Recordable {
ParticipantSet participants_;
#ifdef SFL_VIDEO
sfl_video::VideoMixer videoMixer_;
std::shared_ptr<sfl_video::VideoMixer> videoMixer_;
#endif
};
......
......@@ -943,11 +943,6 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer
if (participants.empty())
ERROR("Participant list is empty for this conference");
#ifdef SFL_VIDEO
// request participant video streams to be routed to video mixer
SIPVoIPLink::instance()->getSipCall(callId)->getVideoRtp().enterConference(conf);
#endif // SFL_VIDEO
// Connect stream
addStream(callId);
return true;
......@@ -1116,12 +1111,6 @@ ManagerImpl::joinParticipant(const std::string& callId1, const std::string& call
conf->setRecordingSmplRate(audiodriver_->getSampleRate());
}
#ifdef SFL_VIDEO
// request participant video streams to be routed to video mixer
SIPVoIPLink::instance()->getSipCall(callId1)->getVideoRtp().enterConference(conf);
SIPVoIPLink::instance()->getSipCall(callId2)->getVideoRtp().enterConference(conf);
#endif // SFL_VIDEO
getMainBuffer().dumpInfo();
return true;
}
......
......@@ -16,7 +16,7 @@ libvideo_la_SOURCES = \
shm_sink.cpp shm_sink.h \
video_camera.cpp video_camera.h \
video_receive_thread.cpp video_receive_thread.h \
video_send_thread.cpp video_send_thread.h \
video_sender.cpp video_sender.h \
video_rtp_session.cpp video_rtp_session.h \
check.h shm_header.h video_provider.h \
libav_utils.cpp libav_utils.h libav_deps.h
......
......@@ -40,9 +40,6 @@
namespace sfl_video {
class VideoSendThread;
class VideoReceiveThread;
class SocketPair {
public:
SocketPair(const char *uri, int localPort);
......
......@@ -211,6 +211,7 @@ void VideoFrame::test()
VideoFrame& VideoGenerator::getNewFrame()
{
std::unique_lock<std::mutex> lk(mutex_);
if (writableFrame_)
writableFrame_->setdefaults();
else
......@@ -220,12 +221,14 @@ VideoFrame& VideoGenerator::getNewFrame()
void VideoGenerator::publishFrame()
{
std::unique_lock<std::mutex> lk(mutex_);
lastFrame_ = std::move(writableFrame_);
notify(std::ref(lastFrame_));
notify(lastFrame_);
}
VideoFrameSP VideoGenerator::obtainLastFrame()
{
std::unique_lock<std::mutex> lk(mutex_);
return lastFrame_;
}
......
......@@ -39,8 +39,16 @@
#include <memory>
#include <set>
#include <mutex>
#include <condition_variable>
// std::this_thread::sleep_for is by default supported since 4.1
#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
#include <chrono>
#include <thread>
#define MYSLEEP(x) std::this_thread::sleep_for(std::chrono::seconds(x))
#else
#include <unistd.h>
#define MYSLEEP(x) sleep(x)
#endif
class AVFrame;
class AVPacket;
......@@ -77,16 +85,22 @@ public:
Observable() : observers_(), mutex_() {}
virtual ~Observable() {};
void attach(Observer<T>* o) {
bool attach(Observer<T>* o) {
std::unique_lock<std::mutex> lk(mutex_);
if (o)
observers_.insert(o);
if (o and observers_.insert(o).second) {
o->attached(this);
return true;
}
return false;
}
void detach(Observer<T>* o) {
bool detach(Observer<T>* o) {
std::unique_lock<std::mutex> lk(mutex_);
if (o)
observers_.erase(o);
if (o and observers_.erase(o)) {
o->detached(this);
return true;
}
return false;
}
void notify(T& data) {
......@@ -95,6 +109,11 @@ public:
observer->update(this, data);
}
int getObserversCount() {
std::unique_lock<std::mutex> lk(mutex_);
return observers_.size();
}
private:
NON_COPYABLE(Observable<T>);
......@@ -110,6 +129,8 @@ class Observer
public:
virtual ~Observer() {};
virtual void update(Observable<T>*, T&) = 0;
virtual void attached(Observable<T>*) {};
virtual void detached(Observable<T>*) {};
};
/*=== VideoPacket ===========================================================*/
......@@ -207,7 +228,7 @@ class VideoFramePassiveReader :
class VideoGenerator : public VideoFrameActiveWriter
{
public:
VideoGenerator() : writableFrame_(), lastFrame_() {}
VideoGenerator() : writableFrame_(), lastFrame_(), mutex_() {}
virtual int getWidth() const = 0;
virtual int getHeight() const = 0;
......@@ -223,6 +244,7 @@ protected:
private:
VideoFrameUP writableFrame_;
VideoFrameSP lastFrame_;
std::mutex mutex_; // lock writableFrame_/lastFrame_ access
};
}
......
......@@ -38,6 +38,8 @@
#include <map>
#include <string>
#define SINK_ID "local"
namespace sfl_video {
using std::string;
......@@ -100,14 +102,11 @@ bool VideoCamera::setup()
/* Sink setup */
EXIT_IF_FAIL(sink_.start(), "Cannot start shared memory sink");
Manager::instance().getVideoControls()->startedDecoding(id_,
sink_.openedName(),
sinkWidth_,
sinkHeight_);
DEBUG("TX: shm sink <%s> started: size = %dx%d",
sink_.openedName().c_str(), sinkWidth_, sinkHeight_);
attach(&sink_);
if (attach(&sink_)) {
Manager::instance().getVideoControls()->startedDecoding(SINK_ID, sink_.openedName(), sinkWidth_, sinkHeight_);
DEBUG("LOCAL: shm sink <%s> started: size = %dx%d",
sink_.openedName().c_str(), sinkWidth_, sinkHeight_);
}
return true;
}
......@@ -117,9 +116,11 @@ void VideoCamera::process()
void VideoCamera::cleanup()
{
Manager::instance().getVideoControls()->stoppedDecoding(id_,
sink_.openedName());
detach(&sink_);
if (detach(&sink_)) {
Manager::instance().getVideoControls()->stoppedDecoding(SINK_ID, sink_.openedName());
sink_.stop();
}
delete decoder_;
}
......
......@@ -32,105 +32,141 @@
#include "libav_deps.h"
#include "video_mixer.h"
#include "check.h"
#include "client/video_controls.h"
#include "manager.h"
#include "logger.h"
#include <cmath>
namespace sfl_video {
VideoMixer::VideoMixer() :
VideoMixer::VideoMixer(const std::string id) :
VideoGenerator::VideoGenerator()
, sourceScaler_()
, scaledFrame_()
, id_(id)
, width_(0)
, height_(0)
, renderMutex_()
, renderCv_()
{ start(); }
, sources_()
, mutex_()
, sink_()
{
auto videoCtrl = Manager::instance().getVideoControls();
if (!videoCtrl->hasPreviewStarted()) {
videoCtrl->startPreview();
MYSLEEP(1);
}
// Local video camera is always attached
videoCtrl->getVideoPreview()->attach(this);
}
VideoMixer::~VideoMixer()
{
stop();
join();
stop_sink();
auto videoCtrl = Manager::instance().getVideoControls();
videoCtrl->getVideoPreview()->detach(this);
}
void VideoMixer::process()
void VideoMixer::attached(Observable<VideoFrameSP>* ob)
{
waitForUpdate();
rendering();
std::unique_lock<std::mutex> lk(mutex_);
sources_.push_back(ob);
}
void VideoMixer::waitForUpdate()
void VideoMixer::detached(Observable<VideoFrameSP>* ob)
{
std::unique_lock<std::mutex> lk(renderMutex_);
renderCv_.wait(lk);
std::unique_lock<std::mutex> lk(mutex_);
sources_.remove(ob);
}
void VideoMixer::update(Observable<VideoFrameSP>* ob, VideoFrameSP& frame_p)
{ renderCv_.notify_one(); }
{
std::unique_lock<std::mutex> lk(mutex_);
int i=0;
for (auto x : sources_) {
if (x == ob) break;
i++;
}
render_frame(*frame_p, i);
}
void VideoMixer::rendering()
void VideoMixer::render_frame(VideoFrame& input, const int index)
{
VideoScaler scaler;
VideoFrame scaled_input;
if (!width_ or !height_)
return;
#if 0
// For all sources:
// - take source frame
// - scale it down and layout it
// - publish the result frame
// Current layout is a squared distribution
VideoFrame &output = getNewFrame();
const int n=sourceList_.size();
if (!output.allocBuffer(width_, height_, VIDEO_PIXFMT_YUV420P)) {
ERROR("VideoFrame::allocBuffer() failed");
return;
}
VideoFrameSP previous_p=obtainLastFrame();
if (previous_p)
previous_p->copy(output);
previous_p.reset();
const int n=sources_.size();
const int zoom=ceil(sqrt(n));
const int cell_width=width_ / zoom;
const int cell_height=height_ / zoom;
VideoFrame &output = getNewFrame();
// Blit frame function support only YUV420P pixel format
if (!output.allocBuffer(width_, height_, VIDEO_PIXFMT_YUV420P))
WARN("VideoFrame::allocBuffer() failed");
if (!scaled_input.allocBuffer(cell_width, cell_height,
VIDEO_PIXFMT_YUV420P)) {
ERROR("VideoFrame::allocBuffer() failed");
return;
}
if (!scaledFrame_.allocBuffer(cell_width, cell_height, VIDEO_PIXFMT_YUV420P))
WARN("VideoFrame::allocBuffer() failed");
int xoff = (index % zoom) * cell_width;
int yoff = (index / zoom) * cell_height;
int lastInputWidth=0;
int lastInputHeight=0;
int i=0;
for (VideoNode* src : sourceList_) {
int xoff = (i % zoom) * cell_width;
int yoff = (i / zoom) * cell_height;
VideoFrameSP input=src->obtainLastFrame();
if (input) {
// scaling context allocation may be time consuming
// so reset it only if needed
if (input->getWidth() != lastInputWidth ||
input->getHeight() != lastInputHeight)
sourceScaler_.reset();
sourceScaler_.scale(*input, scaledFrame_);
output.blit(scaledFrame_, xoff, yoff);
lastInputWidth = input->getWidth();
lastInputHeight = input->getHeight();
}
scaler.scale(input, scaled_input);
output.blit(scaled_input, xoff, yoff);
i++;
}
publishFrame();
#endif
}
void VideoMixer::setDimensions(int width, int height)
{
// FIXME: unprotected write (see rendering())
std::unique_lock<std::mutex> lk(mutex_);
width_ = width;
height_ = height;
stop_sink();
start_sink();
}
int VideoMixer::getWidth() const { return width_; }
int VideoMixer::getHeight() const { return height_; }
int VideoMixer::getPixelFormat() const { return VIDEO_PIXFMT_YUV420P; }
void VideoMixer::start_sink()
{
if (sink_.start()) {
if (this->attach(&sink_)) {
Manager::instance().getVideoControls()->startedDecoding(id_+"_MX", sink_.openedName(), width_, height_);
DEBUG("MX: shm sink <%s> started: size = %dx%d",
sink_.openedName().c_str(), width_, height_);
}
} else
WARN("MX: sink startup failed");
}
void VideoMixer::stop_sink()
{
if (this->detach(&sink_)) {
Manager::instance().getVideoControls()->stoppedDecoding(id_+"_MX", sink_.openedName());
sink_.stop();
}
}
int VideoMixer::getWidth() const
{ return width_; }
int VideoMixer::getHeight() const
{ return height_; }
int VideoMixer::getPixelFormat() const
{ return VIDEO_PIXFMT_YUV420P; }
} // end namespace sfl_video
......@@ -35,25 +35,24 @@
#include "noncopyable.h"
#include "video_base.h"
#include "video_scaler.h"
#include "shm_sink.h"
#include "sflthread.h"
#include <mutex>
#include <condition_variable>
#include <list>
namespace sfl_video {
class VideoMixer :
public VideoGenerator,
public VideoFramePassiveReader,
public SFLThread
public VideoFramePassiveReader
{
public:
VideoMixer();
~VideoMixer();
VideoMixer(const std::string id_);
virtual ~VideoMixer();
void setDimensions(int width, int height);
void render();
int getWidth() const;
int getHeight() const;
......@@ -61,24 +60,22 @@ public:
// as VideoFramePassiveReader
void update(Observable<VideoFrameSP>*, VideoFrameSP&);
void attached(Observable<VideoFrameSP>* ob);
void detached(Observable<VideoFrameSP>* ob);
private:
NON_COPYABLE(VideoMixer);
// as SFLThread
void process();
void waitForUpdate();
void encode();
void rendering();
VideoScaler sourceScaler_;
VideoFrame scaledFrame_;
void render_frame(VideoFrame& input, const int index);
void start_sink();
void stop_sink();
const std::string id_;
int width_;
int height_;
std::mutex renderMutex_;
std::condition_variable renderCv_;
std::list<Observable<VideoFrameSP>*> sources_;
std::mutex mutex_;
SHMSink sink_;
};
}
......
......@@ -151,7 +151,8 @@ void VideoReceiveThread::process()
void VideoReceiveThread::cleanup()
{
Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName());
if (detach(&sink_))
Manager::instance().getVideoControls()->stoppedDecoding(id_+"RX", sink_.openedName());
if (videoDecoder_)
delete videoDecoder_;
......@@ -223,9 +224,10 @@ void VideoReceiveThread::enterConference()
if (!isRunning())
return;
Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName());
detach(&sink_);
sink_.stop();
if (detach(&sink_)) {
Manager::instance().getVideoControls()->stoppedDecoding(id_+"RX", sink_.openedName());
sink_.stop();
}
}
void VideoReceiveThread::exitConference()
......@@ -233,16 +235,12 @@ void VideoReceiveThread::exitConference()
if (!isRunning())
return;
// Sink startup
EXIT_IF_FAIL(sink_.start(), "RX: sink startup failed");
Manager::instance().getVideoControls()->startedDecoding(id_,
sink_.openedName(),
dstWidth_,
dstHeight_);
DEBUG("RX: shm sink <%s> started: size = %dx%d",
sink_.openedName().c_str(), dstWidth_, dstHeight_);
attach(&sink_);
if (attach(&sink_)) {
Manager::instance().getVideoControls()->startedDecoding(id_+"RX", sink_.openedName(), dstWidth_, dstHeight_);
DEBUG("RX: shm sink <%s> started: size = %dx%d",
sink_.openedName().c_str(), dstWidth_, dstHeight_);
}
}
void VideoReceiveThread::setRequestKeyFrameCallback(
......
......@@ -31,7 +31,7 @@
#include "client/video_controls.h"
#include "video_rtp_session.h"
#include "video_send_thread.h"
#include "video_sender.h"
#include "video_receive_thread.h"
#include "video_mixer.h"
#include "socket_pair.h"
......@@ -43,8 +43,6 @@
#include <sstream>
#include <map>
#include <string>
#include <thread>
#include <chrono>
namespace sfl_video {
......@@ -53,9 +51,9 @@ using std::string;
VideoRtpSession::VideoRtpSession(const string &callID,
const map<string, string> &txArgs) :
socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs),
socketPair_(), sender_(), receiveThread_(), txArgs_(txArgs),
rxArgs_(), sending_(false), receiving_(false), callID_(callID),
videoMixer_(), videoLocal_()
videoMixerSP_(), videoLocal_()
{}
VideoRtpSession::~VideoRtpSession()
......@@ -115,7 +113,7 @@ void VideoRtpSession::updateDestination(const string &destination,
tmp << "rtp://" << destination << ":" << port;
// if destination has changed
if (tmp.str() != txArgs_["destination"]) {
if (sendThread_.get() != 0) {
if (sender_.get() != 0) {
ERROR("Video is already being sent");
return;
}
......@@ -146,17 +144,17 @@ void VideoRtpSession::start(int localPort)
auto videoCtrl = Manager::instance().getVideoControls();
if (!videoCtrl->hasPreviewStarted()) {
videoCtrl->startPreview();
std::this_thread::sleep_for(std::chrono::seconds(1));
MYSLEEP(1);
}
videoLocal_ = videoCtrl->getVideoPreview();
if (sendThread_.get())
if (sender_.get())
WARN("Restarting video sender");
sendThread_.reset(new VideoSendThread(callID_, txArgs_, *socketPair_));
sender_.reset(new VideoSender(callID_, txArgs_, *socketPair_));
} else {
DEBUG("Video sending disabled");
sendThread_.reset();
sender_.reset();
}
if (receiving_) {
......@@ -172,46 +170,37 @@ void VideoRtpSession::start(int localPort)
}
// Setup pipeline
if (videoMixer_) {
// We are in conference mode
if (sendThread_) {
videoLocal_->detach(sendThread_.get());
videoMixer_->attach(sendThread_.get());
}
if (receiveThread_) {
receiveThread_->enterConference();
receiveThread_->attach(videoMixer_);
}
if (videoMixerSP_) {
setupConferenceVideoPipeline();
} else {
if (sendThread_)
videoLocal_->attach(sendThread_.get());
if (sender_)
videoLocal_->attach(sender_.get());
}
}
void VideoRtpSession::stop()
{
if (videoLocal_)
videoLocal_->detach(sendThread_.get());
if (videoLocal_) {
videoLocal_->detach(sender_.get());
}
if (videoMixer_) {
videoMixer_->detach(sendThread_.get());
receiveThread_->detach(videoMixer_);
if (videoMixerSP_) {
videoMixerSP_->detach(sender_.get());
receiveThread_->detach(videoMixerSP_.get());
}
if (socketPair_.get())
socketPair_->interrupt();
receiveThread_.reset();
sendThread_.reset();
sender_.reset();
socketPair_.reset();
}
void VideoRtpSession::forceKeyFrame()
{
if (sendThread_.get())
sendThread_->forceKeyFrame();
if (sender_.get())
sender_->forceKeyFrame();
}
void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details)
......@@ -220,36 +209,46 @@ void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &de
receiveThread_->addReceivingDetails(details);
}
void VideoRtpSession::enterConference(Conference *conf)
void VideoRtpSession::setupConferenceVideoPipeline()
{
auto mixer = conf->getVideoMixer();
if (!mixer) {