Skip to content
Snippets Groups Projects
Commit 3fcd9b9e authored by Philippe Gorley's avatar Philippe Gorley Committed by Adrien Béraud
Browse files

recorder: remove internal thread loop

Move thread to ThreadPool, so the recorder can finish its job even after
hangup.

Change-Id: I7ff1898ce8035a00a488aad53bbb16a6b8a56bcc
Gitlab: #40
parent 4a0bf2e4
No related branches found
Tags 1.1.0
No related merge requests found
......@@ -28,6 +28,7 @@
#include "media_io_handle.h"
#include "media_recorder.h"
#include "system_codec_container.h"
#include "thread_pool.h"
#include "video/video_input.h"
#include "video/video_receive_thread.h"
......@@ -55,18 +56,10 @@ replaceAll(const std::string& str, const std::string& from, const std::string& t
}
MediaRecorder::MediaRecorder()
: loop_([]{ return true;},
[this]{ process(); },
[]{})
{}
MediaRecorder::~MediaRecorder()
{
if (loop_.isRunning())
loop_.join();
if (isRecording_)
flush();
}
{}
bool
MediaRecorder::isRecording() const
......@@ -122,17 +115,8 @@ MediaRecorder::stopRecording()
if (isRecording_) {
RING_DBG() << "Stop recording '" << getPath() << "'";
isRecording_ = false;
loop_.join();
flush();
emitSignal<DRing::CallSignal::RecordPlaybackStopped>(getPath());
}
streams_.clear();
videoIdx_ = audioIdx_ = -1;
isRecording_ = false;
audioOnly_ = false;
videoFilter_.reset();
audioFilter_.reset();
encoder_.reset();
}
int
......@@ -159,7 +143,7 @@ MediaRecorder::addStream(const MediaStream& ms)
void
MediaRecorder::update(Observable<std::shared_ptr<AudioFrame>>* ob, const std::shared_ptr<AudioFrame>& a)
{
if (!isRecording_ || !loop_.isRunning())
if (!isRecording_)
return;
std::string name;
if (dynamic_cast<AudioReceiveThread*>(ob))
......@@ -171,12 +155,11 @@ MediaRecorder::update(Observable<std::shared_ptr<AudioFrame>>* ob, const std::sh
clone.copyFrom(*a);
clone.pointer()->pts -= streams_[name].firstTimestamp;
audioFilter_->feedInput(clone.pointer(), name);
loop_.interrupt();
}
void MediaRecorder::update(Observable<std::shared_ptr<VideoFrame>>* ob, const std::shared_ptr<VideoFrame>& v)
{
if (!isRecording_ || !loop_.isRunning())
if (!isRecording_)
return;
std::string name;
if (dynamic_cast<video::VideoReceiveThread*>(ob))
......@@ -188,7 +171,6 @@ void MediaRecorder::update(Observable<std::shared_ptr<VideoFrame>>* ob, const st
clone.copyFrom(*v);
clone.pointer()->pts -= streams_[name].firstTimestamp;
videoFilter_->feedInput(clone.pointer(), name);
loop_.interrupt();
}
int
......@@ -273,7 +255,14 @@ MediaRecorder::initRecord()
}
RING_DBG() << "Recording initialized";
loop_.start();
ThreadPool::instance().run([rec = shared_from_this()] {
while (rec->isRecording()) {
rec->filterAndEncode(rec->videoFilter_.get(), rec->videoIdx_);
rec->filterAndEncode(rec->audioFilter_.get(), rec->audioIdx_);
}
rec->flush();
rec->reset(); // allows recorder to be reused in same call
});
return 0;
}
......@@ -436,35 +425,37 @@ MediaRecorder::buildAudioFilter(const std::vector<MediaStream>& peers, const Med
void
MediaRecorder::flush()
{
if (!isRecording_ || encoder_->getStreamCount() <= 0)
return;
std::lock_guard<std::mutex> lk(mutex_);
filterAndEncode(videoFilter_.get(), videoIdx_);
filterAndEncode(audioFilter_.get(), videoIdx_);
encoder_->flush();
}
void
MediaRecorder::process()
MediaRecorder::reset()
{
AVFrame* output;
if (videoIdx_ >= 0 && videoFilter_)
while ((output = videoFilter_->readOutput()))
sendToEncoder(output, videoIdx_);
if (audioIdx_ >= 0 && audioFilter_)
while ((output = audioFilter_->readOutput()))
sendToEncoder(output, audioIdx_);
streams_.clear();
videoIdx_ = audioIdx_ = -1;
audioOnly_ = false;
videoFilter_.reset();
audioFilter_.reset();
encoder_.reset();
}
void
MediaRecorder::sendToEncoder(AVFrame* frame, int streamIdx)
MediaRecorder::filterAndEncode(MediaFilter* filter, int streamIdx)
{
try {
std::lock_guard<std::mutex> lk(mutex_);
encoder_->encode(frame, streamIdx);
} catch (const MediaEncoderException& e) {
RING_ERR() << "MediaEncoderException: " << e.what();
if (filter && streamIdx >= 0) {
while (auto frame = filter->readOutput()) {
try {
std::lock_guard<std::mutex> lk(mutex_);
encoder_->encode(frame, streamIdx);
} catch (const MediaEncoderException& e) {
RING_ERR() << "Failed to record frame: " << e.what();
}
av_frame_free(&frame);
}
}
av_frame_free(&frame);
}
} // namespace ring
......@@ -27,7 +27,6 @@
#include "media_stream.h"
#include "noncopyable.h"
#include "observer.h"
#include "threadloop.h"
#ifdef RING_VIDEO
#include "video/video_base.h"
#endif
......@@ -46,6 +45,7 @@ class MediaRecorder : public Observer<std::shared_ptr<AudioFrame>>
#ifdef RING_VIDEO
, public video::VideoFramePassiveReader
#endif
, public std::enable_shared_from_this<MediaRecorder>
{
public:
MediaRecorder();
......@@ -111,6 +111,7 @@ private:
NON_COPYABLE(MediaRecorder);
void flush();
void reset();
int initRecord();
MediaStream setupVideoOutput();
......@@ -138,9 +139,7 @@ private:
bool isRecording_ = false;
bool audioOnly_ = false;
InterruptedThreadLoop loop_;
void process();
void sendToEncoder(AVFrame* frame, int streamIdx);
void filterAndEncode(MediaFilter* filter, int streamIdx);
};
}; // namespace ring
......@@ -113,6 +113,8 @@ Recordable::stopRecording()
recorder_->stopRecording();
recording_ = false;
// new recorder since this one may still be recording
recorder_ = std::make_shared<MediaRecorder>();
}
bool
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment