From 948e40cfdfccd3e6cbbb15d259b30ef9ffb45cf4 Mon Sep 17 00:00:00 2001 From: Tommi Date: Mon, 31 May 2021 12:39:57 +0200 Subject: [PATCH] Add thread guards and constness to Call members. Bug: webrtc:11993 Change-Id: I8f6f6fb800f19b9fa2071a1d159dfe9334ab20cb Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220606 Commit-Queue: Tommi Reviewed-by: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#34161} --- call/call.cc | 72 +++++++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/call/call.cc b/call/call.cc index 678ea5506b..de30d6540f 100644 --- a/call/call.cc +++ b/call/call.cc @@ -361,7 +361,7 @@ class Call final : public webrtc::Call, void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, MediaType media_type) - RTC_SHARED_LOCKS_REQUIRED(worker_thread_); + RTC_RUN_ON(worker_thread_); void UpdateAggregateNetworkState(); @@ -369,10 +369,6 @@ class Call final : public webrtc::Call, // callbacks have been registered. void EnsureStarted() RTC_RUN_ON(worker_thread_); - rtc::TaskQueue* send_transport_queue() const { - return transport_send_ptr_->GetWorkerQueue(); - } - Clock* const clock_; TaskQueueFactory* const task_queue_factory_; TaskQueueBase* const worker_thread_; @@ -382,10 +378,12 @@ class Call final : public webrtc::Call, const rtc::scoped_refptr module_process_thread_; const std::unique_ptr call_stats_; const std::unique_ptr bitrate_allocator_; - Call::Config config_; + const Call::Config config_ RTC_GUARDED_BY(worker_thread_); + // Maps to config_.trials, can be used from any thread via `trials()`. + const WebRtcKeyValueConfig& trials_; - NetworkState audio_network_state_; - NetworkState video_network_state_; + NetworkState audio_network_state_ RTC_GUARDED_BY(worker_thread_); + NetworkState video_network_state_ RTC_GUARDED_BY(worker_thread_); // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the // network thread. bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); @@ -403,8 +401,10 @@ class Call final : public webrtc::Call, // TODO(nisse): Should eventually be injected at creation, // with a single object in the bundled case. - RtpStreamReceiverController audio_receiver_controller_; - RtpStreamReceiverController video_receiver_controller_; + RtpStreamReceiverController audio_receiver_controller_ + RTC_GUARDED_BY(worker_thread_); + RtpStreamReceiverController video_receiver_controller_ + RTC_GUARDED_BY(worker_thread_); // This extra map is used for receive processing which is // independent of media type. @@ -457,15 +457,13 @@ class Call final : public webrtc::Call, RtpPayloadStateMap suspended_video_payload_states_ RTC_GUARDED_BY(worker_thread_); - webrtc::RtcEventLog* event_log_; + webrtc::RtcEventLog* const event_log_; // TODO(bugs.webrtc.org/11993) ready to move receive stats access to the // network thread. ReceiveStats receive_stats_ RTC_GUARDED_BY(worker_thread_); uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_); - // TODO(holmer): Remove this lock once BitrateController no longer calls - // OnNetworkChanged from multiple threads. uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); AvgCounter estimated_send_bitrate_kbps_counter_ @@ -482,16 +480,21 @@ class Call final : public webrtc::Call, // Note that |task_safety_| needs to be at a greater scope than the task queue // owned by |transport_send_| since calls might arrive on the network thread // while Call is being deleted and the task queue is being torn down. - ScopedTaskSafety task_safety_; + const ScopedTaskSafety task_safety_; // Caches transport_send_.get(), to avoid racing with destructor. // Note that this is declared before transport_send_ to ensure that it is not // invalidated until no more tasks can be running on the transport_send_ task // queue. - RtpTransportControllerSendInterface* const transport_send_ptr_; + // For more details on the background of this member variable, see: + // https://webrtc-review.googlesource.com/c/src/+/63023/9/call/call.cc + // https://bugs.chromium.org/p/chromium/issues/detail?id=992640 + RtpTransportControllerSendInterface* const transport_send_ptr_ + RTC_GUARDED_BY(send_transport_queue_); // Declared last since it will issue callbacks from a task queue. Declaring it // last ensures that it is destroyed first and any running tasks are finished. - std::unique_ptr transport_send_; + const std::unique_ptr transport_send_; + rtc::TaskQueue* const send_transport_queue_; bool is_started_ RTC_GUARDED_BY(worker_thread_) = false; @@ -748,6 +751,7 @@ Call::Call(Clock* clock, call_stats_(new CallStats(clock_, worker_thread_)), bitrate_allocator_(new BitrateAllocator(this)), config_(config), + trials_(*config.trials), audio_network_state_(kNetworkDown), video_network_state_(kNetworkDown), aggregate_network_up_(false), @@ -768,11 +772,13 @@ Call::Call(Clock* clock, video_send_delay_stats_(new SendDelayStats(clock_)), start_ms_(clock_->TimeInMilliseconds()), transport_send_ptr_(transport_send.get()), - transport_send_(std::move(transport_send)) { + transport_send_(std::move(transport_send)), + send_transport_queue_(transport_send_->GetWorkerQueue()) { RTC_DCHECK(config.event_log != nullptr); RTC_DCHECK(config.trials != nullptr); RTC_DCHECK(network_thread_); RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(send_transport_queue_); // Do not remove this call; it is here to convince the compiler that the // WebRTC source timestamp string needs to be in the final binary. @@ -827,10 +833,10 @@ void Call::EnsureStarted() { // This call seems to kick off a number of things, so probably better left // off being kicked off on request rather than in the ctor. - transport_send_ptr_->RegisterTargetTransferRateObserver(this); + transport_send_->RegisterTargetTransferRateObserver(this); module_process_thread_->EnsureStarted(); - transport_send_ptr_->EnsureStarted(); + transport_send_->EnsureStarted(); } void Call::SetClientBitratePreferences(const BitrateSettings& preferences) { @@ -861,7 +867,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( AudioSendStream* send_stream = new AudioSendStream( clock_, config, config_.audio_state, task_queue_factory_, - module_process_thread_->process_thread(), transport_send_ptr_, + module_process_thread_->process_thread(), transport_send_.get(), bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(), suspended_rtp_state); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == @@ -922,7 +928,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( // set it up asynchronously on the network thread (the registration and // |audio_receiver_controller_| need to live on the network thread). AudioReceiveStream* receive_stream = new AudioReceiveStream( - clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(), + clock_, &audio_receiver_controller_, transport_send_->packet_router(), module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); @@ -999,7 +1005,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( VideoSendStream* send_stream = new VideoSendStream( clock_, num_cpu_cores_, module_process_thread_->process_thread(), - task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_, + task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_.get(), bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, suspended_video_payload_states_, std::move(fec_controller)); @@ -1022,6 +1028,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( webrtc::VideoSendStream* Call::CreateVideoSendStream( webrtc::VideoSendStream::Config config, VideoEncoderConfig encoder_config) { + RTC_DCHECK_RUN_ON(worker_thread_); if (config_.fec_controller_factory) { RTC_LOG(LS_INFO) << "External FEC Controller will be used."; } @@ -1090,7 +1097,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( // |video_receiver_controller_| need to live on the network thread). VideoReceiveStream2* receive_stream = new VideoReceiveStream2( task_queue_factory_, worker_thread_, &video_receiver_controller_, - num_cpu_cores_, transport_send_ptr_->packet_router(), + num_cpu_cores_, transport_send_->packet_router(), std::move(configuration), module_process_thread_->process_thread(), call_stats_.get(), clock_, new VCMTiming(clock_)); @@ -1194,7 +1201,7 @@ void Call::AddAdaptationResource(rtc::scoped_refptr resource) { } RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { - return transport_send_ptr_; + return transport_send_.get(); } Call::Stats Call::GetStats() const { @@ -1204,7 +1211,7 @@ Call::Stats Call::GetStats() const { // TODO(srte): It is unclear if we only want to report queues if network is // available. stats.pacer_delay_ms = - aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0; + aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0; stats.rtt_ms = call_stats_->LastProcessedRtt(); @@ -1221,7 +1228,7 @@ Call::Stats Call::GetStats() const { } const WebRtcKeyValueConfig& Call::trials() const { - return *config_.trials; + return trials_; } TaskQueueBase* Call::network_thread() const { @@ -1303,7 +1310,7 @@ void Call::UpdateAggregateNetworkState() { } aggregate_network_up_ = aggregate_network_up; - transport_send_ptr_->OnNetworkAvailability(aggregate_network_up); + transport_send_->OnNetworkAvailability(aggregate_network_up); } void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { @@ -1315,16 +1322,16 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { // implementations that either just do a PostTask or use locking. video_send_delay_stats_->OnSentPacket(sent_packet.packet_id, clock_->TimeInMilliseconds()); - transport_send_ptr_->OnSentPacket(sent_packet); + transport_send_->OnSentPacket(sent_packet); } void Call::OnStartRateUpdate(DataRate start_rate) { - RTC_DCHECK_RUN_ON(send_transport_queue()); + RTC_DCHECK_RUN_ON(send_transport_queue_); bitrate_allocator_->UpdateStartRate(start_rate.bps()); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - RTC_DCHECK_RUN_ON(send_transport_queue()); + RTC_DCHECK_RUN_ON(send_transport_queue_); uint32_t target_bitrate_bps = msg.target_rate.bps(); // For controlling the rate of feedback messages. @@ -1354,7 +1361,7 @@ void Call::OnTargetTransferRate(TargetTransferRate msg) { } void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { - RTC_DCHECK_RUN_ON(send_transport_queue()); + RTC_DCHECK_RUN_ON(send_transport_queue_); transport_send_ptr_->SetAllocatedSendBitrateLimits(limits); @@ -1581,6 +1588,7 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { video_receiver_controller_.OnRtpPacket(parsed_packet); } +// RTC_RUN_ON(worker_thread_) void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, MediaType media_type) { auto it = receive_rtp_config_.find(packet.Ssrc()); @@ -1596,7 +1604,7 @@ void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, if (header.extension.hasAbsoluteSendTime) { packet_msg.send_time = header.extension.GetAbsoluteSendTimestamp(); } - transport_send_ptr_->OnReceivedPacket(packet_msg); + transport_send_->OnReceivedPacket(packet_msg); if (!use_send_side_bwe && header.extension.hasTransportSequenceNumber) { // Inconsistent configuration of send side BWE. Do nothing.