diff --git a/call/BUILD.gn b/call/BUILD.gn index 19dd5d621d..b4680de860 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -206,7 +206,6 @@ rtc_library("rtp_sender") { "../modules/rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", "../modules/rtp_rtcp:rtp_video_header", - "../modules/utility:utility", "../modules/video_coding:chain_diff_calculator", "../modules/video_coding:codec_globals_headers", "../modules/video_coding:frame_dependencies_calculator", diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 1225e58462..5f3039065a 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -17,6 +17,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" #include "api/transport/goog_cc_factory.h" #include "api/transport/network_types.h" #include "api/units/data_rate.h" @@ -76,6 +77,7 @@ RtpTransportControllerSend::RtpTransportControllerSend( : clock_(clock), event_log_(config.event_log), task_queue_factory_(config.task_queue_factory), + task_queue_(TaskQueueBase::Current()), bitrate_configurator_(config.bitrate_config), pacer_started_(false), pacer_(clock, @@ -102,9 +104,6 @@ RtpTransportControllerSend::RtpTransportControllerSend( congestion_window_size_(DataSize::PlusInfinity()), is_congested_(false), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), - task_queue_(*config.trials, - "rtp_send_controller", - config.task_queue_factory), field_trials_(*config.trials) { ParseFieldTrial({&relay_bandwidth_cap_}, config.trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints")); @@ -120,15 +119,10 @@ RtpTransportControllerSend::RtpTransportControllerSend( } RtpTransportControllerSend::~RtpTransportControllerSend() { - RTC_DCHECK_RUN_ON(&main_thread_); + RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK(video_rtp_senders_.empty()); - if (task_queue_.IsCurrent()) { - // If these repeated tasks run on a task queue owned by - // `task_queue_`, they are stopped when the task queue is deleted. - // Otherwise, stop them here. - pacer_queue_update_task_.Stop(); - controller_task_.Stop(); - } + pacer_queue_update_task_.Stop(); + controller_task_.Stop(); } RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( @@ -142,7 +136,7 @@ RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( std::unique_ptr fec_controller, const RtpSenderFrameEncryptionConfig& frame_encryption_config, rtc::scoped_refptr frame_transformer) { - RTC_DCHECK_RUN_ON(&main_thread_); + RTC_DCHECK_RUN_ON(&sequence_checker_); video_rtp_senders_.push_back(std::make_unique( clock_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms, send_transport, observers, @@ -157,7 +151,7 @@ RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( void RtpTransportControllerSend::DestroyRtpVideoSender( RtpVideoSenderInterface* rtp_video_sender) { - RTC_DCHECK_RUN_ON(&main_thread_); + RTC_DCHECK_RUN_ON(&sequence_checker_); std::vector>::iterator it = video_rtp_senders_.end(); for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) { @@ -195,10 +189,6 @@ absl::optional RtpTransportControllerSend::GetCongestedStateUpdate() return absl::nullopt; } -MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() { - return &task_queue_; -} - PacketRouter* RtpTransportControllerSend::packet_router() { return &packet_router_; } @@ -219,14 +209,14 @@ RtpPacketSender* RtpTransportControllerSend::packet_sender() { void RtpTransportControllerSend::SetAllocatedSendBitrateLimits( BitrateAllocationLimits limits) { - RTC_DCHECK_RUN_ON(&task_queue_); + RTC_DCHECK_RUN_ON(&sequence_checker_); streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate; streams_config_.max_padding_rate = limits.max_padding_rate; streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate; UpdateStreamsConfig(); } void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) { - RTC_DCHECK_RUN_ON(&task_queue_); + RTC_DCHECK_RUN_ON(&sequence_checker_); streams_config_.pacing_factor = pacing_factor; UpdateStreamsConfig(); } @@ -240,13 +230,11 @@ RtpTransportControllerSend::GetStreamFeedbackProvider() { void RtpTransportControllerSend::RegisterTargetTransferRateObserver( TargetTransferRateObserver* observer) { - task_queue_.RunOrPost([this, observer] { - RTC_DCHECK_RUN_ON(&task_queue_); - RTC_DCHECK(observer_ == nullptr); - observer_ = observer; - observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate); - MaybeCreateControllers(); - }); + RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK(observer_ == nullptr); + observer_ = observer; + observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate); + MaybeCreateControllers(); } bool RtpTransportControllerSend::IsRelevantRouteChange( @@ -269,8 +257,8 @@ bool RtpTransportControllerSend::IsRelevantRouteChange( void RtpTransportControllerSend::OnNetworkRouteChanged( absl::string_view transport_name, const rtc::NetworkRoute& network_route) { + RTC_DCHECK_RUN_ON(&sequence_checker_); // Check if the network route is connected. - if (!network_route.connected) { // TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and // consider merging these two methods. @@ -300,10 +288,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( if (relay_constraint_update.has_value()) { UpdateBitrateConstraints(*relay_constraint_update); } - task_queue_.RunOrPost([this, network_route] { - RTC_DCHECK_RUN_ON(&task_queue_); - transport_overhead_bytes_per_packet_ = network_route.packet_overhead; - }); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; // No need to reset BWE if this is the first time the network connects. return; } @@ -329,51 +314,42 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( NetworkRouteChange msg; msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds()); msg.constraints = ConvertConstraints(bitrate_config, clock_); - task_queue_.RunOrPost([this, msg, network_route] { - RTC_DCHECK_RUN_ON(&task_queue_); - transport_overhead_bytes_per_packet_ = network_route.packet_overhead; - if (reset_feedback_on_route_change_) { - transport_feedback_adapter_.SetNetworkRoute(network_route); - } - if (controller_) { - PostUpdates(controller_->OnNetworkRouteChange(msg)); - } else { - UpdateInitialConstraints(msg.constraints); - } - is_congested_ = false; - pacer_.SetCongested(false); - }); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; + if (reset_feedback_on_route_change_) { + transport_feedback_adapter_.SetNetworkRoute(network_route); + } + if (controller_) { + PostUpdates(controller_->OnNetworkRouteChange(msg)); + } else { + UpdateInitialConstraints(msg.constraints); + } + is_congested_ = false; + pacer_.SetCongested(false); } } void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { - RTC_DCHECK_RUN_ON(&main_thread_); + RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_LOG(LS_VERBOSE) << "SignalNetworkState " << (network_available ? "Up" : "Down"); NetworkAvailability msg; msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds()); msg.network_available = network_available; - task_queue_.RunOrPost([this, msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (network_available_ == msg.network_available) - return; - network_available_ = msg.network_available; - if (network_available_) { - pacer_.Resume(); - } else { - pacer_.Pause(); - } - is_congested_ = false; - pacer_.SetCongested(false); - - if (controller_) { - control_handler_->SetNetworkAvailability(network_available_); - PostUpdates(controller_->OnNetworkAvailability(msg)); - UpdateControlState(); - } else { - MaybeCreateControllers(); - } - }); + network_available_ = network_available; + if (network_available) { + pacer_.Resume(); + } else { + pacer_.Pause(); + } + is_congested_ = false; + pacer_.SetCongested(false); + if (controller_) { + control_handler_->SetNetworkAvailability(network_available); + PostUpdates(controller_->OnNetworkAvailability(msg)); + UpdateControlState(); + } else { + MaybeCreateControllers(); + } for (auto& rtp_sender : video_rtp_senders_) { rtp_sender->OnNetworkAvailability(network_available); } @@ -389,11 +365,10 @@ absl::optional RtpTransportControllerSend::GetFirstPacketTime() return pacer_.FirstSentPacketTime(); } void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { - task_queue_.RunOrPost([this, enable]() { - RTC_DCHECK_RUN_ON(&task_queue_); - streams_config_.requests_alr_probing = enable; - UpdateStreamsConfig(); - }); + RTC_DCHECK_RUN_ON(&sequence_checker_); + + streams_config_.requests_alr_probing = enable; + UpdateStreamsConfig(); } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { @@ -401,28 +376,22 @@ void RtpTransportControllerSend::OnSentPacket( // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and // simplify task posting logic when the combined network/worker project // launches. - if (TaskQueueBase::Current() != task_queue_.TaskQueueForPost()) { - // We can't use SafeTask here if we are using an owned task queue, because - // the safety flag will be destroyed when RtpTransportControllerSend is - // destroyed on the worker thread. But we must use SafeTask if we are using - // the worker thread, since the worker thread outlives - // RtpTransportControllerSend. - task_queue_.TaskQueueForPost()->PostTask( - task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() { - RTC_DCHECK_RUN_ON(&task_queue_); - ProcessSentPacket(sent_packet, /*posted_to_worker=*/true); - })); + if (TaskQueueBase::Current() != task_queue_) { + task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + ProcessSentPacket(sent_packet, /*posted_to_worker=*/true); + })); return; } - RTC_DCHECK_RUN_ON(&task_queue_); + RTC_DCHECK_RUN_ON(&sequence_checker_); ProcessSentPacket(sent_packet, /*posted_to_worker=*/false); } -// RTC_RUN_ON(task_queue_) void RtpTransportControllerSend::ProcessSentPacket( const rtc::SentPacket& sent_packet, bool posted_to_worker) { + RTC_DCHECK_RUN_ON(&sequence_checker_); absl::optional packet_msg = transport_feedback_adapter_.ProcessSentPacket(sent_packet); if (!packet_msg) @@ -445,18 +414,19 @@ void RtpTransportControllerSend::ProcessSentPacket( // PacketRouter::SendPacket, we need to break the chain here and PostTask to // get out of the lock. In testing, having updates to process happens pretty // rarely so we do not usually get here. - task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask( - safety_.flag(), - [this, control_update = std::move(control_update)]() mutable { - RTC_DCHECK_RUN_ON(&task_queue_); - ProcessSentPacketUpdates(std::move(control_update)); - })); + task_queue_->PostTask( + SafeTask(safety_.flag(), + [this, control_update = std::move(control_update)]() mutable { + RTC_DCHECK_RUN_ON(&sequence_checker_); + ProcessSentPacketUpdates(std::move(control_update)); + })); } } // RTC_RUN_ON(task_queue_) void RtpTransportControllerSend::ProcessSentPacketUpdates( NetworkControlUpdate updates) { + RTC_DCHECK_RUN_ON(&sequence_checker_); // Only update outstanding data if: // 1. Packet feedback is used. // 2. The packet has not yet received an acknowledgement. @@ -469,28 +439,25 @@ void RtpTransportControllerSend::ProcessSentPacketUpdates( void RtpTransportControllerSend::OnReceivedPacket( const ReceivedPacket& packet_msg) { - task_queue_.RunOrPost([this, packet_msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnReceivedPacket(packet_msg)); - }); + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (controller_) + PostUpdates(controller_->OnReceivedPacket(packet_msg)); } void RtpTransportControllerSend::UpdateBitrateConstraints( const BitrateConstraints& updated) { + RTC_DCHECK_RUN_ON(&sequence_checker_); TargetRateConstraints msg = ConvertConstraints(updated, clock_); - task_queue_.RunOrPost([this, msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) { - PostUpdates(controller_->OnTargetRateConstraints(msg)); - } else { - UpdateInitialConstraints(msg); - } - }); + if (controller_) { + PostUpdates(controller_->OnTargetRateConstraints(msg)); + } else { + UpdateInitialConstraints(msg); + } } void RtpTransportControllerSend::SetSdpBitrateParameters( const BitrateConstraints& constraints) { + RTC_DCHECK_RUN_ON(&sequence_checker_); absl::optional updated = bitrate_configurator_.UpdateWithSdpParameters(constraints); if (updated.has_value()) { @@ -504,6 +471,7 @@ void RtpTransportControllerSend::SetSdpBitrateParameters( void RtpTransportControllerSend::SetClientBitratePreferences( const BitrateSettings& preferences) { + RTC_DCHECK_RUN_ON(&sequence_checker_); absl::optional updated = bitrate_configurator_.UpdateWithClientPreferences(preferences); if (updated.has_value()) { @@ -523,7 +491,7 @@ RtpTransportControllerSend::ApplyOrLiftRelayCap(bool is_relayed) { void RtpTransportControllerSend::OnTransportOverheadChanged( size_t transport_overhead_bytes_per_packet) { - RTC_DCHECK_RUN_ON(&main_thread_); + RTC_DCHECK_RUN_ON(&sequence_checker_); if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) { RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes; return; @@ -550,6 +518,7 @@ void RtpTransportControllerSend::IncludeOverheadInPacedSender() { } void RtpTransportControllerSend::EnsureStarted() { + RTC_DCHECK_RUN_ON(&sequence_checker_); if (!pacer_started_) { pacer_started_ = true; pacer_.EnsureStarted(); @@ -557,75 +526,64 @@ void RtpTransportControllerSend::EnsureStarted() { } void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) { + RTC_DCHECK_RUN_ON(&sequence_checker_); RemoteBitrateReport msg; msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds()); msg.bandwidth = DataRate::BitsPerSec(bitrate); - task_queue_.RunOrPost([this, msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnRemoteBitrateReport(msg)); - }); + if (controller_) + PostUpdates(controller_->OnRemoteBitrateReport(msg)); } void RtpTransportControllerSend::OnReceivedRtcpReceiverReport( const ReportBlockList& report_blocks, int64_t rtt_ms, int64_t now_ms) { - task_queue_.RunOrPost([this, report_blocks, now_ms, rtt_ms]() { - RTC_DCHECK_RUN_ON(&task_queue_); - OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); - RoundTripTimeUpdate report; - report.receive_time = Timestamp::Millis(now_ms); - report.round_trip_time = TimeDelta::Millis(rtt_ms); - report.smoothed = false; - if (controller_ && !report.round_trip_time.IsZero()) - PostUpdates(controller_->OnRoundTripTimeUpdate(report)); - }); + RTC_DCHECK_RUN_ON(&sequence_checker_); + OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); + RoundTripTimeUpdate report; + report.receive_time = Timestamp::Millis(now_ms); + report.round_trip_time = TimeDelta::Millis(rtt_ms); + report.smoothed = false; + if (controller_ && !report.round_trip_time.IsZero()) + PostUpdates(controller_->OnRoundTripTimeUpdate(report)); } void RtpTransportControllerSend::OnAddPacket( const RtpPacketSendInfo& packet_info) { + RTC_DCHECK_RUN_ON(&sequence_checker_); Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds()); - - task_queue_.RunOrPost([this, packet_info, creation_time]() { - RTC_DCHECK_RUN_ON(&task_queue_); - feedback_demuxer_.AddPacket(packet_info); - transport_feedback_adapter_.AddPacket( - packet_info, transport_overhead_bytes_per_packet_, creation_time); - }); + feedback_demuxer_.AddPacket(packet_info); + transport_feedback_adapter_.AddPacket( + packet_info, transport_overhead_bytes_per_packet_, creation_time); } void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { + RTC_DCHECK_RUN_ON(&sequence_checker_); auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds()); - task_queue_.RunOrPost([this, feedback, feedback_time]() { - RTC_DCHECK_RUN_ON(&task_queue_); - feedback_demuxer_.OnTransportFeedback(feedback); - absl::optional feedback_msg = - transport_feedback_adapter_.ProcessTransportFeedback(feedback, - feedback_time); - if (feedback_msg) { - if (controller_) - PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); + feedback_demuxer_.OnTransportFeedback(feedback); + absl::optional feedback_msg = + transport_feedback_adapter_.ProcessTransportFeedback(feedback, + feedback_time); + if (feedback_msg) { + if (controller_) + PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); - // Only update outstanding data if any packet is first time acked. - UpdateCongestedState(); - } - }); + // Only update outstanding data if any packet is first time acked. + UpdateCongestedState(); + } } void RtpTransportControllerSend::OnRemoteNetworkEstimate( NetworkStateEstimate estimate) { + RTC_DCHECK_RUN_ON(&sequence_checker_); if (event_log_) { event_log_->Log(std::make_unique( estimate.link_capacity_lower, estimate.link_capacity_upper)); } estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds()); - task_queue_.RunOrPost([this, estimate] { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnNetworkStateEstimate(estimate)); - }); + if (controller_) + PostUpdates(controller_->OnNetworkStateEstimate(estimate)); } void RtpTransportControllerSend::MaybeCreateControllers() { @@ -663,12 +621,11 @@ void RtpTransportControllerSend::UpdateInitialConstraints( } void RtpTransportControllerSend::StartProcessPeriodicTasks() { - RTC_DCHECK_RUN_ON(&task_queue_); + RTC_DCHECK_RUN_ON(&sequence_checker_); if (!pacer_queue_update_task_.Running()) { pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart( - task_queue_.TaskQueueForDelayedTasks(), kPacerQueueUpdateInterval, - [this]() { - RTC_DCHECK_RUN_ON(&task_queue_); + task_queue_, kPacerQueueUpdateInterval, [this]() { + RTC_DCHECK_RUN_ON(&sequence_checker_); TimeDelta expected_queue_time = pacer_.ExpectedQueueTime(); control_handler_->SetPacerQueue(expected_queue_time); UpdateControlState(); @@ -678,8 +635,8 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() { controller_task_.Stop(); if (process_interval_.IsFinite()) { controller_task_ = RepeatingTaskHandle::DelayedStart( - task_queue_.TaskQueueForDelayedTasks(), process_interval_, [this]() { - RTC_DCHECK_RUN_ON(&task_queue_); + task_queue_, process_interval_, [this]() { + RTC_DCHECK_RUN_ON(&sequence_checker_); UpdateControllerWithTimeInterval(); return process_interval_; }); @@ -722,6 +679,7 @@ void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) { void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks( const ReportBlockList& report_blocks, int64_t now_ms) { + RTC_DCHECK_RUN_ON(&sequence_checker_); if (report_blocks.empty()) return; diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index eaac55f829..02a7f524a4 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -34,7 +34,6 @@ #include "modules/pacing/packet_router.h" #include "modules/pacing/rtp_packet_pacer.h" #include "modules/pacing/task_queue_paced_sender.h" -#include "modules/utility/maybe_worker_thread.h" #include "rtc_base/network_route.h" #include "rtc_base/race_checker.h" #include "rtc_base/task_queue.h" @@ -75,7 +74,6 @@ class RtpTransportControllerSend final RtpVideoSenderInterface* rtp_video_sender) override; // Implements RtpTransportControllerSendInterface - MaybeWorkerThread* GetWorkerQueue() override; PacketRouter* packet_router() override; NetworkStateEstimateObserver* network_state_estimate_observer() override; @@ -123,85 +121,88 @@ class RtpTransportControllerSend final void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override; private: - void MaybeCreateControllers() RTC_RUN_ON(task_queue_); + void MaybeCreateControllers() RTC_RUN_ON(sequence_checker_); void UpdateInitialConstraints(TargetRateConstraints new_contraints) - RTC_RUN_ON(task_queue_); + RTC_RUN_ON(sequence_checker_); - void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); - void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); + void StartProcessPeriodicTasks() RTC_RUN_ON(sequence_checker_); + void UpdateControllerWithTimeInterval() RTC_RUN_ON(sequence_checker_); absl::optional ApplyOrLiftRelayCap(bool is_relayed); bool IsRelevantRouteChange(const rtc::NetworkRoute& old_route, const rtc::NetworkRoute& new_route) const; void UpdateBitrateConstraints(const BitrateConstraints& updated); - void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); + void UpdateStreamsConfig() RTC_RUN_ON(sequence_checker_); void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, int64_t now_ms) - RTC_RUN_ON(task_queue_); - void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_); - void UpdateControlState() RTC_RUN_ON(task_queue_); - void UpdateCongestedState() RTC_RUN_ON(task_queue_); - absl::optional GetCongestedStateUpdate() const RTC_RUN_ON(task_queue_); + RTC_RUN_ON(sequence_checker_); + void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(sequence_checker_); + void UpdateControlState() RTC_RUN_ON(sequence_checker_); + void UpdateCongestedState() RTC_RUN_ON(sequence_checker_); + absl::optional GetCongestedStateUpdate() const + RTC_RUN_ON(sequence_checker_); void ProcessSentPacket(const rtc::SentPacket& sent_packet, - bool posted_to_worker) RTC_RUN_ON(task_queue_); + bool posted_to_worker) RTC_RUN_ON(sequence_checker_); void ProcessSentPacketUpdates(NetworkControlUpdate updates) - RTC_RUN_ON(task_queue_); + RTC_RUN_ON(sequence_checker_); Clock* const clock_; RtcEventLog* const event_log_; TaskQueueFactory* const task_queue_factory_; - SequenceChecker main_thread_; + SequenceChecker sequence_checker_; + TaskQueueBase* task_queue_; PacketRouter packet_router_; std::vector> video_rtp_senders_ - RTC_GUARDED_BY(&main_thread_); + RTC_GUARDED_BY(&sequence_checker_); RtpBitrateConfigurator bitrate_configurator_; - std::map network_routes_; - bool pacer_started_; + std::map network_routes_ + RTC_GUARDED_BY(sequence_checker_); + bool pacer_started_ RTC_GUARDED_BY(sequence_checker_); TaskQueuePacedSender pacer_; - TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); + TargetTransferRateObserver* observer_ RTC_GUARDED_BY(sequence_checker_); TransportFeedbackDemuxer feedback_demuxer_; TransportFeedbackAdapter transport_feedback_adapter_ - RTC_GUARDED_BY(task_queue_); + RTC_GUARDED_BY(sequence_checker_); NetworkControllerFactoryInterface* const controller_factory_override_ - RTC_PT_GUARDED_BY(task_queue_); + RTC_PT_GUARDED_BY(sequence_checker_); const std::unique_ptr - controller_factory_fallback_ RTC_PT_GUARDED_BY(task_queue_); + controller_factory_fallback_ RTC_PT_GUARDED_BY(sequence_checker_); std::unique_ptr control_handler_ - RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_); + RTC_GUARDED_BY(sequence_checker_) RTC_PT_GUARDED_BY(sequence_checker_); std::unique_ptr controller_ - RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_); + RTC_GUARDED_BY(sequence_checker_) RTC_PT_GUARDED_BY(sequence_checker_); - TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); + TimeDelta process_interval_ RTC_GUARDED_BY(sequence_checker_); std::map last_report_blocks_ - RTC_GUARDED_BY(task_queue_); - Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); + RTC_GUARDED_BY(sequence_checker_); + Timestamp last_report_block_time_ RTC_GUARDED_BY(sequence_checker_); - NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); - StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); + NetworkControllerConfig initial_config_ RTC_GUARDED_BY(sequence_checker_); + StreamsConfig streams_config_ RTC_GUARDED_BY(sequence_checker_); const bool reset_feedback_on_route_change_; const bool add_pacing_to_cwin_; FieldTrialParameter relay_bandwidth_cap_; - size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_); - bool network_available_ RTC_GUARDED_BY(task_queue_); - RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); - RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); + size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(sequence_checker_); + bool network_available_ RTC_GUARDED_BY(sequence_checker_); + RepeatingTaskHandle pacer_queue_update_task_ + RTC_GUARDED_BY(sequence_checker_); + RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(sequence_checker_); - DataSize congestion_window_size_ RTC_GUARDED_BY(task_queue_); - bool is_congested_ RTC_GUARDED_BY(task_queue_); + DataSize congestion_window_size_ RTC_GUARDED_BY(sequence_checker_); + bool is_congested_ RTC_GUARDED_BY(sequence_checker_); // Protected by internal locks. RateLimiter retransmission_rate_limiter_; ScopedTaskSafety safety_; - MaybeWorkerThread task_queue_; const FieldTrialsView& field_trials_; }; diff --git a/call/rtp_transport_controller_send_interface.h b/call/rtp_transport_controller_send_interface.h index 44df5aa736..6ac2d84d03 100644 --- a/call/rtp_transport_controller_send_interface.h +++ b/call/rtp_transport_controller_send_interface.h @@ -42,7 +42,6 @@ class TaskQueue; namespace webrtc { class FrameEncryptorInterface; -class MaybeWorkerThread; class TargetTransferRateObserver; class Transport; class PacketRouter; @@ -94,9 +93,6 @@ struct RtpSenderFrameEncryptionConfig { class RtpTransportControllerSendInterface { public: virtual ~RtpTransportControllerSendInterface() {} - // TODO(webrtc:14502): Remove MaybeWorkerThread when experiment has been - // evaluated. - virtual MaybeWorkerThread* GetWorkerQueue() = 0; virtual PacketRouter* packet_router() = 0; virtual RtpVideoSenderInterface* CreateRtpVideoSender( diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 38f48de41a..708aa81f4e 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -27,7 +27,6 @@ #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" #include "modules/rtp_rtcp/source/rtp_sender.h" -#include "modules/utility/maybe_worker_thread.h" #include "modules/video_coding/include/video_codec_interface.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 3181cfb0a4..f7407e7d65 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -193,24 +193,10 @@ class RtpVideoSenderTestFixture { MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } - void Stop() { - RunOnTransportQueue([&]() { router_->Stop(); }); - } + void Stop() { router_->Stop(); } void SetActiveModules(const std::vector& active_modules) { - RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); }); - } - - // Several RtpVideoSender methods expect to be called on the task queue as - // owned by the send transport. While the SequenceChecker may pick up the - // default thread as the transport queue, explicit checks for the transport - // queue (not just using a SequenceChecker) aren't possible unless such a - // queue is actually active. So RunOnTransportQueue is a convenience function - // that allow for running a `task` on the transport queue, similar to - // SendTask(). - void RunOnTransportQueue(absl::AnyInvocable task) { - transport_controller_.GetWorkerQueue()->RunOrPost(std::move(task)); - AdvanceTime(TimeDelta::Zero()); + router_->SetActiveModules(active_modules); } private: diff --git a/call/test/mock_rtp_transport_controller_send.h b/call/test/mock_rtp_transport_controller_send.h index 6e78534de2..0c522dfff4 100644 --- a/call/test/mock_rtp_transport_controller_send.h +++ b/call/test/mock_rtp_transport_controller_send.h @@ -50,7 +50,6 @@ class MockRtpTransportControllerSend DestroyRtpVideoSender, (RtpVideoSenderInterface*), (override)); - MOCK_METHOD(MaybeWorkerThread*, GetWorkerQueue, (), (override)); MOCK_METHOD(PacketRouter*, packet_router, (), (override)); MOCK_METHOD(NetworkStateEstimateObserver*, network_state_estimate_observer, diff --git a/test/scenario/scenario.cc b/test/scenario/scenario.cc index 93377120a1..98f59e6c7d 100644 --- a/test/scenario/scenario.cc +++ b/test/scenario/scenario.cc @@ -80,7 +80,7 @@ Scenario::~Scenario() { if (start_time_.IsFinite()) Stop(); for (auto& call_client : clients_) { - call_client->transport_->Disconnect(); + call_client->SendTask([&] { call_client->transport_->Disconnect(); }); call_client->UnBind(); } }