From 4ccdf932e1706caee26b3faa33753d27f93d79ad Mon Sep 17 00:00:00 2001 From: Tommi Date: Mon, 17 May 2021 14:50:10 +0200 Subject: [PATCH] VideoRtpReceiver & AudioRtpReceiver threading fixes. For implementations where the signaling and worker threads are not the same thread, this significantly cuts down on Thread::Invoke()s that would block the signaling thread while waiting for the worker thread. For Audio and Video Rtp receivers, the following methods now do not block the signaling thread: * GetParameters * SetJitterBufferMinimumDelay * GetSources * SetFrameDecryptor / GetFrameDecryptor * SetDepacketizerToDecoderFrameTransformer Importantly this change also makes the track() accessor accessible directly from the application thread (bypassing the proxy) since for receiver objects, the track object is const. Other changes: * Remove RefCountedObject inheritance, use make_ref_counted instead. * Every member variable in the rtp receiver classes is now RTC_GUARDED * Stop() now fully clears up worker thread state, and Stop() is consistently called before destruction. This means that there's one thread hop instead of at least 4 before (sometimes more), per receiver. * OnChanged triggered volume for audio tracks is done asynchronously. * Deleted most of the JitterBufferDelay implementation. Turns out that it was largely unnecessary overhead and complexity. It seems that these two classes are copy/pasted to a large extent so further refactoring would be good in the future, as to not have to fix each issue twice. Bug: chromium:1184611 Change-Id: I1ba5c3abbd1b0571f7d12850d64004fd2d83e5e2 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/218605 Commit-Queue: Tommi Reviewed-by: Markus Handell Cr-Commit-Position: refs/heads/master@{#34022} --- api/rtp_receiver_interface.h | 32 +-- pc/BUILD.gn | 50 +--- pc/audio_rtp_receiver.cc | 248 ++++++++++++-------- pc/audio_rtp_receiver.h | 69 +++--- pc/jitter_buffer_delay.cc | 45 +--- pc/jitter_buffer_delay.h | 31 +-- pc/jitter_buffer_delay_interface.h | 39 ---- pc/jitter_buffer_delay_proxy.h | 31 --- pc/jitter_buffer_delay_unittest.cc | 64 ++---- pc/remote_audio_source.cc | 36 ++- pc/rtp_receiver.cc | 16 -- pc/rtp_receiver.h | 7 - pc/rtp_sender_receiver_unittest.cc | 38 +++- pc/rtp_transceiver.cc | 14 +- pc/rtp_transceiver_unittest.cc | 2 + pc/rtp_transmission_manager.cc | 21 +- pc/stats_collector.cc | 30 ++- pc/test/fake_peer_connection_for_stats.h | 2 +- pc/video_rtp_receiver.cc | 276 +++++++++++++---------- pc/video_rtp_receiver.h | 92 +++++--- pc/video_rtp_receiver_unittest.cc | 25 +- 21 files changed, 585 insertions(+), 583 deletions(-) delete mode 100644 pc/jitter_buffer_delay_interface.h delete mode 100644 pc/jitter_buffer_delay_proxy.h diff --git a/api/rtp_receiver_interface.h b/api/rtp_receiver_interface.h index e0ace545af..d2645eda8c 100644 --- a/api/rtp_receiver_interface.h +++ b/api/rtp_receiver_interface.h @@ -100,11 +100,13 @@ class RTC_EXPORT RtpReceiverInterface : public rtc::RefCountInterface { // before it is sent across the network. This will decrypt the entire frame // using the user provided decryption mechanism regardless of whether SRTP is // enabled or not. + // TODO(bugs.webrtc.org/12772): Remove. virtual void SetFrameDecryptor( rtc::scoped_refptr frame_decryptor); // Returns a pointer to the frame decryptor set previously by the // user. This can be used to update the state of the object. + // TODO(bugs.webrtc.org/12772): Remove. virtual rtc::scoped_refptr GetFrameDecryptor() const; // Sets a frame transformer between the depacketizer and the decoder to enable @@ -120,27 +122,31 @@ class RTC_EXPORT RtpReceiverInterface : public rtc::RefCountInterface { // Define proxy for RtpReceiverInterface. // TODO(deadbeef): Move this to .cc file and out of api/. What threads methods // are called on is an implementation detail. -BEGIN_PRIMARY_PROXY_MAP(RtpReceiver) +BEGIN_PROXY_MAP(RtpReceiver) PROXY_PRIMARY_THREAD_DESTRUCTOR() -PROXY_CONSTMETHOD0(rtc::scoped_refptr, track) +BYPASS_PROXY_CONSTMETHOD0(rtc::scoped_refptr, track) PROXY_CONSTMETHOD0(rtc::scoped_refptr, dtls_transport) PROXY_CONSTMETHOD0(std::vector, stream_ids) PROXY_CONSTMETHOD0(std::vector>, streams) BYPASS_PROXY_CONSTMETHOD0(cricket::MediaType, media_type) BYPASS_PROXY_CONSTMETHOD0(std::string, id) -PROXY_CONSTMETHOD0(RtpParameters, GetParameters) +PROXY_SECONDARY_CONSTMETHOD0(RtpParameters, GetParameters) PROXY_METHOD1(void, SetObserver, RtpReceiverObserverInterface*) -PROXY_METHOD1(void, SetJitterBufferMinimumDelay, absl::optional) -PROXY_CONSTMETHOD0(std::vector, GetSources) -PROXY_METHOD1(void, - SetFrameDecryptor, - rtc::scoped_refptr) -PROXY_CONSTMETHOD0(rtc::scoped_refptr, - GetFrameDecryptor) -PROXY_METHOD1(void, - SetDepacketizerToDecoderFrameTransformer, - rtc::scoped_refptr) +PROXY_SECONDARY_METHOD1(void, + SetJitterBufferMinimumDelay, + absl::optional) +PROXY_SECONDARY_CONSTMETHOD0(std::vector, GetSources) +// TODO(bugs.webrtc.org/12772): Remove. +PROXY_SECONDARY_METHOD1(void, + SetFrameDecryptor, + rtc::scoped_refptr) +// TODO(bugs.webrtc.org/12772): Remove. +PROXY_SECONDARY_CONSTMETHOD0(rtc::scoped_refptr, + GetFrameDecryptor) +PROXY_SECONDARY_METHOD1(void, + SetDepacketizerToDecoderFrameTransformer, + rtc::scoped_refptr) END_PROXY_MAP() } // namespace webrtc diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 35487c7f5a..3039ec6f66 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -216,8 +216,6 @@ rtc_library("peerconnection") { ":connection_context", ":dtmf_sender", ":jitter_buffer_delay", - ":jitter_buffer_delay_interface", - ":jitter_buffer_delay_proxy", ":media_protocol_names", ":media_stream", ":peer_connection_message_handler", @@ -502,8 +500,6 @@ rtc_library("audio_rtp_receiver") { deps = [ ":audio_track", ":jitter_buffer_delay", - ":jitter_buffer_delay_interface", - ":jitter_buffer_delay_proxy", ":media_stream", ":remote_audio_source", ":rtp_receiver", @@ -520,6 +516,9 @@ rtc_library("audio_rtp_receiver") { "../rtc_base:checks", "../rtc_base:refcount", "../rtc_base:threading", + "../rtc_base/system:no_unique_address", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", ] absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", @@ -535,8 +534,6 @@ rtc_library("video_rtp_receiver") { ] deps = [ ":jitter_buffer_delay", - ":jitter_buffer_delay_interface", - ":jitter_buffer_delay_proxy", ":media_stream", ":rtp_receiver", ":video_rtp_track_source", @@ -556,6 +553,7 @@ rtc_library("video_rtp_receiver") { "../rtc_base:checks", "../rtc_base:rtc_base_approved", "../rtc_base:threading", + "../rtc_base/system:no_unique_address", ] absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", @@ -615,19 +613,6 @@ rtc_library("video_track") { ] } -rtc_source_set("jitter_buffer_delay_interface") { - sources = [ "jitter_buffer_delay_interface.h" ] - deps = [ - "../media:rtc_media_base", - "../rtc_base:refcount", - ] - absl_deps = [ - "//third_party/abseil-cpp/absl/algorithm:container", - "//third_party/abseil-cpp/absl/strings", - "//third_party/abseil-cpp/absl/types:optional", - ] -} - rtc_source_set("sdp_state_provider") { sources = [ "sdp_state_provider.h" ] deps = [ @@ -636,35 +621,19 @@ rtc_source_set("sdp_state_provider") { ] } -rtc_source_set("jitter_buffer_delay_proxy") { - sources = [ "jitter_buffer_delay_proxy.h" ] - deps = [ - ":jitter_buffer_delay_interface", - "../api:libjingle_peerconnection_api", - "../media:rtc_media_base", - ] -} - rtc_library("jitter_buffer_delay") { sources = [ "jitter_buffer_delay.cc", "jitter_buffer_delay.h", ] deps = [ - ":jitter_buffer_delay_interface", "../api:sequence_checker", - "../media:rtc_media_base", - "../rtc_base", "../rtc_base:checks", - "../rtc_base:refcount", + "../rtc_base:safe_conversions", "../rtc_base:safe_minmax", - "../rtc_base:threading", - ] - absl_deps = [ - "//third_party/abseil-cpp/absl/algorithm:container", - "//third_party/abseil-cpp/absl/strings", - "//third_party/abseil-cpp/absl/types:optional", + "../rtc_base/system:no_unique_address", ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } rtc_library("remote_audio_source") { @@ -1024,7 +993,6 @@ if (rtc_include_tests && !build_with_chromium) { ":dtmf_sender", ":integration_test_helpers", ":jitter_buffer_delay", - ":jitter_buffer_delay_interface", ":media_stream", ":peerconnection", ":remote_audio_source", @@ -1092,6 +1060,7 @@ if (rtc_include_tests && !build_with_chromium) { "../test:field_trial", "../test:fileutils", "../test:rtp_test_utils", + "../test:test_common", "../test/pc/sctp:fake_sctp_transport", "./scenario_tests:pc_scenario_tests", "//third_party/abseil-cpp/absl/algorithm:container", @@ -1181,7 +1150,6 @@ if (rtc_include_tests && !build_with_chromium) { ":audio_track", ":dtmf_sender", ":jitter_buffer_delay", - ":jitter_buffer_delay_interface", ":media_stream", ":pc_test_utils", ":peerconnection", @@ -1291,7 +1259,6 @@ if (rtc_include_tests && !build_with_chromium) { "test/frame_generator_capturer_video_track_source.h", "test/mock_channel_interface.h", "test/mock_data_channel.h", - "test/mock_delayable.h", "test/mock_peer_connection_observers.h", "test/mock_rtp_receiver_internal.h", "test/mock_rtp_sender_internal.h", @@ -1303,7 +1270,6 @@ if (rtc_include_tests && !build_with_chromium) { deps = [ ":jitter_buffer_delay", - ":jitter_buffer_delay_interface", ":libjingle_peerconnection", ":peerconnection", ":rtc_pc_base", diff --git a/pc/audio_rtp_receiver.cc b/pc/audio_rtp_receiver.cc index 382b5a0fa1..5f815c589e 100644 --- a/pc/audio_rtp_receiver.cc +++ b/pc/audio_rtp_receiver.cc @@ -18,11 +18,10 @@ #include "api/media_stream_track_proxy.h" #include "api/sequence_checker.h" #include "pc/audio_track.h" -#include "pc/jitter_buffer_delay.h" -#include "pc/jitter_buffer_delay_proxy.h" #include "rtc_base/checks.h" #include "rtc_base/location.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" namespace webrtc { @@ -52,10 +51,7 @@ AudioRtpReceiver::AudioRtpReceiver( AudioTrack::Create(receiver_id, source_))), cached_track_enabled_(track_->enabled()), attachment_id_(GenerateUniqueId()), - delay_(JitterBufferDelayProxy::Create( - rtc::Thread::Current(), - worker_thread_, - rtc::make_ref_counted(worker_thread))) { + worker_thread_safety_(PendingTaskSafetyFlag::CreateDetachedInactive()) { RTC_DCHECK(worker_thread_); RTC_DCHECK(track_->GetSource()->remote()); track_->RegisterObserver(this); @@ -64,140 +60,188 @@ AudioRtpReceiver::AudioRtpReceiver( } AudioRtpReceiver::~AudioRtpReceiver() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + RTC_DCHECK(stopped_); + RTC_DCHECK(!media_channel_); + track_->GetSource()->UnregisterAudioObserver(this); track_->UnregisterObserver(this); - Stop(); } void AudioRtpReceiver::OnChanged() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); if (cached_track_enabled_ != track_->enabled()) { cached_track_enabled_ = track_->enabled(); - Reconfigure(); + worker_thread_->PostTask(ToQueuedTask( + worker_thread_safety_, + [this, enabled = cached_track_enabled_, volume = cached_volume_]() { + RTC_DCHECK_RUN_ON(worker_thread_); + Reconfigure(enabled, volume); + })); } } -bool AudioRtpReceiver::SetOutputVolume(double volume) { +// RTC_RUN_ON(worker_thread_) +void AudioRtpReceiver::SetOutputVolume_w(double volume) { RTC_DCHECK_GE(volume, 0.0); RTC_DCHECK_LE(volume, 10.0); - RTC_DCHECK(media_channel_); - RTC_DCHECK(!stopped_); - return worker_thread_->Invoke(RTC_FROM_HERE, [&] { - return ssrc_ ? media_channel_->SetOutputVolume(*ssrc_, volume) - : media_channel_->SetDefaultOutputVolume(volume); - }); + ssrc_ ? media_channel_->SetOutputVolume(*ssrc_, volume) + : media_channel_->SetDefaultOutputVolume(volume); } void AudioRtpReceiver::OnSetVolume(double volume) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK_GE(volume, 0); RTC_DCHECK_LE(volume, 10); - cached_volume_ = volume; - if (!media_channel_ || stopped_) { - RTC_LOG(LS_ERROR) - << "AudioRtpReceiver::OnSetVolume: No audio channel exists."; + if (stopped_) return; - } + + cached_volume_ = volume; + // When the track is disabled, the volume of the source, which is the // corresponding WebRtc Voice Engine channel will be 0. So we do not allow // setting the volume to the source when the track is disabled. - if (!stopped_ && track_->enabled()) { - if (!SetOutputVolume(cached_volume_)) { - RTC_NOTREACHED(); - } + if (track_->enabled()) { + worker_thread_->PostTask( + ToQueuedTask(worker_thread_safety_, [this, volume = cached_volume_]() { + RTC_DCHECK_RUN_ON(worker_thread_); + SetOutputVolume_w(volume); + })); } } +rtc::scoped_refptr AudioRtpReceiver::dtls_transport() + const { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + return dtls_transport_; +} + std::vector AudioRtpReceiver::stream_ids() const { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); std::vector stream_ids(streams_.size()); for (size_t i = 0; i < streams_.size(); ++i) stream_ids[i] = streams_[i]->id(); return stream_ids; } +std::vector> +AudioRtpReceiver::streams() const { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + return streams_; +} + RtpParameters AudioRtpReceiver::GetParameters() const { - if (!media_channel_ || stopped_) { + RTC_DCHECK_RUN_ON(worker_thread_); + if (!media_channel_) return RtpParameters(); - } - return worker_thread_->Invoke(RTC_FROM_HERE, [&] { - return ssrc_ ? media_channel_->GetRtpReceiveParameters(*ssrc_) - : media_channel_->GetDefaultRtpReceiveParameters(); - }); + return ssrc_ ? media_channel_->GetRtpReceiveParameters(*ssrc_) + : media_channel_->GetDefaultRtpReceiveParameters(); } void AudioRtpReceiver::SetFrameDecryptor( rtc::scoped_refptr frame_decryptor) { + RTC_DCHECK_RUN_ON(worker_thread_); frame_decryptor_ = std::move(frame_decryptor); // Special Case: Set the frame decryptor to any value on any existing channel. - if (media_channel_ && ssrc_.has_value() && !stopped_) { - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_); - }); + if (media_channel_ && ssrc_) { + media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_); } } rtc::scoped_refptr AudioRtpReceiver::GetFrameDecryptor() const { + RTC_DCHECK_RUN_ON(worker_thread_); return frame_decryptor_; } void AudioRtpReceiver::Stop() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); // TODO(deadbeef): Need to do more here to fully stop receiving packets. - if (stopped_) { - return; + if (!stopped_) { + source_->SetState(MediaSourceInterface::kEnded); + stopped_ = true; } - source_->SetState(MediaSourceInterface::kEnded); - if (media_channel_) { - // Allow that SetOutputVolume fail. This is the normal case when the - // underlying media channel has already been deleted. - SetOutputVolume(0.0); - } - stopped_ = true; + + worker_thread_->Invoke(RTC_FROM_HERE, [&]() { + RTC_DCHECK_RUN_ON(worker_thread_); + if (media_channel_) + SetOutputVolume_w(0.0); + SetMediaChannel_w(nullptr); + }); } void AudioRtpReceiver::StopAndEndTrack() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); Stop(); track_->internal()->set_ended(); } void AudioRtpReceiver::RestartMediaChannel(absl::optional ssrc) { - RTC_DCHECK(media_channel_); - if (!stopped_ && ssrc_ == ssrc) { - return; - } + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + bool ok = worker_thread_->Invoke( + RTC_FROM_HERE, [&, enabled = cached_track_enabled_, + volume = cached_volume_, was_stopped = stopped_]() { + RTC_DCHECK_RUN_ON(worker_thread_); + if (!media_channel_) { + RTC_DCHECK(was_stopped); + return false; // Can't restart. + } + + if (!was_stopped && ssrc_ == ssrc) { + // Already running with that ssrc. + RTC_DCHECK(worker_thread_safety_->alive()); + return true; + } + + if (!was_stopped) { + source_->Stop(media_channel_, ssrc_); + } + + ssrc_ = std::move(ssrc); + source_->Start(media_channel_, ssrc_); + if (ssrc_) { + media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs()); + } + + Reconfigure(enabled, volume); + return true; + }); + + if (!ok) + return; - if (!stopped_) { - source_->Stop(media_channel_, ssrc_); - delay_->OnStop(); - } - ssrc_ = ssrc; stopped_ = false; - source_->Start(media_channel_, ssrc); - delay_->OnStart(media_channel_, ssrc.value_or(0)); - Reconfigure(); } void AudioRtpReceiver::SetupMediaChannel(uint32_t ssrc) { - if (!media_channel_) { - RTC_LOG(LS_ERROR) - << "AudioRtpReceiver::SetupMediaChannel: No audio channel exists."; - return; - } + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RestartMediaChannel(ssrc); } void AudioRtpReceiver::SetupUnsignaledMediaChannel() { - if (!media_channel_) { - RTC_LOG(LS_ERROR) << "AudioRtpReceiver::SetupUnsignaledMediaChannel: No " - "audio channel exists."; - } + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RestartMediaChannel(absl::nullopt); } +uint32_t AudioRtpReceiver::ssrc() const { + RTC_DCHECK_RUN_ON(worker_thread_); + return ssrc_.value_or(0); +} + void AudioRtpReceiver::set_stream_ids(std::vector stream_ids) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); SetStreams(CreateStreamsFromIds(std::move(stream_ids))); } +void AudioRtpReceiver::set_transport( + rtc::scoped_refptr dtls_transport) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + dtls_transport_ = std::move(dtls_transport); +} + void AudioRtpReceiver::SetStreams( const std::vector>& streams) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); // Remove remote track from any streams that are going away. for (const auto& existing_stream : streams_) { bool removed = true; @@ -230,51 +274,42 @@ void AudioRtpReceiver::SetStreams( } std::vector AudioRtpReceiver::GetSources() const { - if (!media_channel_ || !ssrc_ || stopped_) { + RTC_DCHECK_RUN_ON(worker_thread_); + if (!media_channel_ || !ssrc_) { return {}; } - return worker_thread_->Invoke>( - RTC_FROM_HERE, [&] { return media_channel_->GetSources(*ssrc_); }); + return media_channel_->GetSources(*ssrc_); } void AudioRtpReceiver::SetDepacketizerToDecoderFrameTransformer( rtc::scoped_refptr frame_transformer) { - worker_thread_->Invoke( - RTC_FROM_HERE, [this, frame_transformer = std::move(frame_transformer)] { - RTC_DCHECK_RUN_ON(worker_thread_); - frame_transformer_ = frame_transformer; - if (media_channel_ && ssrc_.has_value() && !stopped_) { - media_channel_->SetDepacketizerToDecoderFrameTransformer( - *ssrc_, frame_transformer); - } - }); + RTC_DCHECK_RUN_ON(worker_thread_); + if (media_channel_) { + media_channel_->SetDepacketizerToDecoderFrameTransformer(ssrc_.value_or(0), + frame_transformer); + } + frame_transformer_ = std::move(frame_transformer); } -void AudioRtpReceiver::Reconfigure() { - if (!media_channel_ || stopped_) { - RTC_LOG(LS_ERROR) - << "AudioRtpReceiver::Reconfigure: No audio channel exists."; - return; - } - if (!SetOutputVolume(track_->enabled() ? cached_volume_ : 0)) { - RTC_NOTREACHED(); - } - // Reattach the frame decryptor if we were reconfigured. - MaybeAttachFrameDecryptorToMediaChannel( - ssrc_, worker_thread_, frame_decryptor_, media_channel_, stopped_); +// RTC_RUN_ON(worker_thread_) +void AudioRtpReceiver::Reconfigure(bool track_enabled, double volume) { + RTC_DCHECK(media_channel_); - if (media_channel_ && ssrc_.has_value() && !stopped_) { - worker_thread_->Invoke(RTC_FROM_HERE, [this] { - RTC_DCHECK_RUN_ON(worker_thread_); - if (!frame_transformer_) - return; - media_channel_->SetDepacketizerToDecoderFrameTransformer( - *ssrc_, frame_transformer_); - }); + SetOutputVolume_w(track_enabled ? volume : 0); + + if (ssrc_ && frame_decryptor_) { + // Reattach the frame decryptor if we were reconfigured. + media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_); + } + + if (frame_transformer_) { + media_channel_->SetDepacketizerToDecoderFrameTransformer( + ssrc_.value_or(0), frame_transformer_); } } void AudioRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); observer_ = observer; // Deliver any notifications the observer may have missed by being set late. if (received_first_packet_ && observer_) { @@ -284,16 +319,35 @@ void AudioRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) { void AudioRtpReceiver::SetJitterBufferMinimumDelay( absl::optional delay_seconds) { - delay_->Set(delay_seconds); + RTC_DCHECK_RUN_ON(worker_thread_); + delay_.Set(delay_seconds); + if (media_channel_ && ssrc_) + media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs()); } void AudioRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK(media_channel == nullptr || media_channel->media_type() == media_type()); + + if (stopped_ && !media_channel) + return; + + worker_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(worker_thread_); + SetMediaChannel_w(media_channel); + }); +} + +// RTC_RUN_ON(worker_thread_) +void AudioRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) { + media_channel ? worker_thread_safety_->SetAlive() + : worker_thread_safety_->SetNotAlive(); media_channel_ = static_cast(media_channel); } void AudioRtpReceiver::NotifyFirstPacketReceived() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); if (observer_) { observer_->OnFirstPacketReceived(media_type()); } diff --git a/pc/audio_rtp_receiver.h b/pc/audio_rtp_receiver.h index 789d4a0f52..7f2e557126 100644 --- a/pc/audio_rtp_receiver.h +++ b/pc/audio_rtp_receiver.h @@ -12,6 +12,7 @@ #define PC_AUDIO_RTP_RECEIVER_H_ #include + #include #include @@ -25,13 +26,16 @@ #include "api/rtp_parameters.h" #include "api/rtp_receiver_interface.h" #include "api/scoped_refptr.h" +#include "api/sequence_checker.h" #include "api/transport/rtp/rtp_source.h" #include "media/base/media_channel.h" #include "pc/audio_track.h" -#include "pc/jitter_buffer_delay_interface.h" +#include "pc/jitter_buffer_delay.h" #include "pc/remote_audio_source.h" #include "pc/rtp_receiver.h" #include "rtc_base/ref_counted_object.h" +#include "rtc_base/system/no_unique_address.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" @@ -39,7 +43,7 @@ namespace webrtc { class AudioRtpReceiver : public ObserverInterface, public AudioSourceInterface::AudioObserver, - public rtc::RefCountedObject { + public RtpReceiverInternal { public: AudioRtpReceiver(rtc::Thread* worker_thread, std::string receiver_id, @@ -59,22 +63,16 @@ class AudioRtpReceiver : public ObserverInterface, // AudioSourceInterface::AudioObserver implementation void OnSetVolume(double volume) override; - rtc::scoped_refptr audio_track() const { - return track_.get(); - } + rtc::scoped_refptr audio_track() const { return track_; } // RtpReceiverInterface implementation rtc::scoped_refptr track() const override { - return track_.get(); - } - rtc::scoped_refptr dtls_transport() const override { - return dtls_transport_; + return track_; } + rtc::scoped_refptr dtls_transport() const override; std::vector stream_ids() const override; std::vector> streams() - const override { - return streams_; - } + const override; cricket::MediaType media_type() const override { return cricket::MEDIA_TYPE_AUDIO; @@ -95,13 +93,11 @@ class AudioRtpReceiver : public ObserverInterface, void StopAndEndTrack() override; void SetupMediaChannel(uint32_t ssrc) override; void SetupUnsignaledMediaChannel() override; - uint32_t ssrc() const override { return ssrc_.value_or(0); } + uint32_t ssrc() const override; void NotifyFirstPacketReceived() override; void set_stream_ids(std::vector stream_ids) override; void set_transport( - rtc::scoped_refptr dtls_transport) override { - dtls_transport_ = dtls_transport; - } + rtc::scoped_refptr dtls_transport) override; void SetStreams(const std::vector>& streams) override; void SetObserver(RtpReceiverObserverInterface* observer) override; @@ -119,29 +115,40 @@ class AudioRtpReceiver : public ObserverInterface, private: void RestartMediaChannel(absl::optional ssrc); - void Reconfigure(); - bool SetOutputVolume(double volume); + void Reconfigure(bool track_enabled, double volume) + RTC_RUN_ON(worker_thread_); + void SetOutputVolume_w(double volume) RTC_RUN_ON(worker_thread_); + void SetMediaChannel_w(cricket::MediaChannel* media_channel) + RTC_RUN_ON(worker_thread_); + RTC_NO_UNIQUE_ADDRESS SequenceChecker signaling_thread_checker_; rtc::Thread* const worker_thread_; const std::string id_; const rtc::scoped_refptr source_; const rtc::scoped_refptr> track_; - cricket::VoiceMediaChannel* media_channel_ = nullptr; - absl::optional ssrc_; - std::vector> streams_; - bool cached_track_enabled_; - double cached_volume_ = 1; - bool stopped_ = true; - RtpReceiverObserverInterface* observer_ = nullptr; - bool received_first_packet_ = false; - int attachment_id_ = 0; - rtc::scoped_refptr frame_decryptor_; - rtc::scoped_refptr dtls_transport_; - // Allows to thread safely change playout delay. Handles caching cases if + cricket::VoiceMediaChannel* media_channel_ RTC_GUARDED_BY(worker_thread_) = + nullptr; + absl::optional ssrc_ RTC_GUARDED_BY(worker_thread_); + std::vector> streams_ + RTC_GUARDED_BY(&signaling_thread_checker_); + bool cached_track_enabled_ RTC_GUARDED_BY(&signaling_thread_checker_); + double cached_volume_ RTC_GUARDED_BY(&signaling_thread_checker_) = 1.0; + bool stopped_ RTC_GUARDED_BY(&signaling_thread_checker_) = true; + RtpReceiverObserverInterface* observer_ + RTC_GUARDED_BY(&signaling_thread_checker_) = nullptr; + bool received_first_packet_ RTC_GUARDED_BY(&signaling_thread_checker_) = + false; + const int attachment_id_; + rtc::scoped_refptr frame_decryptor_ + RTC_GUARDED_BY(worker_thread_); + rtc::scoped_refptr dtls_transport_ + RTC_GUARDED_BY(&signaling_thread_checker_); + // Stores and updates the playout delay. Handles caching cases if // |SetJitterBufferMinimumDelay| is called before start. - rtc::scoped_refptr delay_; + JitterBufferDelay delay_ RTC_GUARDED_BY(worker_thread_); rtc::scoped_refptr frame_transformer_ RTC_GUARDED_BY(worker_thread_); + const rtc::scoped_refptr worker_thread_safety_; }; } // namespace webrtc diff --git a/pc/jitter_buffer_delay.cc b/pc/jitter_buffer_delay.cc index 3fdf823d24..801cef7215 100644 --- a/pc/jitter_buffer_delay.cc +++ b/pc/jitter_buffer_delay.cc @@ -14,7 +14,6 @@ #include "rtc_base/checks.h" #include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/numerics/safe_minmax.h" -#include "rtc_base/thread.h" namespace { constexpr int kDefaultDelay = 0; @@ -23,43 +22,21 @@ constexpr int kMaximumDelayMs = 10000; namespace webrtc { -JitterBufferDelay::JitterBufferDelay(rtc::Thread* worker_thread) - : signaling_thread_(rtc::Thread::Current()), worker_thread_(worker_thread) { - RTC_DCHECK(worker_thread_); -} - -void JitterBufferDelay::OnStart(cricket::Delayable* media_channel, - uint32_t ssrc) { - RTC_DCHECK_RUN_ON(signaling_thread_); - - media_channel_ = media_channel; - ssrc_ = ssrc; - - // Trying to apply cached delay for the audio stream. - if (cached_delay_seconds_) { - Set(cached_delay_seconds_.value()); - } -} - -void JitterBufferDelay::OnStop() { - RTC_DCHECK_RUN_ON(signaling_thread_); - // Assume that audio stream is no longer present. - media_channel_ = nullptr; - ssrc_ = absl::nullopt; +JitterBufferDelay::JitterBufferDelay() { + worker_thread_checker_.Detach(); } void JitterBufferDelay::Set(absl::optional delay_seconds) { - RTC_DCHECK_RUN_ON(worker_thread_); - - // TODO(kuddai) propagate absl::optional deeper down as default preference. - int delay_ms = - rtc::saturated_cast(delay_seconds.value_or(kDefaultDelay) * 1000); - delay_ms = rtc::SafeClamp(delay_ms, 0, kMaximumDelayMs); - + RTC_DCHECK_RUN_ON(&worker_thread_checker_); cached_delay_seconds_ = delay_seconds; - if (media_channel_ && ssrc_) { - media_channel_->SetBaseMinimumPlayoutDelayMs(ssrc_.value(), delay_ms); - } +} + +int JitterBufferDelay::GetMs() const { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + return rtc::SafeClamp( + rtc::saturated_cast(cached_delay_seconds_.value_or(kDefaultDelay) * + 1000), + 0, kMaximumDelayMs); } } // namespace webrtc diff --git a/pc/jitter_buffer_delay.h b/pc/jitter_buffer_delay.h index 8edfc6ce20..dc10e3d2ba 100644 --- a/pc/jitter_buffer_delay.h +++ b/pc/jitter_buffer_delay.h @@ -14,36 +14,25 @@ #include #include "absl/types/optional.h" -#include "media/base/delayable.h" -#include "pc/jitter_buffer_delay_interface.h" -#include "rtc_base/thread.h" +#include "api/sequence_checker.h" +#include "rtc_base/system/no_unique_address.h" namespace webrtc { // JitterBufferDelay converts delay from seconds to milliseconds for the // underlying media channel. It also handles cases when user sets delay before -// the start of media_channel by caching its request. Note, this class is not -// thread safe. Its thread safe version is defined in -// pc/jitter_buffer_delay_proxy.h -class JitterBufferDelay : public JitterBufferDelayInterface { +// the start of media_channel by caching its request. +class JitterBufferDelay { public: - // Must be called on signaling thread. - explicit JitterBufferDelay(rtc::Thread* worker_thread); + JitterBufferDelay(); - void OnStart(cricket::Delayable* media_channel, uint32_t ssrc) override; - - void OnStop() override; - - void Set(absl::optional delay_seconds) override; + void Set(absl::optional delay_seconds); + int GetMs() const; private: - // Throughout webrtc source, sometimes it is also called as |main_thread_|. - rtc::Thread* const signaling_thread_; - rtc::Thread* const worker_thread_; - // Media channel and ssrc together uniqely identify audio stream. - cricket::Delayable* media_channel_ = nullptr; - absl::optional ssrc_; - absl::optional cached_delay_seconds_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; + absl::optional cached_delay_seconds_ + RTC_GUARDED_BY(&worker_thread_checker_); }; } // namespace webrtc diff --git a/pc/jitter_buffer_delay_interface.h b/pc/jitter_buffer_delay_interface.h deleted file mode 100644 index f2132d318d..0000000000 --- a/pc/jitter_buffer_delay_interface.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef PC_JITTER_BUFFER_DELAY_INTERFACE_H_ -#define PC_JITTER_BUFFER_DELAY_INTERFACE_H_ - -#include - -#include "absl/types/optional.h" -#include "media/base/delayable.h" -#include "rtc_base/ref_count.h" - -namespace webrtc { - -// JitterBufferDelay delivers user's queries to the underlying media channel. It -// can describe either video or audio delay for receiving stream. "Interface" -// suffix in the interface name is required to be compatible with api/proxy.cc -class JitterBufferDelayInterface : public rtc::RefCountInterface { - public: - // OnStart allows to uniqely identify to which receiving stream playout - // delay must correpond through |media_channel| and |ssrc| pair. - virtual void OnStart(cricket::Delayable* media_channel, uint32_t ssrc) = 0; - - // Indicates that underlying receiving stream is stopped. - virtual void OnStop() = 0; - - virtual void Set(absl::optional delay_seconds) = 0; -}; - -} // namespace webrtc - -#endif // PC_JITTER_BUFFER_DELAY_INTERFACE_H_ diff --git a/pc/jitter_buffer_delay_proxy.h b/pc/jitter_buffer_delay_proxy.h deleted file mode 100644 index 91729d6935..0000000000 --- a/pc/jitter_buffer_delay_proxy.h +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef PC_JITTER_BUFFER_DELAY_PROXY_H_ -#define PC_JITTER_BUFFER_DELAY_PROXY_H_ - -#include - -#include "api/proxy.h" -#include "media/base/delayable.h" -#include "pc/jitter_buffer_delay_interface.h" - -namespace webrtc { - -BEGIN_PROXY_MAP(JitterBufferDelay) -PROXY_PRIMARY_THREAD_DESTRUCTOR() -PROXY_METHOD2(void, OnStart, cricket::Delayable*, uint32_t) -PROXY_METHOD0(void, OnStop) -PROXY_SECONDARY_METHOD1(void, Set, absl::optional) -END_PROXY_MAP() - -} // namespace webrtc - -#endif // PC_JITTER_BUFFER_DELAY_PROXY_H_ diff --git a/pc/jitter_buffer_delay_unittest.cc b/pc/jitter_buffer_delay_unittest.cc index 61adb2d2bc..b00075ceb5 100644 --- a/pc/jitter_buffer_delay_unittest.cc +++ b/pc/jitter_buffer_delay_unittest.cc @@ -13,79 +13,47 @@ #include #include "absl/types/optional.h" -#include "api/scoped_refptr.h" -#include "pc/test/mock_delayable.h" -#include "rtc_base/ref_counted_object.h" -#include "rtc_base/thread.h" -#include "test/gmock.h" #include "test/gtest.h" -using ::testing::Return; - -namespace { -constexpr int kSsrc = 1234; -} // namespace - namespace webrtc { class JitterBufferDelayTest : public ::testing::Test { public: - JitterBufferDelayTest() - : delay_( - rtc::make_ref_counted(rtc::Thread::Current())) {} + JitterBufferDelayTest() {} protected: - rtc::scoped_refptr delay_; - MockDelayable delayable_; + JitterBufferDelay delay_; }; TEST_F(JitterBufferDelayTest, Set) { - delay_->OnStart(&delayable_, kSsrc); - - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 3000)) - .WillOnce(Return(true)); - // Delay in seconds. - delay_->Set(3.0); + delay_.Set(3.0); + EXPECT_EQ(delay_.GetMs(), 3000); } -TEST_F(JitterBufferDelayTest, Caching) { - // Check that value is cached before start. - delay_->Set(4.0); - - // Check that cached value applied on the start. - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 4000)) - .WillOnce(Return(true)); - delay_->OnStart(&delayable_, kSsrc); +TEST_F(JitterBufferDelayTest, DefaultValue) { + EXPECT_EQ(delay_.GetMs(), 0); // Default value is 0ms. } TEST_F(JitterBufferDelayTest, Clamping) { - delay_->OnStart(&delayable_, kSsrc); - // In current Jitter Buffer implementation (Audio or Video) maximum supported // value is 10000 milliseconds. - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 10000)) - .WillOnce(Return(true)); - delay_->Set(10.5); + delay_.Set(10.5); + EXPECT_EQ(delay_.GetMs(), 10000); // Test int overflow. - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 10000)) - .WillOnce(Return(true)); - delay_->Set(21474836470.0); + delay_.Set(21474836470.0); + EXPECT_EQ(delay_.GetMs(), 10000); - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 0)) - .WillOnce(Return(true)); - delay_->Set(-21474836470.0); + delay_.Set(-21474836470.0); + EXPECT_EQ(delay_.GetMs(), 0); // Boundary value in seconds to milliseconds conversion. - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 0)) - .WillOnce(Return(true)); - delay_->Set(0.0009); + delay_.Set(0.0009); + EXPECT_EQ(delay_.GetMs(), 0); - EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 0)) - .WillOnce(Return(true)); - - delay_->Set(-2.0); + delay_.Set(-2.0); + EXPECT_EQ(delay_.GetMs(), 0); } } // namespace webrtc diff --git a/pc/remote_audio_source.cc b/pc/remote_audio_source.cc index 9e65f6781c..dc890e737c 100644 --- a/pc/remote_audio_source.cc +++ b/pc/remote_audio_source.cc @@ -61,7 +61,7 @@ RemoteAudioSource::RemoteAudioSource( } RemoteAudioSource::~RemoteAudioSource() { - RTC_DCHECK(main_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(main_thread_); RTC_DCHECK(audio_observers_.empty()); if (!sinks_.empty()) { RTC_LOG(LS_WARNING) @@ -71,32 +71,28 @@ RemoteAudioSource::~RemoteAudioSource() { void RemoteAudioSource::Start(cricket::VoiceMediaChannel* media_channel, absl::optional ssrc) { - RTC_DCHECK_RUN_ON(main_thread_); - RTC_DCHECK(media_channel); + RTC_DCHECK_RUN_ON(worker_thread_); // Register for callbacks immediately before AddSink so that we always get // notified when a channel goes out of scope (signaled when "AudioDataProxy" // is destroyed). - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - ssrc ? media_channel->SetRawAudioSink( - *ssrc, std::make_unique(this)) - : media_channel->SetDefaultRawAudioSink( - std::make_unique(this)); - }); + RTC_DCHECK(media_channel); + ssrc ? media_channel->SetRawAudioSink(*ssrc, + std::make_unique(this)) + : media_channel->SetDefaultRawAudioSink( + std::make_unique(this)); } void RemoteAudioSource::Stop(cricket::VoiceMediaChannel* media_channel, absl::optional ssrc) { - RTC_DCHECK_RUN_ON(main_thread_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK(media_channel); - - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - ssrc ? media_channel->SetRawAudioSink(*ssrc, nullptr) - : media_channel->SetDefaultRawAudioSink(nullptr); - }); + ssrc ? media_channel->SetRawAudioSink(*ssrc, nullptr) + : media_channel->SetDefaultRawAudioSink(nullptr); } void RemoteAudioSource::SetState(SourceState new_state) { + RTC_DCHECK_RUN_ON(main_thread_); if (state_ != new_state) { state_ = new_state; FireOnChanged(); @@ -104,12 +100,12 @@ void RemoteAudioSource::SetState(SourceState new_state) { } MediaSourceInterface::SourceState RemoteAudioSource::state() const { - RTC_DCHECK(main_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(main_thread_); return state_; } bool RemoteAudioSource::remote() const { - RTC_DCHECK(main_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(main_thread_); return true; } @@ -135,7 +131,7 @@ void RemoteAudioSource::UnregisterAudioObserver(AudioObserver* observer) { } void RemoteAudioSource::AddSink(AudioTrackSinkInterface* sink) { - RTC_DCHECK(main_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(main_thread_); RTC_DCHECK(sink); if (state_ != MediaSourceInterface::kLive) { @@ -149,7 +145,7 @@ void RemoteAudioSource::AddSink(AudioTrackSinkInterface* sink) { } void RemoteAudioSource::RemoveSink(AudioTrackSinkInterface* sink) { - RTC_DCHECK(main_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(main_thread_); RTC_DCHECK(sink); MutexLock lock(&sink_lock_); @@ -184,7 +180,7 @@ void RemoteAudioSource::OnAudioChannelGone() { } void RemoteAudioSource::OnMessage(rtc::Message* msg) { - RTC_DCHECK(main_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(main_thread_); sinks_.clear(); SetState(MediaSourceInterface::kEnded); // Will possibly delete this RemoteAudioSource since it is reference counted diff --git a/pc/rtp_receiver.cc b/pc/rtp_receiver.cc index 694a8215a6..88f32d88e5 100644 --- a/pc/rtp_receiver.cc +++ b/pc/rtp_receiver.cc @@ -39,20 +39,4 @@ RtpReceiverInternal::CreateStreamsFromIds(std::vector stream_ids) { return streams; } -// Attempt to attach the frame decryptor to the current media channel on the -// correct worker thread only if both the media channel exists and a ssrc has -// been allocated to the stream. -void RtpReceiverInternal::MaybeAttachFrameDecryptorToMediaChannel( - const absl::optional& ssrc, - rtc::Thread* worker_thread, - rtc::scoped_refptr frame_decryptor, - cricket::MediaChannel* media_channel, - bool stopped) { - if (media_channel && frame_decryptor && ssrc.has_value() && !stopped) { - worker_thread->Invoke(RTC_FROM_HERE, [&] { - media_channel->SetFrameDecryptor(*ssrc, frame_decryptor); - }); - } -} - } // namespace webrtc diff --git a/pc/rtp_receiver.h b/pc/rtp_receiver.h index 22fa75360f..73fc5b9858 100644 --- a/pc/rtp_receiver.h +++ b/pc/rtp_receiver.h @@ -92,13 +92,6 @@ class RtpReceiverInternal : public RtpReceiverInterface { static std::vector> CreateStreamsFromIds(std::vector stream_ids); - - static void MaybeAttachFrameDecryptorToMediaChannel( - const absl::optional& ssrc, - rtc::Thread* worker_thread, - rtc::scoped_refptr frame_decryptor, - cricket::MediaChannel* media_channel, - bool stopped); }; } // namespace webrtc diff --git a/pc/rtp_sender_receiver_unittest.cc b/pc/rtp_sender_receiver_unittest.cc index 625f29b34b..10dc894518 100644 --- a/pc/rtp_sender_receiver_unittest.cc +++ b/pc/rtp_sender_receiver_unittest.cc @@ -63,6 +63,7 @@ #include "rtc_base/thread.h" #include "test/gmock.h" #include "test/gtest.h" +#include "test/run_loop.h" using ::testing::_; using ::testing::ContainerEq; @@ -299,9 +300,9 @@ class RtpSenderReceiverTest void CreateAudioRtpReceiver( std::vector> streams = {}) { - audio_rtp_receiver_ = - new AudioRtpReceiver(rtc::Thread::Current(), kAudioTrackId, streams, - /*is_unified_plan=*/true); + audio_rtp_receiver_ = rtc::make_ref_counted( + rtc::Thread::Current(), kAudioTrackId, streams, + /*is_unified_plan=*/true); audio_rtp_receiver_->SetMediaChannel(voice_media_channel_); audio_rtp_receiver_->SetupMediaChannel(kAudioSsrc); audio_track_ = audio_rtp_receiver_->audio_track(); @@ -310,8 +311,8 @@ class RtpSenderReceiverTest void CreateVideoRtpReceiver( std::vector> streams = {}) { - video_rtp_receiver_ = - new VideoRtpReceiver(rtc::Thread::Current(), kVideoTrackId, streams); + video_rtp_receiver_ = rtc::make_ref_counted( + rtc::Thread::Current(), kVideoTrackId, streams); video_rtp_receiver_->SetMediaChannel(video_media_channel_); video_rtp_receiver_->SetupMediaChannel(kVideoSsrc); video_track_ = video_rtp_receiver_->video_track(); @@ -330,19 +331,25 @@ class RtpSenderReceiverTest video_media_channel_->AddRecvStream(stream_params); uint32_t primary_ssrc = stream_params.first_ssrc(); - video_rtp_receiver_ = - new VideoRtpReceiver(rtc::Thread::Current(), kVideoTrackId, streams); + video_rtp_receiver_ = rtc::make_ref_counted( + rtc::Thread::Current(), kVideoTrackId, streams); video_rtp_receiver_->SetMediaChannel(video_media_channel_); video_rtp_receiver_->SetupMediaChannel(primary_ssrc); video_track_ = video_rtp_receiver_->video_track(); } void DestroyAudioRtpReceiver() { + if (!audio_rtp_receiver_) + return; + audio_rtp_receiver_->Stop(); audio_rtp_receiver_ = nullptr; VerifyVoiceChannelNoOutput(); } void DestroyVideoRtpReceiver() { + if (!video_rtp_receiver_) + return; + video_rtp_receiver_->Stop(); video_rtp_receiver_ = nullptr; VerifyVideoChannelNoOutput(); } @@ -498,6 +505,7 @@ class RtpSenderReceiverTest } protected: + test::RunLoop run_loop_; rtc::Thread* const network_thread_; rtc::Thread* const worker_thread_; webrtc::RtcEventLogNull event_log_; @@ -599,11 +607,15 @@ TEST_F(RtpSenderReceiverTest, RemoteAudioTrackDisable) { EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(1, volume); + // Handling of enable/disable is applied asynchronously. audio_track_->set_enabled(false); + run_loop_.Flush(); + EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(0, volume); audio_track_->set_enabled(true); + run_loop_.Flush(); EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(1, volume); @@ -636,6 +648,7 @@ TEST_F(RtpSenderReceiverTest, RemoteVideoTrackState) { EXPECT_EQ(webrtc::MediaStreamTrackInterface::kEnded, video_track_->state()); EXPECT_EQ(webrtc::MediaSourceInterface::kEnded, video_track_->GetSource()->state()); + DestroyVideoRtpReceiver(); } // Currently no action is taken when a remote video track is disabled or @@ -657,22 +670,27 @@ TEST_F(RtpSenderReceiverTest, RemoteAudioTrackSetVolume) { double volume; audio_track_->GetSource()->SetVolume(0.5); + run_loop_.Flush(); EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(0.5, volume); // Disable the audio track, this should prevent setting the volume. audio_track_->set_enabled(false); + RTC_DCHECK_EQ(worker_thread_, run_loop_.task_queue()); + run_loop_.Flush(); audio_track_->GetSource()->SetVolume(0.8); EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(0, volume); // When the track is enabled, the previously set volume should take effect. audio_track_->set_enabled(true); + run_loop_.Flush(); EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(0.8, volume); // Try changing volume one more time. audio_track_->GetSource()->SetVolume(0.9); + run_loop_.Flush(); EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume)); EXPECT_EQ(0.9, volume); @@ -683,12 +701,14 @@ TEST_F(RtpSenderReceiverTest, AudioRtpReceiverDelay) { CreateAudioRtpReceiver(); VerifyRtpReceiverDelayBehaviour(voice_media_channel_, audio_rtp_receiver_.get(), kAudioSsrc); + DestroyAudioRtpReceiver(); } TEST_F(RtpSenderReceiverTest, VideoRtpReceiverDelay) { CreateVideoRtpReceiver(); VerifyRtpReceiverDelayBehaviour(video_media_channel_, video_rtp_receiver_.get(), kVideoSsrc); + DestroyVideoRtpReceiver(); } // Test that the media channel isn't enabled for sending if the audio sender @@ -1582,6 +1602,7 @@ TEST_F(RtpSenderReceiverTest, AudioReceiverCanSetFrameDecryptor) { audio_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor); EXPECT_EQ(fake_frame_decryptor.get(), audio_rtp_receiver_->GetFrameDecryptor().get()); + DestroyAudioRtpReceiver(); } // Validate that the default FrameEncryptor setting is nullptr. @@ -1593,6 +1614,7 @@ TEST_F(RtpSenderReceiverTest, AudioReceiverCannotSetFrameDecryptorAfterStop) { audio_rtp_receiver_->Stop(); audio_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor); // TODO(webrtc:9926) - Validate media channel not set once fakes updated. + DestroyAudioRtpReceiver(); } // Validate that the default FrameEncryptor setting is nullptr. @@ -1627,6 +1649,7 @@ TEST_F(RtpSenderReceiverTest, VideoReceiverCanSetFrameDecryptor) { video_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor); EXPECT_EQ(fake_frame_decryptor.get(), video_rtp_receiver_->GetFrameDecryptor().get()); + DestroyVideoRtpReceiver(); } // Validate that the default FrameEncryptor setting is nullptr. @@ -1638,6 +1661,7 @@ TEST_F(RtpSenderReceiverTest, VideoReceiverCannotSetFrameDecryptorAfterStop) { video_rtp_receiver_->Stop(); video_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor); // TODO(webrtc:9926) - Validate media channel not set once fakes updated. + DestroyVideoRtpReceiver(); } // Checks that calling the internal methods for get/set parameters do not diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index 0b7de31372..a78b9d6be6 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -211,14 +211,10 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) { for (const auto& receiver : receivers_) { if (!channel_) { - // TODO(tommi): This can internally block and hop to the worker thread. - // It's likely that SetMediaChannel also does that, so perhaps we should - // require SetMediaChannel(nullptr) to also Stop() and skip this call. receiver->internal()->Stop(); + } else { + receiver->internal()->SetMediaChannel(channel_->media_channel()); } - - receiver->internal()->SetMediaChannel(channel_ ? channel_->media_channel() - : nullptr); } } @@ -268,12 +264,8 @@ bool RtpTransceiver::RemoveReceiver(RtpReceiverInterface* receiver) { if (it == receivers_.end()) { return false; } + // `Stop()` will clear the internally cached pointer to the media channel. (*it)->internal()->Stop(); - // After the receiver has been removed, there's no guarantee that the - // contained media channel isn't deleted shortly after this. To make sure that - // the receiver doesn't spontaneously try to use it's (potentially stale) - // media channel reference, we clear it out. - (*it)->internal()->SetMediaChannel(nullptr); receivers_.erase(it); return true; } diff --git a/pc/rtp_transceiver_unittest.cc b/pc/rtp_transceiver_unittest.cc index f9f4f205ea..0128e912e3 100644 --- a/pc/rtp_transceiver_unittest.cc +++ b/pc/rtp_transceiver_unittest.cc @@ -93,6 +93,7 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test { rtc::Thread::Current(), sender_), RtpReceiverProxyWithInternal::Create( + rtc::Thread::Current(), rtc::Thread::Current(), receiver_), channel_manager_.get(), @@ -162,6 +163,7 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test { rtc::Thread::Current(), sender_), RtpReceiverProxyWithInternal::Create( + rtc::Thread::Current(), rtc::Thread::Current(), receiver_), channel_manager_.get(), diff --git a/pc/rtp_transmission_manager.cc b/pc/rtp_transmission_manager.cc index eaf29b889f..9040a69699 100644 --- a/pc/rtp_transmission_manager.cc +++ b/pc/rtp_transmission_manager.cc @@ -11,6 +11,7 @@ #include "pc/rtp_transmission_manager.h" #include +#include #include "absl/types/optional.h" #include "api/peer_connection_interface.h" @@ -240,15 +241,17 @@ RtpTransmissionManager::CreateReceiver(cricket::MediaType media_type, receiver; if (media_type == cricket::MEDIA_TYPE_AUDIO) { receiver = RtpReceiverProxyWithInternal::Create( - signaling_thread(), - new AudioRtpReceiver(worker_thread(), receiver_id, - std::vector({}), IsUnifiedPlan())); + signaling_thread(), worker_thread(), + rtc::make_ref_counted(worker_thread(), receiver_id, + std::vector({}), + IsUnifiedPlan())); NoteUsageEvent(UsageEvent::AUDIO_ADDED); } else { RTC_DCHECK_EQ(media_type, cricket::MEDIA_TYPE_VIDEO); receiver = RtpReceiverProxyWithInternal::Create( - signaling_thread(), new VideoRtpReceiver(worker_thread(), receiver_id, - std::vector({}))); + signaling_thread(), worker_thread(), + rtc::make_ref_counted(worker_thread(), receiver_id, + std::vector({}))); NoteUsageEvent(UsageEvent::VIDEO_ADDED); } return receiver; @@ -453,7 +456,7 @@ void RtpTransmissionManager::CreateAudioReceiver( streams.push_back(rtc::scoped_refptr(stream)); // TODO(https://crbug.com/webrtc/9480): When we remove remote_streams(), use // the constructor taking stream IDs instead. - auto* audio_receiver = new AudioRtpReceiver( + auto audio_receiver = rtc::make_ref_counted( worker_thread(), remote_sender_info.sender_id, streams, IsUnifiedPlan()); audio_receiver->SetMediaChannel(voice_media_channel()); if (remote_sender_info.sender_id == kDefaultAudioSenderId) { @@ -462,7 +465,7 @@ void RtpTransmissionManager::CreateAudioReceiver( audio_receiver->SetupMediaChannel(remote_sender_info.first_ssrc); } auto receiver = RtpReceiverProxyWithInternal::Create( - signaling_thread(), audio_receiver); + signaling_thread(), worker_thread(), std::move(audio_receiver)); GetAudioTransceiver()->internal()->AddReceiver(receiver); Observer()->OnAddTrack(receiver, streams); NoteUsageEvent(UsageEvent::AUDIO_ADDED); @@ -476,7 +479,7 @@ void RtpTransmissionManager::CreateVideoReceiver( streams.push_back(rtc::scoped_refptr(stream)); // TODO(https://crbug.com/webrtc/9480): When we remove remote_streams(), use // the constructor taking stream IDs instead. - auto* video_receiver = new VideoRtpReceiver( + auto video_receiver = rtc::make_ref_counted( worker_thread(), remote_sender_info.sender_id, streams); video_receiver->SetMediaChannel(video_media_channel()); if (remote_sender_info.sender_id == kDefaultVideoSenderId) { @@ -485,7 +488,7 @@ void RtpTransmissionManager::CreateVideoReceiver( video_receiver->SetupMediaChannel(remote_sender_info.first_ssrc); } auto receiver = RtpReceiverProxyWithInternal::Create( - signaling_thread(), video_receiver); + signaling_thread(), worker_thread(), std::move(video_receiver)); GetVideoTransceiver()->internal()->AddReceiver(receiver); Observer()->OnAddTrack(receiver, streams); NoteUsageEvent(UsageEvent::VIDEO_ADDED); diff --git a/pc/stats_collector.cc b/pc/stats_collector.cc index 864876435d..6d4c224cb6 100644 --- a/pc/stats_collector.cc +++ b/pc/stats_collector.cc @@ -1163,9 +1163,10 @@ void StatsCollector::ExtractMediaInfo( std::vector> gatherers; + auto transceivers = pc_->GetTransceiversInternal(); { rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; - for (const auto& transceiver : pc_->GetTransceiversInternal()) { + for (const auto& transceiver : transceivers) { cricket::ChannelInterface* channel = transceiver->internal()->channel(); if (!channel) { continue; @@ -1176,20 +1177,37 @@ void StatsCollector::ExtractMediaInfo( gatherer->transport_name = transport_names_by_mid.at(gatherer->mid); for (const auto& sender : transceiver->internal()->senders()) { - std::string track_id = (sender->track() ? sender->track()->id() : ""); + auto track = sender->track(); + std::string track_id = (track ? track->id() : ""); gatherer->sender_track_id_by_ssrc.insert( std::make_pair(sender->ssrc(), track_id)); } - for (const auto& receiver : transceiver->internal()->receivers()) { - gatherer->receiver_track_id_by_ssrc.insert(std::make_pair( - receiver->internal()->ssrc(), receiver->track()->id())); - } + + // Populating `receiver_track_id_by_ssrc` will be done on the worker + // thread as the `ssrc` property of the receiver needs to be accessed + // there. + gatherers.push_back(std::move(gatherer)); } } pc_->worker_thread()->Invoke(RTC_FROM_HERE, [&] { rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; + // Populate `receiver_track_id_by_ssrc` for the gatherers. + int i = 0; + for (const auto& transceiver : transceivers) { + cricket::ChannelInterface* channel = transceiver->internal()->channel(); + if (!channel) + continue; + MediaChannelStatsGatherer* gatherer = gatherers[i++].get(); + RTC_DCHECK_EQ(gatherer->mid, channel->content_name()); + + for (const auto& receiver : transceiver->internal()->receivers()) { + gatherer->receiver_track_id_by_ssrc.insert(std::make_pair( + receiver->internal()->ssrc(), receiver->track()->id())); + } + } + for (auto it = gatherers.begin(); it != gatherers.end(); /* incremented manually */) { MediaChannelStatsGatherer* gatherer = it->get(); diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index ab2375aa7b..4cdbd82162 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -182,7 +182,7 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { // TODO(steveanton): Switch tests to use RtpTransceivers directly. auto receiver_proxy = RtpReceiverProxyWithInternal::Create( - signaling_thread_, receiver); + signaling_thread_, worker_thread_, receiver); GetOrCreateFirstTransceiverOfType(receiver->media_type()) ->internal() ->AddReceiver(receiver_proxy); diff --git a/pc/video_rtp_receiver.cc b/pc/video_rtp_receiver.cc index 374770a935..34cfe96f28 100644 --- a/pc/video_rtp_receiver.cc +++ b/pc/video_rtp_receiver.cc @@ -17,8 +17,6 @@ #include "api/video/recordable_encoded_frame.h" #include "api/video_track_source_proxy.h" -#include "pc/jitter_buffer_delay.h" -#include "pc/jitter_buffer_delay_proxy.h" #include "pc/video_track.h" #include "rtc_base/checks.h" #include "rtc_base/location.h" @@ -39,7 +37,7 @@ VideoRtpReceiver::VideoRtpReceiver( const std::vector>& streams) : worker_thread_(worker_thread), id_(receiver_id), - source_(new RefCountedObject(this)), + source_(rtc::make_ref_counted(&source_callback_)), track_(VideoTrackProxyWithInternal::Create( rtc::Thread::Current(), worker_thread, @@ -49,111 +47,130 @@ VideoRtpReceiver::VideoRtpReceiver( worker_thread, source_), worker_thread))), - attachment_id_(GenerateUniqueId()), - delay_(JitterBufferDelayProxy::Create( - rtc::Thread::Current(), - worker_thread, - new rtc::RefCountedObject(worker_thread))) { + attachment_id_(GenerateUniqueId()) { RTC_DCHECK(worker_thread_); SetStreams(streams); source_->SetState(MediaSourceInterface::kLive); } VideoRtpReceiver::~VideoRtpReceiver() { - // Since cricket::VideoRenderer is not reference counted, - // we need to remove it from the channel before we are deleted. - Stop(); - // Make sure we can't be called by the |source_| anymore. - worker_thread_->Invoke(RTC_FROM_HERE, - [this] { source_->ClearCallback(); }); + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + RTC_DCHECK(stopped_); + RTC_DCHECK(!media_channel_); } std::vector VideoRtpReceiver::stream_ids() const { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); std::vector stream_ids(streams_.size()); for (size_t i = 0; i < streams_.size(); ++i) stream_ids[i] = streams_[i]->id(); return stream_ids; } +rtc::scoped_refptr VideoRtpReceiver::dtls_transport() + const { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + return dtls_transport_; +} + +std::vector> +VideoRtpReceiver::streams() const { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + return streams_; +} + RtpParameters VideoRtpReceiver::GetParameters() const { - if (!media_channel_ || stopped_) { + RTC_DCHECK_RUN_ON(worker_thread_); + if (!media_channel_) return RtpParameters(); - } - return worker_thread_->Invoke(RTC_FROM_HERE, [&] { - return ssrc_ ? media_channel_->GetRtpReceiveParameters(*ssrc_) - : media_channel_->GetDefaultRtpReceiveParameters(); - }); + return ssrc_ ? media_channel_->GetRtpReceiveParameters(*ssrc_) + : media_channel_->GetDefaultRtpReceiveParameters(); } void VideoRtpReceiver::SetFrameDecryptor( rtc::scoped_refptr frame_decryptor) { + RTC_DCHECK_RUN_ON(worker_thread_); frame_decryptor_ = std::move(frame_decryptor); // Special Case: Set the frame decryptor to any value on any existing channel. - if (media_channel_ && ssrc_.has_value() && !stopped_) { - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_); - }); + if (media_channel_ && ssrc_) { + media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_); } } rtc::scoped_refptr VideoRtpReceiver::GetFrameDecryptor() const { + RTC_DCHECK_RUN_ON(worker_thread_); return frame_decryptor_; } void VideoRtpReceiver::SetDepacketizerToDecoderFrameTransformer( rtc::scoped_refptr frame_transformer) { - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - RTC_DCHECK_RUN_ON(worker_thread_); - frame_transformer_ = std::move(frame_transformer); - if (media_channel_ && !stopped_) { - media_channel_->SetDepacketizerToDecoderFrameTransformer( - ssrc_.value_or(0), frame_transformer_); - } - }); + RTC_DCHECK_RUN_ON(worker_thread_); + frame_transformer_ = std::move(frame_transformer); + if (media_channel_) { + media_channel_->SetDepacketizerToDecoderFrameTransformer( + ssrc_.value_or(0), frame_transformer_); + } } void VideoRtpReceiver::Stop() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); // TODO(deadbeef): Need to do more here to fully stop receiving packets. - if (stopped_) { - return; + + if (!stopped_) { + source_->SetState(MediaSourceInterface::kEnded); + stopped_ = true; } - source_->SetState(MediaSourceInterface::kEnded); - if (!media_channel_) { - RTC_LOG(LS_WARNING) << "VideoRtpReceiver::Stop: No video channel exists."; - } else { - // Allow that SetSink fails. This is the normal case when the underlying - // media channel has already been deleted. - worker_thread_->Invoke(RTC_FROM_HERE, [&] { - RTC_DCHECK_RUN_ON(worker_thread_); + + worker_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(worker_thread_); + if (media_channel_) { SetSink(nullptr); - }); - } - delay_->OnStop(); - stopped_ = true; + SetMediaChannel_w(nullptr); + } + source_->ClearCallback(); + }); } void VideoRtpReceiver::StopAndEndTrack() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); Stop(); track_->internal()->set_ended(); } void VideoRtpReceiver::RestartMediaChannel(absl::optional ssrc) { - RTC_DCHECK(media_channel_); - if (!stopped_ && ssrc_ == ssrc) { - return; - } - worker_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + + // `stopped_` will be `true` on construction. RestartMediaChannel + // can in this case function like "ensure started" and flip `stopped_` + // to false. + + // TODO(tommi): Can we restart the media channel without blocking? + bool ok = worker_thread_->Invoke(RTC_FROM_HERE, [&, was_stopped = + stopped_] { RTC_DCHECK_RUN_ON(worker_thread_); - if (!stopped_) { + if (!media_channel_) { + // Ignore further negotiations if we've already been stopped and don't + // have an associated media channel. + RTC_DCHECK(was_stopped); + return false; // Can't restart. + } + + if (!was_stopped && ssrc_ == ssrc) { + // Already running with that ssrc. + return true; + } + + // Disconnect from the previous ssrc. + if (!was_stopped) { SetSink(nullptr); } + bool encoded_sink_enabled = saved_encoded_sink_enabled_; SetEncodedSinkEnabled(false); - stopped_ = false; - - ssrc_ = ssrc; + // Set up the new ssrc. + ssrc_ = std::move(ssrc); SetSink(source_->sink()); if (encoded_sink_enabled) { SetEncodedSinkEnabled(true); @@ -163,47 +180,62 @@ void VideoRtpReceiver::RestartMediaChannel(absl::optional ssrc) { media_channel_->SetDepacketizerToDecoderFrameTransformer( ssrc_.value_or(0), frame_transformer_); } + + if (media_channel_ && ssrc_) { + if (frame_decryptor_) { + media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_); + } + + media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs()); + } + + return true; }); - // Attach any existing frame decryptor to the media channel. - MaybeAttachFrameDecryptorToMediaChannel( - ssrc, worker_thread_, frame_decryptor_, media_channel_, stopped_); - // TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC - // value. - delay_->OnStart(media_channel_, ssrc.value_or(0)); + if (!ok) + return; + + stopped_ = false; } +// RTC_RUN_ON(worker_thread_) void VideoRtpReceiver::SetSink(rtc::VideoSinkInterface* sink) { - RTC_DCHECK(media_channel_); if (ssrc_) { media_channel_->SetSink(*ssrc_, sink); - return; + } else { + media_channel_->SetDefaultSink(sink); } - media_channel_->SetDefaultSink(sink); } void VideoRtpReceiver::SetupMediaChannel(uint32_t ssrc) { - if (!media_channel_) { - RTC_LOG(LS_ERROR) - << "VideoRtpReceiver::SetupMediaChannel: No video channel exists."; - } + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RestartMediaChannel(ssrc); } void VideoRtpReceiver::SetupUnsignaledMediaChannel() { - if (!media_channel_) { - RTC_LOG(LS_ERROR) << "VideoRtpReceiver::SetupUnsignaledMediaChannel: No " - "video channel exists."; - } + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RestartMediaChannel(absl::nullopt); } +uint32_t VideoRtpReceiver::ssrc() const { + RTC_DCHECK_RUN_ON(worker_thread_); + return ssrc_.value_or(0); +} + void VideoRtpReceiver::set_stream_ids(std::vector stream_ids) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); SetStreams(CreateStreamsFromIds(std::move(stream_ids))); } +void VideoRtpReceiver::set_transport( + rtc::scoped_refptr dtls_transport) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); + dtls_transport_ = std::move(dtls_transport); +} + void VideoRtpReceiver::SetStreams( const std::vector>& streams) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); // Remove remote track from any streams that are going away. for (const auto& existing_stream : streams_) { bool removed = true; @@ -236,6 +268,7 @@ void VideoRtpReceiver::SetStreams( } void VideoRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); observer_ = observer; // Deliver any notifications the observer may have missed by being set late. if (received_first_packet_ && observer_) { @@ -245,40 +278,57 @@ void VideoRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) { void VideoRtpReceiver::SetJitterBufferMinimumDelay( absl::optional delay_seconds) { - delay_->Set(delay_seconds); + RTC_DCHECK_RUN_ON(worker_thread_); + delay_.Set(delay_seconds); + if (media_channel_ && ssrc_) + media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs()); } void VideoRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); RTC_DCHECK(media_channel == nullptr || media_channel->media_type() == media_type()); + + if (stopped_ && !media_channel) + return; + worker_thread_->Invoke(RTC_FROM_HERE, [&] { RTC_DCHECK_RUN_ON(worker_thread_); - bool encoded_sink_enabled = saved_encoded_sink_enabled_; - if (encoded_sink_enabled && media_channel_) { - // Turn off the old sink, if any. - SetEncodedSinkEnabled(false); - } - - media_channel_ = static_cast(media_channel); - - if (media_channel_) { - if (saved_generate_keyframe_) { - // TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC - media_channel_->GenerateKeyFrame(ssrc_.value_or(0)); - saved_generate_keyframe_ = false; - } - if (encoded_sink_enabled) { - SetEncodedSinkEnabled(true); - } - if (frame_transformer_) { - media_channel_->SetDepacketizerToDecoderFrameTransformer( - ssrc_.value_or(0), frame_transformer_); - } - } + SetMediaChannel_w(media_channel); }); } +// RTC_RUN_ON(worker_thread_) +void VideoRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) { + if (media_channel == media_channel_) + return; + + bool encoded_sink_enabled = saved_encoded_sink_enabled_; + if (encoded_sink_enabled && media_channel_) { + // Turn off the old sink, if any. + SetEncodedSinkEnabled(false); + } + + media_channel_ = static_cast(media_channel); + + if (media_channel_) { + if (saved_generate_keyframe_) { + // TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC + media_channel_->GenerateKeyFrame(ssrc_.value_or(0)); + saved_generate_keyframe_ = false; + } + if (encoded_sink_enabled) { + SetEncodedSinkEnabled(true); + } + if (frame_transformer_) { + media_channel_->SetDepacketizerToDecoderFrameTransformer( + ssrc_.value_or(0), frame_transformer_); + } + } +} + void VideoRtpReceiver::NotifyFirstPacketReceived() { + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); if (observer_) { observer_->OnFirstPacketReceived(media_type()); } @@ -286,11 +336,10 @@ void VideoRtpReceiver::NotifyFirstPacketReceived() { } std::vector VideoRtpReceiver::GetSources() const { - if (!media_channel_ || !ssrc_ || stopped_) { - return {}; - } - return worker_thread_->Invoke>( - RTC_FROM_HERE, [&] { return media_channel_->GetSources(*ssrc_); }); + RTC_DCHECK_RUN_ON(worker_thread_); + if (!ssrc_ || !media_channel_) + return std::vector(); + return media_channel_->GetSources(*ssrc_); } void VideoRtpReceiver::OnGenerateKeyFrame() { @@ -316,20 +365,21 @@ void VideoRtpReceiver::OnEncodedSinkEnabled(bool enable) { saved_encoded_sink_enabled_ = enable; } +// RTC_RUN_ON(worker_thread_) void VideoRtpReceiver::SetEncodedSinkEnabled(bool enable) { - if (media_channel_) { - if (enable) { - // TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC - auto source = source_; - media_channel_->SetRecordableEncodedFrameCallback( - ssrc_.value_or(0), - [source = std::move(source)](const RecordableEncodedFrame& frame) { - source->BroadcastRecordableEncodedFrame(frame); - }); - } else { - // TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC - media_channel_->ClearRecordableEncodedFrameCallback(ssrc_.value_or(0)); - } + if (!media_channel_) + return; + + // TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC + const auto ssrc = ssrc_.value_or(0); + + if (enable) { + media_channel_->SetRecordableEncodedFrameCallback( + ssrc, [source = source_](const RecordableEncodedFrame& frame) { + source->BroadcastRecordableEncodedFrame(frame); + }); + } else { + media_channel_->ClearRecordableEncodedFrameCallback(ssrc); } } diff --git a/pc/video_rtp_receiver.h b/pc/video_rtp_receiver.h index 8e36af6dfa..89e15a5c79 100644 --- a/pc/video_rtp_receiver.h +++ b/pc/video_rtp_receiver.h @@ -32,18 +32,18 @@ #include "api/video/video_sink_interface.h" #include "api/video/video_source_interface.h" #include "media/base/media_channel.h" -#include "pc/jitter_buffer_delay_interface.h" +#include "pc/jitter_buffer_delay.h" #include "pc/rtp_receiver.h" #include "pc/video_rtp_track_source.h" #include "pc/video_track.h" #include "rtc_base/ref_counted_object.h" +#include "rtc_base/system/no_unique_address.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" namespace webrtc { -class VideoRtpReceiver : public rtc::RefCountedObject, - public VideoRtpTrackSource::Callback { +class VideoRtpReceiver : public RtpReceiverInternal { public: // An SSRC of 0 will create a receiver that will match the first SSRC it // sees. Must be called on signaling thread. @@ -59,23 +59,16 @@ class VideoRtpReceiver : public rtc::RefCountedObject, virtual ~VideoRtpReceiver(); - rtc::scoped_refptr video_track() const { - return track_.get(); - } + rtc::scoped_refptr video_track() const { return track_; } // RtpReceiverInterface implementation rtc::scoped_refptr track() const override { - return track_.get(); - } - rtc::scoped_refptr dtls_transport() const override { - return dtls_transport_; + return track_; } + rtc::scoped_refptr dtls_transport() const override; std::vector stream_ids() const override; std::vector> streams() - const override { - return streams_; - } - + const override; cricket::MediaType media_type() const override { return cricket::MEDIA_TYPE_VIDEO; } @@ -98,13 +91,11 @@ class VideoRtpReceiver : public rtc::RefCountedObject, void StopAndEndTrack() override; void SetupMediaChannel(uint32_t ssrc) override; void SetupUnsignaledMediaChannel() override; - uint32_t ssrc() const override { return ssrc_.value_or(0); } + uint32_t ssrc() const override; void NotifyFirstPacketReceived() override; void set_stream_ids(std::vector stream_ids) override; void set_transport( - rtc::scoped_refptr dtls_transport) override { - dtls_transport_ = dtls_transport; - } + rtc::scoped_refptr dtls_transport) override; void SetStreams(const std::vector>& streams) override; @@ -123,33 +114,68 @@ class VideoRtpReceiver : public rtc::RefCountedObject, void RestartMediaChannel(absl::optional ssrc); void SetSink(rtc::VideoSinkInterface* sink) RTC_RUN_ON(worker_thread_); + void SetMediaChannel_w(cricket::MediaChannel* media_channel) + RTC_RUN_ON(worker_thread_); // VideoRtpTrackSource::Callback - void OnGenerateKeyFrame() override; - void OnEncodedSinkEnabled(bool enable) override; + void OnGenerateKeyFrame(); + void OnEncodedSinkEnabled(bool enable); + void SetEncodedSinkEnabled(bool enable) RTC_RUN_ON(worker_thread_); + class SourceCallback : public VideoRtpTrackSource::Callback { + public: + explicit SourceCallback(VideoRtpReceiver* receiver) : receiver_(receiver) {} + ~SourceCallback() override = default; + + private: + void OnGenerateKeyFrame() override { receiver_->OnGenerateKeyFrame(); } + void OnEncodedSinkEnabled(bool enable) override { + receiver_->OnEncodedSinkEnabled(enable); + } + + VideoRtpReceiver* const receiver_; + } source_callback_{this}; + + RTC_NO_UNIQUE_ADDRESS SequenceChecker signaling_thread_checker_; rtc::Thread* const worker_thread_; const std::string id_; - cricket::VideoMediaChannel* media_channel_ = nullptr; - absl::optional ssrc_; + // See documentation for `stopped_` below for when a valid media channel + // has been assigned and when this pointer will be null. + cricket::VideoMediaChannel* media_channel_ RTC_GUARDED_BY(worker_thread_) = + nullptr; + absl::optional ssrc_ RTC_GUARDED_BY(worker_thread_); // |source_| is held here to be able to change the state of the source when // the VideoRtpReceiver is stopped. - rtc::scoped_refptr source_; - rtc::scoped_refptr> track_; - std::vector> streams_; - bool stopped_ = true; - RtpReceiverObserverInterface* observer_ = nullptr; - bool received_first_packet_ = false; - int attachment_id_ = 0; - rtc::scoped_refptr frame_decryptor_; - rtc::scoped_refptr dtls_transport_; + const rtc::scoped_refptr source_; + const rtc::scoped_refptr> track_; + std::vector> streams_ + RTC_GUARDED_BY(&signaling_thread_checker_); + // `stopped` is state that's used on the signaling thread to indicate whether + // a valid `media_channel_` has been assigned and configured. When an instance + // of VideoRtpReceiver is initially created, `stopped_` is true and will + // remain true until either `SetupMediaChannel` or + // `SetupUnsignaledMediaChannel` is called after assigning a media channel. + // After that, `stopped_` will remain false until `Stop()` is called. + // Note, for checking the state of the class on the worker thread, + // check `media_channel_` instead, as that's the main worker thread state. + bool stopped_ RTC_GUARDED_BY(&signaling_thread_checker_) = true; + RtpReceiverObserverInterface* observer_ + RTC_GUARDED_BY(&signaling_thread_checker_) = nullptr; + bool received_first_packet_ RTC_GUARDED_BY(&signaling_thread_checker_) = + false; + const int attachment_id_; + rtc::scoped_refptr frame_decryptor_ + RTC_GUARDED_BY(worker_thread_); + rtc::scoped_refptr dtls_transport_ + RTC_GUARDED_BY(&signaling_thread_checker_); rtc::scoped_refptr frame_transformer_ RTC_GUARDED_BY(worker_thread_); - // Allows to thread safely change jitter buffer delay. Handles caching cases + // Stores the minimum jitter buffer delay. Handles caching cases // if |SetJitterBufferMinimumDelay| is called before start. - rtc::scoped_refptr delay_; + JitterBufferDelay delay_ RTC_GUARDED_BY(worker_thread_); + // Records if we should generate a keyframe when |media_channel_| gets set up // or switched. bool saved_generate_keyframe_ RTC_GUARDED_BY(worker_thread_) = false; diff --git a/pc/video_rtp_receiver_unittest.cc b/pc/video_rtp_receiver_unittest.cc index e65bc8c7ed..3a8099d30f 100644 --- a/pc/video_rtp_receiver_unittest.cc +++ b/pc/video_rtp_receiver_unittest.cc @@ -17,8 +17,10 @@ #include "test/gmock.h" using ::testing::_; +using ::testing::AnyNumber; using ::testing::InSequence; using ::testing::Mock; +using ::testing::NiceMock; using ::testing::SaveArg; using ::testing::StrictMock; @@ -53,19 +55,26 @@ class VideoRtpReceiverTest : public testing::Test { VideoRtpReceiverTest() : worker_thread_(rtc::Thread::Create()), channel_(nullptr, cricket::VideoOptions()), - receiver_(new VideoRtpReceiver(worker_thread_.get(), - "receiver", - {"stream"})) { + receiver_(rtc::make_ref_counted( + worker_thread_.get(), + std::string("receiver"), + std::vector({"stream"}))) { worker_thread_->Start(); receiver_->SetMediaChannel(&channel_); } + ~VideoRtpReceiverTest() override { + // Clear expectations that tests may have set up before calling Stop(). + Mock::VerifyAndClearExpectations(&channel_); + receiver_->Stop(); + } + webrtc::VideoTrackSourceInterface* Source() { return receiver_->streams()[0]->FindVideoTrack("receiver")->GetSource(); } std::unique_ptr worker_thread_; - MockVideoMediaChannel channel_; + NiceMock channel_; rtc::scoped_refptr receiver_; }; @@ -98,6 +107,10 @@ TEST_F(VideoRtpReceiverTest, // Switching to a new channel should now not cause calls to GenerateKeyFrame. StrictMock channel4(nullptr, cricket::VideoOptions()); receiver_->SetMediaChannel(&channel4); + + // We must call Stop() here since the mock media channels live on the stack + // and `receiver_` still has a pointer to those objects. + receiver_->Stop(); } TEST_F(VideoRtpReceiverTest, EnablesEncodedOutput) { @@ -131,6 +144,10 @@ TEST_F(VideoRtpReceiverTest, DisablesEnablesEncodedOutputOnChannelSwitch) { Source()->RemoveEncodedSink(&sink); StrictMock channel3(nullptr, cricket::VideoOptions()); receiver_->SetMediaChannel(&channel3); + + // We must call Stop() here since the mock media channels live on the stack + // and `receiver_` still has a pointer to those objects. + receiver_->Stop(); } TEST_F(VideoRtpReceiverTest, BroadcastsEncodedFramesWhenEnabled) {