diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp index b81518b2ee4e0e4cc9e9bac6b6c602ebe8c00275..7becf002a0265f3f0b283fe6c2a306ef40fa3a99 100644 --- a/src/media/media_recorder.cpp +++ b/src/media/media_recorder.cpp @@ -165,8 +165,26 @@ MediaRecorder::startRecording() // start thread after isRecording_ is set to true dht::ThreadPool::computation().run([rec = shared_from_this()] { while (rec->isRecording()) { - rec->filterAndEncode(rec->videoFilter_.get(), rec->videoIdx_); - rec->filterAndEncode(rec->audioFilter_.get(), rec->audioIdx_); + std::shared_ptr<MediaFrame> frame; + // get frame from queue + { + std::unique_lock<std::mutex> lk(rec->mutexFrameBuff_); + rec->cv_.wait(lk, [rec]{ return rec->interrupted_ or not rec->frameBuff_.empty();}); + if (rec->interrupted_) { + break; + } + frame = std::move(rec->frameBuff_.front()); + rec->frameBuff_.pop_front(); + } + try { + // encode frame + if (frame && frame->pointer()) { + bool isVideo = (frame->pointer()->width > 0 && frame->pointer()->height > 0); + rec->encoder_->encode(frame->pointer(), isVideo ? rec->videoIdx_ : rec->audioIdx_); + } + } catch (const MediaEncoderException& e) { + JAMI_ERR() << "Failed to record frame: " << e.what(); + } } rec->flush(); rec->reset(); // allows recorder to be reused in same call @@ -178,6 +196,8 @@ MediaRecorder::startRecording() void MediaRecorder::stopRecording() { + interrupted_ = true; + cv_.notify_all(); if (isRecording_) { JAMI_DBG() << "Stop recording '" << getPath() << "'"; isRecording_ = false; @@ -229,13 +249,15 @@ MediaRecorder::onFrame(const std::string& name, const std::shared_ptr<MediaFrame std::unique_ptr<MediaFrame> clone; const auto& ms = streams_[name]->info; if (ms.isVideo) { -#ifdef RING_ACCEL - clone = video::HardwareAccel::transferToMainMemory(*std::static_pointer_cast<VideoFrame>(frame), - static_cast<AVPixelFormat>(ms.format)); -#else - clone = std::make_unique<MediaFrame>(); - clone->copyFrom(*frame); -#endif + auto desc = av_pix_fmt_desc_get((AVPixelFormat)(std::static_pointer_cast<VideoFrame>(frame))->format()); + if (desc && (desc->flags & AV_PIX_FMT_FLAG_HWACCEL)) { + clone = video::HardwareAccel::transferToMainMemory(*std::static_pointer_cast<VideoFrame>(frame), + static_cast<AVPixelFormat>(ms.format)); + } + else { + clone = std::make_unique<MediaFrame>(); + clone->copyFrom(*frame); + } } else { clone = std::make_unique<MediaFrame>(); clone->copyFrom(*frame); @@ -247,10 +269,24 @@ MediaRecorder::onFrame(const std::string& name, const std::shared_ptr<MediaFrame #else clone->pointer()->pts -= ms.firstTimestamp; #endif - if (ms.isVideo) + + std::unique_ptr<MediaFrame> filteredFrame; + if (ms.isVideo) { + std::lock_guard<std::mutex> lk(mutexFilterVideo_); videoFilter_->feedInput(clone->pointer(), name); - else + filteredFrame = videoFilter_->readOutput(); + } + else { + std::lock_guard<std::mutex> lk(mutexFilterAudio_); audioFilter_->feedInput(clone->pointer(), name); + filteredFrame = audioFilter_->readOutput(); + } + + if (filteredFrame) { + std::lock_guard<std::mutex> lk(mutexFrameBuff_); + frameBuff_.emplace_back(std::move(filteredFrame)); + cv_.notify_one(); + } } int @@ -489,19 +525,25 @@ MediaRecorder::buildAudioFilter(const std::vector<MediaStream>& peers, const Med void MediaRecorder::flush() { - std::lock_guard<std::mutex> lk(mutex_); - if (videoFilter_) + if (videoFilter_) { + std::lock_guard<std::mutex> lk(mutexFilterVideo_); videoFilter_->flush(); - if (audioFilter_) + } + + if (audioFilter_) { + std::lock_guard<std::mutex> lk(mutexFilterAudio_); audioFilter_->flush(); - filterAndEncode(videoFilter_.get(), videoIdx_); - filterAndEncode(audioFilter_.get(), audioIdx_); + } encoder_->flush(); } void MediaRecorder::reset() { + { + std::lock_guard<std::mutex> lk(mutexFrameBuff_); + frameBuff_.clear(); + } streams_.clear(); videoIdx_ = audioIdx_ = -1; audioOnly_ = false; @@ -510,19 +552,4 @@ MediaRecorder::reset() encoder_.reset(); } -void -MediaRecorder::filterAndEncode(MediaFilter* filter, int streamIdx) -{ - if (filter && streamIdx >= 0) { - while (auto frame = filter->readOutput()) { - try { - std::lock_guard<std::mutex> lk(mutex_); - encoder_->encode(frame->pointer(), streamIdx); - } catch (const MediaEncoderException& e) { - JAMI_ERR() << "Failed to record frame: " << e.what(); - } - } - } -} - } // namespace jami diff --git a/src/media/media_recorder.h b/src/media/media_recorder.h index 8328635ebf6cc32c2e83ca49445e26ba939298ac..3cb941a0309fcaec070fd4d1e906d57f1a8e2ecb 100644 --- a/src/media/media_recorder.h +++ b/src/media/media_recorder.h @@ -35,6 +35,8 @@ #include <stdexcept> #include <string> #include <utility> +#include <condition_variable> +#include <atomic> namespace jami { @@ -129,7 +131,9 @@ private: MediaStream setupAudioOutput(); std::string buildAudioFilter(const std::vector<MediaStream>& peers, const MediaStream& local) const; - std::mutex mutex_; // protect against concurrent file writes + std::mutex mutexFrameBuff_; + std::mutex mutexFilterVideo_; + std::mutex mutexFilterAudio_; std::map<std::string, std::unique_ptr<StreamObserver>> streams_; @@ -150,7 +154,10 @@ private: bool isRecording_ = false; bool audioOnly_ = false; - void filterAndEncode(MediaFilter* filter, int streamIdx); + std::condition_variable cv_; + std::atomic_bool interrupted_ {false}; + + std::list<std::shared_ptr<MediaFrame>> frameBuff_; }; }; // namespace jami