From ad84d0254ab199f4cd57fc6928c540390c3fcdcf Mon Sep 17 00:00:00 2001 From: Tommi Date: Sun, 10 May 2020 19:03:43 +0200 Subject: [PATCH] Remove locking from RtpStreamsSynchronizer. Remove dependency on ProcessThread. Instead RtpStreamsSynchronizer uses the worker thread and makes callbacks on the same thread. That in turn simplifies locking for VideoReceiveStream2, which we'll take advantage of later. Bug: webrtc:11489 Change-Id: Id9a5a7977771b92e420a09cc472cfb43de5627cc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174221 Commit-Queue: Tommi Reviewed-by: Magnus Flodman Cr-Commit-Position: refs/heads/master@{#31200} --- video/BUILD.gn | 2 + video/rtp_streams_synchronizer.h | 6 +- video/rtp_streams_synchronizer2.cc | 217 +++++++++++++++++++++++++++++ video/rtp_streams_synchronizer2.h | 80 +++++++++++ video/video_receive_stream2.cc | 16 +-- video/video_receive_stream2.h | 3 +- 6 files changed, 307 insertions(+), 17 deletions(-) create mode 100644 video/rtp_streams_synchronizer2.cc create mode 100644 video/rtp_streams_synchronizer2.h diff --git a/video/BUILD.gn b/video/BUILD.gn index 2404281727..2e355f7e98 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -28,6 +28,8 @@ rtc_library("video") { "report_block_stats.h", "rtp_streams_synchronizer.cc", "rtp_streams_synchronizer.h", + "rtp_streams_synchronizer2.cc", + "rtp_streams_synchronizer2.h", "rtp_video_stream_receiver.cc", "rtp_video_stream_receiver.h", "rtp_video_stream_receiver_frame_transformer_delegate.cc", diff --git a/video/rtp_streams_synchronizer.h b/video/rtp_streams_synchronizer.h index 00ef526dc5..6abf5bbe0e 100644 --- a/video/rtp_streams_synchronizer.h +++ b/video/rtp_streams_synchronizer.h @@ -25,11 +25,7 @@ namespace webrtc { class Syncable; -// TODO(bugs.webrtc.org/11489): Remove dependency on ProcessThread/Module. -// Instead make this a single threaded class, constructed on a TQ and -// post a 1 sec timer there. There shouldn't be a need for locking internally -// and the callback from this class, should occur on the construction TQ -// which in turn means that the callback doesn't need locking either. +// DEPRECATED. class RtpStreamsSynchronizer : public Module { public: explicit RtpStreamsSynchronizer(Syncable* syncable_video); diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc new file mode 100644 index 0000000000..1a9a3e8026 --- /dev/null +++ b/video/rtp_streams_synchronizer2.cc @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/rtp_streams_synchronizer2.h" + +#include "absl/types/optional.h" +#include "call/syncable.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/time_utils.h" +#include "rtc_base/trace_event.h" +#include "system_wrappers/include/rtp_to_ntp_estimator.h" + +namespace webrtc { +namespace internal { +namespace { +// Time interval for logging stats. +constexpr int64_t kStatsLogIntervalMs = 10000; +constexpr uint32_t kSyncIntervalMs = 1000; + +bool UpdateMeasurements(StreamSynchronization::Measurements* stream, + const Syncable::Info& info) { + stream->latest_timestamp = info.latest_received_capture_timestamp; + stream->latest_receive_time_ms = info.latest_receive_time_ms; + bool new_rtcp_sr = false; + return stream->rtp_to_ntp.UpdateMeasurements( + info.capture_time_ntp_secs, info.capture_time_ntp_frac, + info.capture_time_source_clock, &new_rtcp_sr); +} +} // namespace + +RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue, + Syncable* syncable_video) + : task_queue_(main_queue), + syncable_video_(syncable_video), + last_sync_time_(rtc::TimeNanos()), + last_stats_log_ms_(rtc::TimeMillis()) { + RTC_DCHECK(syncable_video); +} + +RtpStreamsSynchronizer::~RtpStreamsSynchronizer() { + RTC_DCHECK_RUN_ON(&main_checker_); + task_safety_flag_->SetNotAlive(); +} + +void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { + RTC_DCHECK_RUN_ON(&main_checker_); + + // Prevent expensive no-ops. + if (syncable_audio == syncable_audio_) + return; + + syncable_audio_ = syncable_audio; + sync_.reset(nullptr); + if (!syncable_audio_) + return; + + sync_.reset( + new StreamSynchronization(syncable_video_->id(), syncable_audio_->id())); + QueueTimer(); +} + +void RtpStreamsSynchronizer::QueueTimer() { + RTC_DCHECK_RUN_ON(&main_checker_); + if (timer_running_) + return; + + timer_running_ = true; + uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) / + rtc::kNumNanosecsPerMillisec; + RTC_DCHECK_LE(delay, kSyncIntervalMs); + task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] { + if (!safety->alive()) + return; + RTC_DCHECK_RUN_ON(&main_checker_); + timer_running_ = false; + UpdateDelay(); + }), + delay); +} + +void RtpStreamsSynchronizer::UpdateDelay() { + RTC_DCHECK_RUN_ON(&main_checker_); + last_sync_time_ = rtc::TimeNanos(); + + if (!syncable_audio_) + return; + + RTC_DCHECK(sync_.get()); + + QueueTimer(); + + bool log_stats = false; + const int64_t now_ms = rtc::TimeMillis(); + if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) { + last_stats_log_ms_ = now_ms; + log_stats = true; + } + + absl::optional audio_info = syncable_audio_->GetInfo(); + if (!audio_info || !UpdateMeasurements(&audio_measurement_, *audio_info)) { + return; + } + + int64_t last_video_receive_ms = video_measurement_.latest_receive_time_ms; + absl::optional video_info = syncable_video_->GetInfo(); + if (!video_info || !UpdateMeasurements(&video_measurement_, *video_info)) { + return; + } + + if (last_video_receive_ms == video_measurement_.latest_receive_time_ms) { + // No new video packet has been received since last update. + return; + } + + int relative_delay_ms; + // Calculate how much later or earlier the audio stream is compared to video. + if (!sync_->ComputeRelativeDelay(audio_measurement_, video_measurement_, + &relative_delay_ms)) { + return; + } + + if (log_stats) { + RTC_LOG(LS_INFO) << "Sync info stats: " << now_ms + << ", {ssrc: " << sync_->audio_stream_id() << ", " + << "cur_delay_ms: " << audio_info->current_delay_ms + << "} {ssrc: " << sync_->video_stream_id() << ", " + << "cur_delay_ms: " << video_info->current_delay_ms + << "} {relative_delay_ms: " << relative_delay_ms << "} "; + } + + TRACE_COUNTER1("webrtc", "SyncCurrentVideoDelay", + video_info->current_delay_ms); + TRACE_COUNTER1("webrtc", "SyncCurrentAudioDelay", + audio_info->current_delay_ms); + TRACE_COUNTER1("webrtc", "SyncRelativeDelay", relative_delay_ms); + + int target_audio_delay_ms = 0; + int target_video_delay_ms = video_info->current_delay_ms; + // Calculate the necessary extra audio delay and desired total video + // delay to get the streams in sync. + if (!sync_->ComputeDelays(relative_delay_ms, audio_info->current_delay_ms, + &target_audio_delay_ms, &target_video_delay_ms)) { + return; + } + + if (log_stats) { + RTC_LOG(LS_INFO) << "Sync delay stats: " << now_ms + << ", {ssrc: " << sync_->audio_stream_id() << ", " + << "target_delay_ms: " << target_audio_delay_ms + << "} {ssrc: " << sync_->video_stream_id() << ", " + << "target_delay_ms: " << target_video_delay_ms << "} "; + } + + syncable_audio_->SetMinimumPlayoutDelay(target_audio_delay_ms); + syncable_video_->SetMinimumPlayoutDelay(target_video_delay_ms); +} + +// TODO(https://bugs.webrtc.org/7065): Move RtpToNtpEstimator out of +// RtpStreamsSynchronizer and into respective receive stream to always populate +// the estimated playout timestamp. +bool RtpStreamsSynchronizer::GetStreamSyncOffsetInMs( + uint32_t rtp_timestamp, + int64_t render_time_ms, + int64_t* video_playout_ntp_ms, + int64_t* stream_offset_ms, + double* estimated_freq_khz) const { + RTC_DCHECK_RUN_ON(&main_checker_); + + if (!syncable_audio_) + return false; + + uint32_t audio_rtp_timestamp; + int64_t time_ms; + if (!syncable_audio_->GetPlayoutRtpTimestamp(&audio_rtp_timestamp, + &time_ms)) { + return false; + } + + int64_t latest_audio_ntp; + if (!audio_measurement_.rtp_to_ntp.Estimate(audio_rtp_timestamp, + &latest_audio_ntp)) { + return false; + } + + syncable_audio_->SetEstimatedPlayoutNtpTimestampMs(latest_audio_ntp, time_ms); + + int64_t latest_video_ntp; + if (!video_measurement_.rtp_to_ntp.Estimate(rtp_timestamp, + &latest_video_ntp)) { + return false; + } + + // Current audio ntp. + int64_t now_ms = rtc::TimeMillis(); + latest_audio_ntp += (now_ms - time_ms); + + // Remove video playout delay. + int64_t time_to_render_ms = render_time_ms - now_ms; + if (time_to_render_ms > 0) + latest_video_ntp -= time_to_render_ms; + + *video_playout_ntp_ms = latest_video_ntp; + *stream_offset_ms = latest_audio_ntp - latest_video_ntp; + *estimated_freq_khz = video_measurement_.rtp_to_ntp.params()->frequency_khz; + return true; +} + +} // namespace internal +} // namespace webrtc diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h new file mode 100644 index 0000000000..353434e6a9 --- /dev/null +++ b/video/rtp_streams_synchronizer2.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_ +#define VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_ + +#include + +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "video/stream_synchronization.h" + +namespace webrtc { + +class Syncable; + +namespace internal { + +// RtpStreamsSynchronizer is responsible for synchronizing audio and video for +// a given audio receive stream and video receive stream. +class RtpStreamsSynchronizer { + public: + RtpStreamsSynchronizer(TaskQueueBase* main_queue, Syncable* syncable_video); + ~RtpStreamsSynchronizer(); + + void ConfigureSync(Syncable* syncable_audio); + + // Gets the estimated playout NTP timestamp for the video frame with + // |rtp_timestamp| and the sync offset between the current played out audio + // frame and the video frame. Returns true on success, false otherwise. + // The |estimated_freq_khz| is the frequency used in the RTP to NTP timestamp + // conversion. + bool GetStreamSyncOffsetInMs(uint32_t rtp_timestamp, + int64_t render_time_ms, + int64_t* video_playout_ntp_ms, + int64_t* stream_offset_ms, + double* estimated_freq_khz) const; + + private: + void QueueTimer(); + void UpdateDelay(); + + TaskQueueBase* const task_queue_; + + // Used to check if we're running on the main thread/task queue. + // The reason we currently don't use RTC_DCHECK_RUN_ON(task_queue_) is because + // we might be running on an rtc::Thread implementation of TaskQueue, which + // does not consistently set itself as the active TaskQueue. + // Instead, we rely on a SequenceChecker for now. + SequenceChecker main_checker_; + + Syncable* const syncable_video_; + + Syncable* syncable_audio_ RTC_GUARDED_BY(main_checker_) = nullptr; + std::unique_ptr sync_ RTC_GUARDED_BY(main_checker_); + StreamSynchronization::Measurements audio_measurement_ + RTC_GUARDED_BY(main_checker_); + StreamSynchronization::Measurements video_measurement_ + RTC_GUARDED_BY(main_checker_); + int64_t last_sync_time_ RTC_GUARDED_BY(&main_checker_); + int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_); + bool timer_running_ RTC_GUARDED_BY(main_checker_) = false; + + // Used to signal destruction to potentially pending tasks. + PendingTaskSafetyFlag::Pointer task_safety_flag_ = + PendingTaskSafetyFlag::Create(); +}; + +} // namespace internal +} // namespace webrtc + +#endif // VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_ diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 9f40c4567b..6649fcaf74 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -32,7 +32,6 @@ #include "call/rtx_receive_stream.h" #include "common_video/include/incoming_video_stream.h" #include "media/base/h264_profile_level_id.h" -#include "modules/utility/include/process_thread.h" #include "modules/video_coding/include/video_codec_interface.h" #include "modules/video_coding/include/video_coding_defines.h" #include "modules/video_coding/include/video_error_codes.h" @@ -194,7 +193,6 @@ VideoReceiveStream2::VideoReceiveStream2( transport_adapter_(config.rtcp_send_transport), config_(std::move(config)), num_cpu_cores_(num_cpu_cores), - process_thread_(process_thread), worker_thread_(current_queue), clock_(clock), call_stats_(call_stats), @@ -211,13 +209,13 @@ VideoReceiveStream2::VideoReceiveStream2( rtp_receive_statistics_.get(), &stats_proxy_, &stats_proxy_, - process_thread_, + process_thread, this, // NackSender nullptr, // Use default KeyFrameRequestSender this, // OnCompleteFrameCallback config_.frame_decryptor, config_.frame_transformer), - rtp_stream_sync_(this), + rtp_stream_sync_(current_queue, this), max_wait_for_keyframe_ms_(KeyframeIntervalSettings::ParseFromFieldTrials() .MaxWaitForKeyframeMs() .value_or(kMaxWaitForKeyFrameMs)), @@ -231,7 +229,6 @@ VideoReceiveStream2::VideoReceiveStream2( RTC_DCHECK(worker_thread_); RTC_DCHECK(config_.renderer); - RTC_DCHECK(process_thread_); RTC_DCHECK(call_stats_); module_process_sequence_checker_.Detach(); @@ -253,7 +250,6 @@ VideoReceiveStream2::VideoReceiveStream2( frame_buffer_.reset( new video_coding::FrameBuffer(clock_, timing_.get(), &stats_proxy_)); - process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE); // Register with RtpStreamReceiverController. media_receiver_ = receiver_controller->CreateReceiver( config_.rtp.remote_ssrc, &rtp_video_stream_receiver_); @@ -273,7 +269,6 @@ VideoReceiveStream2::~VideoReceiveStream2() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString(); Stop(); - process_thread_->DeRegisterModule(&rtp_stream_sync_); task_safety_flag_->SetNotAlive(); } @@ -587,7 +582,7 @@ uint32_t VideoReceiveStream2::id() const { } absl::optional VideoReceiveStream2::GetInfo() const { - RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); absl::optional info = rtp_video_stream_receiver_.GetSyncInfo(); @@ -611,8 +606,9 @@ void VideoReceiveStream2::SetEstimatedPlayoutNtpTimestampMs( } void VideoReceiveStream2::SetMinimumPlayoutDelay(int delay_ms) { - RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); - // TODO(bugs.webrtc.org/11489): Consider posting to worker. + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + // TODO(bugs.webrtc.org/11489): See if we can't get rid of the + // |playout_delay_lock_| rtc::CritScope cs(&playout_delay_lock_); syncable_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index 66fbc05e91..9f32c1d6e4 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -29,7 +29,7 @@ #include "rtc_base/task_queue.h" #include "system_wrappers/include/clock.h" #include "video/receive_statistics_proxy2.h" -#include "video/rtp_streams_synchronizer.h" +#include "video/rtp_streams_synchronizer2.h" #include "video/rtp_video_stream_receiver.h" #include "video/transport_adapter.h" #include "video/video_stream_decoder2.h" @@ -181,7 +181,6 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, TransportAdapter transport_adapter_; const VideoReceiveStream::Config config_; const int num_cpu_cores_; - ProcessThread* const process_thread_; TaskQueueBase* const worker_thread_; Clock* const clock_;