Skip to content
Snippets Groups Projects
Select Git revision
  • f1f937a5428650ed9c4318f3587baf0e0590acd3
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
31 results

ringbuffer.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    ringbuffer.cpp 8.47 KiB
    /*
     *  Copyright (C) 2004-2013 Savoir-Faire Linux Inc.
     *  Author: Alexandre Savard <alexandre.savard@savoirfairelinux.com>
     *  Author: Yan Morin <yan.morin@savoirfairelinux.com>
     *  Author: Laurielle Lea <laurielle.lea@savoirfairelinux.com>
     *  Author: Adrien Beraud <adrien.beraud@gmail.com>
     *
     *  Portions (c) Dominic Mazzoni (Audacity)
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
     *
     *  Additional permission under GNU GPL version 3 section 7:
     *
     *  If you modify this program, or any covered work, by linking or
     *  combining it with the OpenSSL project's OpenSSL library (or a
     *  modified version of that library), containing parts covered by the
     *  terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
     *  grants you additional permission to convey the resulting work.
     *  Corresponding Source for a non-source form of such a combination
     *  shall include the source code for the parts of OpenSSL used as well
     *  as that of the covered work.
     */
    
    #include "ringbuffer.h"
    #include "logger.h"
    
    #include <chrono>
    #include <utility> // for std::pair
    #include <cstdlib>
    #include <cstring>
    
    namespace {
    // corresponds to 160 ms (about 5 rtp packets)
    const size_t MIN_BUFFER_SIZE = 1024;
    }
    
    // Create  a ring buffer with 'size' bytes
    RingBuffer::RingBuffer(size_t size, const std::string &call_id, AudioFormat format /* = MONO */) :
        endPos_(0)
        , buffer_(std::max(size, MIN_BUFFER_SIZE), format)
        , lock_()
        , readpointers_()
        , buffer_id_(call_id)
    {
    }
    
    void
    RingBuffer::flush(const std::string &call_id)
    {
        storeReadPointer(endPos_, call_id);
    }
    
    
    void
    RingBuffer::flushAll()
    {
        ReadPointer::iterator iter;
    
        for (iter = readpointers_.begin(); iter != readpointers_.end(); ++iter)
            iter->second = endPos_;
    }
    
    size_t
    RingBuffer::putLength() const
    {
        const size_t buffer_size = buffer_.frames();
        if (buffer_size == 0)
            return 0;
        const size_t startPos = (not readpointers_.empty()) ? getSmallestReadPointer() : 0;
        return (endPos_ + buffer_size - startPos) % buffer_size;
    }
    
    size_t RingBuffer::getLength(const std::string &call_id) const
    {
        const size_t buffer_size = buffer_.frames();
        if (buffer_size == 0)
            return 0;
        return (endPos_ + buffer_size - getReadPointer(call_id)) % buffer_size;
    }
    
    void
    RingBuffer::debug()
    {
        DEBUG("Start=%d; End=%d; BufferSize=%d", getSmallestReadPointer(), endPos_, buffer_.frames());
    }
    
    size_t RingBuffer::getReadPointer(const std::string &call_id) const
    {
        if (hasNoReadPointers())
            return 0;
        ReadPointer::const_iterator iter = readpointers_.find(call_id);
        return (iter != readpointers_.end()) ? iter->second : 0;
    }
    
    size_t
    RingBuffer::getSmallestReadPointer() const
    {
        if (hasNoReadPointers())
            return 0;
        size_t smallest = buffer_.frames();
        for(auto const& iter : readpointers_)
            smallest = std::min(smallest, iter.second);
        return smallest;
    }
    
    void
    RingBuffer::storeReadPointer(size_t pointer_value, const std::string &call_id)
    {
        ReadPointer::iterator iter = readpointers_.find(call_id);
    
        if (iter != readpointers_.end())
            iter->second = pointer_value;
        else
            DEBUG("Cannot find \"%s\" readPointer in \"%s\" ringbuffer", call_id.c_str(), buffer_id_.c_str());
    }
    
    
    void
    RingBuffer::createReadPointer(const std::string &call_id)
    {
        std::unique_lock<std::mutex> l(lock_);
        if (!hasThisReadPointer(call_id))
            readpointers_.insert(std::pair<std::string, int> (call_id, endPos_));
    }
    
    
    void
    RingBuffer::removeReadPointer(const std::string &call_id)
    {
        std::unique_lock<std::mutex> l(lock_);
        ReadPointer::iterator iter = readpointers_.find(call_id);
    
        if (iter != readpointers_.end())
            readpointers_.erase(iter);
    }
    
    
    bool
    RingBuffer::hasThisReadPointer(const std::string &call_id) const
    {
        return readpointers_.find(call_id) != readpointers_.end();
    }
    
    
    bool RingBuffer::hasNoReadPointers() const
    {
        return readpointers_.empty();
    }
    
    //
    // For the writer only:
    //
    
    // This one puts some data inside the ring buffer.
    void RingBuffer::put(AudioBuffer& buf)
    {
        std::unique_lock<std::mutex> l(lock_);
        const size_t sample_num = buf.frames();
        const size_t buffer_size = buffer_.frames();
        if (buffer_size == 0)
            return;
    
        size_t len = putLength();
        if (buffer_size - len < sample_num)
            discard(sample_num);
        size_t toCopy = sample_num;
    
        // Add more channels if the input buffer holds more channels than the ring.
        if (buffer_.channels() < buf.channels())
            buffer_.setChannelNum(buf.channels());
    
        size_t in_pos = 0;
        size_t pos = endPos_;
    
        while (toCopy) {
            size_t block = toCopy;
    
            if (block > buffer_size - pos) // Wrap block around ring ?
                block = buffer_size - pos; // Fill in to the end of the buffer
    
            buffer_.copy(buf, block, in_pos, pos);
            in_pos += block;
            pos = (pos + block) % buffer_size;
            toCopy -= block;
        }
    
        endPos_ = pos;
        not_empty_.notify_all();
    }
    
    //
    // For the reader only:
    //
    
    size_t
    RingBuffer::availableForGet(const std::string &call_id) const
    {
        // Used space
        return getLength(call_id);
    }
    
    size_t RingBuffer::get(AudioBuffer& buf, const std::string &call_id)
    {
        std::unique_lock<std::mutex> l(lock_);
    
        if (hasNoReadPointers())
            return 0;
    
        if (not hasThisReadPointer(call_id))
            return 0;
    
        const size_t buffer_size = buffer_.frames();
        if (buffer_size == 0)
            return 0;
    
        size_t len = getLength(call_id);
        const size_t sample_num = buf.frames();
        size_t toCopy = std::min(sample_num, len);
        if (toCopy and toCopy != sample_num) {
            DEBUG("Partial get: %d/%d", toCopy, sample_num);
        }
    
        const size_t copied = toCopy;
    
        size_t dest = 0;
        size_t startPos = getReadPointer(call_id);
    
        while (toCopy > 0) {
            size_t block = toCopy;
    
            if (block > buffer_size - startPos)
                block = buffer_size - startPos;
    
            buf.copy(buffer_, block, startPos, dest);
    
            dest += block;
            startPos = (startPos + block) % buffer_size;
            toCopy -= block;
        }
    
        storeReadPointer(startPos, call_id);
        return copied;
    }
    
    
    size_t RingBuffer::waitForDataAvailable(const std::string &call_id, const size_t min_data_length, const std::chrono::high_resolution_clock::time_point& deadline) const
    {
        std::unique_lock<std::mutex> l(lock_);
        const size_t buffer_size = buffer_.frames();
        if(buffer_size < min_data_length) return 0;
        ReadPointer::const_iterator read_ptr = readpointers_.find(call_id);
        if(read_ptr == readpointers_.end()) return 0;
        size_t getl = 0;
        if (deadline == std::chrono::high_resolution_clock::time_point()) {
            not_empty_.wait(l, [=, &getl]{
                    getl =  (endPos_ + buffer_size - read_ptr->second) % buffer_size;
                    return getl >= min_data_length;
            });
        } else {
            not_empty_.wait_until(l, deadline, [=, &getl]{
                    getl = (endPos_ + buffer_size - read_ptr->second) % buffer_size;
                    return getl >= min_data_length;
            });
        }
        return getl;
    }
    
    size_t
    RingBuffer::discard(size_t toDiscard, const std::string &call_id)
    {
        std::unique_lock<std::mutex> l(lock_);
    
        size_t buffer_size = buffer_.frames();
        if (buffer_size == 0)
            return 0;
    
        size_t len = getLength(call_id);
        if (toDiscard > len)
            toDiscard = len;
    
        size_t startPos = (getReadPointer(call_id) + toDiscard) % buffer_size;
        storeReadPointer(startPos, call_id);
        return toDiscard;
    }
    
    size_t
    RingBuffer::discard(size_t toDiscard)
    {
        const size_t buffer_size = buffer_.frames();
        for (auto & r : readpointers_) {
            size_t dst = (r.second + buffer_size - endPos_) % buffer_size;
            if (dst < toDiscard) {
                DEBUG("%s : discarding: %d frames", r.first.c_str(), toDiscard - dst);
                r.second = (r.second + toDiscard - dst) % buffer_size;
            }
        }
        return toDiscard;
    }