Skip to content
Snippets Groups Projects
Commit 7a8d39f1 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

peer channel: use deque

Change-Id: I73923252daa1c59a9278b8b90cbf6153f022296e
parent 121c0d45
Branches
No related tags found
No related merge requests found
...@@ -20,7 +20,8 @@ ...@@ -20,7 +20,8 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <sstream> #include <deque>
#include <algorithm>
namespace jami { namespace jami {
...@@ -35,39 +36,41 @@ public: ...@@ -35,39 +36,41 @@ public:
std::lock_guard<std::mutex> lk(o.mutex_); std::lock_guard<std::mutex> lk(o.mutex_);
stream_ = std::move(o.stream_); stream_ = std::move(o.stream_);
stop_ = o.stop_; stop_ = o.stop_;
o.cv_.notify_all();
} }
ssize_t isDataAvailable() { ssize_t isDataAvailable() {
std::lock_guard<std::mutex> lk{mutex_}; std::lock_guard<std::mutex> lk{mutex_};
auto pos = stream_.tellg(); return stream_.size();
stream_.seekg(0, std::ios_base::end);
auto available = (stream_.tellg() - pos);
stream_.seekg(pos);
return available;
} }
template <typename Duration> template <typename Duration>
bool wait(Duration timeout) { ssize_t wait(Duration timeout) {
std::unique_lock<std::mutex> lk {mutex_}; std::unique_lock<std::mutex> lk {mutex_};
return cv_.wait_for(lk, timeout, [this]{ return stop_ or !stream_.eof(); }); cv_.wait_for(lk, timeout, [this]{ return stop_ or not stream_.empty(); });
return stream_.size();
} }
std::size_t read(char* output, std::size_t size) { std::size_t read(char* output, std::size_t size) {
std::unique_lock<std::mutex> lk {mutex_}; std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [&, this]{ cv_.wait(lk, [this]{
if (stop_) return stop_ or not stream_.empty();
return true;
stream_.read(&output[0], size);
return stream_.gcount() > 0;
}); });
return stop_ ? 0 : stream_.gcount(); if (stop_)
return 0;
auto toRead = std::min(size, stream_.size());
if (toRead) {
auto endIt = stream_.begin()+toRead;
std::copy(stream_.begin(), endIt, output);
stream_.erase(stream_.begin(), endIt);
}
return toRead;
} }
void write(const char* data, std::size_t size) { void write(const char* data, std::size_t size) {
std::lock_guard<std::mutex> lk {mutex_}; std::lock_guard<std::mutex> lk {mutex_};
stream_.clear(); stream_.insert(stream_.end(), data, data+size);
stream_.write(data, size); cv_.notify_all();
cv_.notify_one();
} }
void stop() noexcept { void stop() noexcept {
...@@ -85,10 +88,8 @@ private: ...@@ -85,10 +88,8 @@ private:
std::mutex mutex_ {}; std::mutex mutex_ {};
std::condition_variable cv_ {}; std::condition_variable cv_ {};
std::stringstream stream_ {}; std::deque<char> stream_;
bool stop_ {false}; bool stop_ {false};
friend void operator <<(std::vector<char>&, PeerChannel&);
}; };
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment