Commit 58b78449 authored by Adrien Béraud's avatar Adrien Béraud

peer channel: properly notify clean shutdown

Change-Id: Ic1381e5f00b386f9af801e138c86d0cee1af6abb
parent 0f22a36f
......@@ -45,7 +45,6 @@ class IceSocket
: ice_transport_(std::move(iceTransport)), compId_(compId) {}
void close();
ssize_t recv(unsigned char* buf, size_t len);
ssize_t send(const unsigned char* buf, size_t len);
ssize_t waitForData(std::chrono::milliseconds timeout);
void setOnRecv(IceRecvCb cb);
......
......@@ -788,7 +788,11 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size)
if (io.cb) {
io.cb((uint8_t*)pkt, size);
} else {
peerChannels_.at(comp_id-1).write((char*)pkt, size);
std::error_code ec;
auto err = peerChannels_.at(comp_id-1).write((char*)pkt, size, ec);
if (err < 0) {
JAMI_ERR("[ice:%p] rx: channel is closed", this);
}
}
}
......@@ -1176,31 +1180,32 @@ IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand)
}
ssize_t
IceTransport::recv(int comp_id, unsigned char* buf, size_t len)
IceTransport::recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec)
{
sip_utils::register_thread();
auto &io = pimpl_->compIO_[comp_id];
std::lock_guard<std::mutex> lk(io.mutex);
if (io.queue.empty()) {
return 0;
ec = std::make_error_code(std::errc::resource_unavailable_try_again);
return -1;
}
auto& packet = io.queue.front();
const auto count = std::min(len, packet.data.size());
std::copy_n(packet.data.begin(), count, buf);
if (count == packet.data.size()) {
io.queue.pop_front();
io.queue.pop_front();
} else {
packet.data.erase(packet.data.begin(), packet.data.begin() + count);
packet.data.erase(packet.data.begin(), packet.data.begin() + count);
}
ec.clear();
return count;
}
ssize_t
IceTransport::recvfrom(int comp_id, char *buf, size_t len) {
return pimpl_->peerChannels_.at(comp_id).read(buf, len);
IceTransport::recvfrom(int comp_id, char *buf, size_t len, std::error_code& ec) {
return pimpl_->peerChannels_.at(comp_id).read(buf, len, ec);
}
void
......@@ -1286,7 +1291,7 @@ IceTransport::isDataAvailable(int comp_id)
ssize_t
IceTransport::waitForData(int comp_id, std::chrono::milliseconds timeout, std::error_code& ec)
{
return pimpl_->peerChannels_.at(comp_id).wait(timeout);
return pimpl_->peerChannels_.at(comp_id).wait(timeout, ec);
}
std::vector<SDP>
......@@ -1434,14 +1439,9 @@ IceSocketTransport::read(ValueType* buf, std::size_t len, std::error_code& ec)
if (!ice_->isRunning()) return 0;
try {
auto res = reliable_
? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len)
: ice_->recv(compId_, buf, len);
if (res < 0) {
ec.assign(errno, std::generic_category());
return 0;
}
ec.clear();
return res;
? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len, ec)
: ice_->recv(compId_, buf, len, ec);
return (res < 0) ? 0 : res;
} catch (const std::exception &e) {
JAMI_ERR("IceSocketTransport::read exception: %s", e.what());
}
......@@ -1468,14 +1468,6 @@ IceSocket::close()
ice_transport_.reset();
}
ssize_t
IceSocket::recv(unsigned char* buf, size_t len)
{
if (!ice_transport_.get())
return -1;
return ice_transport_->recv(compId_, buf, len);
}
ssize_t
IceSocket::send(const unsigned char* buf, size_t len)
{
......
......@@ -183,8 +183,8 @@ public:
void setOnRecv(unsigned comp_id, IceRecvCb cb);
ssize_t recv(int comp_id, unsigned char* buf, size_t len);
ssize_t recvfrom(int comp_id, char *buf, size_t len);
ssize_t recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec);
ssize_t recvfrom(int comp_id, char *buf, size_t len, std::error_code& ec);
ssize_t send(int comp_id, const unsigned char* buf, size_t len);
......
......@@ -346,11 +346,7 @@ IceSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec)
if (ice_) {
if (!ice_->isRunning()) return 0;
try {
auto res = ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len);
if (res < 0)
ec.assign(errno, std::generic_category());
else
ec.clear();
auto res = ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len, ec);
return (res >= 0) ? res : 0;
} catch (const std::exception &e) {
JAMI_ERR("IceSocketEndpoint::read exception: %s", e.what());
......
......@@ -664,12 +664,17 @@ TlsSession::TlsSessionImpl::waitForRawData(std::chrono::milliseconds timeout)
{
if (transport_.isReliable()) {
std::error_code ec;
if (transport_.waitForData(timeout, ec) <= 0) {
auto err = transport_.waitForData(timeout, ec);
if (err <= 0) {
// shutdown?
if (state_ == TlsSessionState::SHUTDOWN) {
gnutls_transport_set_errno(session_, EINTR);
return -1;
}
if (ec) {
gnutls_transport_set_errno(session_, ec.value());
return -1;
}
return 0;
}
return 1;
......@@ -683,7 +688,7 @@ TlsSession::TlsSessionImpl::waitForRawData(std::chrono::milliseconds timeout)
return -1;
}
if (rxQueue_.empty()) {
JAMI_ERR("[TLS] waitForRawData: timeout after %u ms", timeout.count());
JAMI_ERR("[TLS] waitForRawData: timeout after %ld ms", timeout.count());
return 0;
}
return 1;
......
......@@ -45,32 +45,50 @@ public:
}
template <typename Duration>
ssize_t wait(Duration timeout) {
ssize_t wait(Duration timeout, std::error_code& ec) {
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait_for(lk, timeout, [this]{ return stop_ or not stream_.empty(); });
if (stop_) {
ec = std::make_error_code(std::errc::interrupted);
return -1;
}
ec.clear();
return stream_.size();
}
std::size_t read(char* output, std::size_t size) {
ssize_t read(char* output, std::size_t size, std::error_code& ec) {
std::unique_lock<std::mutex> lk {mutex_};
cv_.wait(lk, [this]{
return stop_ or not stream_.empty();
});
if (stop_)
if (stream_.size()) {
auto toRead = std::min(size, stream_.size());
if (toRead) {
auto endIt = stream_.begin()+toRead;
std::copy(stream_.begin(), endIt, output);
stream_.erase(stream_.begin(), endIt);
}
ec.clear();
return toRead;
}
if (stop_) {
ec.clear();
return 0;
auto toRead = std::min(size, stream_.size());
if (toRead) {
auto endIt = stream_.begin()+toRead;
std::copy(stream_.begin(), endIt, output);
stream_.erase(stream_.begin(), endIt);
}
return toRead;
ec = std::make_error_code(std::errc::resource_unavailable_try_again);
return -1;
}
void write(const char* data, std::size_t size) {
ssize_t write(const char* data, std::size_t size, std::error_code& ec) {
std::lock_guard<std::mutex> lk {mutex_};
if (stop_) {
ec = std::make_error_code(std::errc::broken_pipe);
return -1;
}
stream_.insert(stream_.end(), data, data+size);
cv_.notify_all();
ec.clear();
return size;
}
void stop() noexcept {
......
......@@ -154,7 +154,11 @@ TurnTransportPimpl::onRxData(const uint8_t* pkt, unsigned pkt_len,
auto channel_it = peerChannels_.find(peer_addr);
if (channel_it == std::end(peerChannels_))
return;
channel_it->second.write((const char*)pkt, pkt_len);
std::error_code ec;
auto ret = channel_it->second.write((const char*)pkt, pkt_len, ec);
if (ret < 0) {
JAMI_ERR("TURN rx: channel is closed");
}
}
}
......@@ -359,20 +363,13 @@ TurnTransport::sendto(const IpAddr& peer, const std::vector<char>& buffer)
return sendto(peer, &buffer[0], buffer.size());
}
std::size_t
TurnTransport::recvfrom(const IpAddr& peer, char* buffer, std::size_t size)
ssize_t
TurnTransport::recvfrom(const IpAddr& peer, char* buffer, std::size_t size, std::error_code& ec)
{
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(peer);
lk.unlock();
return channel.read(buffer, size);
}
void
TurnTransport::recvfrom(const IpAddr& peer, std::vector<char>& result)
{
auto res = recvfrom(peer, result.data(), result.size());
result.resize(res);
return channel.read(buffer, size, ec);
}
std::vector<IpAddr>
......@@ -389,7 +386,7 @@ TurnTransport::waitForData(const IpAddr& peer, std::chrono::milliseconds timeout
MutexLock lk {pimpl_->apiMutex_};
auto& channel = pimpl_->peerChannels_.at(peer);
lk.unlock();
return channel.wait(timeout);
return channel.wait(timeout, ec);
}
//==============================================================================
......@@ -435,17 +432,7 @@ std::size_t
ConnectedTurnTransport::read(ValueType* buf, std::size_t size, std::error_code& ec)
{
if (size > 0) {
try {
size = turn_.recvfrom(peer_, reinterpret_cast<char*>(buf), size);
} catch (const sip_utils::PjsipFailure& ex) {
ec = ex.code();
return 0;
}
if (size == 0) {
ec = std::make_error_code(std::errc::broken_pipe);
return 0;
}
return turn_.recvfrom(peer_, reinterpret_cast<char*>(buf), size, ec);
}
ec.clear();
......
......@@ -105,7 +105,6 @@ public:
/// Collect pending data from a given peer.
///
/// Data are read from given \a peer incoming buffer until EOF or \a data size() is reached.
/// \a data is resized with exact number of characters read.
/// If \a peer is not connected this function raise an exception.
/// If \a peer exists but no data are available this method blocks until TURN deconnection
/// or at first incoming character.
......@@ -114,11 +113,9 @@ public:
/// \param [in,out] pre-dimensionned character vector to write incoming data
/// \exception std::out_of_range \a peer is not connected yet
///
void recvfrom(const IpAddr& peer, std::vector<char>& data);
/// Works as recvfrom() vector version but accept a simple char array.
///
std::size_t recvfrom(const IpAddr& peer, char* buffer, std::size_t size);
ssize_t recvfrom(const IpAddr& peer, char* buffer, std::size_t size, std::error_code& ec);
/// Send data to given peer through the TURN tunnel.
///
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment