Commit b9a1978c authored by Tristan Matthews's avatar Tristan Matthews

* #18668: audiortp: use pthreads, favour composition over inheritance

parent f2ef16f2
......@@ -132,7 +132,7 @@ void AudioRtpFactory::start(const std::vector<AudioCodec*> &audioCodecs)
if (rtpSession_ == NULL)
throw AudioRtpFactoryException("RTP session was null when trying to start audio thread");
rtpSession_->startRtpThread(audioCodecs);
rtpSession_->startRtpThreads(audioCodecs);
}
void AudioRtpFactory::stop()
......
......@@ -40,7 +40,7 @@
#include "manager.h"
namespace sfl {
AudioRtpSession::AudioRtpSession(SIPCall &call, ost::RTPDataQueue &queue, ost::Thread &thread) :
AudioRtpSession::AudioRtpSession(SIPCall &call, ost::RTPDataQueue &queue) :
AudioRtpRecordHandler(call)
, call_(call)
, timestamp_(0)
......@@ -51,7 +51,6 @@ AudioRtpSession::AudioRtpSession(SIPCall &call, ost::RTPDataQueue &queue, ost::T
, remote_ip_()
, remote_port_(0)
, timestampCount_(0)
, thread_(thread)
{
queue_.setTypeOfService(ost::RTPDataQueue::tosEnhanced);
}
......@@ -232,13 +231,9 @@ void AudioRtpSession::updateDestinationIpAddress()
}
void AudioRtpSession::startRtpThread(const std::vector<AudioCodec*> &audioCodecs)
void AudioRtpSession::prepareRtpReceiveThread(const std::vector<AudioCodec*> &audioCodecs)
{
if (isStarted_)
return;
DEBUG("Starting main thread");
DEBUG("Preparing receiving thread");
isStarted_ = true;
setSessionTimeouts();
setSessionMedia(audioCodecs);
......@@ -248,7 +243,6 @@ void AudioRtpSession::startRtpThread(const std::vector<AudioCodec*> &audioCodecs
#endif
queue_.enableStack();
thread_.start();
}
......@@ -263,4 +257,15 @@ int AudioRtpSession::getIncrementForDTMF() const
return timestampIncrement_;
}
void AudioRtpSession::startRtpThreads(const std::vector<AudioCodec*> &audioCodecs)
{
if (isStarted_)
return;
prepareRtpReceiveThread(audioCodecs);
// implemented in subclasses
startReceiveThread();
startSendThread();
}
}
......@@ -40,9 +40,6 @@
#include "noncopyable.h"
class SIPCall;
namespace ost {
class Thread;
}
namespace sfl {
......@@ -54,12 +51,12 @@ class AudioRtpSession : public AudioRtpRecordHandler {
* Constructor
* @param sipcall The pointer on the SIP call
*/
AudioRtpSession(SIPCall &sipcall, ost::RTPDataQueue &queue, ost::Thread &thread);
AudioRtpSession(SIPCall &sipcall, ost::RTPDataQueue &queue);
virtual ~AudioRtpSession();
void updateSessionMedia(const std::vector<AudioCodec*> &audioCodecs);
virtual void startRtpThread(const std::vector<AudioCodec*> &audioCodecs);
void startRtpThreads(const std::vector<AudioCodec*> &audioCodecs);
/**
* Used mostly when receiving a reinvite
......@@ -77,6 +74,10 @@ class AudioRtpSession : public AudioRtpRecordHandler {
virtual std::vector<uint8> getLocalMasterSalt() const = 0;
protected:
ost::RTPDataQueue &queue_;
bool isStarted_;
void prepareRtpReceiveThread(const std::vector<AudioCodec*> &audioCodecs);
/**
* Set the audio codec for this RTP session
*/
......@@ -117,11 +118,10 @@ class AudioRtpSession : public AudioRtpRecordHandler {
*/
unsigned int transportRate_;
ost::RTPDataQueue &queue_;
bool isStarted_;
private:
NON_COPYABLE(AudioRtpSession);
virtual void startReceiveThread() = 0;
virtual void startSendThread() = 0;
/**
* Set RTP Sockets send/receive timeouts
......@@ -152,8 +152,6 @@ class AudioRtpSession : public AudioRtpRecordHandler {
* Timestamp reset frequency specified in number of packet sent
*/
short timestampCount_;
ost::Thread &thread_;
};
}
......
......@@ -39,31 +39,44 @@
namespace sfl {
AudioSymmetricRtpSession::AudioSymmetricRtpSession(SIPCall &call) :
ost::TimerPort()
, ost::SymmetricRTPSession(ost::InetHostAddress(call.getLocalIp().c_str()), call.getLocalAudioPort())
, AudioRtpSession(call, *this, *this)
, rtpThread_(*this)
, audioCodecs_()
ost::SymmetricRTPSession(ost::InetHostAddress(call.getLocalIp().c_str()), call.getLocalAudioPort())
, AudioRtpSession(call, *this)
, rtpSendThread_(*this)
{
DEBUG("Setting new RTP session with destination %s:%d",
call_.getLocalIp().c_str(), call_.getLocalAudioPort());
audioRtpRecord_.callId_ = call_.getCallId();
}
AudioSymmetricRtpSession::~AudioSymmetricRtpSession()
AudioSymmetricRtpSession::AudioRtpSendThread::AudioRtpSendThread(AudioSymmetricRtpSession &session) :
running_(false), rtpSession_(session), thread_(0)
{}
AudioSymmetricRtpSession::AudioRtpSendThread::~AudioRtpSendThread()
{
if (rtpThread_.running_) {
rtpThread_.running_ = false;
rtpThread_.join();
}
running_ = false;
if (thread_)
pthread_join(thread_, NULL);
}
AudioSymmetricRtpSession::AudioRtpThread::AudioRtpThread(AudioSymmetricRtpSession &session) : running_(true), rtpSession_(session)
{}
void AudioSymmetricRtpSession::AudioRtpSendThread::start()
{
running_ = true;
pthread_create(&thread_, NULL, &runCallback, this);
}
void AudioSymmetricRtpSession::AudioRtpThread::run()
void *
AudioSymmetricRtpSession::AudioRtpSendThread::runCallback(void *data)
{
TimerPort::setTimer(rtpSession_.transportRate_);
AudioSymmetricRtpSession::AudioRtpSendThread *context = static_cast<AudioSymmetricRtpSession::AudioRtpSendThread*>(data);
context->run();
return NULL;
}
void AudioSymmetricRtpSession::AudioRtpSendThread::run()
{
ost::TimerPort::setTimer(rtpSession_.transportRate_);
const int MS_TO_USEC = 1000;
while (running_) {
// Send session
......@@ -72,20 +85,19 @@ void AudioSymmetricRtpSession::AudioRtpThread::run()
else
rtpSession_.sendMicData();
Thread::sleep(TimerPort::getTimer());
usleep(ost::TimerPort::getTimer() * MS_TO_USEC);
TimerPort::incTimer(rtpSession_.transportRate_);
ost::TimerPort::incTimer(rtpSession_.transportRate_);
}
}
void AudioSymmetricRtpSession::startRtpThread(const std::vector<AudioCodec*> &audioCodecs)
void AudioSymmetricRtpSession::startReceiveThread()
{
DEBUG("Starting main thread");
if (isStarted_)
return;
ost::SymmetricRTPSession::start();
}
audioCodecs_ = audioCodecs;
AudioRtpSession::startRtpThread(audioCodecs);
startSymmetricRtpThread();
void AudioSymmetricRtpSession::startSendThread()
{
rtpSendThread_.start();
}
}
......@@ -60,16 +60,11 @@ class AudioSymmetricRtpSession : public ost::TimerPort, public ost::SymmetricRTP
* @param call The SIP call
*/
AudioSymmetricRtpSession(SIPCall &call);
~AudioSymmetricRtpSession();
virtual bool onRTPPacketRecv(ost::IncomingRTPPkt& pkt) {
return AudioRtpSession::onRTPPacketRecv(pkt);
}
void startSymmetricRtpThread() {
rtpThread_.start();
}
virtual void setLocalMasterKey(const std::vector<uint8>& key UNUSED) const {}
virtual void setLocalMasterSalt(const std::vector<uint8>& key UNUSED) const {};
......@@ -83,22 +78,25 @@ class AudioSymmetricRtpSession : public ost::TimerPort, public ost::SymmetricRTP
private:
NON_COPYABLE(AudioSymmetricRtpSession);
class AudioRtpThread : public ost::Thread, public ost::TimerPort {
class AudioRtpSendThread : public ost::TimerPort {
public:
AudioRtpThread(AudioSymmetricRtpSession &session);
virtual void run();
AudioRtpSendThread(AudioSymmetricRtpSession &session);
~AudioRtpSendThread();
void start();
bool running_;
private:
NON_COPYABLE(AudioRtpThread);
NON_COPYABLE(AudioRtpSendThread);
static void *runCallback(void *data);
void run();
pthread_t thread_;
AudioSymmetricRtpSession &rtpSession_;
};
void startRtpThread(const std::vector<AudioCodec*> &audioCodecs);
AudioRtpThread rtpThread_ ;
std::vector<sfl::AudioCodec*> audioCodecs_;
void startReceiveThread();
void startSendThread();
AudioRtpSendThread rtpSendThread_ ;
};
}
......
......@@ -48,9 +48,9 @@ namespace sfl {
AudioZrtpSession::AudioZrtpSession(SIPCall &call, const std::string &zidFilename) :
ost::SymmetricZRTPSession(ost::InetHostAddress(call.getLocalIp().c_str()), call.getLocalAudioPort())
, AudioRtpSession(call, *this, *this)
, AudioRtpSession(call, *this)
, zidFilename_(zidFilename)
, rtpThread_(*this)
, rtpSendThread_(*this)
{
initializeZid();
DEBUG("Setting new RTP session with destination %s:%d",
......@@ -58,15 +58,6 @@ AudioZrtpSession::AudioZrtpSession(SIPCall &call, const std::string &zidFilename
audioRtpRecord_.callId_ = call_.getCallId();
}
AudioZrtpSession::~AudioZrtpSession()
{
if (rtpThread_.running_) {
rtpThread_.running_ = false;
rtpThread_.join();
}
}
void AudioZrtpSession::initializeZid()
{
if (zidFilename_.empty())
......@@ -121,10 +112,32 @@ void AudioZrtpSession::sendMicData()
queue_.sendImmediate(timestamp_, getMicDataEncoded(), compSize);
}
AudioZrtpSession::AudioZrtpThread::AudioZrtpThread(AudioZrtpSession &session) : running_(true), zrtpSession_(session)
AudioZrtpSession::AudioZrtpSendThread::AudioZrtpSendThread(AudioZrtpSession &session) :
running_(true), zrtpSession_(session), thread_(0)
{}
void AudioZrtpSession::AudioZrtpThread::run()
AudioZrtpSession::AudioZrtpSendThread::~AudioZrtpSendThread()
{
running_ = false;
if (thread_)
pthread_join(thread_, NULL);
}
void
AudioZrtpSession::AudioZrtpSendThread::start()
{
pthread_create(&thread_, NULL, &runCallback, this);
}
void *
AudioZrtpSession::AudioZrtpSendThread::runCallback(void *data)
{
AudioZrtpSession::AudioZrtpSendThread *context = static_cast<AudioZrtpSession::AudioZrtpSendThread*>(data);
context->run();
return NULL;
}
void AudioZrtpSession::AudioZrtpSendThread::run()
{
DEBUG("Entering Audio zrtp thread main loop %s", running_ ? "running" : "not running");
......@@ -150,13 +163,14 @@ int AudioZrtpSession::getIncrementForDTMF() const
return 160;
}
void AudioZrtpSession::startRtpThread(const std::vector<AudioCodec*> &audioCodecs)
void AudioZrtpSession::startReceiveThread()
{
if (isStarted_)
return;
ost::SymmetricZRTPSession::start();
}
AudioRtpSession::startRtpThread(audioCodecs);
startZrtpThread();
void AudioZrtpSession::startSendThread()
{
rtpSendThread_.start();
}
}
......@@ -60,11 +60,6 @@ class AudioZrtpSession :
public AudioRtpSession {
public:
AudioZrtpSession(SIPCall &call, const std::string& zidFilename);
~AudioZrtpSession();
void startZrtpThread() {
rtpThread_.start();
}
virtual bool onRTPPacketRecv(ost::IncomingRTPPkt &pkt) {
return AudioRtpSession::onRTPPacketRecv(pkt);
......@@ -81,25 +76,28 @@ class AudioZrtpSession :
private:
NON_COPYABLE(AudioZrtpSession);
class AudioZrtpThread : public ost::Thread, public ost::TimerPort {
class AudioZrtpSendThread : public ost::TimerPort {
public:
AudioZrtpThread(AudioZrtpSession &session);
virtual void run();
AudioZrtpSendThread(AudioZrtpSession &session);
~AudioZrtpSendThread();
void start();
bool running_;
private:
NON_COPYABLE(AudioZrtpThread);
static void *runCallback(void *data);
void run();
NON_COPYABLE(AudioZrtpSendThread);
AudioZrtpSession &zrtpSession_;
pthread_t thread_;
};
void sendMicData();
void initializeZid();
std::string zidFilename_;
void startRtpThread(const std::vector<AudioCodec*> &audioCodecs);
void startReceiveThread();
void startSendThread();
virtual int getIncrementForDTMF() const;
AudioZrtpThread rtpThread_;
AudioZrtpSendThread rtpSendThread_;
};
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment