Commit 4a56a7fa authored by Tristan Matthews's avatar Tristan Matthews
Browse files

* #6210: video_rtp_thread -> video_send/receive_thread

parent 6f23f6ff
......@@ -7,7 +7,8 @@ libvideo_endpoint_la_SOURCES = video_endpoint.cpp video_endpoint.h \
libav_utils.cpp libav_utils.h \
video_rtp_session.cpp video_rtp_session.h \
video_rtp_factory.cpp video_rtp_factory.h \
video_rtp_thread.h video_rtp_thread.cpp
video_send_thread.h video_send_thread.cpp \
video_receive_thread.h video_receive_thread.cpp
libvideo_endpoint_la_LIBADD = @LIBAVCODEC_LIBS@ @LIBAVFORMAT_LIBS@ @LIBAVDEVICE_LIBS@ @LIBSWSCALE_LIBS@ @CCRTP_LIBS@
AM_CXXFLAGS=@LIBAVCODEC_CFLAGS@ @LIBAVFORMAT_CFLAGS@ @LIBAVDEVICE_CFLAGS@ @LIBSWSCALE_CFLAGS@ @CCRTP_CFLAGS@
......
/*
* Copyright (C) 2004, 2005, 2006, 2009, 2008, 2009, 2010, 2011 Savoir-Faire Linux Inc.
* Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
* Additional permission under GNU GPL version 3 section 7:
*
* If you modify this program, or any covered work, by linking or
* combining it with the OpenSSL project's OpenSSL library (or a
* modified version of that library), containing parts covered by the
* terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
* grants you additional permission to convey the resulting work.
* Corresponding Source for a non-source form of such a combination
* shall include the source code for the parts of OpenSSL used as well
* as that of the covered work.
*/
#define __STDC_CONSTANT_MACROS
#include "video_receive_thread.h"
// libav includes
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/opt.h>
#include <libavdevice/avdevice.h>
#include <libswscale/swscale.h>
}
// shm includes
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h> /* semaphore functions and structs. */
#include <sys/shm.h>
namespace sfl_video {
namespace { // anonymouse namespace
#if _SEM_SEMUN_UNDEFINED
union semun
{
int val; /* value for SETVAL */
struct semid_ds *buf; /* buffer for IPC_STAT & IPC_SET */
unsigned short int *array; /* array for GETALL & SETALL */
struct seminfo *__buf; /* buffer for IPC_INFO */
};
#endif
typedef struct {
unsigned size;
unsigned width;
unsigned height;
} FrameInfo;
#define TEMPFILE "/tmp/frame.txt"
void postFrameSize(unsigned width, unsigned height, unsigned numBytes)
{
FILE *tmp = fopen(TEMPFILE, "w");
/* write to file*/
fprintf(tmp, "%u\n", numBytes);
fprintf(tmp, "%u\n", width);
fprintf(tmp, "%u\n", height);
fclose(tmp);
}
int createSemSet()
{
/* this variable will contain the semaphore set. */
int sem_set_id;
key_t key = ftok("/tmp", 'b');
/* semaphore value, for semctl(). */
union semun sem_val;
/* first we create a semaphore set with a single semaphore, */
/* whose counter is initialized to '0'. */
sem_set_id = semget(key, 1, 0600 | IPC_CREAT);
if (sem_set_id == -1) {
perror("semget");
exit(1);
}
sem_val.val = 0;
semctl(sem_set_id, 0, SETVAL, sem_val);
return sem_set_id;
}
void
cleanupSemaphore(int sem_set_id)
{
semctl(sem_set_id, 0, IPC_RMID);
}
/*
* function: sem_signal. signals the process that a frame is ready.
* input: semaphore set ID.
* output: none.
*/
void
sem_signal(int sem_set_id)
{
/* structure for semaphore operations. */
struct sembuf sem_op;
/* signal the semaphore - increase its value by one. */
sem_op.sem_num = 0;
sem_op.sem_op = 1;
sem_op.sem_flg = 0;
semop(sem_set_id, &sem_op, 1);
}
/* join and/or create a shared memory segment */
int createShm(unsigned numBytes)
{
key_t key;
int shm_id;
/* connect to and possibly create a segment with 644 permissions
(rw-r--r--) */
key = ftok("/tmp", 'c');
shm_id = shmget(key, numBytes, 0644 | IPC_CREAT);
return shm_id;
}
/* attach a shared memory segment */
uint8_t *attachShm(int shm_id)
{
uint8_t *data = NULL;
/* attach to the segment and get a pointer to it */
data = reinterpret_cast<uint8_t*>(shmat(shm_id, (void *)0, 0));
if (data == (uint8_t *)(-1)) {
perror("shmat");
data = NULL;
}
return data;
}
void detachShm(uint8_t *data)
{
/* detach from the segment: */
if (shmdt(data) == -1) {
perror("shmdt");
}
}
void destroyShm(int shm_id)
{
/* destroy it */
shmctl(shm_id, IPC_RMID, NULL);
}
void cleanupShm(int shm_id, uint8_t *data)
{
detachShm(data);
destroyShm(shm_id);
}
int bufferSizeRGB24(int width, int height)
{
int numBytes;
// determine required buffer size and allocate buffer
numBytes = avpicture_get_size(PIX_FMT_RGB24, width, height);
return numBytes * sizeof(uint8_t);
}
} // end anonymous namespace
void VideoReceiveThread::setup()
{
av_register_all();
avdevice_register_all();
AVInputFormat *file_iformat = 0;
// Open video file
if (av_open_input_file(&inputCtx_, args_["input"].c_str(), file_iformat, 0, NULL) != 0)
{
std::cerr << "Could not open input file " << args_["input"] <<
std::endl;
cleanup();
}
// retrieve stream information
if (av_find_stream_info(inputCtx_) < 0)
{
std::cerr << "Could not find stream info!" << std::endl;
cleanup();
}
// find the first video stream from the input
unsigned i;
for (i = 0; i < inputCtx_->nb_streams; i++)
{
if (inputCtx_->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO)
{
videoStreamIndex_ = i;
break;
}
}
if (videoStreamIndex_ == -1)
{
std::cerr << "Could not find video stream!" << std::endl;
cleanup();
}
// Get a pointer to the codec context for the video stream
decoderCtx_ = inputCtx_->streams[videoStreamIndex_]->codec;
// find the decoder for the video stream
AVCodec *inputDecoder = avcodec_find_decoder(decoderCtx_->codec_id);
if (inputDecoder == NULL)
{
std::cerr << "Unsupported codec!" << std::endl;
cleanup();
}
// open codec
if (avcodec_open(decoderCtx_, inputDecoder) < 0)
{
std::cerr << "Could not open codec!" << std::endl;
cleanup();
}
scaledPicture_ = avcodec_alloc_frame();
if (scaledPicture_ == 0)
{
std::cerr << "Could not allocated output frame!" << std::endl;
cleanup();
}
unsigned numBytes;
// determine required buffer size and allocate buffer
numBytes = bufferSizeRGB24(decoderCtx_->width, decoderCtx_->height);
/*printf("%u bytes\n", numBytes);*/
postFrameSize(decoderCtx_->width, decoderCtx_->height, numBytes);
// create shared memory segment and attach to it
shmID_ = createShm(numBytes);
shmBuffer_ = attachShm(shmID_);
semSetID_ = createSemSet();
// assign appropriate parts of buffer to image planes in scaledPicture
avpicture_fill(reinterpret_cast<AVPicture *>(scaledPicture_),
reinterpret_cast<uint8_t*>(shmBuffer_),
PIX_FMT_RGB24, decoderCtx_->width, decoderCtx_->height);
// allocate video frame
rawFrame_ = avcodec_alloc_frame();
}
void VideoReceiveThread::cleanup()
{
// free shared memory
cleanupSemaphore(semSetID_);
cleanupShm(shmID_, shmBuffer_);
// free the scaled frame
av_free(scaledPicture_);
// free the YUV frame
av_free(rawFrame_);
// doesn't need to be freed, we didn't use avcodec_alloc_context
avcodec_close(decoderCtx_);
// close the video file
av_close_input_file(inputCtx_);
std::cerr << "Exitting the decoder thread" << std::endl;
// exit this thread
exit();
}
SwsContext * VideoReceiveThread::createScalingContext()
{
// Create scaling context, no scaling done here
SwsContext *imgConvertCtx = sws_getContext(decoderCtx_->width,
decoderCtx_->height, decoderCtx_->pix_fmt, decoderCtx_->width,
decoderCtx_->height, PIX_FMT_RGB24, SWS_BICUBIC,
NULL, NULL, NULL);
if (imgConvertCtx == 0)
{
std::cerr << "Cannot init the conversion context!" << std::endl;
cleanup();
}
return imgConvertCtx;
}
VideoReceiveThread::VideoReceiveThread(const std::map<std::string, std::string> &args) : args_(args),
interrupted_(false) {}
void VideoReceiveThread::run()
{
setup();
AVPacket inpacket;
int frameFinished;
SwsContext *imgConvertCtx = createScalingContext();
while (not interrupted_ and av_read_frame(inputCtx_, &inpacket) >= 0)
{
// is this a packet from the video stream?
if (inpacket.stream_index == videoStreamIndex_)
{
// decode video frame from camera
avcodec_decode_video2(decoderCtx_, rawFrame_, &frameFinished, &inpacket);
if (frameFinished)
{
sws_scale(imgConvertCtx, rawFrame_->data, rawFrame_->linesize,
0, decoderCtx_->height, scaledPicture_->data,
scaledPicture_->linesize);
/* signal the semaphore that a new frame is ready */
sem_signal(semSetID_);
}
}
// free the packet that was allocated by av_read_frame
av_free_packet(&inpacket);
}
// free resources, exit thread
cleanup();
}
void VideoReceiveThread::stop()
{
// FIXME: not thread safe, add mutex
interrupted_ = true;
}
VideoReceiveThread::~VideoReceiveThread()
{
terminate();
}
} // end namespace sfl_video
/*
* Copyright (C) 2011 Savoir-Faire Linux Inc.
* Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
* Additional permission under GNU GPL version 3 section 7:
*
* If you modify this program, or any covered work, by linking or
* combining it with the OpenSSL project's OpenSSL library (or a
* modified version of that library), containing parts covered by the
* terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
* grants you additional permission to convey the resulting work.
* Corresponding Source for a non-source form of such a combination
* shall include the source code for the parts of OpenSSL used as well
* as that of the covered work.
*/
#ifndef _VIDEO_RECEIVE_THREAD_H_
#define _VIDEO_RECEIVE_THREAD_H_
#include <cc++/thread.h>
#include <map>
#include <string>
class SwsContext;
class AVCodecContext;
class AVStream;
class AVFormatContext;
class AVFrame;
namespace sfl_video {
class VideoReceiveThread : public ost::Thread {
private:
std::map<std::string, std::string> args_;
volatile int interrupted_;
/*-------------------------------------------------------------*/
/* These variables should be used in thread (i.e. run()) only! */
/*-------------------------------------------------------------*/
uint8_t *scaledPictureBuf_;
uint8_t *shmBuffer_;
int shmID_;
int semSetID_;
AVCodecContext *decoderCtx_;
AVFrame *rawFrame_;
AVFrame *scaledPicture_;
int videoStreamIndex_;
AVFormatContext *inputCtx_;
void setup();
void cleanup();
SwsContext * createScalingContext();
public:
VideoReceiveThread(const std::map<std::string, std::string> &args);
virtual ~VideoReceiveThread();
virtual void run();
void stop();
};
}
#endif // _VIDEO_RECEIVE_THREAD_H_
......@@ -34,7 +34,8 @@
#include <sstream>
#include <map>
#include <string>
#include "video_rtp_thread.h"
#include "video_send_thread.h"
#include "video_receive_thread.h"
namespace sfl_video {
......@@ -49,7 +50,7 @@ VideoRtpSession::VideoRtpSession(const std::string &input,
void VideoRtpSession::test()
{
assert(rtpSendThread_.get() == 0);
assert(sendThread_.get() == 0);
std::cerr << "Capturing from " << input_ << ", encoding to " << codec_ <<
" at " << bitrate_ << " bps, sending to " << destinationURI_ <<
std::endl;
......@@ -62,18 +63,18 @@ void VideoRtpSession::test()
args["bitrate"] = bitstr.str();
args["destination"] = destinationURI_;
rtpSendThread_.reset(new VideoRtpSendThread(args));
rtpSendThread_->start();
sendThread_.reset(new VideoSendThread(args));
sendThread_->start();
rtpSendThread_->waitForSDP();
sendThread_->waitForSDP();
args["input"] = "test.sdp";
rtpReceiveThread_.reset(new VideoRtpReceiveThread(args));
rtpReceiveThread_->start();
receiveThread_.reset(new VideoReceiveThread(args));
receiveThread_->start();
}
void VideoRtpSession::start()
{
assert(rtpSendThread_.get() == 0);
assert(sendThread_.get() == 0);
std::cerr << "Capturing from " << input_ << ", encoding to " << codec_ <<
" at " << bitrate_ << " bps, sending to " << destinationURI_ <<
std::endl;
......@@ -86,28 +87,28 @@ void VideoRtpSession::start()
args["bitrate"] = bitstr.str();
args["destination"] = destinationURI_;
rtpSendThread_.reset(new VideoRtpSendThread(args));
rtpSendThread_->start();
sendThread_.reset(new VideoSendThread(args));
sendThread_->start();
args["input"] = "test.sdp";
rtpReceiveThread_.reset(new VideoRtpReceiveThread(args));
rtpReceiveThread_->start();
receiveThread_.reset(new VideoReceiveThread(args));
receiveThread_->start();
}
void VideoRtpSession::stop()
{
std::cerr << "Stopping video rtp session " << std::endl;
// FIXME: all kinds of evil!!! interrupted should be atomic
rtpReceiveThread_->stop();
rtpReceiveThread_->join();
receiveThread_->stop();
receiveThread_->join();
rtpSendThread_->stop();
rtpSendThread_->join();
sendThread_->stop();
sendThread_->join();
std::cerr << "cancelled video rtp session " << std::endl;
// destroy objects
rtpReceiveThread_.reset();
rtpSendThread_.reset();
receiveThread_.reset();
sendThread_.reset();
}
} // end namspace sfl_video
......@@ -36,8 +36,8 @@
namespace sfl_video {
class VideoRtpSendThread;
class VideoRtpReceiveThread;
class VideoSendThread;
class VideoReceiveThread;
class VideoRtpSession {
public:
......@@ -52,8 +52,8 @@ class VideoRtpSession {
const std::string codec_;
const int bitrate_;
const std::string destinationURI_;
std::tr1::shared_ptr<VideoRtpSendThread> rtpSendThread_;
std::tr1::shared_ptr<VideoRtpReceiveThread> rtpReceiveThread_;
std::tr1::shared_ptr<VideoSendThread> sendThread_;
std::tr1::shared_ptr<VideoReceiveThread> receiveThread_;
};
}
......
......@@ -30,7 +30,7 @@
#define __STDC_CONSTANT_MACROS
#include "video_rtp_thread.h"
#include "video_send_thread.h"
// libav includes
extern "C" {
......@@ -41,316 +41,9 @@ extern "C" {
#include <libswscale/swscale.h>
}
// shm includes
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h> /* semaphore functions and structs. */
#include <sys/shm.h>
namespace sfl_video {
namespace { // anonymouse namespace
#if _SEM_SEMUN_UNDEFINED
union semun
{
int val; /* value for SETVAL */
struct semid_ds *buf; /* buffer for IPC_STAT & IPC_SET */
unsigned short int *array; /* array for GETALL & SETALL */
struct seminfo *__buf; /* buffer for IPC_INFO */
};
#endif
typedef struct {
unsigned size;
unsigned width;
unsigned height;
} FrameInfo;
#define TEMPFILE "/tmp/frame.txt"
void postFrameSize(unsigned width, unsigned height, unsigned numBytes)
{
FILE *tmp = fopen(TEMPFILE, "w");
/* write to file*/
fprintf(tmp, "%u\n", numBytes);
fprintf(tmp, "%u\n", width);
fprintf(tmp, "%u\n", height);
fclose(tmp);
}
int createSemSet()
{
/* this variable will contain the semaphore set. */
int sem_set_id;
key_t key = ftok("/tmp", 'b');
/* semaphore value, for semctl(). */
union semun sem_val;
/* first we create a semaphore set with a single semaphore, */
/* whose counter is initialized to '0'. */
sem_set_id = semget(key, 1, 0600 | IPC_CREAT);
if (sem_set_id == -1) {