diff --git a/src/ice_socket.h b/src/ice_socket.h index 0e8862d574b03deb34b1d428d2de22a69995dd54..3db1509fc3d546b0f9fc04b37713d3965be81a74 100644 --- a/src/ice_socket.h +++ b/src/ice_socket.h @@ -45,7 +45,6 @@ class IceSocket : ice_transport_(std::move(iceTransport)), compId_(compId) {} void close(); - ssize_t recv(unsigned char* buf, size_t len); ssize_t send(const unsigned char* buf, size_t len); ssize_t waitForData(std::chrono::milliseconds timeout); void setOnRecv(IceRecvCb cb); diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 3ec8061e4ab35303591ebc381b3ede684ded9290..074193e34c5fb0f5b7ad60ec059eab3c7cfc4214 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -788,7 +788,11 @@ IceTransport::Impl::onReceiveData(unsigned comp_id, void *pkt, pj_size_t size) if (io.cb) { io.cb((uint8_t*)pkt, size); } else { - peerChannels_.at(comp_id-1).write((char*)pkt, size); + std::error_code ec; + auto err = peerChannels_.at(comp_id-1).write((char*)pkt, size, ec); + if (err < 0) { + JAMI_ERR("[ice:%p] rx: channel is closed", this); + } } } @@ -1176,31 +1180,32 @@ IceTransport::getCandidateFromSDP(const std::string& line, IceCandidate& cand) } ssize_t -IceTransport::recv(int comp_id, unsigned char* buf, size_t len) +IceTransport::recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec) { - sip_utils::register_thread(); auto &io = pimpl_->compIO_[comp_id]; std::lock_guard<std::mutex> lk(io.mutex); if (io.queue.empty()) { - return 0; + ec = std::make_error_code(std::errc::resource_unavailable_try_again); + return -1; } auto& packet = io.queue.front(); const auto count = std::min(len, packet.data.size()); std::copy_n(packet.data.begin(), count, buf); if (count == packet.data.size()) { - io.queue.pop_front(); + io.queue.pop_front(); } else { - packet.data.erase(packet.data.begin(), packet.data.begin() + count); + packet.data.erase(packet.data.begin(), packet.data.begin() + count); } + ec.clear(); return count; } ssize_t -IceTransport::recvfrom(int comp_id, char *buf, size_t len) { - return pimpl_->peerChannels_.at(comp_id).read(buf, len); +IceTransport::recvfrom(int comp_id, char *buf, size_t len, std::error_code& ec) { + return pimpl_->peerChannels_.at(comp_id).read(buf, len, ec); } void @@ -1286,7 +1291,7 @@ IceTransport::isDataAvailable(int comp_id) ssize_t IceTransport::waitForData(int comp_id, std::chrono::milliseconds timeout, std::error_code& ec) { - return pimpl_->peerChannels_.at(comp_id).wait(timeout); + return pimpl_->peerChannels_.at(comp_id).wait(timeout, ec); } std::vector<SDP> @@ -1434,14 +1439,9 @@ IceSocketTransport::read(ValueType* buf, std::size_t len, std::error_code& ec) if (!ice_->isRunning()) return 0; try { auto res = reliable_ - ? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len) - : ice_->recv(compId_, buf, len); - if (res < 0) { - ec.assign(errno, std::generic_category()); - return 0; - } - ec.clear(); - return res; + ? ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len, ec) + : ice_->recv(compId_, buf, len, ec); + return (res < 0) ? 0 : res; } catch (const std::exception &e) { JAMI_ERR("IceSocketTransport::read exception: %s", e.what()); } @@ -1468,14 +1468,6 @@ IceSocket::close() ice_transport_.reset(); } -ssize_t -IceSocket::recv(unsigned char* buf, size_t len) -{ - if (!ice_transport_.get()) - return -1; - return ice_transport_->recv(compId_, buf, len); -} - ssize_t IceSocket::send(const unsigned char* buf, size_t len) { diff --git a/src/ice_transport.h b/src/ice_transport.h index 432ed3d7edb3e5c567eaf83521e36df2a66eff13..1a16a6df37cbf58a6d2c08d08c95ae1271b6bd16 100644 --- a/src/ice_transport.h +++ b/src/ice_transport.h @@ -183,8 +183,8 @@ public: void setOnRecv(unsigned comp_id, IceRecvCb cb); - ssize_t recv(int comp_id, unsigned char* buf, size_t len); - ssize_t recvfrom(int comp_id, char *buf, size_t len); + ssize_t recv(int comp_id, unsigned char* buf, size_t len, std::error_code& ec); + ssize_t recvfrom(int comp_id, char *buf, size_t len, std::error_code& ec); ssize_t send(int comp_id, const unsigned char* buf, size_t len); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 665c34ee7dff962ef33e898eb61b3422bac38361..7e987ed42b08b021776ce019acc58294b5f4d0ac 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -346,11 +346,7 @@ IceSocketEndpoint::read(ValueType* buf, std::size_t len, std::error_code& ec) if (ice_) { if (!ice_->isRunning()) return 0; try { - auto res = ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len); - if (res < 0) - ec.assign(errno, std::generic_category()); - else - ec.clear(); + auto res = ice_->recvfrom(compId_, reinterpret_cast<char *>(buf), len, ec); return (res >= 0) ? res : 0; } catch (const std::exception &e) { JAMI_ERR("IceSocketEndpoint::read exception: %s", e.what()); diff --git a/src/security/tls_session.cpp b/src/security/tls_session.cpp index 7099218cbbe384390eec6fc0b84e252297d9eb8c..c3e563aecfdf568b419c1585cd9c1713700aa269 100644 --- a/src/security/tls_session.cpp +++ b/src/security/tls_session.cpp @@ -664,12 +664,17 @@ TlsSession::TlsSessionImpl::waitForRawData(std::chrono::milliseconds timeout) { if (transport_.isReliable()) { std::error_code ec; - if (transport_.waitForData(timeout, ec) <= 0) { + auto err = transport_.waitForData(timeout, ec); + if (err <= 0) { // shutdown? if (state_ == TlsSessionState::SHUTDOWN) { gnutls_transport_set_errno(session_, EINTR); return -1; } + if (ec) { + gnutls_transport_set_errno(session_, ec.value()); + return -1; + } return 0; } return 1; @@ -683,7 +688,7 @@ TlsSession::TlsSessionImpl::waitForRawData(std::chrono::milliseconds timeout) return -1; } if (rxQueue_.empty()) { - JAMI_ERR("[TLS] waitForRawData: timeout after %u ms", timeout.count()); + JAMI_ERR("[TLS] waitForRawData: timeout after %ld ms", timeout.count()); return 0; } return 1; diff --git a/src/transport/peer_channel.h b/src/transport/peer_channel.h index d65bb6516b1ef5e6de812f9a77894b1762e0104d..13d190405d1951a5751d556005da28e8dcaf768f 100644 --- a/src/transport/peer_channel.h +++ b/src/transport/peer_channel.h @@ -45,32 +45,50 @@ public: } template <typename Duration> - ssize_t wait(Duration timeout) { + ssize_t wait(Duration timeout, std::error_code& ec) { std::unique_lock<std::mutex> lk {mutex_}; cv_.wait_for(lk, timeout, [this]{ return stop_ or not stream_.empty(); }); + if (stop_) { + ec = std::make_error_code(std::errc::interrupted); + return -1; + } + ec.clear(); return stream_.size(); } - std::size_t read(char* output, std::size_t size) { + ssize_t read(char* output, std::size_t size, std::error_code& ec) { std::unique_lock<std::mutex> lk {mutex_}; cv_.wait(lk, [this]{ return stop_ or not stream_.empty(); }); - if (stop_) + if (stream_.size()) { + auto toRead = std::min(size, stream_.size()); + if (toRead) { + auto endIt = stream_.begin()+toRead; + std::copy(stream_.begin(), endIt, output); + stream_.erase(stream_.begin(), endIt); + } + ec.clear(); + return toRead; + } + if (stop_) { + ec.clear(); return 0; - auto toRead = std::min(size, stream_.size()); - if (toRead) { - auto endIt = stream_.begin()+toRead; - std::copy(stream_.begin(), endIt, output); - stream_.erase(stream_.begin(), endIt); } - return toRead; + ec = std::make_error_code(std::errc::resource_unavailable_try_again); + return -1; } - void write(const char* data, std::size_t size) { + ssize_t write(const char* data, std::size_t size, std::error_code& ec) { std::lock_guard<std::mutex> lk {mutex_}; + if (stop_) { + ec = std::make_error_code(std::errc::broken_pipe); + return -1; + } stream_.insert(stream_.end(), data, data+size); cv_.notify_all(); + ec.clear(); + return size; } void stop() noexcept { diff --git a/src/turn_transport.cpp b/src/turn_transport.cpp index de7cc684a61a69f521aa2fbf985462f21b04c2ef..c0727749e5937cf75413791eae3b0aced2e52370 100644 --- a/src/turn_transport.cpp +++ b/src/turn_transport.cpp @@ -154,7 +154,11 @@ TurnTransportPimpl::onRxData(const uint8_t* pkt, unsigned pkt_len, auto channel_it = peerChannels_.find(peer_addr); if (channel_it == std::end(peerChannels_)) return; - channel_it->second.write((const char*)pkt, pkt_len); + std::error_code ec; + auto ret = channel_it->second.write((const char*)pkt, pkt_len, ec); + if (ret < 0) { + JAMI_ERR("TURN rx: channel is closed"); + } } } @@ -359,20 +363,13 @@ TurnTransport::sendto(const IpAddr& peer, const std::vector<char>& buffer) return sendto(peer, &buffer[0], buffer.size()); } -std::size_t -TurnTransport::recvfrom(const IpAddr& peer, char* buffer, std::size_t size) +ssize_t +TurnTransport::recvfrom(const IpAddr& peer, char* buffer, std::size_t size, std::error_code& ec) { MutexLock lk {pimpl_->apiMutex_}; auto& channel = pimpl_->peerChannels_.at(peer); lk.unlock(); - return channel.read(buffer, size); -} - -void -TurnTransport::recvfrom(const IpAddr& peer, std::vector<char>& result) -{ - auto res = recvfrom(peer, result.data(), result.size()); - result.resize(res); + return channel.read(buffer, size, ec); } std::vector<IpAddr> @@ -389,7 +386,7 @@ TurnTransport::waitForData(const IpAddr& peer, std::chrono::milliseconds timeout MutexLock lk {pimpl_->apiMutex_}; auto& channel = pimpl_->peerChannels_.at(peer); lk.unlock(); - return channel.wait(timeout); + return channel.wait(timeout, ec); } //============================================================================== @@ -435,17 +432,7 @@ std::size_t ConnectedTurnTransport::read(ValueType* buf, std::size_t size, std::error_code& ec) { if (size > 0) { - try { - size = turn_.recvfrom(peer_, reinterpret_cast<char*>(buf), size); - } catch (const sip_utils::PjsipFailure& ex) { - ec = ex.code(); - return 0; - } - - if (size == 0) { - ec = std::make_error_code(std::errc::broken_pipe); - return 0; - } + return turn_.recvfrom(peer_, reinterpret_cast<char*>(buf), size, ec); } ec.clear(); diff --git a/src/turn_transport.h b/src/turn_transport.h index 5d76d6e8f909b28e7dd7420e7b9701c3e5702156..c667a918be571d6fcd855f285f6ec1bced1942e1 100644 --- a/src/turn_transport.h +++ b/src/turn_transport.h @@ -105,7 +105,6 @@ public: /// Collect pending data from a given peer. /// /// Data are read from given \a peer incoming buffer until EOF or \a data size() is reached. - /// \a data is resized with exact number of characters read. /// If \a peer is not connected this function raise an exception. /// If \a peer exists but no data are available this method blocks until TURN deconnection /// or at first incoming character. @@ -114,11 +113,9 @@ public: /// \param [in,out] pre-dimensionned character vector to write incoming data /// \exception std::out_of_range \a peer is not connected yet /// - void recvfrom(const IpAddr& peer, std::vector<char>& data); - /// Works as recvfrom() vector version but accept a simple char array. /// - std::size_t recvfrom(const IpAddr& peer, char* buffer, std::size_t size); + ssize_t recvfrom(const IpAddr& peer, char* buffer, std::size_t size, std::error_code& ec); /// Send data to given peer through the TURN tunnel. ///