Commit c455eceb authored by Philippe Gorley's avatar Philippe Gorley Committed by Adrien Béraud

recorder: remove dynamic casts

Have an intermediate object subscribe to the media sources while also
keeping a copy of the stream name. The recorder can then keep a
reference to these to figure out the source of the frame, instead of
knowing what objects it subscribes to.

Change-Id: I4a34c8b035301eefc90645cf6cb500dbcd879f91
parent fc83b1f8
...@@ -196,23 +196,25 @@ AudioRtpSession::setMuted(bool isMuted) ...@@ -196,23 +196,25 @@ AudioRtpSession::setMuted(bool isMuted)
void void
AudioRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec) AudioRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{ {
if (receiveThread_) { if (receiveThread_)
receiveThread_->attach(rec.get()); receiveThread_->attach(rec->addStream(receiveThread_->getInfo()));
rec->addStream(receiveThread_->getInfo()); if (auto input = ring::getAudioInput(callID_))
} input->attach(rec->addStream(input->getInfo()));
if (auto input = ring::getAudioInput(callID_)) {
input->attach(rec.get());
rec->addStream(input->getInfo());
}
} }
void void
AudioRtpSession::deinitRecorder(std::shared_ptr<MediaRecorder>& rec) AudioRtpSession::deinitRecorder(std::shared_ptr<MediaRecorder>& rec)
{ {
if (receiveThread_) if (receiveThread_) {
receiveThread_->detach(rec.get()); if (auto ob = rec->getStream(receiveThread_->getInfo().name)) {
if (auto input = ring::getAudioInput(callID_)) receiveThread_->detach(ob);
input->detach(rec.get()); }
}
if (auto input = ring::getAudioInput(callID_)) {
if (auto ob = rec->getStream(input->getInfo().name)) {
input->detach(ob);
}
}
} }
} // namespace ring } // namespace ring
...@@ -73,16 +73,14 @@ LocalRecorder::startRecording() ...@@ -73,16 +73,14 @@ LocalRecorder::startRecording()
audioInput_ = ring::getAudioInput(path_); audioInput_ = ring::getAudioInput(path_);
audioInput_->setFormat(AudioFormat::STEREO()); audioInput_->setFormat(AudioFormat::STEREO());
recorder_->addStream(audioInput_->getInfo()); audioInput_->attach(recorder_->addStream(audioInput_->getInfo()));
audioInput_->attach(recorder_.get());
#ifdef RING_VIDEO #ifdef RING_VIDEO
// video recording // video recording
if (!isAudioOnly_) { if (!isAudioOnly_) {
videoInput_ = std::static_pointer_cast<video::VideoInput>(ring::getVideoCamera()); videoInput_ = std::static_pointer_cast<video::VideoInput>(ring::getVideoCamera());
if (videoInput_) { if (videoInput_) {
recorder_->addStream(videoInput_->getInfo()); videoInput_->attach(recorder_->addStream(videoInput_->getInfo()));
videoInput_->attach(recorder_.get());
} else { } else {
RING_ERR() << "Unable to record video (no video input)"; RING_ERR() << "Unable to record video (no video input)";
return false; return false;
...@@ -98,9 +96,11 @@ LocalRecorder::stopRecording() ...@@ -98,9 +96,11 @@ LocalRecorder::stopRecording()
{ {
Recordable::stopRecording(); Recordable::stopRecording();
Manager::instance().getRingBufferPool().unBindHalfDuplexOut(path_, RingBufferPool::DEFAULT_ID); Manager::instance().getRingBufferPool().unBindHalfDuplexOut(path_, RingBufferPool::DEFAULT_ID);
audioInput_->detach(recorder_.get()); if (auto ob = recorder_->getStream(audioInput_->getInfo().name))
audioInput_->detach(ob);
if (videoInput_) if (videoInput_)
videoInput_->detach(recorder_.get()); if (auto ob = recorder_->getStream(videoInput_->getInfo().name))
videoInput_->detach(ob);
audioInput_.reset(); audioInput_.reset();
videoInput_.reset(); videoInput_.reset();
} }
......
...@@ -19,9 +19,6 @@ ...@@ -19,9 +19,6 @@
*/ */
#include "libav_deps.h" // MUST BE INCLUDED FIRST #include "libav_deps.h" // MUST BE INCLUDED FIRST
#include "audio/audio_input.h"
#include "audio/audio_receive_thread.h"
#include "audio/audio_sender.h"
#include "client/ring_signal.h" #include "client/ring_signal.h"
#include "fileutils.h" #include "fileutils.h"
#include "logger.h" #include "logger.h"
...@@ -29,8 +26,6 @@ ...@@ -29,8 +26,6 @@
#include "media_recorder.h" #include "media_recorder.h"
#include "system_codec_container.h" #include "system_codec_container.h"
#include "thread_pool.h" #include "thread_pool.h"
#include "video/video_input.h"
#include "video/video_receive_thread.h"
#include <algorithm> #include <algorithm>
#include <iomanip> #include <iomanip>
...@@ -119,45 +114,50 @@ MediaRecorder::stopRecording() ...@@ -119,45 +114,50 @@ MediaRecorder::stopRecording()
} }
} }
int Observer<std::shared_ptr<MediaFrame>>*
MediaRecorder::addStream(const MediaStream& ms) MediaRecorder::addStream(const MediaStream& ms)
{ {
if (audioOnly_ && ms.isVideo) { if (audioOnly_ && ms.isVideo) {
RING_ERR() << "Trying to add video stream to audio only recording"; RING_ERR() << "Trying to add video stream to audio only recording";
return -1; return nullptr;
} }
if (streams_.insert(std::make_pair(ms.name, ms)).second) { auto ptr = std::make_unique<StreamObserver>(ms, [this, ms](const std::shared_ptr<MediaFrame>& frame) {
onFrame(ms.name, frame);
});
auto p = streams_.insert(std::make_pair(ms.name, std::move(ptr)));
if (p.second) {
RING_DBG() << "Recorder input #" << streams_.size() << ": " << ms; RING_DBG() << "Recorder input #" << streams_.size() << ": " << ms;
if (ms.isVideo) if (ms.isVideo)
hasVideo_ = true; hasVideo_ = true;
else else
hasAudio_ = true; hasAudio_ = true;
return 0; return p.first->second.get();
} else { } else {
RING_ERR() << "Could not add stream '" << ms.name << "' to record"; RING_WARN() << "Recorder already has '" << ms.name << "' as input";
return -1; return p.first->second.get();
} }
} }
Observer<std::shared_ptr<MediaFrame>>*
MediaRecorder::getStream(const std::string& name) const
{
const auto it = streams_.find(name);
if (it != streams_.cend())
return it->second.get();
return nullptr;
}
void void
MediaRecorder::update(Observable<std::shared_ptr<MediaFrame>>* ob, const std::shared_ptr<MediaFrame>& m) MediaRecorder::onFrame(const std::string& name, const std::shared_ptr<MediaFrame>& frame)
{ {
if (!isRecording_) if (!isRecording_)
return; return;
std::string name;
if (dynamic_cast<AudioReceiveThread*>(ob))
name = "a:remote";
else if (dynamic_cast<AudioInput*>(ob))
name = "a:local";
else if (dynamic_cast<video::VideoReceiveThread*>(ob))
name = "v:remote";
else if (dynamic_cast<video::VideoInput*>(ob))
name = "v:local";
// copy frame to not mess with the original frame's pts (does not actually copy frame data) // copy frame to not mess with the original frame's pts (does not actually copy frame data)
MediaFrame clone; MediaFrame clone;
clone.copyFrom(*m); clone.copyFrom(*frame);
clone.pointer()->pts -= streams_[name].firstTimestamp; clone.pointer()->pts -= streams_[name]->info.firstTimestamp;
if (clone.pointer()->width > 0 && clone.pointer()->height > 0) if (clone.pointer()->width > 0 && clone.pointer()->height > 0)
videoFilter_->feedInput(clone.pointer(), name); videoFilter_->feedInput(clone.pointer(), name);
else else
...@@ -262,19 +262,19 @@ MediaRecorder::setupVideoOutput() ...@@ -262,19 +262,19 @@ MediaRecorder::setupVideoOutput()
{ {
MediaStream encoderStream, peer, local; MediaStream encoderStream, peer, local;
auto it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){ auto it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){
return pair.second.isVideo && pair.second.name.find("remote") != std::string::npos; return pair.second->info.isVideo && pair.second->info.name.find("remote") != std::string::npos;
}); });
if (it != streams_.end()) { if (it != streams_.end()) {
peer = it->second; peer = it->second->info;
} }
it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){ it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){
return pair.second.isVideo && pair.second.name.find("local") != std::string::npos; return pair.second->info.isVideo && pair.second->info.name.find("local") != std::string::npos;
}); });
if (it != streams_.end()) { if (it != streams_.end()) {
local = it->second; local = it->second->info;
} }
// vp8 supports only yuv420p // vp8 supports only yuv420p
...@@ -348,19 +348,19 @@ MediaRecorder::setupAudioOutput() ...@@ -348,19 +348,19 @@ MediaRecorder::setupAudioOutput()
{ {
MediaStream encoderStream, peer, local; MediaStream encoderStream, peer, local;
auto it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){ auto it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){
return !pair.second.isVideo && pair.second.name.find("remote") != std::string::npos; return !pair.second->info.isVideo && pair.second->info.name.find("remote") != std::string::npos;
}); });
if (it != streams_.end()) { if (it != streams_.end()) {
peer = it->second; peer = it->second->info;
} }
it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){ it = std::find_if(streams_.begin(), streams_.end(), [](const auto& pair){
return !pair.second.isVideo && pair.second.name.find("local") != std::string::npos; return !pair.second->info.isVideo && pair.second->info.name.find("local") != std::string::npos;
}); });
if (it != streams_.end()) { if (it != streams_.end()) {
local = it->second; local = it->second->info;
} }
// resample to common audio format, so any player can play the file // resample to common audio format, so any player can play the file
......
...@@ -27,9 +27,6 @@ ...@@ -27,9 +27,6 @@
#include "media_stream.h" #include "media_stream.h"
#include "noncopyable.h" #include "noncopyable.h"
#include "observer.h" #include "observer.h"
#ifdef RING_VIDEO
#include "video/video_base.h"
#endif
#include <map> #include <map>
#include <memory> #include <memory>
...@@ -41,8 +38,7 @@ ...@@ -41,8 +38,7 @@
namespace ring { namespace ring {
class MediaRecorder : public Observer<std::shared_ptr<MediaFrame>> class MediaRecorder : public std::enable_shared_from_this<MediaRecorder>
, public std::enable_shared_from_this<MediaRecorder>
{ {
public: public:
MediaRecorder(); MediaRecorder();
...@@ -84,8 +80,14 @@ public: ...@@ -84,8 +80,14 @@ public:
void setMetadata(const std::string& title, const std::string& desc); void setMetadata(const std::string& title, const std::string& desc);
/** /**
* Adds a stream to the recorder. Caller must then attach this to the media source.
*/ */
int addStream(const MediaStream& ms); Observer<std::shared_ptr<MediaFrame>>* addStream(const MediaStream& ms);
/**
* Gets the stream observer so the caller can detach it from the media source.
*/
Observer<std::shared_ptr<MediaFrame>>* getStream(const std::string& name) const;
/** /**
* Starts the record. Streams must have been added using Observable::attach and * Starts the record. Streams must have been added using Observable::attach and
...@@ -98,14 +100,29 @@ public: ...@@ -98,14 +100,29 @@ public:
*/ */
void stopRecording(); void stopRecording();
/**
* Updates the recorder with an audio or video frame.
*/
void update(Observable<std::shared_ptr<MediaFrame>>* ob, const std::shared_ptr<MediaFrame>& a) override;
private: private:
NON_COPYABLE(MediaRecorder); NON_COPYABLE(MediaRecorder);
struct StreamObserver : public Observer<std::shared_ptr<MediaFrame>> {
const MediaStream info;
StreamObserver(const MediaStream& ms, std::function<void(const std::shared_ptr<MediaFrame>&)> func)
: info(ms), cb_(func)
{};
~StreamObserver() {};
void update(Observable<std::shared_ptr<MediaFrame>>* /*ob*/, const std::shared_ptr<MediaFrame>& m) override
{
cb_(m);
}
private:
std::function<void(const std::shared_ptr<MediaFrame>&)> cb_;
};
void onFrame(const std::string& name, const std::shared_ptr<MediaFrame>& frame);
void flush(); void flush();
void reset(); void reset();
...@@ -117,7 +134,7 @@ private: ...@@ -117,7 +134,7 @@ private:
std::mutex mutex_; // protect against concurrent file writes std::mutex mutex_; // protect against concurrent file writes
std::map<std::string, const MediaStream> streams_; std::map<std::string, std::unique_ptr<StreamObserver>> streams_;
std::string path_; std::string path_;
std::tm startTime_; std::tm startTime_;
......
...@@ -568,22 +568,30 @@ void ...@@ -568,22 +568,30 @@ void
VideoRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec) VideoRtpSession::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{ {
if (receiveThread_) { if (receiveThread_) {
receiveThread_->attach(rec.get()); if (auto ob = rec->addStream(receiveThread_->getInfo())) {
rec->addStream(receiveThread_->getInfo()); receiveThread_->attach(ob);
}
} }
if (auto vidInput = std::static_pointer_cast<VideoInput>(videoLocal_)) { if (auto input = std::static_pointer_cast<VideoInput>(videoLocal_)) {
vidInput->attach(rec.get()); if (auto ob = rec->addStream(input->getInfo())) {
rec->addStream(vidInput->getInfo()); input->attach(ob);
}
} }
} }
void void
VideoRtpSession::deinitRecorder(std::shared_ptr<MediaRecorder>& rec) VideoRtpSession::deinitRecorder(std::shared_ptr<MediaRecorder>& rec)
{ {
if (receiveThread_) if (receiveThread_) {
receiveThread_->detach(rec.get()); if (auto ob = rec->getStream(receiveThread_->getInfo().name)) {
if (auto vidInput = std::static_pointer_cast<VideoInput>(videoLocal_)) receiveThread_->detach(ob);
vidInput->detach(rec.get()); }
}
if (auto input = std::static_pointer_cast<VideoInput>(videoLocal_)) {
if (auto ob = rec->getStream(input->getInfo().name)) {
input->detach(ob);
}
}
} }
}} // namespace ring::video }} // namespace ring::video
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment