-
Rafaël Carré authoredRafaël Carré authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
mainbuffer.cpp 15.30 KiB
/*
* Copyright (C) 2004, 2005, 2006, 2008, 2009, 2010, 2011 Savoir-Faire Linux Inc.
* Author : Alexandre Savard <alexandre.savard@savoirfairelinux.com>
*
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
* Additional permission under GNU GPL version 3 section 7:
*
* If you modify this program, or any covered work, by linking or
* combining it with the OpenSSL project's OpenSSL library (or a
* modified version of that library), containing parts covered by the
* terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
* grants you additional permission to convey the resulting work.
* Corresponding Source for a non-source form of such a combination
* shall include the source code for the parts of OpenSSL used as well
* as that of the covered work.
*/
#include "mainbuffer.h"
#include <utility> // for std::pair
#include "manager.h"
MainBuffer::MainBuffer() : _internalSamplingRate (8000)
{
}
MainBuffer::~MainBuffer()
{
}
void MainBuffer::setInternalSamplingRate (int sr)
{
if (sr > _internalSamplingRate) {
flushAllBuffers();
_internalSamplingRate = sr;
}
}
CallIDSet* MainBuffer::getCallIDSet (std::string call_id)
{
CallIDMap::iterator iter = _callIDMap.find (call_id);
return (iter != _callIDMap.end()) ? iter->second : NULL;
}
void MainBuffer::createCallIDSet (std::string set_id)
{
_callIDMap.insert (std::pair<std::string, CallIDSet*> (set_id, new CallIDSet));
}
bool MainBuffer::removeCallIDSet (std::string set_id)
{
CallIDSet* callid_set = getCallIDSet (set_id);
if (!callid_set) {
_debug ("removeCallIDSet error callid set %s does not exist!", set_id.c_str());
return false;
}
if (_callIDMap.erase (set_id) == 0) {
_debug ("removeCallIDSet error while removing callid set %s!", set_id.c_str());
return false;
}
delete callid_set;
callid_set = NULL;
return true;
}
void MainBuffer::addCallIDtoSet (std::string set_id, std::string call_id)
{
CallIDSet* callid_set = getCallIDSet (set_id);
callid_set->insert (call_id);
}
void MainBuffer::removeCallIDfromSet (std::string set_id, std::string call_id)
{
CallIDSet* callid_set = getCallIDSet (set_id);
if (callid_set != NULL) {
if (callid_set->erase (call_id) != 0) {
} else {
_debug ("removeCallIDfromSet error while removing callid %s from set %s!", call_id.c_str(), set_id.c_str());
}
} else {
_debug ("removeCallIDfromSet error callid set %s does not exist!", set_id.c_str());
}
}
RingBuffer* MainBuffer::getRingBuffer (std::string call_id)
{
RingBufferMap::iterator iter = _ringBufferMap.find (call_id);
if (iter == _ringBufferMap.end()) {
// _debug("ringBuffer with ID: \"%s\" doesn't exist! ", call_id.c_str());
return NULL;
} else
return iter->second;
}
RingBuffer* MainBuffer::createRingBuffer (std::string call_id)
{
RingBuffer* newRingBuffer = new RingBuffer (SIZEBUF, call_id);
_ringBufferMap.insert (std::pair<std::string, RingBuffer*> (call_id, newRingBuffer));
return newRingBuffer;
}
bool MainBuffer::removeRingBuffer (std::string call_id)
{
RingBuffer* ring_buffer = getRingBuffer (call_id);
if (ring_buffer != NULL) {
if (_ringBufferMap.erase (call_id) != 0) {
delete ring_buffer;
return true;
} else {
_error ("BufferManager: Error: Fail to delete ringbuffer %s!", call_id.c_str());
return false;
}
} else {
_debug ("BufferManager: Error: Ringbuffer %s does not exist!", call_id.c_str());
return true;
}
}
void MainBuffer::bindCallID (std::string call_id1, std::string call_id2)
{
ost::MutexLock guard (_mutex);
RingBuffer* ring_buffer;
CallIDSet* callid_set;
if ( (ring_buffer = getRingBuffer (call_id1)) == NULL)
createRingBuffer (call_id1);
if ( (callid_set = getCallIDSet (call_id1)) == NULL)
createCallIDSet (call_id1);
if ( (ring_buffer = getRingBuffer (call_id2)) == NULL)
createRingBuffer (call_id2);
if ( (callid_set = getCallIDSet (call_id2)) == NULL)
createCallIDSet (call_id2);
getRingBuffer (call_id1)->createReadPointer (call_id2);
getRingBuffer (call_id2)->createReadPointer (call_id1);
addCallIDtoSet (call_id1, call_id2);
addCallIDtoSet (call_id2, call_id1);
}
void MainBuffer::bindHalfDuplexOut (std::string process_id, std::string call_id)
{
ost::MutexLock guard (_mutex);
// This method is used only for active calls, if this call does not exist, do nothing
if (!getRingBuffer (call_id))
return;
if (!getCallIDSet (process_id))
createCallIDSet (process_id);
getRingBuffer (call_id)->createReadPointer (process_id);
addCallIDtoSet (process_id, call_id);
}
void MainBuffer::unBindCallID (std::string call_id1, std::string call_id2)
{
ost::MutexLock guard (_mutex);
removeCallIDfromSet (call_id1, call_id2);
removeCallIDfromSet (call_id2, call_id1);
RingBuffer* ringbuffer = getRingBuffer (call_id2);
if (ringbuffer) {
ringbuffer->removeReadPointer (call_id1);
if (ringbuffer->getNbReadPointer() == 0) {
removeCallIDSet (call_id2);
removeRingBuffer (call_id2);
}
}
ringbuffer = getRingBuffer (call_id1);
if (ringbuffer) {
ringbuffer->removeReadPointer (call_id2);
if (ringbuffer->getNbReadPointer() == 0) {
removeCallIDSet (call_id1);
removeRingBuffer (call_id1);
}
}
}
void MainBuffer::unBindHalfDuplexOut (std::string process_id, std::string call_id)
{
ost::MutexLock guard (_mutex);
removeCallIDfromSet (process_id, call_id);
RingBuffer* ringbuffer = getRingBuffer (call_id);
if (ringbuffer) {
ringbuffer->removeReadPointer (process_id);
if (ringbuffer->getNbReadPointer() == 0) {
removeCallIDSet (call_id);
removeRingBuffer (call_id);
}
} else {
_debug ("Error: did not found ringbuffer %s", process_id.c_str());
removeCallIDSet (process_id);
}
CallIDSet* callid_set = getCallIDSet (process_id);
if (callid_set) {
if (callid_set->empty())
removeCallIDSet (process_id);
}
}
void MainBuffer::unBindAll (std::string call_id)
{
CallIDSet* callid_set = getCallIDSet (call_id);
if (callid_set == NULL)
return;
if (callid_set->empty())
return;
CallIDSet temp_set = *callid_set;
CallIDSet::iterator iter_set = temp_set.begin();
while (iter_set != temp_set.end()) {
std::string call_id_in_set = *iter_set;
unBindCallID (call_id, call_id_in_set);
iter_set++;
}
}
void MainBuffer::unBindAllHalfDuplexOut (std::string process_id)
{
CallIDSet* callid_set = getCallIDSet (process_id);
if (!callid_set)
return;
if (callid_set->empty())
return;
CallIDSet temp_set = *callid_set;
CallIDSet::iterator iter_set = temp_set.begin();
while (iter_set != temp_set.end()) {
std::string call_id_in_set = *iter_set;
unBindCallID (process_id, call_id_in_set);
iter_set++;
}
}
void MainBuffer::putData (void *buffer, int toCopy, std::string call_id)
{
ost::MutexLock guard (_mutex);
RingBuffer* ring_buffer = getRingBuffer (call_id);
if (ring_buffer)
ring_buffer->Put (buffer, toCopy);
}
int MainBuffer::getData (void *buffer, int toCopy, std::string call_id)
{
ost::MutexLock guard (_mutex);
CallIDSet* callid_set = getCallIDSet (call_id);
if (!callid_set || callid_set->empty()) {
return 0;
}
if (callid_set->size() == 1) {
CallIDSet::iterator iter_id = callid_set->begin();
if (iter_id != callid_set->end()) {
return getDataByID (buffer, toCopy, *iter_id, call_id);
} else
return 0;
} else {
memset (buffer, 0, toCopy);
int size = 0;
CallIDSet::iterator iter_id = callid_set->begin();
while (iter_id != callid_set->end()) {
int nbSmplToCopy = toCopy / sizeof (SFLDataFormat);
SFLDataFormat mixBuffer[nbSmplToCopy];
memset (mixBuffer, 0, toCopy);
size = getDataByID (mixBuffer, toCopy, *iter_id, call_id);
if (size > 0) {
for (int k = 0; k < nbSmplToCopy; k++) {
( (SFLDataFormat*) (buffer)) [k] += mixBuffer[k];
}
}
iter_id++;
}
return size;
}
}
int MainBuffer::getDataByID (void *buffer, int toCopy, std::string call_id, std::string reader_id)
{
RingBuffer* ring_buffer = getRingBuffer (call_id);
if (!ring_buffer) {
return 0;
}
return ring_buffer->Get (buffer, toCopy, reader_id);
}
int MainBuffer::availForGet (std::string call_id)
{
ost::MutexLock guard (_mutex);
CallIDSet* callid_set = getCallIDSet (call_id);
if (callid_set == NULL)
return 0;
if (callid_set->empty()) {
return 0;
}
if (callid_set->size() == 1) {
CallIDSet::iterator iter_id = callid_set->begin();
if ( (call_id != default_id) && (*iter_id == call_id)) {
_debug ("This problem should not occur since we have %i element", (int) callid_set->size());
}
return availForGetByID (*iter_id, call_id);
} else {
int avail_bytes = 99999;
int nb_bytes;
CallIDSet::iterator iter_id = callid_set->begin();
syncBuffers (call_id);
for (iter_id = callid_set->begin(); iter_id != callid_set->end(); iter_id++) {
nb_bytes = availForGetByID (*iter_id, call_id);
if ( (nb_bytes != 0) && (nb_bytes < avail_bytes))
avail_bytes = nb_bytes;
}
return avail_bytes != 99999 ? avail_bytes : 0;
}
}
int MainBuffer::availForGetByID (std::string call_id, std::string reader_id)
{
if ( (call_id != default_id) && (reader_id == call_id)) {
_error ("MainBuffer: Error: RingBuffer has a readpointer on tiself");
}
RingBuffer* ringbuffer = getRingBuffer (call_id);
if (ringbuffer == NULL) {
_error ("MainBuffer: Error: RingBuffer does not exist");
return 0;
} else
return ringbuffer->AvailForGet (reader_id);
}
int MainBuffer::discard (int toDiscard, std::string call_id)
{
ost::MutexLock guard (_mutex);
CallIDSet* callid_set = getCallIDSet (call_id);
if (callid_set == NULL)
return 0;
if (callid_set->empty()) {
return 0;
}
if (callid_set->size() == 1) {
CallIDSet::iterator iter_id = callid_set->begin();
return discardByID (toDiscard, *iter_id, call_id);
} else {
CallIDSet::iterator iter_id;
for (iter_id = callid_set->begin(); iter_id != callid_set->end(); iter_id++) {
discardByID (toDiscard, *iter_id, call_id);
}
return toDiscard;
}
}
int MainBuffer::discardByID (int toDiscard, std::string call_id, std::string reader_id)
{
RingBuffer* ringbuffer = getRingBuffer (call_id);
if (ringbuffer == NULL)
return 0;
else
return ringbuffer->Discard (toDiscard, reader_id);
}
void MainBuffer::flush (std::string call_id)
{
ost::MutexLock guard (_mutex);
CallIDSet* callid_set = getCallIDSet (call_id);
if (callid_set == NULL)
return;
if (callid_set->empty()) {
}
if (callid_set->size() == 1) {
CallIDSet::iterator iter_id = callid_set->begin();
flushByID (*iter_id, call_id);
} else {
CallIDSet::iterator iter_id;
for (iter_id = callid_set->begin(); iter_id != callid_set->end(); iter_id++) {
flushByID (*iter_id, call_id);
}
}
}
void MainBuffer::flushDefault()
{
ost::MutexLock guard (_mutex);
flushByID (default_id, default_id);
}
void MainBuffer::flushByID (std::string call_id, std::string reader_id)
{
RingBuffer* ringbuffer = getRingBuffer (call_id);
if (ringbuffer != NULL)
ringbuffer->flush (reader_id);
}
void MainBuffer::flushAllBuffers()
{
RingBufferMap::iterator iter_buffer = _ringBufferMap.begin();
while (iter_buffer != _ringBufferMap.end()) {
iter_buffer->second->flushAll();
iter_buffer++;
}
}
void MainBuffer:: syncBuffers (std::string call_id)
{
CallIDSet* callid_set = getCallIDSet (call_id);
if (callid_set == NULL)
return;
if (callid_set->empty()) {
return;
}
if (callid_set->size() == 1) {
// no need to resync, only one session
return;
}
int nbBuffers = 0;
float mean_nbBytes = 0.0;
CallIDSet::iterator iter_id = callid_set->begin();
// compute mean nb byte in buffers
for (iter_id = callid_set->begin(); iter_id != callid_set->end(); iter_id++) {
nbBuffers++;
mean_nbBytes += availForGetByID (*iter_id, call_id);
}
mean_nbBytes = mean_nbBytes / (float) nbBuffers;
// resync buffers in this conference according to the computed mean
for (iter_id = callid_set->begin(); iter_id != callid_set->end(); iter_id++) {
if (availForGetByID (*iter_id, call_id) > (mean_nbBytes + 640))
discardByID (640, *iter_id, call_id);
}
}
void MainBuffer::stateInfo()
{
_debug ("MainBuffer: State info");
CallIDMap::iterator iter_call = _callIDMap.begin();
// print each call and bound call ids
while (iter_call != _callIDMap.end()) {
std::string dbg_str (" Call: ");
dbg_str.append (iter_call->first);
dbg_str.append (" is bound to: ");
CallIDSet* call_id_set = (CallIDSet*) iter_call->second;
CallIDSet::iterator iter_call_id = call_id_set->begin();
while (iter_call_id != call_id_set->end()) {
dbg_str.append (*iter_call_id);
dbg_str.append (", ");
iter_call_id++;
}
_debug ("%s", dbg_str.c_str());
iter_call++;
}
// Print ringbuffers ids and readpointers
RingBufferMap::iterator iter_buffer = _ringBufferMap.begin();
while (iter_buffer != _ringBufferMap.end()) {
RingBuffer* rbuffer = (RingBuffer*) iter_buffer->second;
ReadPointer* rpointer = NULL;
std::string dbg_str (" Buffer: ");
dbg_str.append (iter_buffer->first);
dbg_str.append (" as read pointer: ");
if (rbuffer)
rpointer = rbuffer->getReadPointerList();
if (rpointer) {
ReadPointer::iterator iter_pointer = rpointer->begin();
while (iter_pointer != rpointer->end()) {
dbg_str.append (iter_pointer->first);
dbg_str.append (", ");
iter_pointer++;
}
}
_debug ("%s", dbg_str.c_str());
iter_buffer++;
}
}