Remove locking from RtpStreamsSynchronizer.

Remove dependency on ProcessThread.

Instead RtpStreamsSynchronizer uses the worker thread
and makes callbacks on the same thread. That in turn
simplifies locking for VideoReceiveStream2, which we'll
take advantage of later.

Bug: webrtc:11489
Change-Id: Id9a5a7977771b92e420a09cc472cfb43de5627cc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174221
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Magnus Flodman <mflodman@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31200}
This commit is contained in:
Tommi 2020-05-10 19:03:43 +02:00 committed by Commit Bot
parent d7e08c8cf8
commit ad84d0254a
6 changed files with 307 additions and 17 deletions

View file

@ -28,6 +28,8 @@ rtc_library("video") {
"report_block_stats.h",
"rtp_streams_synchronizer.cc",
"rtp_streams_synchronizer.h",
"rtp_streams_synchronizer2.cc",
"rtp_streams_synchronizer2.h",
"rtp_video_stream_receiver.cc",
"rtp_video_stream_receiver.h",
"rtp_video_stream_receiver_frame_transformer_delegate.cc",

View file

@ -25,11 +25,7 @@ namespace webrtc {
class Syncable;
// TODO(bugs.webrtc.org/11489): Remove dependency on ProcessThread/Module.
// Instead make this a single threaded class, constructed on a TQ and
// post a 1 sec timer there. There shouldn't be a need for locking internally
// and the callback from this class, should occur on the construction TQ
// which in turn means that the callback doesn't need locking either.
// DEPRECATED.
class RtpStreamsSynchronizer : public Module {
public:
explicit RtpStreamsSynchronizer(Syncable* syncable_video);

View file

@ -0,0 +1,217 @@
/*
* Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "video/rtp_streams_synchronizer2.h"
#include "absl/types/optional.h"
#include "call/syncable.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
#include "system_wrappers/include/rtp_to_ntp_estimator.h"
namespace webrtc {
namespace internal {
namespace {
// Time interval for logging stats.
constexpr int64_t kStatsLogIntervalMs = 10000;
constexpr uint32_t kSyncIntervalMs = 1000;
bool UpdateMeasurements(StreamSynchronization::Measurements* stream,
const Syncable::Info& info) {
stream->latest_timestamp = info.latest_received_capture_timestamp;
stream->latest_receive_time_ms = info.latest_receive_time_ms;
bool new_rtcp_sr = false;
return stream->rtp_to_ntp.UpdateMeasurements(
info.capture_time_ntp_secs, info.capture_time_ntp_frac,
info.capture_time_source_clock, &new_rtcp_sr);
}
} // namespace
RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue,
Syncable* syncable_video)
: task_queue_(main_queue),
syncable_video_(syncable_video),
last_sync_time_(rtc::TimeNanos()),
last_stats_log_ms_(rtc::TimeMillis()) {
RTC_DCHECK(syncable_video);
}
RtpStreamsSynchronizer::~RtpStreamsSynchronizer() {
RTC_DCHECK_RUN_ON(&main_checker_);
task_safety_flag_->SetNotAlive();
}
void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) {
RTC_DCHECK_RUN_ON(&main_checker_);
// Prevent expensive no-ops.
if (syncable_audio == syncable_audio_)
return;
syncable_audio_ = syncable_audio;
sync_.reset(nullptr);
if (!syncable_audio_)
return;
sync_.reset(
new StreamSynchronization(syncable_video_->id(), syncable_audio_->id()));
QueueTimer();
}
void RtpStreamsSynchronizer::QueueTimer() {
RTC_DCHECK_RUN_ON(&main_checker_);
if (timer_running_)
return;
timer_running_ = true;
uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) /
rtc::kNumNanosecsPerMillisec;
RTC_DCHECK_LE(delay, kSyncIntervalMs);
task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] {
if (!safety->alive())
return;
RTC_DCHECK_RUN_ON(&main_checker_);
timer_running_ = false;
UpdateDelay();
}),
delay);
}
void RtpStreamsSynchronizer::UpdateDelay() {
RTC_DCHECK_RUN_ON(&main_checker_);
last_sync_time_ = rtc::TimeNanos();
if (!syncable_audio_)
return;
RTC_DCHECK(sync_.get());
QueueTimer();
bool log_stats = false;
const int64_t now_ms = rtc::TimeMillis();
if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) {
last_stats_log_ms_ = now_ms;
log_stats = true;
}
absl::optional<Syncable::Info> audio_info = syncable_audio_->GetInfo();
if (!audio_info || !UpdateMeasurements(&audio_measurement_, *audio_info)) {
return;
}
int64_t last_video_receive_ms = video_measurement_.latest_receive_time_ms;
absl::optional<Syncable::Info> video_info = syncable_video_->GetInfo();
if (!video_info || !UpdateMeasurements(&video_measurement_, *video_info)) {
return;
}
if (last_video_receive_ms == video_measurement_.latest_receive_time_ms) {
// No new video packet has been received since last update.
return;
}
int relative_delay_ms;
// Calculate how much later or earlier the audio stream is compared to video.
if (!sync_->ComputeRelativeDelay(audio_measurement_, video_measurement_,
&relative_delay_ms)) {
return;
}
if (log_stats) {
RTC_LOG(LS_INFO) << "Sync info stats: " << now_ms
<< ", {ssrc: " << sync_->audio_stream_id() << ", "
<< "cur_delay_ms: " << audio_info->current_delay_ms
<< "} {ssrc: " << sync_->video_stream_id() << ", "
<< "cur_delay_ms: " << video_info->current_delay_ms
<< "} {relative_delay_ms: " << relative_delay_ms << "} ";
}
TRACE_COUNTER1("webrtc", "SyncCurrentVideoDelay",
video_info->current_delay_ms);
TRACE_COUNTER1("webrtc", "SyncCurrentAudioDelay",
audio_info->current_delay_ms);
TRACE_COUNTER1("webrtc", "SyncRelativeDelay", relative_delay_ms);
int target_audio_delay_ms = 0;
int target_video_delay_ms = video_info->current_delay_ms;
// Calculate the necessary extra audio delay and desired total video
// delay to get the streams in sync.
if (!sync_->ComputeDelays(relative_delay_ms, audio_info->current_delay_ms,
&target_audio_delay_ms, &target_video_delay_ms)) {
return;
}
if (log_stats) {
RTC_LOG(LS_INFO) << "Sync delay stats: " << now_ms
<< ", {ssrc: " << sync_->audio_stream_id() << ", "
<< "target_delay_ms: " << target_audio_delay_ms
<< "} {ssrc: " << sync_->video_stream_id() << ", "
<< "target_delay_ms: " << target_video_delay_ms << "} ";
}
syncable_audio_->SetMinimumPlayoutDelay(target_audio_delay_ms);
syncable_video_->SetMinimumPlayoutDelay(target_video_delay_ms);
}
// TODO(https://bugs.webrtc.org/7065): Move RtpToNtpEstimator out of
// RtpStreamsSynchronizer and into respective receive stream to always populate
// the estimated playout timestamp.
bool RtpStreamsSynchronizer::GetStreamSyncOffsetInMs(
uint32_t rtp_timestamp,
int64_t render_time_ms,
int64_t* video_playout_ntp_ms,
int64_t* stream_offset_ms,
double* estimated_freq_khz) const {
RTC_DCHECK_RUN_ON(&main_checker_);
if (!syncable_audio_)
return false;
uint32_t audio_rtp_timestamp;
int64_t time_ms;
if (!syncable_audio_->GetPlayoutRtpTimestamp(&audio_rtp_timestamp,
&time_ms)) {
return false;
}
int64_t latest_audio_ntp;
if (!audio_measurement_.rtp_to_ntp.Estimate(audio_rtp_timestamp,
&latest_audio_ntp)) {
return false;
}
syncable_audio_->SetEstimatedPlayoutNtpTimestampMs(latest_audio_ntp, time_ms);
int64_t latest_video_ntp;
if (!video_measurement_.rtp_to_ntp.Estimate(rtp_timestamp,
&latest_video_ntp)) {
return false;
}
// Current audio ntp.
int64_t now_ms = rtc::TimeMillis();
latest_audio_ntp += (now_ms - time_ms);
// Remove video playout delay.
int64_t time_to_render_ms = render_time_ms - now_ms;
if (time_to_render_ms > 0)
latest_video_ntp -= time_to_render_ms;
*video_playout_ntp_ms = latest_video_ntp;
*stream_offset_ms = latest_audio_ntp - latest_video_ntp;
*estimated_freq_khz = video_measurement_.rtp_to_ntp.params()->frequency_khz;
return true;
}
} // namespace internal
} // namespace webrtc

View file

@ -0,0 +1,80 @@
/*
* Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_
#define VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_
#include <memory>
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "video/stream_synchronization.h"
namespace webrtc {
class Syncable;
namespace internal {
// RtpStreamsSynchronizer is responsible for synchronizing audio and video for
// a given audio receive stream and video receive stream.
class RtpStreamsSynchronizer {
public:
RtpStreamsSynchronizer(TaskQueueBase* main_queue, Syncable* syncable_video);
~RtpStreamsSynchronizer();
void ConfigureSync(Syncable* syncable_audio);
// Gets the estimated playout NTP timestamp for the video frame with
// |rtp_timestamp| and the sync offset between the current played out audio
// frame and the video frame. Returns true on success, false otherwise.
// The |estimated_freq_khz| is the frequency used in the RTP to NTP timestamp
// conversion.
bool GetStreamSyncOffsetInMs(uint32_t rtp_timestamp,
int64_t render_time_ms,
int64_t* video_playout_ntp_ms,
int64_t* stream_offset_ms,
double* estimated_freq_khz) const;
private:
void QueueTimer();
void UpdateDelay();
TaskQueueBase* const task_queue_;
// Used to check if we're running on the main thread/task queue.
// The reason we currently don't use RTC_DCHECK_RUN_ON(task_queue_) is because
// we might be running on an rtc::Thread implementation of TaskQueue, which
// does not consistently set itself as the active TaskQueue.
// Instead, we rely on a SequenceChecker for now.
SequenceChecker main_checker_;
Syncable* const syncable_video_;
Syncable* syncable_audio_ RTC_GUARDED_BY(main_checker_) = nullptr;
std::unique_ptr<StreamSynchronization> sync_ RTC_GUARDED_BY(main_checker_);
StreamSynchronization::Measurements audio_measurement_
RTC_GUARDED_BY(main_checker_);
StreamSynchronization::Measurements video_measurement_
RTC_GUARDED_BY(main_checker_);
int64_t last_sync_time_ RTC_GUARDED_BY(&main_checker_);
int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_);
bool timer_running_ RTC_GUARDED_BY(main_checker_) = false;
// Used to signal destruction to potentially pending tasks.
PendingTaskSafetyFlag::Pointer task_safety_flag_ =
PendingTaskSafetyFlag::Create();
};
} // namespace internal
} // namespace webrtc
#endif // VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_

View file

@ -32,7 +32,6 @@
#include "call/rtx_receive_stream.h"
#include "common_video/include/incoming_video_stream.h"
#include "media/base/h264_profile_level_id.h"
#include "modules/utility/include/process_thread.h"
#include "modules/video_coding/include/video_codec_interface.h"
#include "modules/video_coding/include/video_coding_defines.h"
#include "modules/video_coding/include/video_error_codes.h"
@ -194,7 +193,6 @@ VideoReceiveStream2::VideoReceiveStream2(
transport_adapter_(config.rtcp_send_transport),
config_(std::move(config)),
num_cpu_cores_(num_cpu_cores),
process_thread_(process_thread),
worker_thread_(current_queue),
clock_(clock),
call_stats_(call_stats),
@ -211,13 +209,13 @@ VideoReceiveStream2::VideoReceiveStream2(
rtp_receive_statistics_.get(),
&stats_proxy_,
&stats_proxy_,
process_thread_,
process_thread,
this, // NackSender
nullptr, // Use default KeyFrameRequestSender
this, // OnCompleteFrameCallback
config_.frame_decryptor,
config_.frame_transformer),
rtp_stream_sync_(this),
rtp_stream_sync_(current_queue, this),
max_wait_for_keyframe_ms_(KeyframeIntervalSettings::ParseFromFieldTrials()
.MaxWaitForKeyframeMs()
.value_or(kMaxWaitForKeyFrameMs)),
@ -231,7 +229,6 @@ VideoReceiveStream2::VideoReceiveStream2(
RTC_DCHECK(worker_thread_);
RTC_DCHECK(config_.renderer);
RTC_DCHECK(process_thread_);
RTC_DCHECK(call_stats_);
module_process_sequence_checker_.Detach();
@ -253,7 +250,6 @@ VideoReceiveStream2::VideoReceiveStream2(
frame_buffer_.reset(
new video_coding::FrameBuffer(clock_, timing_.get(), &stats_proxy_));
process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE);
// Register with RtpStreamReceiverController.
media_receiver_ = receiver_controller->CreateReceiver(
config_.rtp.remote_ssrc, &rtp_video_stream_receiver_);
@ -273,7 +269,6 @@ VideoReceiveStream2::~VideoReceiveStream2() {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString();
Stop();
process_thread_->DeRegisterModule(&rtp_stream_sync_);
task_safety_flag_->SetNotAlive();
}
@ -587,7 +582,7 @@ uint32_t VideoReceiveStream2::id() const {
}
absl::optional<Syncable::Info> VideoReceiveStream2::GetInfo() const {
RTC_DCHECK_RUN_ON(&module_process_sequence_checker_);
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
absl::optional<Syncable::Info> info =
rtp_video_stream_receiver_.GetSyncInfo();
@ -611,8 +606,9 @@ void VideoReceiveStream2::SetEstimatedPlayoutNtpTimestampMs(
}
void VideoReceiveStream2::SetMinimumPlayoutDelay(int delay_ms) {
RTC_DCHECK_RUN_ON(&module_process_sequence_checker_);
// TODO(bugs.webrtc.org/11489): Consider posting to worker.
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
// TODO(bugs.webrtc.org/11489): See if we can't get rid of the
// |playout_delay_lock_|
rtc::CritScope cs(&playout_delay_lock_);
syncable_minimum_playout_delay_ms_ = delay_ms;
UpdatePlayoutDelays();

View file

@ -29,7 +29,7 @@
#include "rtc_base/task_queue.h"
#include "system_wrappers/include/clock.h"
#include "video/receive_statistics_proxy2.h"
#include "video/rtp_streams_synchronizer.h"
#include "video/rtp_streams_synchronizer2.h"
#include "video/rtp_video_stream_receiver.h"
#include "video/transport_adapter.h"
#include "video/video_stream_decoder2.h"
@ -181,7 +181,6 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream,
TransportAdapter transport_adapter_;
const VideoReceiveStream::Config config_;
const int num_cpu_cores_;
ProcessThread* const process_thread_;
TaskQueueBase* const worker_thread_;
Clock* const clock_;