Commit c17099ae authored by Philippe Gorley's avatar Philippe Gorley

recorder: improve multithreading

Better scopes mutices, adds multithreading to MediaFilter, improves
multithreading for MediaRecorder, decreases lag between both video feeds
while recording.

Change-Id: I9f96c1e90d506a8dc4a261a7e07010d42e5bf3ec
parent 20c6d705
...@@ -31,6 +31,7 @@ extern "C" { ...@@ -31,6 +31,7 @@ extern "C" {
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <thread>
namespace ring { namespace ring {
...@@ -67,6 +68,8 @@ MediaFilter::initialize(const std::string& filterDesc, std::vector<MediaStream> ...@@ -67,6 +68,8 @@ MediaFilter::initialize(const std::string& filterDesc, std::vector<MediaStream>
if (!graph_) if (!graph_)
return fail("Failed to allocate filter graph", AVERROR(ENOMEM)); return fail("Failed to allocate filter graph", AVERROR(ENOMEM));
graph_->nb_threads = std::max(1u, std::min(8u, std::thread::hardware_concurrency()/2));
AVFilterInOut* in; AVFilterInOut* in;
AVFilterInOut* out; AVFilterInOut* out;
if ((ret = avfilter_graph_parse2(graph_, desc_.c_str(), &in, &out)) < 0) if ((ret = avfilter_graph_parse2(graph_, desc_.c_str(), &in, &out)) < 0)
......
...@@ -37,6 +37,8 @@ extern "C" { ...@@ -37,6 +37,8 @@ extern "C" {
namespace ring { namespace ring {
static constexpr auto FRAME_DEQUEUE_INTERVAL = std::chrono::milliseconds(200);
MediaRecorder::MediaRecorder() MediaRecorder::MediaRecorder()
: loop_([]{ return true;}, : loop_([]{ return true;},
[this]{ process(); }, [this]{ process(); },
...@@ -139,13 +141,14 @@ MediaRecorder::startRecording() ...@@ -139,13 +141,14 @@ MediaRecorder::startRecording()
if (!frames_.empty()) { if (!frames_.empty()) {
RING_WARN() << "Frame queue not empty at beginning of recording, frames will be lost"; RING_WARN() << "Frame queue not empty at beginning of recording, frames will be lost";
while (!frames_.empty()) std::lock_guard<std::mutex> q(qLock_);
while (!frames_.empty()) {
auto f = frames_.front();
av_frame_unref(f.frame);
frames_.pop(); frames_.pop();
}
} }
if (!loop_.isRunning())
loop_.start();
encoder_.reset(new MediaEncoder); encoder_.reset(new MediaEncoder);
RING_DBG() << "Start recording '" << getPath() << "'"; RING_DBG() << "Start recording '" << getPath() << "'";
...@@ -161,6 +164,11 @@ MediaRecorder::stopRecording() ...@@ -161,6 +164,11 @@ MediaRecorder::stopRecording()
flush(); flush();
} }
isRecording_ = false; isRecording_ = false;
if (loop_.isRunning())
loop_.join();
videoFilter_.reset();
audioFilter_.reset();
encoder_.reset();
} }
int int
...@@ -200,8 +208,11 @@ MediaRecorder::recordData(AVFrame* frame, bool isVideo, bool fromPeer) ...@@ -200,8 +208,11 @@ MediaRecorder::recordData(AVFrame* frame, bool isVideo, bool fromPeer)
AVFrame* input = av_frame_clone(frame); AVFrame* input = av_frame_clone(frame);
input->pts = input->pts - ms.firstTimestamp; // stream has to start at 0 input->pts = input->pts - ms.firstTimestamp; // stream has to start at 0
std::lock_guard<std::mutex> q(qLock_); {
frames_.emplace(input, isVideo, fromPeer); std::lock_guard<std::mutex> q(qLock_);
frames_.emplace(input, isVideo, fromPeer);
}
loop_.interrupt();
return 0; return 0;
} }
...@@ -253,8 +264,6 @@ MediaRecorder::initRecord() ...@@ -253,8 +264,6 @@ MediaRecorder::initRecord()
encoderOptions["channels"] = std::to_string(audioStream.nbChannels); encoderOptions["channels"] = std::to_string(audioStream.nbChannels);
} }
std::lock_guard<std::mutex> lk(mutex_); // lock as late as possible
encoder_->openFileOutput(getPath(), encoderOptions); encoder_->openFileOutput(getPath(), encoderOptions);
if (nbReceivedVideoStreams_ > 0) { if (nbReceivedVideoStreams_ > 0) {
...@@ -283,6 +292,11 @@ MediaRecorder::initRecord() ...@@ -283,6 +292,11 @@ MediaRecorder::initRecord()
isReady_ = audioIsReady && videoIsReady; isReady_ = audioIsReady && videoIsReady;
if (isReady_) { if (isReady_) {
std::lock_guard<std::mutex> lk(mutex_); // lock as late as possible
if (!loop_.isRunning())
loop_.start();
std::unique_ptr<MediaIOHandle> ioHandle; std::unique_ptr<MediaIOHandle> ioHandle;
try { try {
encoder_->setIOContext(ioHandle); encoder_->setIOContext(ioHandle);
...@@ -431,8 +445,8 @@ MediaRecorder::emptyFilterGraph() ...@@ -431,8 +445,8 @@ MediaRecorder::emptyFilterGraph()
int int
MediaRecorder::sendToEncoder(AVFrame* frame, int streamIdx) MediaRecorder::sendToEncoder(AVFrame* frame, int streamIdx)
{ {
std::lock_guard<std::mutex> lk(mutex_);
try { try {
std::lock_guard<std::mutex> lk(mutex_);
encoder_->encode(frame, streamIdx); encoder_->encode(frame, streamIdx);
} catch (const MediaEncoderException& e) { } catch (const MediaEncoderException& e) {
RING_ERR() << "MediaEncoderException: " << e.what(); RING_ERR() << "MediaEncoderException: " << e.what();
...@@ -449,8 +463,6 @@ MediaRecorder::flush() ...@@ -449,8 +463,6 @@ MediaRecorder::flush()
if (!isRecording_ || encoder_->getStreamCount() <= 0) if (!isRecording_ || encoder_->getStreamCount() <= 0)
return 0; return 0;
emptyFilterGraph();
std::lock_guard<std::mutex> lk(mutex_); std::lock_guard<std::mutex> lk(mutex_);
encoder_->flush(); encoder_->flush();
...@@ -460,21 +472,31 @@ MediaRecorder::flush() ...@@ -460,21 +472,31 @@ MediaRecorder::flush()
void void
MediaRecorder::process() MediaRecorder::process()
{ {
std::lock_guard<std::mutex> q(qLock_); if (!loop_.wait_for(FRAME_DEQUEUE_INTERVAL, [this]{ return !frames_.empty(); }))
if (!isRecording_ || !isReady_ || frames_.empty())
return; return;
auto recframe = frames_.front(); if (loop_.isStopping() || !isRecording_ || !isReady_)
frames_.pop(); return;
AVFrame* input = recframe.frame;
RecordFrame recframe;
{
std::lock_guard<std::mutex> q(qLock_);
if (!frames_.empty()) {
recframe = frames_.front();
frames_.pop();
} else {
return;
}
}
AVFrame* input = recframe.frame;
int streamIdx = (recframe.isVideo ? videoIdx_ : audioIdx_); int streamIdx = (recframe.isVideo ? videoIdx_ : audioIdx_);
auto filter = (recframe.isVideo ? videoFilter_.get() : audioFilter_.get()); auto filter = (recframe.isVideo ? videoFilter_.get() : audioFilter_.get());
if (streamIdx < 0) { if (streamIdx < 0) {
RING_ERR() << "Specified stream is invalid: " RING_ERR() << "Specified stream is invalid: "
<< (recframe.fromPeer ? "remote " : "local ") << (recframe.fromPeer ? "remote " : "local ")
<< (recframe.isVideo ? "video" : "audio"); << (recframe.isVideo ? "video" : "audio");
av_frame_free(&input); av_frame_unref(input);
return; return;
} }
......
...@@ -117,13 +117,14 @@ class MediaRecorder { ...@@ -117,13 +117,14 @@ class MediaRecorder {
AVFrame* frame; AVFrame* frame;
bool isVideo; bool isVideo;
bool fromPeer; bool fromPeer;
RecordFrame() {}
RecordFrame(AVFrame* f, bool v, bool p) RecordFrame(AVFrame* f, bool v, bool p)
: frame(f) : frame(f)
, isVideo(v) , isVideo(v)
, fromPeer(p) , fromPeer(p)
{} {}
}; };
ThreadLoop loop_; InterruptedThreadLoop loop_;
void process(); void process();
std::mutex qLock_; std::mutex qLock_;
std::queue<RecordFrame> frames_; std::queue<RecordFrame> frames_;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment