From c17099ae7de5f5a49e2d494f86939aff0223430a Mon Sep 17 00:00:00 2001
From: philippegorley <philippe.gorley@savoirfairelinux.com>
Date: Thu, 28 Jun 2018 15:38:08 -0400
Subject: [PATCH] recorder: improve multithreading

Better scopes mutices, adds multithreading to MediaFilter, improves
multithreading for MediaRecorder, decreases lag between both video feeds
while recording.

Change-Id: I9f96c1e90d506a8dc4a261a7e07010d42e5bf3ec
---
 src/media/media_filter.cpp   |  3 ++
 src/media/media_recorder.cpp | 56 +++++++++++++++++++++++++-----------
 src/media/media_recorder.h   |  3 +-
 3 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/src/media/media_filter.cpp b/src/media/media_filter.cpp
index 57c3b734a5..64e7b5e862 100644
--- a/src/media/media_filter.cpp
+++ b/src/media/media_filter.cpp
@@ -31,6 +31,7 @@ extern "C" {
 #include <functional>
 #include <memory>
 #include <sstream>
+#include <thread>
 
 namespace ring {
 
@@ -67,6 +68,8 @@ MediaFilter::initialize(const std::string& filterDesc, std::vector<MediaStream>
     if (!graph_)
         return fail("Failed to allocate filter graph", AVERROR(ENOMEM));
 
+    graph_->nb_threads = std::max(1u, std::min(8u, std::thread::hardware_concurrency()/2));
+
     AVFilterInOut* in;
     AVFilterInOut* out;
     if ((ret = avfilter_graph_parse2(graph_, desc_.c_str(), &in, &out)) < 0)
diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp
index 5f89b1ce90..fa00536fc6 100644
--- a/src/media/media_recorder.cpp
+++ b/src/media/media_recorder.cpp
@@ -37,6 +37,8 @@ extern "C" {
 
 namespace ring {
 
+static constexpr auto FRAME_DEQUEUE_INTERVAL = std::chrono::milliseconds(200);
+
 MediaRecorder::MediaRecorder()
     : loop_([]{ return true;},
             [this]{ process(); },
@@ -139,13 +141,14 @@ MediaRecorder::startRecording()
 
     if (!frames_.empty()) {
         RING_WARN() << "Frame queue not empty at beginning of recording, frames will be lost";
-        while (!frames_.empty())
+        std::lock_guard<std::mutex> q(qLock_);
+        while (!frames_.empty()) {
+            auto f = frames_.front();
+            av_frame_unref(f.frame);
             frames_.pop();
+        }
     }
 
-    if (!loop_.isRunning())
-        loop_.start();
-
     encoder_.reset(new MediaEncoder);
 
     RING_DBG() << "Start recording '" << getPath() << "'";
@@ -161,6 +164,11 @@ MediaRecorder::stopRecording()
         flush();
     }
     isRecording_ = false;
+    if (loop_.isRunning())
+        loop_.join();
+    videoFilter_.reset();
+    audioFilter_.reset();
+    encoder_.reset();
 }
 
 int
@@ -200,8 +208,11 @@ MediaRecorder::recordData(AVFrame* frame, bool isVideo, bool fromPeer)
     AVFrame* input = av_frame_clone(frame);
     input->pts = input->pts - ms.firstTimestamp; // stream has to start at 0
 
-    std::lock_guard<std::mutex> q(qLock_);
-    frames_.emplace(input, isVideo, fromPeer);
+    {
+        std::lock_guard<std::mutex> q(qLock_);
+        frames_.emplace(input, isVideo, fromPeer);
+    }
+    loop_.interrupt();
     return 0;
 }
 
@@ -253,8 +264,6 @@ MediaRecorder::initRecord()
         encoderOptions["channels"] = std::to_string(audioStream.nbChannels);
     }
 
-    std::lock_guard<std::mutex> lk(mutex_); // lock as late as possible
-
     encoder_->openFileOutput(getPath(), encoderOptions);
 
     if (nbReceivedVideoStreams_ > 0) {
@@ -283,6 +292,11 @@ MediaRecorder::initRecord()
     isReady_ = audioIsReady && videoIsReady;
 
     if (isReady_) {
+        std::lock_guard<std::mutex> lk(mutex_); // lock as late as possible
+
+        if (!loop_.isRunning())
+            loop_.start();
+
         std::unique_ptr<MediaIOHandle> ioHandle;
         try {
             encoder_->setIOContext(ioHandle);
@@ -431,8 +445,8 @@ MediaRecorder::emptyFilterGraph()
 int
 MediaRecorder::sendToEncoder(AVFrame* frame, int streamIdx)
 {
-    std::lock_guard<std::mutex> lk(mutex_);
     try {
+        std::lock_guard<std::mutex> lk(mutex_);
         encoder_->encode(frame, streamIdx);
     } catch (const MediaEncoderException& e) {
         RING_ERR() << "MediaEncoderException: " << e.what();
@@ -449,8 +463,6 @@ MediaRecorder::flush()
     if (!isRecording_ || encoder_->getStreamCount() <= 0)
         return 0;
 
-    emptyFilterGraph();
-
     std::lock_guard<std::mutex> lk(mutex_);
     encoder_->flush();
 
@@ -460,21 +472,31 @@ MediaRecorder::flush()
 void
 MediaRecorder::process()
 {
-    std::lock_guard<std::mutex> q(qLock_);
-    if (!isRecording_ || !isReady_ || frames_.empty())
+    if (!loop_.wait_for(FRAME_DEQUEUE_INTERVAL, [this]{ return !frames_.empty(); }))
         return;
 
-    auto recframe = frames_.front();
-    frames_.pop();
-    AVFrame* input = recframe.frame;
+    if (loop_.isStopping() || !isRecording_ || !isReady_)
+        return;
 
+    RecordFrame recframe;
+    {
+        std::lock_guard<std::mutex> q(qLock_);
+        if (!frames_.empty()) {
+            recframe = frames_.front();
+            frames_.pop();
+        } else {
+            return;
+        }
+    }
+
+    AVFrame* input = recframe.frame;
     int streamIdx = (recframe.isVideo ? videoIdx_ : audioIdx_);
     auto filter = (recframe.isVideo ? videoFilter_.get() : audioFilter_.get());
     if (streamIdx < 0) {
         RING_ERR() << "Specified stream is invalid: "
             << (recframe.fromPeer ? "remote " : "local ")
             << (recframe.isVideo ? "video" : "audio");
-        av_frame_free(&input);
+        av_frame_unref(input);
         return;
     }
 
diff --git a/src/media/media_recorder.h b/src/media/media_recorder.h
index 4c4221523b..9a46fc409d 100644
--- a/src/media/media_recorder.h
+++ b/src/media/media_recorder.h
@@ -117,13 +117,14 @@ class MediaRecorder {
             AVFrame* frame;
             bool isVideo;
             bool fromPeer;
+            RecordFrame() {}
             RecordFrame(AVFrame* f, bool v, bool p)
                 : frame(f)
                 , isVideo(v)
                 , fromPeer(p)
             {}
         };
-        ThreadLoop loop_;
+        InterruptedThreadLoop loop_;
         void process();
         std::mutex qLock_;
         std::queue<RecordFrame> frames_;
-- 
GitLab