Commit 351856e7 authored by Philippe Gorley's avatar Philippe Gorley Committed by Adrien Béraud

recorder: subscribe to audio/video sources

Makes the MediaRecorder an observer on AudioInput, AudioReceiveThread,
VideoInput and VideoReceiveThread, so they no longer need a reference
to the recorder.

Makes sure the framerate is set for the recorder by adding a way to get
the stream parameters from the video receiver and input, since AVFrame
does not carry framerate and time base information.

Adds way to detach the recorder from the media sources, so it can be
reattached on subsequent records.

Change-Id: I71bac0825541f6445861f4f949288e1c62416f05
parent af39787f
......@@ -47,8 +47,6 @@ AudioInput::AudioInput(const std::string& id) :
AudioInput::~AudioInput()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
loop_.join();
}
......@@ -71,13 +69,6 @@ AudioInput::process()
frame->pointer()->pts = sent_samples;
sent_samples += frame->pointer()->nb_samples;
{
auto rec = recorder_.lock();
if (rec && rec->isRecording()) {
rec->recordData(frame->pointer(), ms);
}
}
notify(frame);
}
......@@ -197,11 +188,4 @@ AudioInput::setMuted(bool isMuted)
muteState_ = isMuted;
}
void
AudioInput::initRecorder(const std::shared_ptr<MediaRecorder>& rec)
{
rec->incrementExpectedStreams(1);
recorder_ = rec;
}
} // namespace ring
......@@ -33,7 +33,6 @@
namespace ring {
class MediaRecorder;
class Resampler;
class AudioInput : public Observable<std::shared_ptr<AudioFrame>>
......@@ -47,7 +46,6 @@ public:
bool isCapturing() const { return loop_.isRunning(); }
void setFormat(const AudioFormat& fmt);
void setMuted(bool isMuted);
void initRecorder(const std::shared_ptr<MediaRecorder>& rec);
private:
bool nextFromDevice(AudioFrame& frame);
......@@ -61,7 +59,6 @@ private:
AudioFormat format_;
std::unique_ptr<Resampler> resampler_;
std::weak_ptr<MediaRecorder> recorder_;
std::string currentResource_;
std::atomic_bool switchPending_ {false};
......
......@@ -49,8 +49,6 @@ AudioReceiveThread::AudioReceiveThread(const std::string& id,
AudioReceiveThread::~AudioReceiveThread()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
loop_.join();
}
......@@ -96,11 +94,6 @@ AudioReceiveThread::process()
auto decodedFrame = std::make_shared<AudioFrame>();
switch (audioDecoder_->decode(*decodedFrame)) {
case MediaDecoder::Status::FrameFinished:
{
auto rec = recorder_.lock();
if (rec && rec->isRecording())
rec->recordData(decodedFrame->pointer(), audioDecoder_->getStream("a:remote"));
}
audioDecoder_->writeToRingBuffer(*decodedFrame, *ringbuffer_,
mainBuffFormat);
notify(decodedFrame);
......@@ -163,11 +156,4 @@ AudioReceiveThread::startLoop()
loop_.start();
}
void
AudioReceiveThread::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
recorder_ = rec;
rec->incrementExpectedStreams(1);
}
}; // namespace ring
......@@ -33,10 +33,9 @@ namespace ring {
class MediaDecoder;
class MediaIOHandle;
class MediaRecorder;
class RingBuffer;
class AudioReceiveThread : Observable<std::shared_ptr<AudioFrame>>
class AudioReceiveThread : public Observable<std::shared_ptr<AudioFrame>>
{
public:
AudioReceiveThread(const std::string &id,
......@@ -47,8 +46,6 @@ public:
void addIOContext(SocketPair &socketPair);
void startLoop();
void initRecorder(std::shared_ptr<MediaRecorder>& rec);
private:
NON_COPYABLE(AudioReceiveThread);
......@@ -60,8 +57,6 @@ private:
void openDecoder();
bool decodeFrame();
std::weak_ptr<MediaRecorder> recorder_;
/*-----------------------------------------------------------------*/
/* These variables should be used in thread (i.e. process()) only! */
/*-----------------------------------------------------------------*/
......
......@@ -197,9 +197,18 @@ void
AudioRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
if (receiveThread_)
receiveThread_->initRecorder(rec);
if (sender_)
sender_->initRecorder(rec);
receiveThread_->attach(rec.get());
if (auto input = ring::getAudioInput(callID_))
input->attach(rec.get());
}
void
AudioRtpSession::deinitRecorder(std::shared_ptr<MediaRecorder>& rec)
{
if (receiveThread_)
receiveThread_->detach(rec.get());
if (auto input = ring::getAudioInput(callID_))
input->detach(rec.get());
}
} // namespace ring
......@@ -53,6 +53,7 @@ class AudioRtpSession : public RtpSession {
void switchInput(const std::string& resource) { input_ = resource; }
void initRecorder(std::shared_ptr<MediaRecorder>& rec) override;
void deinitRecorder(std::shared_ptr<MediaRecorder>& rec) override;
private:
void startSender();
......
......@@ -26,7 +26,6 @@
#include "logger.h"
#include "media_encoder.h"
#include "media_io_handle.h"
#include "media_recorder.h"
#include "media_stream.h"
#include "resampler.h"
#include "smartools.h"
......@@ -52,8 +51,6 @@ AudioSender::AudioSender(const std::string& id,
AudioSender::~AudioSender()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
audioInput_->detach(this);
audioInput_.reset();
audioEncoder_.reset();
......@@ -106,12 +103,6 @@ AudioSender::update(Observable<std::shared_ptr<ring::AudioFrame>>* /*obs*/, cons
ms.firstTimestamp = frame->pts;
sent_samples += frame->nb_samples;
{
auto rec = recorder_.lock();
if (rec && rec->isRecording())
rec->recordData(frame, ms);
}
if (audioEncoder_->encodeAudio(*framePtr) < 0)
RING_ERR("encoding failed");
}
......@@ -129,11 +120,4 @@ AudioSender::getLastSeqValue()
return audioEncoder_->getLastSeqValue();
}
void
AudioSender::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
recorder_ = rec;
rec->incrementExpectedStreams(1);
}
} // namespace ring
......@@ -31,7 +31,6 @@ namespace ring {
class AudioInput;
class MediaEncoder;
class MediaIOHandle;
class MediaRecorder;
class Resampler;
class AudioSender : public Observer<std::shared_ptr<AudioFrame>> {
......@@ -51,8 +50,6 @@ public:
void update(Observable<std::shared_ptr<ring::AudioFrame>>*,
const std::shared_ptr<ring::AudioFrame>&) override;
void initRecorder(std::shared_ptr<MediaRecorder>& rec);
private:
NON_COPYABLE(AudioSender);
......@@ -65,7 +62,6 @@ private:
std::unique_ptr<MediaIOHandle> muxContext_;
std::unique_ptr<Resampler> resampler_;
std::shared_ptr<AudioInput> audioInput_;
std::weak_ptr<MediaRecorder> recorder_;
uint64_t sent_samples = 0;
......
......@@ -73,14 +73,14 @@ LocalRecorder::startRecording()
audioInput_ = ring::getAudioInput(path_);
audioInput_->setFormat(AudioFormat::STEREO());
audioInput_->initRecorder(recorder_);
audioInput_->attach(recorder_.get());
#ifdef RING_VIDEO
// video recording
if (!isAudioOnly_) {
videoInput_ = std::static_pointer_cast<video::VideoInput>(ring::getVideoCamera());
if (videoInput_) {
videoInput_->initRecorder(recorder_);
videoInput_->attach(recorder_.get());
} else {
RING_ERR() << "Unable to record video (no video input)";
return false;
......@@ -97,8 +97,6 @@ LocalRecorder::stopRecording()
Recordable::stopRecording();
Manager::instance().getRingBufferPool().unBindHalfDuplexOut(path_, RingBufferPool::DEFAULT_ID);
audioInput_.reset();
if (videoInput_)
videoInput_->initRecorder(nullptr); // workaround for deiniting recorder
videoInput_.reset();
}
......
......@@ -19,16 +19,17 @@
*/
#include "libav_deps.h" // MUST BE INCLUDED FIRST
#include "audio/audio_input.h"
#include "audio/audio_receive_thread.h"
#include "audio/audio_sender.h"
#include "client/ring_signal.h"
#include "fileutils.h"
#include "logger.h"
#include "media_io_handle.h"
#include "media_recorder.h"
#include "system_codec_container.h"
extern "C" {
#include <libavutil/frame.h>
}
#include "video/video_input.h"
#include "video/video_receive_thread.h"
#include <algorithm>
#include <iomanip>
......@@ -38,8 +39,6 @@ extern "C" {
namespace ring {
static constexpr auto FRAME_DEQUEUE_INTERVAL = std::chrono::milliseconds(200);
static std::string
replaceAll(const std::string& str, const std::string& from, const std::string& to)
{
......@@ -119,12 +118,6 @@ MediaRecorder::setPath(const std::string& path)
RING_DBG() << "Recording will be saved as '" << getPath() << "'";
}
void
MediaRecorder::incrementExpectedStreams(int n)
{
nbExpectedStreams_ += n;
}
bool
MediaRecorder::isRecording() const
{
......@@ -185,6 +178,42 @@ MediaRecorder::stopRecording()
resetToDefaults();
}
void
MediaRecorder::update(Observable<std::shared_ptr<AudioFrame>>* ob, const std::shared_ptr<AudioFrame>& a)
{
MediaStream ms;
if (dynamic_cast<AudioReceiveThread*>(ob))
ms.name = "a:remote";
else // if (dynamic_cast<AudioSender*>(ob) || dynamic_cast<AudioInput*>(ob))
ms.name = "a:local";
ms.isVideo = false;
ms.update(a->pointer());
ms.firstTimestamp = a->pointer()->pts;
recordData(a->pointer(), ms);
}
void MediaRecorder::attached(Observable<std::shared_ptr<AudioFrame>>* /*ob*/)
{
++nbExpectedStreams_;
}
void MediaRecorder::update(Observable<std::shared_ptr<VideoFrame>>* ob, const std::shared_ptr<VideoFrame>& v)
{
MediaStream ms;
if (auto receiver = dynamic_cast<video::VideoReceiveThread*>(ob)) {
ms = receiver->getStream();
} else if (auto input = dynamic_cast<video::VideoInput*>(ob)) {
ms = input->getStream();
}
ms.firstTimestamp = v->pointer()->pts;
recordData(v->pointer(), ms);
}
void MediaRecorder::attached(Observable<std::shared_ptr<VideoFrame>>* /*ob*/)
{
++nbExpectedStreams_;
}
int
MediaRecorder::addStream(const MediaStream& ms)
{
......@@ -402,7 +431,7 @@ MediaRecorder::buildVideoFilter(const std::vector<MediaStream>& peers, const Med
switch (peers.size()) {
case 0:
v << "[" << local.name << "] format=pix_fmts=yuv420p";
v << "[" << local.name << "] fps=30, format=pix_fmts=yuv420p";
break;
case 1:
{
......
......@@ -21,11 +21,16 @@
#pragma once
#include "config.h"
#include "media_buffer.h"
#include "media_encoder.h"
#include "media_filter.h"
#include "media_stream.h"
#include "noncopyable.h"
#include "observer.h"
#include "threadloop.h"
#ifdef RING_VIDEO
#include "video/video_base.h"
#endif
#include <map>
#include <memory>
......@@ -39,95 +44,102 @@ struct AVFrame;
namespace ring {
class MediaRecorder {
public:
MediaRecorder();
~MediaRecorder();
std::string getPath() const;
void setPath(const std::string& path);
void audioOnly(bool audioOnly);
// replaces %TIMESTAMP with time at start of recording
// default title: "Conversation at %Y-%m-%d %H:%M:%S"
// default description: "Recorded with Jami https://jami.net"
void setMetadata(const std::string& title, const std::string& desc);
[[deprecated("use setPath to set full recording path")]]
void setRecordingPath(const std::string& dir);
// adjust nb of streams before recording
// used to know when all streams are set up
void incrementExpectedStreams(int n);
bool isRecording() const;
bool toggleRecording();
int startRecording();
void stopRecording();
int recordData(AVFrame* frame, const MediaStream& ms);
private:
NON_COPYABLE(MediaRecorder);
int addStream(const MediaStream& ms);
int initRecord();
MediaStream setupVideoOutput();
std::string buildVideoFilter(const std::vector<MediaStream>& peers, const MediaStream& local) const;
MediaStream setupAudioOutput();
std::string buildAudioFilter(const std::vector<MediaStream>& peers, const MediaStream& local) const;
void emptyFilterGraph();
int sendToEncoder(AVFrame* frame, int streamIdx);
int flush();
void resetToDefaults(); // clear saved data for next recording
std::unique_ptr<MediaEncoder> encoder_;
std::unique_ptr<MediaFilter> videoFilter_;
std::unique_ptr<MediaFilter> audioFilter_;
std::mutex mutex_; // protect against concurrent file writes
std::map<std::string, const MediaStream> streams_;
std::tm startTime_;
std::string title_;
std::string description_;
std::string path_;
// NOTE do not use dir_ or filename_, use path_ instead
std::string dir_;
std::string filename_;
unsigned nbExpectedStreams_ = 0;
unsigned nbReceivedVideoStreams_ = 0;
unsigned nbReceivedAudioStreams_ = 0;
int videoIdx_ = -1;
int audioIdx_ = -1;
bool isRecording_ = false;
bool isReady_ = false;
bool audioOnly_ = false;
struct RecordFrame {
AVFrame* frame;
bool isVideo;
bool fromPeer;
RecordFrame() {}
RecordFrame(AVFrame* f, bool v, bool p)
: frame(f)
, isVideo(v)
, fromPeer(p)
{}
};
InterruptedThreadLoop loop_;
void process();
std::mutex qLock_;
std::deque<RecordFrame> frames_;
class MediaRecorder : public Observer<std::shared_ptr<AudioFrame>>
#ifdef RING_VIDEO
, public video::VideoFramePassiveReader
#endif
{
public:
MediaRecorder();
~MediaRecorder();
std::string getPath() const;
void setPath(const std::string& path);
void audioOnly(bool audioOnly);
// replaces %TIMESTAMP with time at start of recording
// default title: "Conversation at %Y-%m-%d %H:%M:%S"
// default description: "Recorded with Jami https://jami.net"
void setMetadata(const std::string& title, const std::string& desc);
[[deprecated("use setPath to set full recording path")]]
void setRecordingPath(const std::string& dir);
bool isRecording() const;
bool toggleRecording();
int startRecording();
void stopRecording();
/* Observer methods*/
void update(Observable<std::shared_ptr<AudioFrame>>* ob, const std::shared_ptr<AudioFrame>& a) override;
void attached(Observable<std::shared_ptr<AudioFrame>>* ob) override;
void update(Observable<std::shared_ptr<VideoFrame>>* ob, const std::shared_ptr<VideoFrame>& v) override;
void attached(Observable<std::shared_ptr<VideoFrame>>* ob) override;
private:
NON_COPYABLE(MediaRecorder);
int recordData(AVFrame* frame, const MediaStream& ms);
int addStream(const MediaStream& ms);
int initRecord();
MediaStream setupVideoOutput();
std::string buildVideoFilter(const std::vector<MediaStream>& peers, const MediaStream& local) const;
MediaStream setupAudioOutput();
std::string buildAudioFilter(const std::vector<MediaStream>& peers, const MediaStream& local) const;
void emptyFilterGraph();
int sendToEncoder(AVFrame* frame, int streamIdx);
int flush();
void resetToDefaults(); // clear saved data for next recording
std::unique_ptr<MediaEncoder> encoder_;
std::unique_ptr<MediaFilter> videoFilter_;
std::unique_ptr<MediaFilter> audioFilter_;
std::mutex mutex_; // protect against concurrent file writes
std::map<std::string, const MediaStream> streams_;
std::tm startTime_;
std::string title_;
std::string description_;
std::string path_;
// NOTE do not use dir_ or filename_, use path_ instead
std::string dir_;
std::string filename_;
unsigned nbExpectedStreams_ = 0;
unsigned nbReceivedVideoStreams_ = 0;
unsigned nbReceivedAudioStreams_ = 0;
int videoIdx_ = -1;
int audioIdx_ = -1;
bool isRecording_ = false;
bool isReady_ = false;
bool audioOnly_ = false;
struct RecordFrame {
AVFrame* frame;
bool isVideo;
bool fromPeer;
RecordFrame() {}
RecordFrame(AVFrame* f, bool v, bool p)
: frame(f)
, isVideo(v)
, fromPeer(p)
{}
};
InterruptedThreadLoop loop_;
void process();
std::mutex qLock_;
std::deque<RecordFrame> frames_;
};
}; // namespace ring
......@@ -56,6 +56,7 @@ public:
void setMtu(uint16_t mtu) { mtu_ = mtu; }
virtual void initRecorder(std::shared_ptr<MediaRecorder>& rec) = 0;
virtual void deinitRecorder(std::shared_ptr<MediaRecorder>& rec) = 0;
protected:
std::recursive_mutex mutex_;
......
......@@ -63,8 +63,6 @@ VideoInput::VideoInput()
VideoInput::~VideoInput()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
#if defined(__ANDROID__) || defined(RING_UWP) || (defined(TARGET_OS_IOS) && TARGET_OS_IOS)
/* we need to stop the loop and notify the condition variable
* to unblock the process loop */
......@@ -232,11 +230,6 @@ bool VideoInput::captureFrame()
return static_cast<bool>(decoder_);
case MediaDecoder::Status::FrameFinished:
{
auto rec = recorder_.lock();
if (rec && rec->isRecording())
rec->recordData(frame.pointer(), decoder_->getStream("v:local"));
}
publishFrame();
return true;
// continue decoding
......@@ -613,6 +606,12 @@ int VideoInput::getPixelFormat() const
DeviceParams VideoInput::getParams() const
{ return decOpts_; }
MediaStream
VideoInput::getStream() const
{
return decoder_->getStream("v:local");
}
void
VideoInput::foundDecOpts(const DeviceParams& params)
{
......@@ -622,15 +621,4 @@ VideoInput::foundDecOpts(const DeviceParams& params)
}
}
void
VideoInput::initRecorder(const std::shared_ptr<MediaRecorder>& rec)
{
if (rec) {
recorder_ = rec;
rec->incrementExpectedStreams(1);
} else {
recorder_.reset();
}
}
}} // namespace ring::video
......@@ -25,6 +25,7 @@
#include "noncopyable.h"
#include "threadloop.h"
#include "media_stream.h"
#include "media/media_device.h" // DeviceParams
#include "media/video/video_base.h"
......@@ -42,7 +43,6 @@
namespace ring {
class MediaDecoder;
class MediaRecorder;
}
namespace ring { namespace video {
......@@ -78,6 +78,7 @@ public:
int getHeight() const;
int getPixelFormat() const;
DeviceParams getParams() const;
MediaStream getStream() const;
std::shared_future<DeviceParams> switchInput(const std::string& resource);
#if defined(__ANDROID__) || defined(RING_UWP) || (defined(TARGET_OS_IOS) && TARGET_OS_IOS)
......@@ -89,8 +90,6 @@ public:
void releaseFrame(void *frame);
#endif
void initRecorder(const std::shared_ptr<MediaRecorder>& rec);
private:
NON_COPYABLE(VideoInput);
......@@ -144,8 +143,6 @@ private:
void releaseBufferCb(uint8_t* ptr);
std::array<struct VideoFrameBuffer, 8> buffers_;
#endif
std::weak_ptr<MediaRecorder> recorder_;
};
}} // namespace ring::video
......
......@@ -56,8 +56,6 @@ VideoReceiveThread::VideoReceiveThread(const std::string& id,
VideoReceiveThread::~VideoReceiveThread()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
loop_.join();
}
......@@ -187,11 +185,6 @@ bool VideoReceiveThread::decodeFrame()
switch (ret) {
case MediaDecoder::Status::FrameFinished:
{
auto rec = recorder_.lock();
if (rec && rec->isRecording())
rec->recordData(frame.pointer(), videoDecoder_->getStream("v:remote"));
}
publishFrame();
return true;
......@@ -253,18 +246,17 @@ int VideoReceiveThread::getHeight() const
int VideoReceiveThread::getPixelFormat() const
{ return videoDecoder_->getPixelFormat(); }
void
VideoReceiveThread::triggerKeyFrameRequest()
MediaStream
VideoReceiveThread::getStream() const
{
if (requestKeyFrameCallback_)
requestKeyFrameCallback_(id_);
return videoDecoder_->getStream("v:remote");
}
void
VideoReceiveThread::initRecorder(std::shared_ptr<ring::MediaRecorder>& rec)
VideoReceiveThread::triggerKeyFrameRequest()
{
recorder_ = rec;
rec->incrementExpectedStreams(1);
if (requestKeyFrameCallback_)
requestKeyFrameCallback_(id_);
}
}} // namespace ring::video
......@@ -26,6 +26,7 @@
#include "media_codec.h"
#include "media_io_handle.h"
#include "media_device.h"
#include "media_stream.h"
#include "threadloop.h"
#include "noncopyable.h"
......@@ -38,7 +39,6 @@
namespace ring {
class SocketPair;
class MediaDecoder;
class MediaRecorder;
} // namespace ring
namespace ring { namespace video {
......@@ -60,10 +60,9 @@ public:
int getWidth() const;
int getHeight() const;
int getPixelFormat() const;
MediaStream getStream() const;
void triggerKeyFrameRequest();
void initRecorder(std::shared_ptr<ring::MediaRecorder>& rec);
private:
NON_COPYABLE(VideoReceiveThread);
......@@ -89,8 +88,6 @@ private:
static int interruptCb(void *ctx);
static int readFunction(void *opaque, uint8_t *buf, int buf_size);
std::weak_ptr<MediaRecorder> recorder_;
ThreadLoop loop_;
// used by ThreadLoop
......
......@@ -567,11 +567,19 @@ VideoRtpSession::processPacketLoss()
void
VideoRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
// video recording needs to start with keyframes
if (receiveThread_)
receiveThread_->initRecorder(rec);
receiveThread_->attach(rec.get());
if (auto vidInput = std::static_pointer_cast<VideoInput>(videoLocal_))
vidInput->initRecorder(rec);