diff --git a/src/media/rtp_session.h b/src/media/rtp_session.h index 7f17dc9560627a3e98dbaa943fc28c297ca8c4fa..2a80154ccdc6942be6e1c71069a84d559c56daa2 100644 --- a/src/media/rtp_session.h +++ b/src/media/rtp_session.h @@ -57,7 +57,7 @@ public: protected: std::recursive_mutex mutex_; - std::shared_ptr<SocketPair> socketPair_; + std::unique_ptr<SocketPair> socketPair_; const std::string callID_; MediaDescription send_; diff --git a/src/media/socket_pair.cpp b/src/media/socket_pair.cpp index ff32403803c3d1a4157acf5b6caf1196982d7153..f118c85af4cc8f3d4b4ff1a35f6c23cf51cd93fc 100644 --- a/src/media/socket_pair.cpp +++ b/src/media/socket_pair.cpp @@ -65,6 +65,7 @@ static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */ static constexpr int RTP_MAX_PACKET_LENGTH = 2048; static constexpr auto UDP_HEADER_SIZE = 8; static constexpr auto SRTP_OVERHEAD = 10; +static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000; enum class DataType : unsigned { RTP=1<<0, RTCP=1<<1 }; @@ -488,6 +489,13 @@ SocketPair::writeCallback(uint8_t* buf, int buf_size) buf = srtpContext_->encryptbuf; } + // check if we're sending an RR, if so, detect packet loss + // buf_size gives length of buffer, not just header + if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) { + auto header = reinterpret_cast<rtcpRRHeader*>(buf); + rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK); + } + do { if (interrupted_) return -EINTR; @@ -497,4 +505,12 @@ SocketPair::writeCallback(uint8_t* buf, int buf_size) return ret < 0 ? -errno : ret; } +bool +SocketPair::rtcpPacketLossDetected() const +{ + // set to false on checking packet loss to avoid burst of keyframe requests + bool b = true; + return rtcpPacketLoss_.compare_exchange_strong(b, false); +} + } // namespace ring diff --git a/src/media/socket_pair.h b/src/media/socket_pair.h index 7668e68bde2d9a02c0242f87a2da185d9eb6e373..e6330020b0cc7f9b1a5ce8bfda03652049878452 100644 --- a/src/media/socket_pair.h +++ b/src/media/socket_pair.h @@ -107,6 +107,7 @@ class SocketPair { void stopSendOp(bool state = true); std::vector<rtcpRRHeader> getRtcpInfo(); + bool rtcpPacketLossDetected() const; private: NON_COPYABLE(SocketPair); @@ -139,6 +140,8 @@ class SocketPair { std::list<rtcpRRHeader> listRtcpHeader_; std::mutex rtcpInfo_mutex_; static constexpr unsigned MAX_LIST_SIZE {20}; + + mutable std::atomic_bool rtcpPacketLoss_ {false}; }; diff --git a/src/media/video/video_receive_thread.cpp b/src/media/video/video_receive_thread.cpp index b40fed421a6b299b41e4696467a5838196a3b9b1..10424297715b3b4b827683ecb559f2894de24913 100644 --- a/src/media/video/video_receive_thread.cpp +++ b/src/media/video/video_receive_thread.cpp @@ -36,8 +36,6 @@ namespace ring { namespace video { using std::string; -static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000; - VideoReceiveThread::VideoReceiveThread(const std::string& id, const std::string &sdp, const bool isReset, @@ -164,27 +162,15 @@ int VideoReceiveThread::readFunction(void *opaque, uint8_t *buf, int buf_size) return is.gcount(); } -void VideoReceiveThread::addIOContext(std::shared_ptr<SocketPair> socketPair) +void VideoReceiveThread::addIOContext(SocketPair& socketPair) { - demuxContext_.reset(socketPair->createIOContext(mtu_)); - socketPair_ = socketPair; + demuxContext_.reset(socketPair.createIOContext(mtu_)); } bool VideoReceiveThread::decodeFrame() { const auto ret = videoDecoder_->decode(getNewFrame()); - if (requestKeyFrameCallback_) { - auto rtcpPackets = socketPair_->getRtcpInfo(); - if (rtcpPackets.size() != 0) { - // recent packet loss detected, ask for keyframe - if (ntohl(rtcpPackets.back().fraction_lost) & RTCP_RR_FRACTION_MASK) { - RING_DBG("Sending keyframe request"); - requestKeyFrameCallback_(id_); - } - } - } - switch (ret) { case MediaDecoder::Status::FrameFinished: publishFrame(); @@ -249,4 +235,11 @@ bool VideoReceiveThread::restartDecoder() const { return restartDecoder_.load(); } +void +VideoReceiveThread::triggerKeyFrameRequest() +{ + if (requestKeyFrameCallback_) + requestKeyFrameCallback_(id_); +} + }} // namespace ring::video diff --git a/src/media/video/video_receive_thread.h b/src/media/video/video_receive_thread.h index 08d84cdaaa07a66af6ebeae9feb1c720917611d4..20ff0bb5fcdd189f2572cc5b0d2b5ee8a7846247 100644 --- a/src/media/video/video_receive_thread.h +++ b/src/media/video/video_receive_thread.h @@ -51,7 +51,7 @@ public: ~VideoReceiveThread(); void startLoop(); - void addIOContext(std::shared_ptr<SocketPair> socketPair); + void addIOContext(SocketPair& socketPair); void setRequestKeyFrameCallback(void (*)(const std::string &)); void enterConference(); void exitConference(); @@ -61,6 +61,7 @@ public: int getHeight() const; int getPixelFormat() const; bool restartDecoder() const; + void triggerKeyFrameRequest(); private: NON_COPYABLE(VideoReceiveThread); @@ -81,7 +82,6 @@ private: std::atomic_bool restartDecoder_; bool isReset_; uint16_t mtu_; - std::shared_ptr<SocketPair> socketPair_; void (*requestKeyFrameCallback_)(const std::string &); void openDecoder(); diff --git a/src/media/video/video_rtp_session.cpp b/src/media/video/video_rtp_session.cpp index 5d35eb901801ae7fad10b911274fc025e58d141a..b11979d0d301c1d53f15661da3597e821df29cdb 100644 --- a/src/media/video/video_rtp_session.cpp +++ b/src/media/video/video_rtp_session.cpp @@ -48,6 +48,9 @@ using std::string; constexpr static auto NEWPARAMS_TIMEOUT = std::chrono::milliseconds(1000); +// how long (in seconds) to wait before rechecking for packet loss +static constexpr auto RTCP_PACKET_LOSS_INTERVAL = std::chrono::milliseconds(1000); + VideoRtpSession::VideoRtpSession(const string &callID, const DeviceParams& localVideoParams) : RtpSession(callID), localVideoParams_(localVideoParams) @@ -60,6 +63,9 @@ VideoRtpSession::VideoRtpSession(const string &callID, , receiverRestartThread_([]{ return true; }, [this]{ processReceiverRestart(); }, []{}) + , packetLossThread_([] { return true; }, + [this]{ processPacketLoss(); }, + [](){}) { setupVideoBitrateInfo(); // reset bitrate } @@ -157,14 +163,16 @@ void VideoRtpSession::startReceiver() // XXX keyframe requests can timeout if unanswered receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest); receiverRestartThread_.start(); - receiveThread_->addIOContext(socketPair_); + receiveThread_->addIOContext(*socketPair_); receiveThread_->startLoop(); + packetLossThread_.start(); } else { RING_DBG("Video receiving disabled"); receiverRestartThread_.join(); if (receiveThread_) receiveThread_->detach(videoMixer_.get()); receiveThread_.reset(); + packetLossThread_.join(); } } @@ -218,7 +226,7 @@ void VideoRtpSession::stop() std::lock_guard<std::recursive_mutex> lock(mutex_); rtcpCheckerThread_.join(); receiverRestartThread_.join(); - + packetLossThread_.join(); if (videoLocal_) videoLocal_->detach(sender_.get()); @@ -588,4 +596,13 @@ VideoRtpSession::processReceiverRestart() receiverRestartThread_.wait_for(std::chrono::seconds(RECEIVER_RESTART_INTERVAL)); } +void +VideoRtpSession::processPacketLoss() +{ + if (packetLossThread_.wait_for(RTCP_PACKET_LOSS_INTERVAL, + [this]{return socketPair_->rtcpPacketLossDetected();})) { + receiveThread_->triggerKeyFrameRequest(); + } +} + }} // namespace ring::video diff --git a/src/media/video/video_rtp_session.h b/src/media/video/video_rtp_session.h index 7eac6d235424b3a5117c5bd17b5b627fbd37ffc5..31c502735ef7655bfa8f7959d018b4a51e49759d 100644 --- a/src/media/video/video_rtp_session.h +++ b/src/media/video/video_rtp_session.h @@ -136,6 +136,9 @@ private: InterruptedThreadLoop receiverRestartThread_; void processReceiverRestart(); + + InterruptedThreadLoop packetLossThread_; + void processPacketLoss(); }; }} // namespace ring::video diff --git a/src/media/video/video_sender.cpp b/src/media/video/video_sender.cpp index b0c3bae15739081a9cfcb5faa6a066fb74df016c..cdb42ed8b2abb82b51fe99ffefe5d1b74293d8e8 100644 --- a/src/media/video/video_sender.cpp +++ b/src/media/video/video_sender.cpp @@ -84,7 +84,7 @@ VideoSender::update(Observable<std::shared_ptr<VideoFrame>>* /*obs*/, void VideoSender::forceKeyFrame() { - RING_DBG("Peer has requested a key frame"); + RING_DBG("Key frame requested"); ++forceKeyFrame_; } diff --git a/src/threadloop.h b/src/threadloop.h index daa422f6d3a4057f97d12cbb4e95ddcd5812b452..fb11cfb7eafd940f815de07f9c3fe1957f72a5c3 100644 --- a/src/threadloop.h +++ b/src/threadloop.h @@ -97,6 +97,17 @@ public: cv_.wait_for(lk, rel_time, [this](){return isStopping();}); } + template <typename Rep, typename Period, typename Pred> + bool + wait_for(const std::chrono::duration<Rep, Period>& rel_time, Pred&& pred) + { + if (std::this_thread::get_id() != get_id()) + throw std::runtime_error("can not call wait_for outside thread context"); + + std::unique_lock<std::mutex> lk(mutex_); + return cv_.wait_for(lk, rel_time, [this, pred]{ return isStopping() || pred(); }); + } + private: std::mutex mutex_; std::condition_variable cv_;