Skip to content
Snippets Groups Projects
Commit 4a0bf2e4 authored by Philippe Gorley's avatar Philippe Gorley Committed by Adrien Béraud
Browse files

recorder: refactor and simplify

Removes need for frame queue, as there's already an internal queue in
libavfilter. Once a frame is received, feed it directly to the filter.

The recorder's thread now only needs to send filtered frames to the
encoder.

Change-Id: I37bfa6e61f492cec120989fe210e92752e63466f
parent c04704aa
Branches
Tags
No related merge requests found
......@@ -108,16 +108,6 @@ MediaRecorder::startRecording()
std::time_t t = std::time(nullptr);
startTime_ = *std::localtime(&t);
if (!frames_.empty()) {
RING_WARN() << "Frame queue not empty at beginning of recording, frames will be lost";
std::lock_guard<std::mutex> q(qLock_);
while (!frames_.empty()) {
auto f = frames_.front();
av_frame_unref(f.frame);
frames_.pop_front();
}
}
encoder_.reset(new MediaEncoder);
RING_DBG() << "Start recording '" << getPath() << "'";
......@@ -169,50 +159,36 @@ MediaRecorder::addStream(const MediaStream& ms)
void
MediaRecorder::update(Observable<std::shared_ptr<AudioFrame>>* ob, const std::shared_ptr<AudioFrame>& a)
{
if (!isRecording_ || !loop_.isRunning())
return;
std::string name;
if (dynamic_cast<AudioReceiveThread*>(ob))
name = "a:remote";
else // ob is of type AudioInput*
name = "a:local";
recordData(a->pointer(), streams_[name]);
// copy frame to not mess with the original frame's pts
AudioFrame clone;
clone.copyFrom(*a);
clone.pointer()->pts -= streams_[name].firstTimestamp;
audioFilter_->feedInput(clone.pointer(), name);
loop_.interrupt();
}
void MediaRecorder::update(Observable<std::shared_ptr<VideoFrame>>* ob, const std::shared_ptr<VideoFrame>& v)
{
if (!isRecording_ || !loop_.isRunning())
return;
std::string name;
if (dynamic_cast<video::VideoReceiveThread*>(ob))
name = "v:remote";
else // ob is of type VideoInput*
name = "v:local";
recordData(v->pointer(), streams_[name]);
}
int
MediaRecorder::recordData(AVFrame* frame, const MediaStream& ms)
{
// recorder may be recording, but not ready for the first frames
if (!isRecording_ || !loop_.isRunning())
return 0;
const auto& params = streams_.at(ms.name);
// save a copy of the frame, will be filtered/encoded in another thread
AVFrame* input = av_frame_clone(frame);
if (!input) {
RING_ERR() << "Could not record data (failed to copy frame)";
return -1;
}
input->pts = input->pts - params.firstTimestamp; // stream has to start at 0
bool fromPeer = params.name.find("remote") != std::string::npos;
{
std::lock_guard<std::mutex> q(qLock_);
frames_.emplace_back(input, params.isVideo, fromPeer);
}
// copy frame to not mess with the original frame's pts
VideoFrame clone;
clone.copyFrom(*v);
clone.pointer()->pts -= streams_[name].firstTimestamp;
videoFilter_->feedInput(clone.pointer(), name);
loop_.interrupt();
return 0;
}
int
......@@ -458,7 +434,17 @@ MediaRecorder::buildAudioFilter(const std::vector<MediaStream>& peers, const Med
}
void
MediaRecorder::emptyFilterGraph()
MediaRecorder::flush()
{
if (!isRecording_ || encoder_->getStreamCount() <= 0)
return;
std::lock_guard<std::mutex> lk(mutex_);
encoder_->flush();
}
void
MediaRecorder::process()
{
AVFrame* output;
if (videoIdx_ >= 0 && videoFilter_)
......@@ -469,7 +455,7 @@ MediaRecorder::emptyFilterGraph()
sendToEncoder(output, audioIdx_);
}
int
void
MediaRecorder::sendToEncoder(AVFrame* frame, int streamIdx)
{
try {
......@@ -477,82 +463,8 @@ MediaRecorder::sendToEncoder(AVFrame* frame, int streamIdx)
encoder_->encode(frame, streamIdx);
} catch (const MediaEncoderException& e) {
RING_ERR() << "MediaEncoderException: " << e.what();
av_frame_unref(frame);
return -1;
}
av_frame_unref(frame);
return 0;
}
int
MediaRecorder::flush()
{
if (!isRecording_ || encoder_->getStreamCount() <= 0)
return 0;
std::lock_guard<std::mutex> lk(mutex_);
encoder_->flush();
return 0;
}
void
MediaRecorder::process()
{
// wait until frames_ is not empty or until we are no longer recording
loop_.wait([this]{ return !frames_.empty(); });
// if we exited because we stopped recording, stop our thread
if (loop_.isStopping())
return;
// else encode a frame
RecordFrame recframe;
{
std::lock_guard<std::mutex> q(qLock_);
if (!frames_.empty()) {
recframe = frames_.front();
frames_.pop_front();
} else {
return;
}
}
AVFrame* input = recframe.frame;
int streamIdx = (recframe.isVideo ? videoIdx_ : audioIdx_);
auto filter = (recframe.isVideo ? videoFilter_.get() : audioFilter_.get());
if (streamIdx < 0) {
RING_ERR() << "Specified stream is invalid: "
<< (recframe.fromPeer ? "remote " : "local ")
<< (recframe.isVideo ? "video" : "audio");
av_frame_unref(input);
return;
}
auto it = std::find_if(streams_.cbegin(), streams_.cend(), [recframe](const auto& pair){
return pair.second.isVideo == recframe.isVideo &&
recframe.fromPeer == (pair.second.name.find("remote") != std::string::npos);
});
if (it == streams_.cend()) {
RING_ERR() << "Specified stream could not be found: "
<< (recframe.fromPeer ? "remote " : "local ")
<< (recframe.isVideo ? "video" : "audio");
av_frame_unref(input);
return;
}
auto ms = it->second;
// get filter input name if frame needs filtering
std::string inputName = ms.name;
emptyFilterGraph();
if (filter) {
filter->feedInput(input, inputName);
}
av_frame_free(&input);
av_frame_free(&frame);
}
} // namespace ring
......@@ -40,8 +40,6 @@
#include <string>
#include <utility>
struct AVFrame;
namespace ring {
class MediaRecorder : public Observer<std::shared_ptr<AudioFrame>>
......@@ -112,8 +110,7 @@ public:
private:
NON_COPYABLE(MediaRecorder);
int recordData(AVFrame* frame, const MediaStream& ms);
int flush();
void flush();
int initRecord();
MediaStream setupVideoOutput();
......@@ -141,23 +138,9 @@ private:
bool isRecording_ = false;
bool audioOnly_ = false;
struct RecordFrame {
AVFrame* frame;
bool isVideo;
bool fromPeer;
RecordFrame() {}
RecordFrame(AVFrame* f, bool v, bool p)
: frame(f)
, isVideo(v)
, fromPeer(p)
{}
};
InterruptedThreadLoop loop_;
void process();
void emptyFilterGraph();
int sendToEncoder(AVFrame* frame, int streamIdx);
std::mutex qLock_;
std::deque<RecordFrame> frames_;
void sendToEncoder(AVFrame* frame, int streamIdx);
};
}; // namespace ring
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment