From a67dd98043187d01eab614a741a26997032d3d41 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Fri, 29 Sep 2023 03:13:35 -0400
Subject: [PATCH] media_decoder: refactor locking model

In SIPCall, requestKeyFrame & setVideoOrientation are using
pjsip on the main thread. To avoid this, use the io thread as
it's io operations that should not be done on main thread.

Moreover, some operations from recorder were called from pjsip
but not retriggered, causing jami->pjsip->jami operations
called with lock that could be called from pjsip nor jami causing
potential deadlocks.

Finally, audioDecoder_/receiveThread_ should be protected in
AudioRtpSession.

This fixes crashes for ut_conference

GitLab: #900
GitLab: #901
GitLab: #902
Change-Id: I055b297d25bc417e685f4c31d6105e4f6dab52e6
---
 src/media/audio/audio_receive_thread.cpp |  4 ++
 src/media/audio/audio_receive_thread.h   |  1 +
 src/media/audio/audio_rtp_session.cpp    | 68 +++++++++++++++++-------
 src/media/audio/audio_rtp_session.h      |  2 +-
 src/media/media_decoder.cpp              |  4 ++
 src/media/media_recorder.cpp             | 17 ++++--
 src/media/video/video_rtp_session.cpp    | 43 ++++++++++-----
 src/media/video/video_rtp_session.h      |  2 +-
 src/sip/sipcall.cpp                      |  4 +-
 9 files changed, 105 insertions(+), 40 deletions(-)

diff --git a/src/media/audio/audio_receive_thread.cpp b/src/media/audio/audio_receive_thread.cpp
index a808714b44..b5524c2e52 100644
--- a/src/media/audio/audio_receive_thread.cpp
+++ b/src/media/audio/audio_receive_thread.cpp
@@ -55,6 +55,7 @@ AudioReceiveThread::~AudioReceiveThread()
 bool
 AudioReceiveThread::setup()
 {
+    std::lock_guard<std::mutex> lk(mutex_);
     audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable {
         notify(frame);
         ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame));
@@ -106,6 +107,7 @@ AudioReceiveThread::process()
 void
 AudioReceiveThread::cleanup()
 {
+    std::lock_guard<std::mutex> lk(mutex_);
     audioDecoder_.reset();
     demuxContext_.reset();
 }
@@ -149,6 +151,8 @@ AudioReceiveThread::setRecorderCallback(
 MediaStream
 AudioReceiveThread::getInfo() const
 {
+    if (!audioDecoder_)
+        return {};
     return audioDecoder_->getStream("a:remote");
 }
 
diff --git a/src/media/audio/audio_receive_thread.h b/src/media/audio/audio_receive_thread.h
index 4f1f3400e3..ce5688ba08 100644
--- a/src/media/audio/audio_receive_thread.h
+++ b/src/media/audio/audio_receive_thread.h
@@ -78,6 +78,7 @@ private:
     DeviceParams args_;
 
     std::istringstream stream_;
+    mutable std::mutex mutex_;
     std::unique_ptr<MediaDecoder> audioDecoder_;
     std::unique_ptr<MediaIOHandle> sdpContext_;
     std::unique_ptr<MediaIOHandle> demuxContext_;
diff --git a/src/media/audio/audio_rtp_session.cpp b/src/media/audio/audio_rtp_session.cpp
index 7a18ed3320..2cc92af671 100644
--- a/src/media/audio/audio_rtp_session.cpp
+++ b/src/media/audio/audio_rtp_session.cpp
@@ -42,6 +42,8 @@
 #include "client/videomanager.h"
 #include "manager.h"
 #include "observer.h"
+
+#include <asio/io_context.hpp>
 #include <sstream>
 
 namespace jami {
@@ -94,7 +96,13 @@ AudioRtpSession::startSender()
 
     // sender sets up input correctly, we just keep a reference in case startSender is called
     audioInput_ = jami::getAudioInput(callId_);
-    audioInput_->setRecorderCallback([this](const MediaStream& ms) { attachLocalRecorder(ms); });
+    audioInput_->setRecorderCallback(
+            [w=weak_from_this()](const MediaStream& ms) {
+                Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                    if (auto shared = w.lock())
+                        shared->attachLocalRecorder(ms);
+                });
+            });
     audioInput_->setMuted(muteState_);
     audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
     auto newParams = audioInput_->switchInput(input_);
@@ -173,7 +181,12 @@ AudioRtpSession::startReceiver()
                                                 receive_.receiving_sdp,
                                                 mtu_));
 
-    receiveThread_->setRecorderCallback([this](const MediaStream& ms) { attachRemoteRecorder(ms); });
+    receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
+        Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+            if (auto shared = w.lock())
+                shared->attachRemoteRecorder(ms);
+        });
+    });
     receiveThread_->addIOContext(*socketPair_);
     receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
     receiveThread_->startReceiver();
@@ -252,26 +265,31 @@ AudioRtpSession::stop()
 void
 AudioRtpSession::setMuted(bool muted, Direction dir)
 {
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
-    if (dir == Direction::SEND) {
-        muteState_ = muted;
-        if (audioInput_)
-            audioInput_->setMuted(muted);
-    } else {
-        if (receiveThread_) {
-            auto ms = receiveThread_->getInfo();
-            if (muted) {
-                if (auto ob = recorder_->getStream(ms.name)) {
-                    receiveThread_->detach(ob);
-                    recorder_->removeStream(ms);
+    Manager::instance().ioContext()->post([w=weak_from_this(), muted, dir]() {
+        if (auto shared = w.lock()) {
+            std::lock_guard<std::recursive_mutex> lock(shared->mutex_);
+            if (dir == Direction::SEND) {
+                shared->muteState_ = muted;
+                if (shared->audioInput_) {
+                    shared->audioInput_->setMuted(muted);
                 }
             } else {
-                if (auto ob = recorder_->addStream(ms)) {
-                    receiveThread_->attach(ob);
+                if (shared->receiveThread_) {
+                    auto ms = shared->receiveThread_->getInfo();
+                    if (muted) {
+                        if (auto ob = shared->recorder_->getStream(ms.name)) {
+                            shared->receiveThread_->detach(ob);
+                            shared->recorder_->removeStream(ms);
+                        }
+                    } else {
+                        if (auto ob = shared->recorder_->addStream(ms)) {
+                            shared->receiveThread_->attach(ob);
+                        }
+                    }
                 }
             }
         }
-    }
+    });
 }
 
 void
@@ -366,6 +384,7 @@ AudioRtpSession::processRtcpChecker()
 void
 AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
 {
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
     if (!recorder_ || !receiveThread_)
         return;
     if (auto ob = recorder_->addStream(ms)) {
@@ -376,6 +395,7 @@ AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
 void
 AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
 {
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
     if (!recorder_ || !audioInput_)
         return;
     if (auto ob = recorder_->addStream(ms)) {
@@ -390,10 +410,20 @@ AudioRtpSession::initRecorder()
         return;
     if (receiveThread_)
         receiveThread_->setRecorderCallback(
-            [this](const MediaStream& ms) { attachRemoteRecorder(ms); });
+            [w=weak_from_this()](const MediaStream& ms) {
+                Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                    if (auto shared = w.lock())
+                        shared->attachRemoteRecorder(ms);
+                });
+        });
     if (audioInput_)
         audioInput_->setRecorderCallback(
-            [this](const MediaStream& ms) { attachLocalRecorder(ms); });
+            [w=weak_from_this()](const MediaStream& ms) {
+                Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                    if (auto shared = w.lock())
+                        shared->attachLocalRecorder(ms);
+                });
+        });
 }
 
 void
diff --git a/src/media/audio/audio_rtp_session.h b/src/media/audio/audio_rtp_session.h
index e517b9968e..47cc601eb7 100644
--- a/src/media/audio/audio_rtp_session.h
+++ b/src/media/audio/audio_rtp_session.h
@@ -47,7 +47,7 @@ struct RTCPInfo
     float latency;
 };
 
-class AudioRtpSession : public RtpSession
+class AudioRtpSession : public RtpSession, public std::enable_shared_from_this<AudioRtpSession>
 {
 public:
     AudioRtpSession(const std::string& callId,
diff --git a/src/media/media_decoder.cpp b/src/media/media_decoder.cpp
index 9f3f8d35de..e994f6ca33 100644
--- a/src/media/media_decoder.cpp
+++ b/src/media/media_decoder.cpp
@@ -832,6 +832,10 @@ MediaDecoder::correctPixFmt(int input_pix_fmt)
 MediaStream
 MediaDecoder::getStream(std::string name) const
 {
+    if (!decoderCtx_) {
+        JAMI_WARN("No decoder context");
+        return {};
+    }
     auto ms = MediaStream(name, decoderCtx_, lastTimestamp_);
 #ifdef RING_ACCEL
     // accel_ is null if not using accelerated codecs
diff --git a/src/media/media_recorder.cpp b/src/media/media_recorder.cpp
index d5a72b9c53..2a4c9101d3 100644
--- a/src/media/media_recorder.cpp
+++ b/src/media/media_recorder.cpp
@@ -725,10 +725,19 @@ MediaRecorder::reset()
         frameBuff_.clear();
     }
     videoIdx_ = audioIdx_ = -1;
-    videoFilter_.reset();
-    audioFilter_.reset();
-    outputAudioFilter_.reset();
-    outputVideoFilter_.reset();
+    {
+        std::lock_guard<std::mutex> lk(mutexStreamSetup_);
+        {
+            std::lock_guard<std::mutex> lk2(mutexFilterVideo_);
+            videoFilter_.reset();
+            outputVideoFilter_.reset();
+        }
+        {
+            std::lock_guard<std::mutex> lk2(mutexFilterAudio_);
+            audioFilter_.reset();
+            outputAudioFilter_.reset();
+        }
+    }
     encoder_.reset();
 }
 
diff --git a/src/media/video/video_rtp_session.cpp b/src/media/video/video_rtp_session.cpp
index abeea6672c..b016c4b37f 100644
--- a/src/media/video/video_rtp_session.cpp
+++ b/src/media/video/video_rtp_session.cpp
@@ -42,6 +42,7 @@
 
 #include <dhtnet/ice_socket.h>
 
+#include <asio/io_context.hpp>
 #include <sstream>
 #include <map>
 #include <string>
@@ -140,8 +141,11 @@ VideoRtpSession::startSender()
             videoLocal_ = input;
             if (input) {
                 videoLocal_->setRecorderCallback(
-                    [this](const MediaStream& ms) {
-                        attachLocalRecorder(ms);
+                    [w=weak_from_this()](const MediaStream& ms) {
+                        Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                            if (auto shared = w.lock())
+                                shared->attachLocalRecorder(ms);
+                        });
                     });
                 auto newParams = input->getParams();
                 try {
@@ -291,9 +295,12 @@ VideoRtpSession::startReceiver()
             if (activeStream)
                 videoMixer_->setActiveStream(streamId_);
         }
-        receiveThread_->setRecorderCallback(
-            [this](const MediaStream& ms) { attachRemoteRecorder(ms); });
-
+        receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
+            Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                if (auto shared = w.lock())
+                    shared->attachRemoteRecorder(ms);
+            });
+        });
     } else {
         JAMI_DBG("[%p] Video receiver disabled", this);
         if (receiveThread_ and videoMixer_ and conference_) {
@@ -777,6 +784,7 @@ VideoRtpSession::processRtcpChecker()
 void
 VideoRtpSession::attachRemoteRecorder(const MediaStream& ms)
 {
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
     if (!recorder_ || !receiveThread_)
         return;
     if (auto ob = recorder_->addStream(ms)) {
@@ -787,6 +795,7 @@ VideoRtpSession::attachRemoteRecorder(const MediaStream& ms)
 void
 VideoRtpSession::attachLocalRecorder(const MediaStream& ms)
 {
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
     if (!recorder_ || !videoLocal_ || !Manager::instance().videoPreferences.getRecordPreview())
         return;
     if (auto ob = recorder_->addStream(ms)) {
@@ -797,23 +806,31 @@ VideoRtpSession::attachLocalRecorder(const MediaStream& ms)
 void
 VideoRtpSession::initRecorder()
 {
-	if (!recorder_)
-		return;
+    if (!recorder_)
+        return;
     if (receiveThread_) {
-        receiveThread_->setRecorderCallback(
-            [this](const MediaStream& ms) { attachRemoteRecorder(ms); });
+        receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
+            Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                if (auto shared = w.lock())
+                    shared->attachRemoteRecorder(ms);
+            });
+        });
     }
     if (videoLocal_ && !send_.onHold) {
-        videoLocal_->setRecorderCallback(
-            [this](const MediaStream& ms) { attachLocalRecorder(ms); });
+        videoLocal_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
+            Manager::instance().ioContext()->post([w=std::move(w), ms]() {
+                if (auto shared = w.lock())
+                    shared->attachLocalRecorder(ms);
+            });
+        });
     }
 }
 
 void
 VideoRtpSession::deinitRecorder()
 {
-	if (!recorder_)
-		return;
+    if (!recorder_)
+        return;
     if (receiveThread_) {
         auto ms = receiveThread_->getInfo();
         if (auto ob = recorder_->getStream(ms.name)) {
diff --git a/src/media/video/video_rtp_session.h b/src/media/video/video_rtp_session.h
index 1937c6c752..c70ed39714 100644
--- a/src/media/video/video_rtp_session.h
+++ b/src/media/video/video_rtp_session.h
@@ -65,7 +65,7 @@ struct VideoBitrateInfo
     float packetLostThreshold;
 };
 
-class VideoRtpSession : public RtpSession
+class VideoRtpSession : public RtpSession, public std::enable_shared_from_this<VideoRtpSession>
 {
 public:
     using BaseType = RtpSession;
diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp
index 8477eb3aee..655232b6bb 100644
--- a/src/sip/sipcall.cpp
+++ b/src/sip/sipcall.cpp
@@ -253,13 +253,13 @@ SIPCall::configureRtpSession(const std::shared_ptr<RtpSession>& rtpSession,
         assert(videoRtp && mediaAttr);
         auto streamIdx = findRtpStreamIndex(mediaAttr->label_);
         videoRtp->setRequestKeyFrameCallback([w = weak(), streamIdx] {
-            runOnMainThread([w = std::move(w), streamIdx] {
+            dht::ThreadPool::io().run([w = std::move(w), streamIdx] {
                 if (auto thisPtr = w.lock())
                     thisPtr->requestKeyframe(streamIdx);
             });
         });
         videoRtp->setChangeOrientationCallback([w = weak(), streamIdx](int angle) {
-            runOnMainThread([w, angle, streamIdx] {
+            dht::ThreadPool::io().run([w, angle, streamIdx] {
                 if (auto thisPtr = w.lock())
                     thisPtr->setVideoOrientation(streamIdx, angle);
             });
-- 
GitLab