diff --git a/video/BUILD.gn b/video/BUILD.gn index 2404281727..2e355f7e98 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -28,6 +28,8 @@ rtc_library("video") { "report_block_stats.h", "rtp_streams_synchronizer.cc", "rtp_streams_synchronizer.h", + "rtp_streams_synchronizer2.cc", + "rtp_streams_synchronizer2.h", "rtp_video_stream_receiver.cc", "rtp_video_stream_receiver.h", "rtp_video_stream_receiver_frame_transformer_delegate.cc", diff --git a/video/rtp_streams_synchronizer.h b/video/rtp_streams_synchronizer.h index 00ef526dc5..6abf5bbe0e 100644 --- a/video/rtp_streams_synchronizer.h +++ b/video/rtp_streams_synchronizer.h @@ -25,11 +25,7 @@ namespace webrtc { class Syncable; -// TODO(bugs.webrtc.org/11489): Remove dependency on ProcessThread/Module. -// Instead make this a single threaded class, constructed on a TQ and -// post a 1 sec timer there. There shouldn't be a need for locking internally -// and the callback from this class, should occur on the construction TQ -// which in turn means that the callback doesn't need locking either. +// DEPRECATED. class RtpStreamsSynchronizer : public Module { public: explicit RtpStreamsSynchronizer(Syncable* syncable_video); diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc new file mode 100644 index 0000000000..1a9a3e8026 --- /dev/null +++ b/video/rtp_streams_synchronizer2.cc @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/rtp_streams_synchronizer2.h" + +#include "absl/types/optional.h" +#include "call/syncable.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/time_utils.h" +#include "rtc_base/trace_event.h" +#include "system_wrappers/include/rtp_to_ntp_estimator.h" + +namespace webrtc { +namespace internal { +namespace { +// Time interval for logging stats. +constexpr int64_t kStatsLogIntervalMs = 10000; +constexpr uint32_t kSyncIntervalMs = 1000; + +bool UpdateMeasurements(StreamSynchronization::Measurements* stream, + const Syncable::Info& info) { + stream->latest_timestamp = info.latest_received_capture_timestamp; + stream->latest_receive_time_ms = info.latest_receive_time_ms; + bool new_rtcp_sr = false; + return stream->rtp_to_ntp.UpdateMeasurements( + info.capture_time_ntp_secs, info.capture_time_ntp_frac, + info.capture_time_source_clock, &new_rtcp_sr); +} +} // namespace + +RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue, + Syncable* syncable_video) + : task_queue_(main_queue), + syncable_video_(syncable_video), + last_sync_time_(rtc::TimeNanos()), + last_stats_log_ms_(rtc::TimeMillis()) { + RTC_DCHECK(syncable_video); +} + +RtpStreamsSynchronizer::~RtpStreamsSynchronizer() { + RTC_DCHECK_RUN_ON(&main_checker_); + task_safety_flag_->SetNotAlive(); +} + +void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { + RTC_DCHECK_RUN_ON(&main_checker_); + + // Prevent expensive no-ops. + if (syncable_audio == syncable_audio_) + return; + + syncable_audio_ = syncable_audio; + sync_.reset(nullptr); + if (!syncable_audio_) + return; + + sync_.reset( + new StreamSynchronization(syncable_video_->id(), syncable_audio_->id())); + QueueTimer(); +} + +void RtpStreamsSynchronizer::QueueTimer() { + RTC_DCHECK_RUN_ON(&main_checker_); + if (timer_running_) + return; + + timer_running_ = true; + uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) / + rtc::kNumNanosecsPerMillisec; + RTC_DCHECK_LE(delay, kSyncIntervalMs); + task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] { + if (!safety->alive()) + return; + RTC_DCHECK_RUN_ON(&main_checker_); + timer_running_ = false; + UpdateDelay(); + }), + delay); +} + +void RtpStreamsSynchronizer::UpdateDelay() { + RTC_DCHECK_RUN_ON(&main_checker_); + last_sync_time_ = rtc::TimeNanos(); + + if (!syncable_audio_) + return; + + RTC_DCHECK(sync_.get()); + + QueueTimer(); + + bool log_stats = false; + const int64_t now_ms = rtc::TimeMillis(); + if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) { + last_stats_log_ms_ = now_ms; + log_stats = true; + } + + absl::optional audio_info = syncable_audio_->GetInfo(); + if (!audio_info || !UpdateMeasurements(&audio_measurement_, *audio_info)) { + return; + } + + int64_t last_video_receive_ms = video_measurement_.latest_receive_time_ms; + absl::optional video_info = syncable_video_->GetInfo(); + if (!video_info || !UpdateMeasurements(&video_measurement_, *video_info)) { + return; + } + + if (last_video_receive_ms == video_measurement_.latest_receive_time_ms) { + // No new video packet has been received since last update. + return; + } + + int relative_delay_ms; + // Calculate how much later or earlier the audio stream is compared to video. + if (!sync_->ComputeRelativeDelay(audio_measurement_, video_measurement_, + &relative_delay_ms)) { + return; + } + + if (log_stats) { + RTC_LOG(LS_INFO) << "Sync info stats: " << now_ms + << ", {ssrc: " << sync_->audio_stream_id() << ", " + << "cur_delay_ms: " << audio_info->current_delay_ms + << "} {ssrc: " << sync_->video_stream_id() << ", " + << "cur_delay_ms: " << video_info->current_delay_ms + << "} {relative_delay_ms: " << relative_delay_ms << "} "; + } + + TRACE_COUNTER1("webrtc", "SyncCurrentVideoDelay", + video_info->current_delay_ms); + TRACE_COUNTER1("webrtc", "SyncCurrentAudioDelay", + audio_info->current_delay_ms); + TRACE_COUNTER1("webrtc", "SyncRelativeDelay", relative_delay_ms); + + int target_audio_delay_ms = 0; + int target_video_delay_ms = video_info->current_delay_ms; + // Calculate the necessary extra audio delay and desired total video + // delay to get the streams in sync. + if (!sync_->ComputeDelays(relative_delay_ms, audio_info->current_delay_ms, + &target_audio_delay_ms, &target_video_delay_ms)) { + return; + } + + if (log_stats) { + RTC_LOG(LS_INFO) << "Sync delay stats: " << now_ms + << ", {ssrc: " << sync_->audio_stream_id() << ", " + << "target_delay_ms: " << target_audio_delay_ms + << "} {ssrc: " << sync_->video_stream_id() << ", " + << "target_delay_ms: " << target_video_delay_ms << "} "; + } + + syncable_audio_->SetMinimumPlayoutDelay(target_audio_delay_ms); + syncable_video_->SetMinimumPlayoutDelay(target_video_delay_ms); +} + +// TODO(https://bugs.webrtc.org/7065): Move RtpToNtpEstimator out of +// RtpStreamsSynchronizer and into respective receive stream to always populate +// the estimated playout timestamp. +bool RtpStreamsSynchronizer::GetStreamSyncOffsetInMs( + uint32_t rtp_timestamp, + int64_t render_time_ms, + int64_t* video_playout_ntp_ms, + int64_t* stream_offset_ms, + double* estimated_freq_khz) const { + RTC_DCHECK_RUN_ON(&main_checker_); + + if (!syncable_audio_) + return false; + + uint32_t audio_rtp_timestamp; + int64_t time_ms; + if (!syncable_audio_->GetPlayoutRtpTimestamp(&audio_rtp_timestamp, + &time_ms)) { + return false; + } + + int64_t latest_audio_ntp; + if (!audio_measurement_.rtp_to_ntp.Estimate(audio_rtp_timestamp, + &latest_audio_ntp)) { + return false; + } + + syncable_audio_->SetEstimatedPlayoutNtpTimestampMs(latest_audio_ntp, time_ms); + + int64_t latest_video_ntp; + if (!video_measurement_.rtp_to_ntp.Estimate(rtp_timestamp, + &latest_video_ntp)) { + return false; + } + + // Current audio ntp. + int64_t now_ms = rtc::TimeMillis(); + latest_audio_ntp += (now_ms - time_ms); + + // Remove video playout delay. + int64_t time_to_render_ms = render_time_ms - now_ms; + if (time_to_render_ms > 0) + latest_video_ntp -= time_to_render_ms; + + *video_playout_ntp_ms = latest_video_ntp; + *stream_offset_ms = latest_audio_ntp - latest_video_ntp; + *estimated_freq_khz = video_measurement_.rtp_to_ntp.params()->frequency_khz; + return true; +} + +} // namespace internal +} // namespace webrtc diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h new file mode 100644 index 0000000000..353434e6a9 --- /dev/null +++ b/video/rtp_streams_synchronizer2.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_ +#define VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_ + +#include + +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "video/stream_synchronization.h" + +namespace webrtc { + +class Syncable; + +namespace internal { + +// RtpStreamsSynchronizer is responsible for synchronizing audio and video for +// a given audio receive stream and video receive stream. +class RtpStreamsSynchronizer { + public: + RtpStreamsSynchronizer(TaskQueueBase* main_queue, Syncable* syncable_video); + ~RtpStreamsSynchronizer(); + + void ConfigureSync(Syncable* syncable_audio); + + // Gets the estimated playout NTP timestamp for the video frame with + // |rtp_timestamp| and the sync offset between the current played out audio + // frame and the video frame. Returns true on success, false otherwise. + // The |estimated_freq_khz| is the frequency used in the RTP to NTP timestamp + // conversion. + bool GetStreamSyncOffsetInMs(uint32_t rtp_timestamp, + int64_t render_time_ms, + int64_t* video_playout_ntp_ms, + int64_t* stream_offset_ms, + double* estimated_freq_khz) const; + + private: + void QueueTimer(); + void UpdateDelay(); + + TaskQueueBase* const task_queue_; + + // Used to check if we're running on the main thread/task queue. + // The reason we currently don't use RTC_DCHECK_RUN_ON(task_queue_) is because + // we might be running on an rtc::Thread implementation of TaskQueue, which + // does not consistently set itself as the active TaskQueue. + // Instead, we rely on a SequenceChecker for now. + SequenceChecker main_checker_; + + Syncable* const syncable_video_; + + Syncable* syncable_audio_ RTC_GUARDED_BY(main_checker_) = nullptr; + std::unique_ptr sync_ RTC_GUARDED_BY(main_checker_); + StreamSynchronization::Measurements audio_measurement_ + RTC_GUARDED_BY(main_checker_); + StreamSynchronization::Measurements video_measurement_ + RTC_GUARDED_BY(main_checker_); + int64_t last_sync_time_ RTC_GUARDED_BY(&main_checker_); + int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_); + bool timer_running_ RTC_GUARDED_BY(main_checker_) = false; + + // Used to signal destruction to potentially pending tasks. + PendingTaskSafetyFlag::Pointer task_safety_flag_ = + PendingTaskSafetyFlag::Create(); +}; + +} // namespace internal +} // namespace webrtc + +#endif // VIDEO_RTP_STREAMS_SYNCHRONIZER2_H_ diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 9f40c4567b..6649fcaf74 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -32,7 +32,6 @@ #include "call/rtx_receive_stream.h" #include "common_video/include/incoming_video_stream.h" #include "media/base/h264_profile_level_id.h" -#include "modules/utility/include/process_thread.h" #include "modules/video_coding/include/video_codec_interface.h" #include "modules/video_coding/include/video_coding_defines.h" #include "modules/video_coding/include/video_error_codes.h" @@ -194,7 +193,6 @@ VideoReceiveStream2::VideoReceiveStream2( transport_adapter_(config.rtcp_send_transport), config_(std::move(config)), num_cpu_cores_(num_cpu_cores), - process_thread_(process_thread), worker_thread_(current_queue), clock_(clock), call_stats_(call_stats), @@ -211,13 +209,13 @@ VideoReceiveStream2::VideoReceiveStream2( rtp_receive_statistics_.get(), &stats_proxy_, &stats_proxy_, - process_thread_, + process_thread, this, // NackSender nullptr, // Use default KeyFrameRequestSender this, // OnCompleteFrameCallback config_.frame_decryptor, config_.frame_transformer), - rtp_stream_sync_(this), + rtp_stream_sync_(current_queue, this), max_wait_for_keyframe_ms_(KeyframeIntervalSettings::ParseFromFieldTrials() .MaxWaitForKeyframeMs() .value_or(kMaxWaitForKeyFrameMs)), @@ -231,7 +229,6 @@ VideoReceiveStream2::VideoReceiveStream2( RTC_DCHECK(worker_thread_); RTC_DCHECK(config_.renderer); - RTC_DCHECK(process_thread_); RTC_DCHECK(call_stats_); module_process_sequence_checker_.Detach(); @@ -253,7 +250,6 @@ VideoReceiveStream2::VideoReceiveStream2( frame_buffer_.reset( new video_coding::FrameBuffer(clock_, timing_.get(), &stats_proxy_)); - process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE); // Register with RtpStreamReceiverController. media_receiver_ = receiver_controller->CreateReceiver( config_.rtp.remote_ssrc, &rtp_video_stream_receiver_); @@ -273,7 +269,6 @@ VideoReceiveStream2::~VideoReceiveStream2() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString(); Stop(); - process_thread_->DeRegisterModule(&rtp_stream_sync_); task_safety_flag_->SetNotAlive(); } @@ -587,7 +582,7 @@ uint32_t VideoReceiveStream2::id() const { } absl::optional VideoReceiveStream2::GetInfo() const { - RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); absl::optional info = rtp_video_stream_receiver_.GetSyncInfo(); @@ -611,8 +606,9 @@ void VideoReceiveStream2::SetEstimatedPlayoutNtpTimestampMs( } void VideoReceiveStream2::SetMinimumPlayoutDelay(int delay_ms) { - RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); - // TODO(bugs.webrtc.org/11489): Consider posting to worker. + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + // TODO(bugs.webrtc.org/11489): See if we can't get rid of the + // |playout_delay_lock_| rtc::CritScope cs(&playout_delay_lock_); syncable_minimum_playout_delay_ms_ = delay_ms; UpdatePlayoutDelays(); diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index 66fbc05e91..9f32c1d6e4 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -29,7 +29,7 @@ #include "rtc_base/task_queue.h" #include "system_wrappers/include/clock.h" #include "video/receive_statistics_proxy2.h" -#include "video/rtp_streams_synchronizer.h" +#include "video/rtp_streams_synchronizer2.h" #include "video/rtp_video_stream_receiver.h" #include "video/transport_adapter.h" #include "video/video_stream_decoder2.h" @@ -181,7 +181,6 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, TransportAdapter transport_adapter_; const VideoReceiveStream::Config config_; const int num_cpu_cores_; - ProcessThread* const process_thread_; TaskQueueBase* const worker_thread_; Clock* const clock_;