Commit 26d1e199 authored by Guillaume Roguez's avatar Guillaume Roguez

#29564: video: fix non-conference pipeline setup

Note: VideoMixer is not yet ready in this commit.
parent 58f887c8
...@@ -943,6 +943,11 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer ...@@ -943,6 +943,11 @@ ManagerImpl::addParticipant(const std::string& callId, const std::string& confer
if (participants.empty()) if (participants.empty())
ERROR("Participant list is empty for this conference"); ERROR("Participant list is empty for this conference");
#ifdef SFL_VIDEO
// request participant video streams to be routed to video mixer
SIPVoIPLink::instance()->getSipCall(callId)->getVideoRtp().enterConference(conf);
#endif // SFL_VIDEO
// Connect stream // Connect stream
addStream(callId); addStream(callId);
return true; return true;
...@@ -1111,6 +1116,12 @@ ManagerImpl::joinParticipant(const std::string& callId1, const std::string& call ...@@ -1111,6 +1116,12 @@ ManagerImpl::joinParticipant(const std::string& callId1, const std::string& call
conf->setRecordingSmplRate(audiodriver_->getSampleRate()); conf->setRecordingSmplRate(audiodriver_->getSampleRate());
} }
#ifdef SFL_VIDEO
// request participant video streams to be routed to video mixer
SIPVoIPLink::instance()->getSipCall(callId1)->getVideoRtp().enterConference(conf);
SIPVoIPLink::instance()->getSipCall(callId2)->getVideoRtp().enterConference(conf);
#endif // SFL_VIDEO
getMainBuffer().dumpInfo(); getMainBuffer().dumpInfo();
return true; return true;
} }
......
...@@ -79,12 +79,14 @@ public: ...@@ -79,12 +79,14 @@ public:
void attach(Observer<T>* o) { void attach(Observer<T>* o) {
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
observers_.insert(o); if (o)
observers_.insert(o);
} }
void detach(Observer<T>* o) { void detach(Observer<T>* o) {
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
observers_.erase(o); if (o)
observers_.erase(o);
} }
void notify(T& data) { void notify(T& data) {
......
...@@ -45,9 +45,7 @@ VideoMixer::VideoMixer() : ...@@ -45,9 +45,7 @@ VideoMixer::VideoMixer() :
, height_(0) , height_(0)
, renderMutex_() , renderMutex_()
, renderCv_() , renderCv_()
{ { start(); }
start();
}
VideoMixer::~VideoMixer() VideoMixer::~VideoMixer()
{ {
......
...@@ -47,18 +47,17 @@ using std::string; ...@@ -47,18 +47,17 @@ using std::string;
const int SDP_BUFFER_SIZE = 8192; const int SDP_BUFFER_SIZE = 8192;
VideoReceiveThread::VideoReceiveThread(const std::string& id, VideoReceiveThread::VideoReceiveThread(const std::string& id,
const std::map<string, string>& args, const std::map<string, string>& args) :
const std::shared_ptr<SHMSink>& sink) :
VideoGenerator::VideoGenerator() VideoGenerator::VideoGenerator()
, args_(args) , args_(args)
, videoDecoder_() , videoDecoder_()
, sink_(sink)
, dstWidth_(0) , dstWidth_(0)
, dstHeight_(0) , dstHeight_(0)
, id_(id) , id_(id)
, stream_(args_["receiving_sdp"]) , stream_(args_["receiving_sdp"])
, sdpContext_(SDP_BUFFER_SIZE, false, &readFunction, 0, 0, this) , sdpContext_(SDP_BUFFER_SIZE, false, &readFunction, 0, 0, this)
, demuxContext_() , demuxContext_()
, sink_()
, requestKeyFrameCallback_(0) , requestKeyFrameCallback_(0)
{} {}
...@@ -140,32 +139,19 @@ bool VideoReceiveThread::setup() ...@@ -140,32 +139,19 @@ bool VideoReceiveThread::setup()
dstHeight_ = videoDecoder_->getHeight(); dstHeight_ = videoDecoder_->getHeight();
} }
// Sink startup auto conf = Manager::instance().getConferenceFromCallID(id_);
if (sink_) { if (!conf)
EXIT_IF_FAIL(sink_->start(), "RX: sink startup failed"); exitConference();
Manager::instance().getVideoControls()->startedDecoding(id_,
sink_->openedName(),
dstWidth_,
dstHeight_);
DEBUG("RX: shm sink <%s> started: size = %dx%d",
sink_->openedName().c_str(), dstWidth_, dstHeight_);
attach(sink_.get());
}
return true; return true;
} }
void VideoReceiveThread::process() void VideoReceiveThread::process()
{ { decodeFrame(); }
decodeFrame();
}
void VideoReceiveThread::cleanup() void VideoReceiveThread::cleanup()
{ {
if (sink_) { Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName());
detach(sink_.get());
sink_->stop();
}
if (videoDecoder_) if (videoDecoder_)
delete videoDecoder_; delete videoDecoder_;
...@@ -218,12 +204,51 @@ bool VideoReceiveThread::decodeFrame() ...@@ -218,12 +204,51 @@ bool VideoReceiveThread::decodeFrame()
return false; return false;
} }
void VideoReceiveThread::setRequestKeyFrameCallback( void VideoReceiveThread::addReceivingDetails(
void (*cb)(const std::string &)) std::map<std::string, std::string> &details)
{
if (isRunning() and dstWidth_ > 0 and dstHeight_ > 0) {
details["VIDEO_SHM_PATH"] = sink_.openedName();
std::ostringstream os;
os << dstWidth_;
details["VIDEO_WIDTH"] = os.str();
os.str("");
os << dstHeight_;
details["VIDEO_HEIGHT"] = os.str();
}
}
void VideoReceiveThread::enterConference()
{
if (!isRunning())
return;
Manager::instance().getVideoControls()->stoppedDecoding(id_, sink_.openedName());
detach(&sink_);
sink_.stop();
}
void VideoReceiveThread::exitConference()
{ {
requestKeyFrameCallback_ = cb; if (!isRunning())
return;
// Sink startup
EXIT_IF_FAIL(sink_.start(), "RX: sink startup failed");
Manager::instance().getVideoControls()->startedDecoding(id_,
sink_.openedName(),
dstWidth_,
dstHeight_);
DEBUG("RX: shm sink <%s> started: size = %dx%d",
sink_.openedName().c_str(), dstWidth_, dstHeight_);
attach(&sink_);
} }
void VideoReceiveThread::setRequestKeyFrameCallback(
void (*cb)(const std::string &))
{ requestKeyFrameCallback_ = cb; }
int VideoReceiveThread::getWidth() const int VideoReceiveThread::getWidth() const
{ return dstWidth_; } { return dstWidth_; }
......
...@@ -49,12 +49,14 @@ class SocketPair; ...@@ -49,12 +49,14 @@ class SocketPair;
class VideoReceiveThread : public VideoGenerator, public SFLThread { class VideoReceiveThread : public VideoGenerator, public SFLThread {
public: public:
VideoReceiveThread(const std::string &id, VideoReceiveThread(const std::string &id,
const std::map<std::string, std::string> &args, const std::map<std::string, std::string> &args);
const std::shared_ptr<SHMSink>& sink);
~VideoReceiveThread(); ~VideoReceiveThread();
void addIOContext(SocketPair &socketPair); void addIOContext(SocketPair &socketPair);
void setRequestKeyFrameCallback(void (*)(const std::string &)); void setRequestKeyFrameCallback(void (*)(const std::string &));
void addReceivingDetails(std::map<std::string, std::string> &details);
void enterConference();
void exitConference();
// as VideoGenerator // as VideoGenerator
int getWidth() const; int getWidth() const;
...@@ -70,13 +72,13 @@ private: ...@@ -70,13 +72,13 @@ private:
/* These variables should be used in thread (i.e. run()) only! */ /* These variables should be used in thread (i.e. run()) only! */
/*-------------------------------------------------------------*/ /*-------------------------------------------------------------*/
VideoDecoder *videoDecoder_; VideoDecoder *videoDecoder_;
const std::shared_ptr<SHMSink> sink_;
int dstWidth_; int dstWidth_;
int dstHeight_; int dstHeight_;
const std::string id_; const std::string id_;
std::istringstream stream_; std::istringstream stream_;
VideoIOHandle sdpContext_; VideoIOHandle sdpContext_;
VideoIOHandle *demuxContext_; VideoIOHandle *demuxContext_;
SHMSink sink_;
void (*requestKeyFrameCallback_)(const std::string &); void (*requestKeyFrameCallback_)(const std::string &);
void openDecoder(); void openDecoder();
...@@ -84,6 +86,7 @@ private: ...@@ -84,6 +86,7 @@ private:
static int interruptCb(void *ctx); static int interruptCb(void *ctx);
static int readFunction(void *opaque, uint8_t *buf, int buf_size); static int readFunction(void *opaque, uint8_t *buf, int buf_size);
// as SFLThread // as SFLThread
bool setup(); bool setup();
void process(); void process();
......
...@@ -55,7 +55,7 @@ VideoRtpSession::VideoRtpSession(const string &callID, ...@@ -55,7 +55,7 @@ VideoRtpSession::VideoRtpSession(const string &callID,
const map<string, string> &txArgs) : const map<string, string> &txArgs) :
socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs), socketPair_(), sendThread_(), receiveThread_(), txArgs_(txArgs),
rxArgs_(), sending_(false), receiving_(false), callID_(callID), rxArgs_(), sending_(false), receiving_(false), callID_(callID),
videoMixer_(), videoLocal_(), sink_(new SHMSink()) videoMixer_(), videoLocal_()
{} {}
VideoRtpSession::~VideoRtpSession() VideoRtpSession::~VideoRtpSession()
...@@ -131,11 +131,6 @@ void VideoRtpSession::updateDestination(const string &destination, ...@@ -131,11 +131,6 @@ void VideoRtpSession::updateDestination(const string &destination,
void VideoRtpSession::start(int localPort) void VideoRtpSession::start(int localPort)
{ {
std::string curcid = Manager::instance().getCurrentCallId();
videoMixer_ = nullptr;
videoLocal_ = nullptr;
if (not sending_ and not receiving_) if (not sending_ and not receiving_)
return; return;
...@@ -154,28 +149,11 @@ void VideoRtpSession::start(int localPort) ...@@ -154,28 +149,11 @@ void VideoRtpSession::start(int localPort)
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
// Check for video conference mode
auto conf = Manager::instance().getConferenceFromCallID(callID_);
videoLocal_ = videoCtrl->getVideoPreview(); videoLocal_ = videoCtrl->getVideoPreview();
if (not conf and not videoLocal_) { if (sendThread_.get())
ERROR("Sending disabled, no local video"); WARN("Restarting video sender");
sending_ = false;
sendThread_.reset(); sendThread_.reset(new VideoSendThread(callID_, txArgs_, *socketPair_));
} else {
if (conf) {
// setup mixer pipeline
videoMixer_ = conf->getVideoMixer();
if (videoLocal_)
videoLocal_->attach(videoMixer_);
}
if (sendThread_.get())
WARN("Restarting video sender");
sendThread_.reset(new VideoSendThread(callID_, txArgs_,
*socketPair_, videoLocal_,
videoMixer_));
}
} else { } else {
DEBUG("Video sending disabled"); DEBUG("Video sending disabled");
sendThread_.reset(); sendThread_.reset();
...@@ -184,8 +162,7 @@ void VideoRtpSession::start(int localPort) ...@@ -184,8 +162,7 @@ void VideoRtpSession::start(int localPort)
if (receiving_) { if (receiving_) {
if (receiveThread_.get()) if (receiveThread_.get())
WARN("restarting video receiver"); WARN("restarting video receiver");
receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_, receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_));
!videoMixer_?sink_:nullptr));
receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest); receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest);
receiveThread_->addIOContext(*socketPair_); receiveThread_->addIOContext(*socketPair_);
receiveThread_->start(); receiveThread_->start();
...@@ -193,22 +170,35 @@ void VideoRtpSession::start(int localPort) ...@@ -193,22 +170,35 @@ void VideoRtpSession::start(int localPort)
DEBUG("Video receiving disabled"); DEBUG("Video receiving disabled");
receiveThread_.reset(); receiveThread_.reset();
} }
// Setup pipeline
if (videoMixer_) {
// We are in conference mode
if (sendThread_) {
videoLocal_->detach(sendThread_.get());
videoMixer_->attach(sendThread_.get());
}
if (receiveThread_) {
receiveThread_->enterConference();
receiveThread_->attach(videoMixer_);
}
} else {
if (sendThread_)
videoLocal_->attach(sendThread_.get());
}
} }
void VideoRtpSession::stop() void VideoRtpSession::stop()
{ {
Manager::instance().getVideoControls()->stoppedDecoding(callID_, if (videoLocal_)
sink_->openedName()); videoLocal_->detach(sendThread_.get());
if (videoLocal_ and videoMixer_) {
videoLocal_->detach(videoMixer_); if (videoMixer_) {
videoMixer_->detach(sink_.get()); videoMixer_->detach(sendThread_.get());
} else if (videoLocal_) receiveThread_->detach(videoMixer_);
videoLocal_->detach(sink_.get()); }
else if (videoMixer_)
videoMixer_->detach(sink_.get());
videoLocal_ = nullptr;
videoMixer_ = nullptr;
if (socketPair_.get()) if (socketPair_.get())
socketPair_->interrupt(); socketPair_->interrupt();
...@@ -226,15 +216,40 @@ void VideoRtpSession::forceKeyFrame() ...@@ -226,15 +216,40 @@ void VideoRtpSession::forceKeyFrame()
void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details) void VideoRtpSession::addReceivingDetails(std::map<std::string, std::string> &details)
{ {
if (receiveThread_.get()) { if (receiveThread_)
details["VIDEO_SHM_PATH"] = sink_->openedName(); receiveThread_->addReceivingDetails(details);
std::ostringstream os; }
os << receiveThread_->getWidth();
details["VIDEO_WIDTH"] = os.str(); void VideoRtpSession::enterConference(Conference *conf)
os.str(""); {
os << receiveThread_->getHeight(); auto mixer = conf->getVideoMixer();
details["VIDEO_HEIGHT"] = os.str(); if (!mixer) {
ERROR("No conference mixer!");
return;
} }
/* Detach from a possible previous conference */
exitConference();
videoMixer_ = mixer; // catched during start()
}
void VideoRtpSession::exitConference()
{
if (videoMixer_) {
if (sendThread_)
videoMixer_->detach(sendThread_.get());
if (receiveThread_) {
receiveThread_->detach(videoMixer_);
receiveThread_->exitConference();
}
videoMixer_ = nullptr;
}
if (videoLocal_)
videoLocal_->attach(sendThread_.get());
} }
} // end namespace sfl_video } // end namespace sfl_video
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "video_base.h" #include "video_base.h"
#include "video_mixer.h" #include "video_mixer.h"
#include "shm_sink.h" #include "conference.h"
#include "noncopyable.h" #include "noncopyable.h"
#include <string> #include <string>
...@@ -64,6 +64,8 @@ public: ...@@ -64,6 +64,8 @@ public:
void addReceivingDetails(std::map<std::string, std::string> &details); void addReceivingDetails(std::map<std::string, std::string> &details);
void bindMixer(VideoMixer* mixer); void bindMixer(VideoMixer* mixer);
void unbindMixer(); void unbindMixer();
void enterConference(Conference *conf);
void exitConference();
private: private:
NON_COPYABLE(VideoRtpSession); NON_COPYABLE(VideoRtpSession);
...@@ -78,7 +80,6 @@ private: ...@@ -78,7 +80,6 @@ private:
const std::string callID_; const std::string callID_;
VideoMixer* videoMixer_; VideoMixer* videoMixer_;
VideoFrameActiveWriter *videoLocal_; VideoFrameActiveWriter *videoLocal_;
std::shared_ptr<SHMSink> sink_;
}; };
} }
......
...@@ -46,35 +46,16 @@ using std::string; ...@@ -46,35 +46,16 @@ using std::string;
VideoSendThread::VideoSendThread(const std::string &id, VideoSendThread::VideoSendThread(const std::string &id,
const std::map<string, string> &args, const std::map<string, string> &args,
SocketPair& socketPair, SocketPair& socketPair) :
VideoFrameActiveWriter *local_video,
VideoFrameActiveWriter *mixer) :
args_(args) args_(args)
, id_(id) , id_(id)
, videoEncoder_() , videoEncoder_()
, videoSource_(local_video)
, forceKeyFrame_(0) , forceKeyFrame_(0)
, frameNumber_(0) , frameNumber_(0)
, muxContext_(socketPair.getIOContext()) , muxContext_(socketPair.getIOContext())
, sdp_() , sdp_()
{ {
if (setup()) { setup();
// do video pipeline
if (mixer) {
videoSource_ = mixer;
static_cast<VideoMixer*>(mixer)->setDimensions(
videoEncoder_->getWidth(),
videoEncoder_->getHeight());
}
if (videoSource_)
videoSource_->attach(this);
}
}
VideoSendThread::~VideoSendThread()
{
if (videoSource_)
videoSource_->detach(this);
} }
bool VideoSendThread::setup() bool VideoSendThread::setup()
...@@ -142,7 +123,9 @@ void VideoSendThread::encodeAndSendVideo(VideoFrame& input_frame) ...@@ -142,7 +123,9 @@ void VideoSendThread::encodeAndSendVideo(VideoFrame& input_frame)
} }
void VideoSendThread::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p) void VideoSendThread::update(Observable<VideoFrameSP>* obs, VideoFrameSP& frame_p)
{ encodeAndSendVideo(*frame_p); } {
encodeAndSendVideo(*frame_p);
}
void VideoSendThread::forceKeyFrame() void VideoSendThread::forceKeyFrame()
{ atomic_increment(&forceKeyFrame_); } { atomic_increment(&forceKeyFrame_); }
......
...@@ -49,10 +49,7 @@ class VideoSendThread : public VideoFramePassiveReader ...@@ -49,10 +49,7 @@ class VideoSendThread : public VideoFramePassiveReader
public: public:
VideoSendThread(const std::string &id, VideoSendThread(const std::string &id,
const std::map<std::string, std::string> &args, const std::map<std::string, std::string> &args,
SocketPair& socketPair, SocketPair& socketPair);
VideoFrameActiveWriter *local_video,
VideoFrameActiveWriter *mixer);
virtual ~VideoSendThread();
std::string getSDP() const { return sdp_; } std::string getSDP() const { return sdp_; }
void forceKeyFrame(); void forceKeyFrame();
...@@ -69,7 +66,6 @@ private: ...@@ -69,7 +66,6 @@ private:
const std::string &id_; const std::string &id_;
VideoEncoder *videoEncoder_; VideoEncoder *videoEncoder_;
VideoFrameActiveWriter *videoSource_;
int forceKeyFrame_; int forceKeyFrame_;
int frameNumber_; int frameNumber_;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!