From 966fd020e324d7fb6ea2ad60f0adb73fea94d644 Mon Sep 17 00:00:00 2001
From: philippegorley <philippe.gorley@savoirfairelinux.com>
Date: Thu, 14 Jun 2018 09:51:00 -0400
Subject: [PATCH] recorder: parallelize filtering and encoding

Change-Id: Ia2de94e79912ac27da0084daf0d59bd0bcf0220a
Reviewed-by: Sebastien Blin <sebastien.blin@savoirfairelinux.com>
---
 src/media/media_recorder.cpp | 99 +++++++++++++++++++++++++-----------
 src/media/media_recorder.h   | 16 ++++++
 2 files changed, 84 insertions(+), 31 deletions(-)

diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp
index fbb481a337..7a3c5ee9fa 100644
--- a/src/media/media_recorder.cpp
+++ b/src/media/media_recorder.cpp
@@ -38,12 +38,17 @@ extern "C" {
 namespace ring {
 
 MediaRecorder::MediaRecorder()
+    : loop_([]{ return true;},
+            [this]{ process(); },
+            []{})
 {}
 
 MediaRecorder::~MediaRecorder()
 {
     if (isRecording_)
         flush();
+    if (loop_.isRunning())
+        loop_.join();
 }
 
 std::string
@@ -112,6 +117,15 @@ MediaRecorder::startRecording()
     ss << std::put_time(&startTime_, "%Y%m%d-%H%M%S");
     filename_ = ss.str();
 
+    if (!frames_.empty()) {
+        RING_WARN() << "Frame queue not empty at beginning of recording, frames will be lost";
+        while (!frames_.empty())
+            frames_.pop();
+    }
+
+    if (!loop_.isRunning())
+        loop_.start();
+
     encoder_.reset(new MediaEncoder);
 
     RING_DBG() << "Start recording '" << getFilename() << "'";
@@ -158,41 +172,13 @@ MediaRecorder::addStream(bool isVideo, bool fromPeer, MediaStream ms)
 int
 MediaRecorder::recordData(AVFrame* frame, bool isVideo, bool fromPeer)
 {
-    std::lock_guard<std::mutex> lk(mutex_);
     if (!isRecording_ || !isReady_)
         return 0;
 
-    int streamIdx = (isVideo ? videoIdx_ : audioIdx_);
-    auto filter = (isVideo ? videoFilter_.get() : audioFilter_.get());
-    if (streamIdx < 0 || !filter) {
-        RING_ERR() << "Specified stream is invalid: "
-            << (fromPeer ? "remote " : "local ") << (isVideo ? "video" : "audio");
-        return -1;
-    }
-
-    // get filter input name if frame needs filtering
-    std::string inputName;
-    if (isVideo && nbReceivedVideoStreams_ == 2)
-        inputName = (fromPeer ? "v:main" : "v:overlay");
-    if (!isVideo && nbReceivedAudioStreams_ == 2)
-        inputName = (fromPeer ? "a:1" : "a:2");
-
-    // new reference because we are changing the timestamp
+    // save a copy of the frame, will be filtered/encoded in another thread
     AVFrame* input = av_frame_clone(frame);
-    const MediaStream& ms = streams_[isVideo][fromPeer];
-    // stream has to start at 0
-    input->pts = input->pts - ms.firstTimestamp;
-
-    if (inputName.empty()) // #nofilters
-        return sendToEncoder(input, streamIdx);
-
-    // empty filter graph output before sending more frames
-    emptyFilterGraph();
-
-    int err = filter->feedInput(input, inputName);
-    av_frame_unref(input);
-
-    return err;
+    frames_.emplace(input, isVideo, fromPeer);
+    return 0;
 }
 
 int
@@ -437,4 +423,55 @@ MediaRecorder::flush()
     return 0;
 }
 
+void
+MediaRecorder::process()
+{
+    std::lock_guard<std::mutex> lk(mutex_);
+    if (!isRecording_ || !isReady_)
+        return;
+
+    while (!frames_.empty()) {
+        auto f = frames_.front();
+        bool isVideo = f.isVideo;
+        bool fromPeer = f.fromPeer;
+        AVFrame* input = f.frame;
+        frames_.pop();
+
+        int streamIdx = (isVideo ? videoIdx_ : audioIdx_);
+        auto filter = (isVideo ? videoFilter_.get() : audioFilter_.get());
+        if (streamIdx < 0 || !filter) {
+            RING_ERR() << "Specified stream is invalid: "
+                << (fromPeer ? "remote " : "local ") << (isVideo ? "video" : "audio");
+            av_frame_free(&input);
+            continue;
+        }
+
+        // get filter input name if frame needs filtering
+        std::string inputName;
+        if (isVideo && nbReceivedVideoStreams_ == 2)
+            inputName = (fromPeer ? "v:main" : "v:overlay");
+        if (!isVideo && nbReceivedAudioStreams_ == 2)
+            inputName = (fromPeer ? "a:1" : "a:2");
+
+        // new reference because we are changing the timestamp
+        const MediaStream& ms = streams_[isVideo][fromPeer];
+        // stream has to start at 0
+        input->pts = input->pts - ms.firstTimestamp;
+
+        if (inputName.empty()) { // #nofilters
+            if (sendToEncoder(input, streamIdx) < 0) {
+                RING_ERR() << "Filed to encode frame";
+                av_frame_free(&input);
+                continue;
+            }
+        }
+
+        // empty filter graph output before sending more frames
+        emptyFilterGraph();
+
+        filter->feedInput(input, inputName);
+        av_frame_free(&input);
+    }
+}
+
 } // namespace ring
diff --git a/src/media/media_recorder.h b/src/media/media_recorder.h
index 23462ca853..b60cea6f05 100644
--- a/src/media/media_recorder.h
+++ b/src/media/media_recorder.h
@@ -25,10 +25,12 @@
 #include "media_filter.h"
 #include "media_stream.h"
 #include "noncopyable.h"
+#include "threadloop.h"
 
 #include <map>
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <stdexcept>
 #include <string>
 #include <utility>
@@ -102,6 +104,20 @@ class MediaRecorder {
         bool isRecording_ = false;
         bool isReady_ = false;
         bool audioOnly_ = false;
+
+        struct RecordFrame {
+            AVFrame* frame;
+            bool isVideo;
+            bool fromPeer;
+            RecordFrame(AVFrame* f, bool v, bool p)
+                : frame(f)
+                , isVideo(v)
+                , fromPeer(p)
+            {}
+        };
+        ThreadLoop loop_;
+        void process();
+        std::queue<RecordFrame> frames_;
 };
 
 }; // namespace ring
-- 
GitLab