From b912714a96e23590edcbabf7f7eac716e3e45bb0 Mon Sep 17 00:00:00 2001 From: Tristan Matthews <tristan.matthews@savoirfairelinux.com> Date: Fri, 9 May 2014 16:40:43 -0400 Subject: [PATCH] video: do threading via composition not inheritance Rationale: it's easier to reason about the thread lifecycle. The new ThreadLoop class has the same API as the old SFLThread class, but wraps an std::thread instead of a pthread, exits on exceptional errors via an exception and calls its owner's methods via function handles (setup, process and cleanup) Refs #47343 Change-Id: Ie1eec81ec53cdc5bb6f00a89a916c5c8f9abeb4f --- daemon/src/Makefile.am | 4 +- daemon/src/{sflthread.cpp => threadloop.cpp} | 58 +++++++++++--------- daemon/src/{sflthread.h => threadloop.h} | 39 +++++++------ daemon/src/video/check.h | 2 +- daemon/src/video/video_input.cpp | 12 ++-- daemon/src/video/video_input.h | 9 ++- daemon/src/video/video_mixer.h | 1 - daemon/src/video/video_receive_thread.cpp | 20 +++++-- daemon/src/video/video_receive_thread.h | 9 ++- daemon/src/video/video_rtp_session.cpp | 2 +- 10 files changed, 89 insertions(+), 67 deletions(-) rename daemon/src/{sflthread.cpp => threadloop.cpp} (63%) rename daemon/src/{sflthread.h => threadloop.h} (67%) diff --git a/daemon/src/Makefile.am b/daemon/src/Makefile.am index 8c03a90cd2..cc610fdf19 100644 --- a/daemon/src/Makefile.am +++ b/daemon/src/Makefile.am @@ -85,9 +85,9 @@ libsflphone_la_SOURCES = conference.cpp \ logger.cpp \ numbercleaner.cpp \ fileutils.cpp \ - sflthread.cpp \ + threadloop.cpp \ ip_utils.cpp \ - sflthread.h \ + threadloop.h \ conference.h \ voiplink.h \ preferences.h \ diff --git a/daemon/src/sflthread.cpp b/daemon/src/threadloop.cpp similarity index 63% rename from daemon/src/sflthread.cpp rename to daemon/src/threadloop.cpp index 669714bf60..f175aae322 100644 --- a/daemon/src/sflthread.cpp +++ b/daemon/src/threadloop.cpp @@ -29,63 +29,67 @@ * as that of the covered work. */ -#include "sflthread.h" +#include "threadloop.h" #include "logger.h" -void* SFLThread::run_(void* data) +void ThreadLoop::mainloop() { - SFLThread *obj = static_cast<SFLThread*>(data); - obj->mainloop_(); - return nullptr; -} - -void SFLThread::mainloop_() -{ - if (setup()) { - while (running_) - process(); - cleanup(); - } else - ERROR("setup failed"); + try { + if (setup_()) { + while (running_) + process_(); + cleanup_(); + } else { + ERROR("setup failed"); + } + } catch (const ThreadLoopException &e) { + ERROR("%s", e.what()); + } } -SFLThread::SFLThread() : thread_(), running_(false) +ThreadLoop::ThreadLoop(const std::function<bool()> &setup, + const std::function<void()> &process, + const std::function<void()> &cleanup) + : setup_(setup), process_(process), cleanup_(cleanup) {} -SFLThread::~SFLThread() +ThreadLoop::~ThreadLoop() { if (isRunning()) { - stop(); + ERROR("Thread not stopped by owner"); join(); } } -void SFLThread::start() +void ThreadLoop::start() { if (!running_) { running_ = true; - pthread_create(&thread_, NULL, &run_, this); + thread_ = std::thread(&ThreadLoop::mainloop, this); + } else { + ERROR("Thread already started"); } } -void SFLThread::stop() +void ThreadLoop::stop() { running_ = false; } -void SFLThread::join() +void ThreadLoop::join() { - if (thread_) - pthread_join(thread_, NULL); + stop(); + if (thread_.joinable()) + thread_.join(); } -void SFLThread::exit() +void ThreadLoop::exit() { stop(); - pthread_exit(NULL); + throw ThreadLoopException(); } -bool SFLThread::isRunning() +bool ThreadLoop::isRunning() const { return running_; } diff --git a/daemon/src/sflthread.h b/daemon/src/threadloop.h similarity index 67% rename from daemon/src/sflthread.h rename to daemon/src/threadloop.h index 08c1c09011..a41eb22786 100644 --- a/daemon/src/sflthread.h +++ b/daemon/src/threadloop.h @@ -29,35 +29,42 @@ * as that of the covered work. */ -#ifndef __SFLTHREAD_H__ -#define __SFLTHREAD_H__ +#ifndef __THREADLOOP_H__ +#define __THREADLOOP_H__ -#include <pthread.h> #include <atomic> +#include <thread> +#include <functional> +#include <stdexcept> -class SFLThread { +struct ThreadLoopException : public std::runtime_error { + ThreadLoopException() : std::runtime_error("ThreadLoopException") {} +}; + +class ThreadLoop { public: - SFLThread(); - virtual ~SFLThread(); + ThreadLoop(const std::function<bool()> &setup, + const std::function<void()> &process, + const std::function<void()> &cleanup); + ~ThreadLoop(); void start(); -protected: void exit(); void stop(); void join(); - bool isRunning(); + bool isRunning() const; private: - virtual bool setup() { return true; }; - virtual void process() {}; - virtual void cleanup() {}; + // These must be provided by users of ThreadLoop + std::function<bool()> setup_; + std::function<void()> process_; + std::function<void()> cleanup_; - static void* run_(void*); - void mainloop_(); - pthread_t thread_; + void mainloop(); - std::atomic<bool> running_; + std::atomic<bool> running_ = {false}; + std::thread thread_ = {}; }; -#endif // __SFLTHREAD_H__ +#endif // __THREADLOOP_H__ diff --git a/daemon/src/video/check.h b/daemon/src/video/check.h index 5fb7eb92b9..388268a555 100644 --- a/daemon/src/video/check.h +++ b/daemon/src/video/check.h @@ -35,6 +35,6 @@ // If condition A is false, print the error message in M and exit thread #define EXIT_IF_FAIL(A, M, ...) if (!(A)) { \ - ERROR(M, ##__VA_ARGS__); this->exit(); } + ERROR(M, ##__VA_ARGS__); loop_.exit(); } #endif // CHECK_H_ diff --git a/daemon/src/video/video_input.cpp b/daemon/src/video/video_input.cpp index 3593007d52..61e78ad707 100644 --- a/daemon/src/video/video_input.cpp +++ b/daemon/src/video/video_input.cpp @@ -44,14 +44,16 @@ namespace sfl_video { VideoInput::VideoInput() : VideoGenerator::VideoGenerator() , sink_() + , loop_(std::bind(&VideoInput::setup, this), + std::bind(&VideoInput::process, this), + std::bind(&VideoInput::cleanup, this)) { - start(); + loop_.start(); } VideoInput::~VideoInput() { - stop(); - join(); + loop_.join(); } bool VideoInput::setup() @@ -86,7 +88,7 @@ void VideoInput::cleanup() int VideoInput::interruptCb(void *data) { VideoInput *context = static_cast<VideoInput*>(data); - return not context->isRunning(); + return not context->loop_.isRunning(); } bool VideoInput::captureFrame() @@ -100,7 +102,7 @@ bool VideoInput::captureFrame() case VideoDecoder::Status::ReadError: case VideoDecoder::Status::DecodeError: - stop(); + loop_.stop(); // fallthrough case VideoDecoder::Status::Success: return false; diff --git a/daemon/src/video/video_input.h b/daemon/src/video/video_input.h index 45569962d8..e3e0671ad4 100644 --- a/daemon/src/video/video_input.h +++ b/daemon/src/video/video_input.h @@ -37,7 +37,7 @@ #include "noncopyable.h" #include "shm_sink.h" #include "video_decoder.h" -#include "sflthread.h" +#include "threadloop.h" #include <map> #include <atomic> @@ -45,9 +45,7 @@ namespace sfl_video { -class VideoInput : - public VideoGenerator, - public SFLThread +class VideoInput : public VideoGenerator { public: VideoInput(); @@ -75,6 +73,7 @@ private: std::string input_ = ""; std::string format_ = ""; bool emulateRate_ = false; + ThreadLoop loop_; void createDecoder(); void deleteDecoder(); @@ -83,7 +82,7 @@ private: bool initX11(std::string display); bool initFile(std::string path); - // as SFLThread + // for ThreadLoop bool setup(); void process(); void cleanup(); diff --git a/daemon/src/video/video_mixer.h b/daemon/src/video/video_mixer.h index ce107bdfa5..5baeb6e149 100644 --- a/daemon/src/video/video_mixer.h +++ b/daemon/src/video/video_mixer.h @@ -36,7 +36,6 @@ #include "video_base.h" #include "video_scaler.h" #include "shm_sink.h" -#include "sflthread.h" #include <mutex> #include <list> diff --git a/daemon/src/video/video_receive_thread.cpp b/daemon/src/video/video_receive_thread.cpp index e34e742ee5..09ee7bd4c0 100644 --- a/daemon/src/video/video_receive_thread.cpp +++ b/daemon/src/video/video_receive_thread.cpp @@ -59,12 +59,20 @@ VideoReceiveThread::VideoReceiveThread(const std::string& id, , demuxContext_() , sink_(id) , requestKeyFrameCallback_(0) + , loop_(std::bind(&VideoReceiveThread::setup, this), + std::bind(&VideoReceiveThread::process, this), + std::bind(&VideoReceiveThread::cleanup, this)) {} VideoReceiveThread::~VideoReceiveThread() { - stop(); - join(); + loop_.join(); +} + +void +VideoReceiveThread::startLoop() +{ + loop_.start(); } // We do this setup here instead of the constructor because we don't want the @@ -162,7 +170,7 @@ void VideoReceiveThread::cleanup() int VideoReceiveThread::interruptCb(void *data) { VideoReceiveThread *context = static_cast<VideoReceiveThread*>(data); - return not context->isRunning(); + return not context->loop_.isRunning(); } int VideoReceiveThread::readFunction(void *opaque, uint8_t *buf, int buf_size) @@ -199,7 +207,7 @@ bool VideoReceiveThread::decodeFrame() // fallthrough if we can't request keyframe case VideoDecoder::Status::ReadError: ERROR("VideoDecoder fatal error, stopping it..."); - stop(); + loop_.stop(); default: break; @@ -211,7 +219,7 @@ bool VideoReceiveThread::decodeFrame() void VideoReceiveThread::enterConference() { - if (!isRunning()) + if (!loop_.isRunning()) return; if (detach(&sink_)) { @@ -222,7 +230,7 @@ void VideoReceiveThread::enterConference() void VideoReceiveThread::exitConference() { - if (!isRunning()) + if (!loop_.isRunning()) return; if (dstWidth_ > 0 && dstHeight_ > 0) { diff --git a/daemon/src/video/video_receive_thread.h b/daemon/src/video/video_receive_thread.h index 37b01b6a89..cfab33494c 100644 --- a/daemon/src/video/video_receive_thread.h +++ b/daemon/src/video/video_receive_thread.h @@ -33,7 +33,7 @@ #include "video_decoder.h" #include "shm_sink.h" -#include "sflthread.h" +#include "threadloop.h" #include "noncopyable.h" #include <map> @@ -46,11 +46,12 @@ namespace sfl_video { class SocketPair; -class VideoReceiveThread : public VideoGenerator, public SFLThread { +class VideoReceiveThread : public VideoGenerator { public: VideoReceiveThread(const std::string &id, const std::map<std::string, std::string> &args); ~VideoReceiveThread(); + void startLoop(); void addIOContext(SocketPair &socketPair); void setRequestKeyFrameCallback(void (*)(const std::string &)); @@ -86,7 +87,9 @@ private: static int readFunction(void *opaque, uint8_t *buf, int buf_size); - // as SFLThread + ThreadLoop loop_; + + // used by ThreadLoop bool setup(); void process(); void cleanup(); diff --git a/daemon/src/video/video_rtp_session.cpp b/daemon/src/video/video_rtp_session.cpp index d3c8274814..cbc99beb33 100644 --- a/daemon/src/video/video_rtp_session.cpp +++ b/daemon/src/video/video_rtp_session.cpp @@ -172,7 +172,7 @@ void VideoRtpSession::startReceiver() receiveThread_.reset(new VideoReceiveThread(callID_, rxArgs_)); receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest); receiveThread_->addIOContext(*socketPair_); - receiveThread_->start(); + receiveThread_->startLoop(); } else { DEBUG("Video receiving disabled"); if (receiveThread_) -- GitLab