Skip to content
Snippets Groups Projects
Commit 664ea5fa authored by Philippe Gorley's avatar Philippe Gorley Committed by Guillaume Roguez
Browse files

video: fix keyframe request


Fixes a regression where keyframe requests were sent by the wrong peer.

Corrects artifacts in a shorter amount of time.

Change-Id: I2b981b46c07422a4289f378e2a5deaaba0047a3b
Reviewed-by: default avatarGuillaume Roguez <guillaume.roguez@savoirfairelinux.com>
parent 53b54697
No related branches found
No related tags found
No related merge requests found
...@@ -57,7 +57,7 @@ public: ...@@ -57,7 +57,7 @@ public:
protected: protected:
std::recursive_mutex mutex_; std::recursive_mutex mutex_;
std::shared_ptr<SocketPair> socketPair_; std::unique_ptr<SocketPair> socketPair_;
const std::string callID_; const std::string callID_;
MediaDescription send_; MediaDescription send_;
......
...@@ -65,6 +65,7 @@ static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */ ...@@ -65,6 +65,7 @@ static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
static constexpr int RTP_MAX_PACKET_LENGTH = 2048; static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
static constexpr auto UDP_HEADER_SIZE = 8; static constexpr auto UDP_HEADER_SIZE = 8;
static constexpr auto SRTP_OVERHEAD = 10; static constexpr auto SRTP_OVERHEAD = 10;
static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
enum class DataType : unsigned { RTP=1<<0, RTCP=1<<1 }; enum class DataType : unsigned { RTP=1<<0, RTCP=1<<1 };
...@@ -488,6 +489,13 @@ SocketPair::writeCallback(uint8_t* buf, int buf_size) ...@@ -488,6 +489,13 @@ SocketPair::writeCallback(uint8_t* buf, int buf_size)
buf = srtpContext_->encryptbuf; buf = srtpContext_->encryptbuf;
} }
// check if we're sending an RR, if so, detect packet loss
// buf_size gives length of buffer, not just header
if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
auto header = reinterpret_cast<rtcpRRHeader*>(buf);
rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
}
do { do {
if (interrupted_) if (interrupted_)
return -EINTR; return -EINTR;
...@@ -497,4 +505,12 @@ SocketPair::writeCallback(uint8_t* buf, int buf_size) ...@@ -497,4 +505,12 @@ SocketPair::writeCallback(uint8_t* buf, int buf_size)
return ret < 0 ? -errno : ret; return ret < 0 ? -errno : ret;
} }
bool
SocketPair::rtcpPacketLossDetected() const
{
// set to false on checking packet loss to avoid burst of keyframe requests
bool b = true;
return rtcpPacketLoss_.compare_exchange_strong(b, false);
}
} // namespace ring } // namespace ring
...@@ -107,6 +107,7 @@ class SocketPair { ...@@ -107,6 +107,7 @@ class SocketPair {
void stopSendOp(bool state = true); void stopSendOp(bool state = true);
std::vector<rtcpRRHeader> getRtcpInfo(); std::vector<rtcpRRHeader> getRtcpInfo();
bool rtcpPacketLossDetected() const;
private: private:
NON_COPYABLE(SocketPair); NON_COPYABLE(SocketPair);
...@@ -139,6 +140,8 @@ class SocketPair { ...@@ -139,6 +140,8 @@ class SocketPair {
std::list<rtcpRRHeader> listRtcpHeader_; std::list<rtcpRRHeader> listRtcpHeader_;
std::mutex rtcpInfo_mutex_; std::mutex rtcpInfo_mutex_;
static constexpr unsigned MAX_LIST_SIZE {20}; static constexpr unsigned MAX_LIST_SIZE {20};
mutable std::atomic_bool rtcpPacketLoss_ {false};
}; };
......
...@@ -36,8 +36,6 @@ namespace ring { namespace video { ...@@ -36,8 +36,6 @@ namespace ring { namespace video {
using std::string; using std::string;
static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
VideoReceiveThread::VideoReceiveThread(const std::string& id, VideoReceiveThread::VideoReceiveThread(const std::string& id,
const std::string &sdp, const std::string &sdp,
const bool isReset, const bool isReset,
...@@ -164,27 +162,15 @@ int VideoReceiveThread::readFunction(void *opaque, uint8_t *buf, int buf_size) ...@@ -164,27 +162,15 @@ int VideoReceiveThread::readFunction(void *opaque, uint8_t *buf, int buf_size)
return is.gcount(); return is.gcount();
} }
void VideoReceiveThread::addIOContext(std::shared_ptr<SocketPair> socketPair) void VideoReceiveThread::addIOContext(SocketPair& socketPair)
{ {
demuxContext_.reset(socketPair->createIOContext(mtu_)); demuxContext_.reset(socketPair.createIOContext(mtu_));
socketPair_ = socketPair;
} }
bool VideoReceiveThread::decodeFrame() bool VideoReceiveThread::decodeFrame()
{ {
const auto ret = videoDecoder_->decode(getNewFrame()); const auto ret = videoDecoder_->decode(getNewFrame());
if (requestKeyFrameCallback_) {
auto rtcpPackets = socketPair_->getRtcpInfo();
if (rtcpPackets.size() != 0) {
// recent packet loss detected, ask for keyframe
if (ntohl(rtcpPackets.back().fraction_lost) & RTCP_RR_FRACTION_MASK) {
RING_DBG("Sending keyframe request");
requestKeyFrameCallback_(id_);
}
}
}
switch (ret) { switch (ret) {
case MediaDecoder::Status::FrameFinished: case MediaDecoder::Status::FrameFinished:
publishFrame(); publishFrame();
...@@ -249,4 +235,11 @@ bool ...@@ -249,4 +235,11 @@ bool
VideoReceiveThread::restartDecoder() const VideoReceiveThread::restartDecoder() const
{ return restartDecoder_.load(); } { return restartDecoder_.load(); }
void
VideoReceiveThread::triggerKeyFrameRequest()
{
if (requestKeyFrameCallback_)
requestKeyFrameCallback_(id_);
}
}} // namespace ring::video }} // namespace ring::video
...@@ -51,7 +51,7 @@ public: ...@@ -51,7 +51,7 @@ public:
~VideoReceiveThread(); ~VideoReceiveThread();
void startLoop(); void startLoop();
void addIOContext(std::shared_ptr<SocketPair> socketPair); void addIOContext(SocketPair& socketPair);
void setRequestKeyFrameCallback(void (*)(const std::string &)); void setRequestKeyFrameCallback(void (*)(const std::string &));
void enterConference(); void enterConference();
void exitConference(); void exitConference();
...@@ -61,6 +61,7 @@ public: ...@@ -61,6 +61,7 @@ public:
int getHeight() const; int getHeight() const;
int getPixelFormat() const; int getPixelFormat() const;
bool restartDecoder() const; bool restartDecoder() const;
void triggerKeyFrameRequest();
private: private:
NON_COPYABLE(VideoReceiveThread); NON_COPYABLE(VideoReceiveThread);
...@@ -81,7 +82,6 @@ private: ...@@ -81,7 +82,6 @@ private:
std::atomic_bool restartDecoder_; std::atomic_bool restartDecoder_;
bool isReset_; bool isReset_;
uint16_t mtu_; uint16_t mtu_;
std::shared_ptr<SocketPair> socketPair_;
void (*requestKeyFrameCallback_)(const std::string &); void (*requestKeyFrameCallback_)(const std::string &);
void openDecoder(); void openDecoder();
......
...@@ -48,6 +48,9 @@ using std::string; ...@@ -48,6 +48,9 @@ using std::string;
constexpr static auto NEWPARAMS_TIMEOUT = std::chrono::milliseconds(1000); constexpr static auto NEWPARAMS_TIMEOUT = std::chrono::milliseconds(1000);
// how long (in seconds) to wait before rechecking for packet loss
static constexpr auto RTCP_PACKET_LOSS_INTERVAL = std::chrono::milliseconds(1000);
VideoRtpSession::VideoRtpSession(const string &callID, VideoRtpSession::VideoRtpSession(const string &callID,
const DeviceParams& localVideoParams) : const DeviceParams& localVideoParams) :
RtpSession(callID), localVideoParams_(localVideoParams) RtpSession(callID), localVideoParams_(localVideoParams)
...@@ -60,6 +63,9 @@ VideoRtpSession::VideoRtpSession(const string &callID, ...@@ -60,6 +63,9 @@ VideoRtpSession::VideoRtpSession(const string &callID,
, receiverRestartThread_([]{ return true; }, , receiverRestartThread_([]{ return true; },
[this]{ processReceiverRestart(); }, [this]{ processReceiverRestart(); },
[]{}) []{})
, packetLossThread_([] { return true; },
[this]{ processPacketLoss(); },
[](){})
{ {
setupVideoBitrateInfo(); // reset bitrate setupVideoBitrateInfo(); // reset bitrate
} }
...@@ -157,14 +163,16 @@ void VideoRtpSession::startReceiver() ...@@ -157,14 +163,16 @@ void VideoRtpSession::startReceiver()
// XXX keyframe requests can timeout if unanswered // XXX keyframe requests can timeout if unanswered
receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest); receiveThread_->setRequestKeyFrameCallback(&SIPVoIPLink::enqueueKeyframeRequest);
receiverRestartThread_.start(); receiverRestartThread_.start();
receiveThread_->addIOContext(socketPair_); receiveThread_->addIOContext(*socketPair_);
receiveThread_->startLoop(); receiveThread_->startLoop();
packetLossThread_.start();
} else { } else {
RING_DBG("Video receiving disabled"); RING_DBG("Video receiving disabled");
receiverRestartThread_.join(); receiverRestartThread_.join();
if (receiveThread_) if (receiveThread_)
receiveThread_->detach(videoMixer_.get()); receiveThread_->detach(videoMixer_.get());
receiveThread_.reset(); receiveThread_.reset();
packetLossThread_.join();
} }
} }
...@@ -218,7 +226,7 @@ void VideoRtpSession::stop() ...@@ -218,7 +226,7 @@ void VideoRtpSession::stop()
std::lock_guard<std::recursive_mutex> lock(mutex_); std::lock_guard<std::recursive_mutex> lock(mutex_);
rtcpCheckerThread_.join(); rtcpCheckerThread_.join();
receiverRestartThread_.join(); receiverRestartThread_.join();
packetLossThread_.join();
if (videoLocal_) if (videoLocal_)
videoLocal_->detach(sender_.get()); videoLocal_->detach(sender_.get());
...@@ -588,4 +596,13 @@ VideoRtpSession::processReceiverRestart() ...@@ -588,4 +596,13 @@ VideoRtpSession::processReceiverRestart()
receiverRestartThread_.wait_for(std::chrono::seconds(RECEIVER_RESTART_INTERVAL)); receiverRestartThread_.wait_for(std::chrono::seconds(RECEIVER_RESTART_INTERVAL));
} }
void
VideoRtpSession::processPacketLoss()
{
if (packetLossThread_.wait_for(RTCP_PACKET_LOSS_INTERVAL,
[this]{return socketPair_->rtcpPacketLossDetected();})) {
receiveThread_->triggerKeyFrameRequest();
}
}
}} // namespace ring::video }} // namespace ring::video
...@@ -136,6 +136,9 @@ private: ...@@ -136,6 +136,9 @@ private:
InterruptedThreadLoop receiverRestartThread_; InterruptedThreadLoop receiverRestartThread_;
void processReceiverRestart(); void processReceiverRestart();
InterruptedThreadLoop packetLossThread_;
void processPacketLoss();
}; };
}} // namespace ring::video }} // namespace ring::video
......
...@@ -84,7 +84,7 @@ VideoSender::update(Observable<std::shared_ptr<VideoFrame>>* /*obs*/, ...@@ -84,7 +84,7 @@ VideoSender::update(Observable<std::shared_ptr<VideoFrame>>* /*obs*/,
void void
VideoSender::forceKeyFrame() VideoSender::forceKeyFrame()
{ {
RING_DBG("Peer has requested a key frame"); RING_DBG("Key frame requested");
++forceKeyFrame_; ++forceKeyFrame_;
} }
......
...@@ -97,6 +97,17 @@ public: ...@@ -97,6 +97,17 @@ public:
cv_.wait_for(lk, rel_time, [this](){return isStopping();}); cv_.wait_for(lk, rel_time, [this](){return isStopping();});
} }
template <typename Rep, typename Period, typename Pred>
bool
wait_for(const std::chrono::duration<Rep, Period>& rel_time, Pred&& pred)
{
if (std::this_thread::get_id() != get_id())
throw std::runtime_error("can not call wait_for outside thread context");
std::unique_lock<std::mutex> lk(mutex_);
return cv_.wait_for(lk, rel_time, [this, pred]{ return isStopping() || pred(); });
}
private: private:
std::mutex mutex_; std::mutex mutex_;
std::condition_variable cv_; std::condition_variable cv_;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment