From 11585ed297d77e11f0723e96d9e1a126fbfe395c Mon Sep 17 00:00:00 2001 From: philippegorley <philippe.gorley@savoirfairelinux.com> Date: Wed, 11 Jul 2018 15:27:09 -0400 Subject: [PATCH] recorder: refactor pipeline Moves the recorder up one level to the VideoInput, VideoReceiveThread and AudioReceiveThread, instead of the MediaDecoder (there's no equivalent to the VideoInput in the audio layer). Emits the RecordPlaybackStopped when the recording is stopped, so the client can sync its recording state with the daemon, in case the daemon stops recording by itself (rather than user intervention). Change-Id: I743b080cb354273ec074fec51caf2a4328fc1c58 --- src/media/audio/audio_rtp_session.cpp | 23 +++++++++++- src/media/media_decoder.cpp | 47 +++++------------------- src/media/media_decoder.h | 8 ++-- src/media/media_encoder.cpp | 8 +++- src/media/media_recorder.cpp | 4 +- src/media/video/video_input.cpp | 23 ++++++++++-- src/media/video/video_input.h | 3 ++ src/media/video/video_receive_thread.cpp | 23 ++++++++++-- src/media/video/video_receive_thread.h | 3 ++ 9 files changed, 89 insertions(+), 53 deletions(-) diff --git a/src/media/audio/audio_rtp_session.cpp b/src/media/audio/audio_rtp_session.cpp index fb98bb2da6..4539793540 100644 --- a/src/media/audio/audio_rtp_session.cpp +++ b/src/media/audio/audio_rtp_session.cpp @@ -233,6 +233,9 @@ class AudioReceiveThread void openDecoder(); bool decodeFrame(); + std::weak_ptr<MediaRecorder> recorder_; + bool recordingStarted_{false}; + /*-----------------------------------------------------------------*/ /* These variables should be used in thread (i.e. process()) only! */ /*-----------------------------------------------------------------*/ @@ -273,6 +276,8 @@ AudioReceiveThread::AudioReceiveThread(const std::string& id, AudioReceiveThread::~AudioReceiveThread() { + if (auto rec = recorder_.lock()) + rec->stopRecording(); loop_.join(); } @@ -309,6 +314,20 @@ AudioReceiveThread::process() switch (audioDecoder_->decode(decodedFrame)) { case MediaDecoder::Status::FrameFinished: + if (auto rec = recorder_.lock()) { + if (!recordingStarted_) { + if (rec->addStream(false, true, audioDecoder_->getStream()) >= 0) { + recordingStarted_ = true; + } else { + recorder_ = std::weak_ptr<MediaRecorder>(); + } + } + if (recordingStarted_) + rec->recordData(decodedFrame.pointer(), false, true); + } else { + recordingStarted_ = false; + recorder_ = std::weak_ptr<MediaRecorder>(); + } audioDecoder_->writeToRingBuffer(decodedFrame, *ringbuffer_, mainBuffFormat); // Refresh the remote audio codec in the callback SmartInfo @@ -384,8 +403,8 @@ AudioReceiveThread::startLoop() void AudioReceiveThread::initRecorder(std::shared_ptr<MediaRecorder>& rec) { - if (audioDecoder_) - audioDecoder_->initRecorder(rec); + rec->incrementStreams(1); + recorder_ = rec; } AudioRtpSession::AudioRtpSession(const std::string& id) diff --git a/src/media/media_decoder.cpp b/src/media/media_decoder.cpp index 373eff8289..96678dc519 100644 --- a/src/media/media_decoder.cpp +++ b/src/media/media_decoder.cpp @@ -28,7 +28,6 @@ #include "audio/resampler.h" #include "decoder_finder.h" #include "manager.h" -#include "media_recorder.h" #ifdef RING_ACCEL #include "video/accel.h" @@ -60,8 +59,6 @@ MediaDecoder::MediaDecoder() : MediaDecoder::~MediaDecoder() { - if (auto rec = recorder_.lock()) - rec->stopRecording(); #ifdef RING_ACCEL if (decoderCtx_ && decoderCtx_->hw_device_ctx) av_buffer_unref(&decoderCtx_->hw_device_ctx); @@ -293,20 +290,7 @@ MediaDecoder::decode(VideoFrame& result) frame->pts = av_rescale_q_rnd(av_gettime() - startTime_, {1, AV_TIME_BASE}, decoderCtx_->time_base, static_cast<AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX)); - - if (auto rec = recorder_.lock()) { - bool fromPeer = (inputCtx_->iformat->name == std::string("sdp")); - if (!recordingStarted_) { - auto ms = MediaStream("", decoderCtx_, frame->pts); - ms.format = frame->format; // might not match avStream_ if accel is used - if (rec->addStream(true, fromPeer, ms) >= 0) - recordingStarted_ = true; - else - recorder_ = std::weak_ptr<MediaRecorder>(); - } - if (recordingStarted_) - rec->recordData(frame, true, fromPeer); - } + lastTimestamp_ = frame->pts; if (emulateRate_ and packetTimestamp != AV_NOPTS_VALUE) { auto frame_time = getTimeBase()*(packetTimestamp - avStream_->start_time); @@ -365,18 +349,7 @@ MediaDecoder::decode(const AudioFrame& decodedFrame) // NOTE don't use clock to rescale audio pts, it may create artifacts frame->pts = av_rescale_q_rnd(frame->pts, avStream_->time_base, decoderCtx_->time_base, static_cast<AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX)); - - if (auto rec = recorder_.lock()) { - if (!recordingStarted_) { - auto ms = MediaStream("", decoderCtx_, frame->pts); - if (rec->addStream(false, true, ms) >= 0) - recordingStarted_ = true; - else - recorder_ = std::weak_ptr<MediaRecorder>(); - } - if (recordingStarted_) - rec->recordData(frame, false, true); - } + lastTimestamp_ = frame->pts; if (emulateRate_ and packetTimestamp != AV_NOPTS_VALUE) { auto frame_time = getTimeBase()*(packetTimestamp - avStream_->start_time); @@ -525,15 +498,15 @@ MediaDecoder::correctPixFmt(int input_pix_fmt) { return pix_fmt; } -void -MediaDecoder::initRecorder(std::shared_ptr<MediaRecorder>& rec) +MediaStream +MediaDecoder::getStream() const { - // recording will start once we can send an AVPacket to the recorder - recordingStarted_ = false; - recorder_ = rec; - if (auto r = recorder_.lock()) { - r->incrementStreams(1); - } + auto ms = MediaStream("", decoderCtx_, lastTimestamp_); +#ifdef RING_ACCEL + if (decoderCtx_->codec_type == AVMEDIA_TYPE_VIDEO && enableAccel_ && !accel_.name.empty()) + ms.format = AV_PIX_FMT_NV12; // TODO option me! +#endif + return ms; } } // namespace ring diff --git a/src/media/media_decoder.h b/src/media/media_decoder.h index 56ab9c3f4a..f1b4a10c4e 100644 --- a/src/media/media_decoder.h +++ b/src/media/media_decoder.h @@ -34,6 +34,7 @@ #include "audio/audiobuffer.h" +#include "media_stream.h" #include "rational.h" #include "noncopyable.h" @@ -57,7 +58,6 @@ class RingBuffer; class Resampler; class MediaIOHandle; struct DeviceParams; -class MediaRecorder; class MediaDecoder { public: @@ -100,7 +100,7 @@ class MediaDecoder { void enableAccel(bool enableAccel); #endif - void initRecorder(std::shared_ptr<MediaRecorder>& rec); + MediaStream getStream() const; private: NON_COPYABLE(MediaDecoder); @@ -115,6 +115,7 @@ class MediaDecoder { int streamIndex_ = -1; bool emulateRate_ = false; int64_t startTime_; + int64_t lastTimestamp_; AudioBuffer decBuff_; AudioBuffer resamplingBuff_; @@ -131,9 +132,6 @@ class MediaDecoder { unsigned short accelFailures_ = 0; #endif - std::weak_ptr<MediaRecorder> recorder_; - bool recordingStarted_ = false; - protected: AVDictionary *options_ = nullptr; }; diff --git a/src/media/media_encoder.cpp b/src/media/media_encoder.cpp index f90221288c..51b4e56680 100644 --- a/src/media/media_encoder.cpp +++ b/src/media/media_encoder.cpp @@ -446,13 +446,17 @@ int MediaEncoder::encode_audio(const AudioBuffer &buffer) if (auto rec = recorder_.lock()) { if (!recordingStarted_) { auto ms = MediaStream("", encoders_[currentStreamIdx_], frame->pts); - if (rec->addStream(false, false, ms) >= 0) + if (rec->addStream(false, false, ms) >= 0) { recordingStarted_ = true; - else + } else { recorder_ = std::weak_ptr<MediaRecorder>(); + } } if (recordingStarted_) rec->recordData(frame, false, false); + } else { + recordingStarted_ = false; + recorder_ = std::weak_ptr<MediaRecorder>(); } encode(frame, currentStreamIdx_); diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp index 472555f895..f2d63abc11 100644 --- a/src/media/media_recorder.cpp +++ b/src/media/media_recorder.cpp @@ -19,6 +19,7 @@ */ #include "libav_deps.h" // MUST BE INCLUDED FIRST +#include "client/ring_signal.h" #include "fileutils.h" #include "logger.h" #include "media_io_handle.h" @@ -179,6 +180,7 @@ MediaRecorder::stopRecording() isRecording_ = false; loop_.join(); flush(); + emitSignal<DRing::CallSignal::RecordPlaybackStopped>(getPath()); } resetToDefaults(); } @@ -220,7 +222,7 @@ MediaRecorder::recordData(AVFrame* frame, bool isVideo, bool fromPeer) return 0; // save a copy of the frame, will be filtered/encoded in another thread - const MediaStream& ms = streams_[isVideo][fromPeer]; + MediaStream& ms = streams_[isVideo][fromPeer]; AVFrame* input = av_frame_clone(frame); input->pts = input->pts - ms.firstTimestamp; // stream has to start at 0 diff --git a/src/media/video/video_input.cpp b/src/media/video/video_input.cpp index b8570340c2..7542838d4e 100644 --- a/src/media/video/video_input.cpp +++ b/src/media/video/video_input.cpp @@ -63,6 +63,8 @@ VideoInput::VideoInput() VideoInput::~VideoInput() { + if (auto rec = recorder_.lock()) + rec->stopRecording(); #if defined(__ANDROID__) || defined(RING_UWP) || (defined(TARGET_OS_IOS) && TARGET_OS_IOS) /* we need to stop the loop and notify the condition variable * to unblock the process loop */ @@ -206,7 +208,8 @@ bool VideoInput::captureFrame() if (not decoder_) return false; - const auto ret = decoder_->decode(getNewFrame()); + auto& frame = getNewFrame(); + const auto ret = decoder_->decode(frame); switch (ret) { case MediaDecoder::Status::ReadError: return false; @@ -229,6 +232,20 @@ bool VideoInput::captureFrame() return static_cast<bool>(decoder_); case MediaDecoder::Status::FrameFinished: + if (auto rec = recorder_.lock()) { + if (!recordingStarted_) { + if (rec->addStream(true, false, decoder_->getStream()) >= 0) { + recordingStarted_ = true; + } else { + recorder_ = std::weak_ptr<MediaRecorder>(); + } + } + if (recordingStarted_) + rec->recordData(frame.pointer(), true, false); + } else { + recordingStarted_ = false; + recorder_ = std::weak_ptr<MediaRecorder>(); + } publishFrame(); return true; // continue decoding @@ -590,8 +607,8 @@ VideoInput::foundDecOpts(const DeviceParams& params) void VideoInput::initRecorder(std::shared_ptr<MediaRecorder>& rec) { - if (decoder_) - decoder_->initRecorder(rec); + rec->incrementStreams(1); + recorder_ = rec; } }} // namespace ring::video diff --git a/src/media/video/video_input.h b/src/media/video/video_input.h index 4561b4e83b..a5871cceda 100644 --- a/src/media/video/video_input.h +++ b/src/media/video/video_input.h @@ -143,6 +143,9 @@ private: void releaseBufferCb(uint8_t* ptr); std::array<struct VideoFrameBuffer, 8> buffers_; #endif + + std::weak_ptr<MediaRecorder> recorder_; + bool recordingStarted_{false}; }; }} // namespace ring::video diff --git a/src/media/video/video_receive_thread.cpp b/src/media/video/video_receive_thread.cpp index e7a83c4358..b03b106c7b 100644 --- a/src/media/video/video_receive_thread.cpp +++ b/src/media/video/video_receive_thread.cpp @@ -56,6 +56,8 @@ VideoReceiveThread::VideoReceiveThread(const std::string& id, VideoReceiveThread::~VideoReceiveThread() { + if (auto rec = recorder_.lock()) + rec->stopRecording(); loop_.join(); } @@ -166,10 +168,25 @@ void VideoReceiveThread::addIOContext(SocketPair& socketPair) bool VideoReceiveThread::decodeFrame() { - const auto ret = videoDecoder_->decode(getNewFrame()); + auto& frame = getNewFrame(); + const auto ret = videoDecoder_->decode(frame); switch (ret) { case MediaDecoder::Status::FrameFinished: + if (auto rec = recorder_.lock()) { + if (!recordingStarted_) { + if (rec->addStream(true, true, videoDecoder_->getStream()) >= 0) { + recordingStarted_ = true; + } else { + recorder_ = std::weak_ptr<MediaRecorder>(); + } + } + if (recordingStarted_) + rec->recordData(frame.pointer(), true, true); + } else { + recordingStarted_ = false; + recorder_ = std::weak_ptr<MediaRecorder>(); + } publishFrame(); return true; @@ -241,8 +258,8 @@ VideoReceiveThread::triggerKeyFrameRequest() void VideoReceiveThread::initRecorder(std::shared_ptr<ring::MediaRecorder>& rec) { - if (videoDecoder_) - videoDecoder_->initRecorder(rec); + rec->incrementStreams(1); + recorder_ = rec; } }} // namespace ring::video diff --git a/src/media/video/video_receive_thread.h b/src/media/video/video_receive_thread.h index 2c884d067b..bc13ccd687 100644 --- a/src/media/video/video_receive_thread.h +++ b/src/media/video/video_receive_thread.h @@ -89,6 +89,9 @@ private: static int interruptCb(void *ctx); static int readFunction(void *opaque, uint8_t *buf, int buf_size); + std::weak_ptr<MediaRecorder> recorder_; + bool recordingStarted_{false}; + ThreadLoop loop_; // used by ThreadLoop -- GitLab