Commit b912714a authored by Tristan Matthews's avatar Tristan Matthews

video: do threading via composition not inheritance

Rationale: it's easier to reason about the thread lifecycle. The new
ThreadLoop class has the same API as the old SFLThread class, but wraps
an std::thread instead of a pthread, exits on exceptional errors via an
exception and calls its owner's methods via function handles (setup,
process and cleanup)

Refs #47343

Change-Id: Ie1eec81ec53cdc5bb6f00a89a916c5c8f9abeb4f
parent fbc862ee
......@@ -85,9 +85,9 @@ libsflphone_la_SOURCES = conference.cpp \
logger.cpp \
numbercleaner.cpp \
fileutils.cpp \
sflthread.cpp \
threadloop.cpp \
ip_utils.cpp \
sflthread.h \
threadloop.h \
conference.h \
voiplink.h \
preferences.h \
......
......@@ -29,63 +29,67 @@
* as that of the covered work.
*/
#include "sflthread.h"
#include "threadloop.h"
#include "logger.h"
void* SFLThread::run_(void* data)
void ThreadLoop::mainloop()
{
SFLThread *obj = static_cast<SFLThread*>(data);
obj->mainloop_();
return nullptr;
}
void SFLThread::mainloop_()
{
if (setup()) {
while (running_)
process();
cleanup();
} else
ERROR("setup failed");
try {
if (setup_()) {
while (running_)
process_();
cleanup_();
} else {
ERROR("setup failed");
}
} catch (const ThreadLoopException &e) {
ERROR("%s", e.what());
}
}
SFLThread::SFLThread() : thread_(), running_(false)
ThreadLoop::ThreadLoop(const std::function<bool()> &setup,
const std::function<void()> &process,
const std::function<void()> &cleanup)
: setup_(setup), process_(process), cleanup_(cleanup)
{}
SFLThread::~SFLThread()
ThreadLoop::~ThreadLoop()
{
if (isRunning()) {
stop();
ERROR("Thread not stopped by owner");
join();
}
}
void SFLThread::start()
void ThreadLoop::start()
{
if (!running_) {
running_ = true;
pthread_create(&thread_, NULL, &run_, this);
thread_ = std::thread(&ThreadLoop::mainloop, this);
} else {
ERROR("Thread already started");
}
}
void SFLThread::stop()
void ThreadLoop::stop()
{
running_ = false;
}
void SFLThread::join()
void ThreadLoop::join()
{
if (thread_)
pthread_join(thread_, NULL);
stop();
if (thread_.joinable())
thread_.join();
}
void SFLThread::exit()
void ThreadLoop::exit()
{
stop();
pthread_exit(NULL);
throw ThreadLoopException();
}
bool SFLThread::isRunning()
bool ThreadLoop::isRunning() const
{
return running_;
}
......@@ -29,35 +29,42 @@
* as that of the covered work.
*/
#ifndef __SFLTHREAD_H__
#define __SFLTHREAD_H__
#ifndef __THREADLOOP_H__
#define __THREADLOOP_H__
#include <pthread.h>
#include <atomic>
#include <thread>
#include <functional>
#include <stdexcept>
class SFLThread {
struct ThreadLoopException : public std::runtime_error {
ThreadLoopException() : std::runtime_error("ThreadLoopException") {}
};
class ThreadLoop {
public:
SFLThread();
virtual ~SFLThread();
ThreadLoop(const std::function<bool()> &setup,
const std::function<void()> &process,
const std::function<void()> &cleanup);
~ThreadLoop();
void start();
protected:
void exit();
void stop();
void join();
bool isRunning();
bool isRunning() const;
private:
virtual bool setup() { return true; };
virtual void process() {};
virtual void cleanup() {};
// These must be provided by users of ThreadLoop
std::function<bool()> setup_;
std::function<void()> process_;
std::function<void()> cleanup_;
static void* run_(void*);
void mainloop_();
pthread_t thread_;
void mainloop();
std::atomic<bool> running_;
std::atomic<bool> running_ = {false};
std::thread thread_ = {};
};
#endif // __SFLTHREAD_H__
#endif // __THREADLOOP_H__
......@@ -35,6 +35,6 @@
// If condition A is false, print the error message in M and exit thread
#define EXIT_IF_FAIL(A, M, ...) if (!(A)) { \
ERROR(M, ##__VA_ARGS__); this->exit(); }
ERROR(M, ##__VA_ARGS__); loop_.exit(); }
#endif // CHECK_H_
......@@ -44,14 +44,16 @@ namespace sfl_video {
VideoInput::VideoInput() :
VideoGenerator::VideoGenerator()
, sink_()
, loop_(std::bind(&VideoInput::setup, this),
std::bind(&VideoInput::process, this),
std::bind(&VideoInput::cleanup, this))
{
start();
loop_.start();
}
VideoInput::~VideoInput()
{
stop();
join();
loop_.join();
}
bool VideoInput::setup()
......@@ -86,7 +88,7 @@ void VideoInput::cleanup()
int VideoInput::interruptCb(void *data)
{
VideoInput *context = static_cast<VideoInput*>(data);
return not context->isRunning();
return not context->loop_.isRunning();
}
bool VideoInput::captureFrame()
......@@ -100,7 +102,7 @@ bool VideoInput::captureFrame()
case VideoDecoder::Status::ReadError:
case VideoDecoder::Status::DecodeError:
stop();
loop_.stop();
// fallthrough
case VideoDecoder::Status::Success:
return false;
......
......@@ -37,7 +37,7 @@
#include "noncopyable.h"
#include "shm_sink.h"
#include "video_decoder.h"
#include "sflthread.h"
#include "threadloop.h"
#include <map>
#include <atomic>
......@@ -45,9 +45,7 @@
namespace sfl_video {
class VideoInput :
public VideoGenerator,
public SFLThread
class VideoInput : public VideoGenerator
{
public:
VideoInput();
......@@ -75,6 +73,7 @@ private:
std::string input_ = "";
std::string format_ = "";
bool emulateRate_ = false;
ThreadLoop loop_;
void createDecoder();
void deleteDecoder();
......@@ -83,7 +82,7 @@ private:
bool initX11(std::string display);
bool initFile(std::string path);
// as SFLThread
// for ThreadLoop
bool setup();
void process();
void cleanup();
......
......@@ -36,7 +36,6 @@
#include "video_base.h"
#include "video_scaler.h"
#include "shm_sink.h"
#include "sflthread.h"
#include <mutex>
#include <list>
......
......@@ -59,12 +59,20 @@ VideoReceiveThread::VideoReceiveThread(const std::string& id,
, demuxContext_()
, sink_(id)
, requestKeyFrameCallback_(0)
, loop_(std::bind(&VideoReceiveThread::setup, this),
std::bind(&VideoReceiveThread::process, this),
std::bind(&VideoReceiveThread::cleanup, this))
{}
VideoReceiveThread::~VideoReceiveThread()
{
stop();
join();
loop_.join();
}
void
VideoReceiveThread::startLoop()
{
loop_.start();
}
// We do this setup here instead of the constructor because we don't want the
......@@ -162,7 +170,7 @@ void VideoReceiveThread::cleanup()
int VideoReceiveThread::interruptCb(void *data)
{
VideoReceiveThread *context = static_cast<VideoReceiveThread*>(data);
return not context->isRunning();
return not context->loop_.isRunning();
}
int VideoReceiveThread::readFunction(void *opaque, uint8_t *buf, int buf_size)
......@@ -199,7 +207,7 @@ bool VideoReceiveThread::decodeFrame()
// fallthrough if we can't request keyframe
case VideoDecoder::Status::ReadError:
ERROR("VideoDecoder fatal error, stopping it...");
stop();
loop_.stop();
default:
break;
......@@ -211,7 +219,7 @@ bool VideoReceiveThread::decodeFrame()
void VideoReceiveThread::enterConference()
{
if (!isRunning())
if (!loop_.isRunning())
return;
if (detach(&sink_)) {
......@@ -222,7 +230,7 @@ void VideoReceiveThread::enterConference()
void VideoReceiveThread::exitConference()
{
if (!isRunning())
if (!loop_.isRunning())
return;
if (dstWidth_ > 0 && dstHeight_ > 0) {
......
......@@ -33,7 +33,7 @@
#include "video_decoder.h"
#include "shm_sink.h"
#include "sflthread.h"
#include "threadloop.h"
#include "noncopyable.h"
#include <map>
......@@ -46,11 +46,12 @@ namespace sfl_video {
class SocketPair;
class VideoReceiveThread : public VideoGenerator, public SFLThread {
class VideoReceiveThread : public VideoGenerator {
public:
VideoReceiveThread(const std::string &id,
const std::map<std::string, std::string> &args);
~VideoReceiveThread();
void startLoop();
void addIOContext(SocketPair &socketPair);
void setRequestKeyFrameCallback(void (*)(const std::string &));
......@@ -86,7 +87,9 @@ private:
static int readFunction(void *opaque, uint8_t *buf, int buf_size);
// as SFLThread
ThreadLoop loop_;
// used by ThreadLoop
bool setup();
void process();
void cleanup();
......
......@@ -172,7 +172,7 @@ void VideoRtpSession::startReceiver()
receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_));
receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest);
receiveThread_->addIOContext(*socketPair_);
receiveThread_->start();
receiveThread_->startLoop();
} else {
DEBUG("Video receiving disabled");
if (receiveThread_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment