[WebRTC-SendPacketsOnWorkerThread] Cleanup RtpTransportControllerSend

MaybeWorkerThread* GetWorkerQueue() and is removed.
Instead all work is expected to be done on the taskqueue used when
creating the RtpTransportControllerSend.

Bug: webrtc:14502
Change-Id: Iedc30efb8de7592611d6d3c5b5c6cd33c17a60c9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300867
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39872}
This commit is contained in:
Per K 2023-04-13 08:59:19 +02:00 committed by WebRTC LUCI CQ
parent 4665d60e09
commit 37879e9867
8 changed files with 149 additions and 211 deletions

View file

@ -206,7 +206,6 @@ rtc_library("rtp_sender") {
"../modules/rtp_rtcp", "../modules/rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format", "../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/rtp_rtcp:rtp_video_header", "../modules/rtp_rtcp:rtp_video_header",
"../modules/utility:utility",
"../modules/video_coding:chain_diff_calculator", "../modules/video_coding:chain_diff_calculator",
"../modules/video_coding:codec_globals_headers", "../modules/video_coding:codec_globals_headers",
"../modules/video_coding:frame_dependencies_calculator", "../modules/video_coding:frame_dependencies_calculator",

View file

@ -17,6 +17,7 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/transport/goog_cc_factory.h" #include "api/transport/goog_cc_factory.h"
#include "api/transport/network_types.h" #include "api/transport/network_types.h"
#include "api/units/data_rate.h" #include "api/units/data_rate.h"
@ -76,6 +77,7 @@ RtpTransportControllerSend::RtpTransportControllerSend(
: clock_(clock), : clock_(clock),
event_log_(config.event_log), event_log_(config.event_log),
task_queue_factory_(config.task_queue_factory), task_queue_factory_(config.task_queue_factory),
task_queue_(TaskQueueBase::Current()),
bitrate_configurator_(config.bitrate_config), bitrate_configurator_(config.bitrate_config),
pacer_started_(false), pacer_started_(false),
pacer_(clock, pacer_(clock,
@ -102,9 +104,6 @@ RtpTransportControllerSend::RtpTransportControllerSend(
congestion_window_size_(DataSize::PlusInfinity()), congestion_window_size_(DataSize::PlusInfinity()),
is_congested_(false), is_congested_(false),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
task_queue_(*config.trials,
"rtp_send_controller",
config.task_queue_factory),
field_trials_(*config.trials) { field_trials_(*config.trials) {
ParseFieldTrial({&relay_bandwidth_cap_}, ParseFieldTrial({&relay_bandwidth_cap_},
config.trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints")); config.trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));
@ -120,15 +119,10 @@ RtpTransportControllerSend::RtpTransportControllerSend(
} }
RtpTransportControllerSend::~RtpTransportControllerSend() { RtpTransportControllerSend::~RtpTransportControllerSend() {
RTC_DCHECK_RUN_ON(&main_thread_); RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK(video_rtp_senders_.empty()); RTC_DCHECK(video_rtp_senders_.empty());
if (task_queue_.IsCurrent()) { pacer_queue_update_task_.Stop();
// If these repeated tasks run on a task queue owned by controller_task_.Stop();
// `task_queue_`, they are stopped when the task queue is deleted.
// Otherwise, stop them here.
pacer_queue_update_task_.Stop();
controller_task_.Stop();
}
} }
RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
@ -142,7 +136,7 @@ RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
std::unique_ptr<FecController> fec_controller, std::unique_ptr<FecController> fec_controller,
const RtpSenderFrameEncryptionConfig& frame_encryption_config, const RtpSenderFrameEncryptionConfig& frame_encryption_config,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) { rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) {
RTC_DCHECK_RUN_ON(&main_thread_); RTC_DCHECK_RUN_ON(&sequence_checker_);
video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>( video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>(
clock_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms, clock_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms,
send_transport, observers, send_transport, observers,
@ -157,7 +151,7 @@ RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
void RtpTransportControllerSend::DestroyRtpVideoSender( void RtpTransportControllerSend::DestroyRtpVideoSender(
RtpVideoSenderInterface* rtp_video_sender) { RtpVideoSenderInterface* rtp_video_sender) {
RTC_DCHECK_RUN_ON(&main_thread_); RTC_DCHECK_RUN_ON(&sequence_checker_);
std::vector<std::unique_ptr<RtpVideoSenderInterface>>::iterator it = std::vector<std::unique_ptr<RtpVideoSenderInterface>>::iterator it =
video_rtp_senders_.end(); video_rtp_senders_.end();
for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) { for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) {
@ -195,10 +189,6 @@ absl::optional<bool> RtpTransportControllerSend::GetCongestedStateUpdate()
return absl::nullopt; return absl::nullopt;
} }
MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() {
return &task_queue_;
}
PacketRouter* RtpTransportControllerSend::packet_router() { PacketRouter* RtpTransportControllerSend::packet_router() {
return &packet_router_; return &packet_router_;
} }
@ -219,14 +209,14 @@ RtpPacketSender* RtpTransportControllerSend::packet_sender() {
void RtpTransportControllerSend::SetAllocatedSendBitrateLimits( void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
BitrateAllocationLimits limits) { BitrateAllocationLimits limits) {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate; streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate;
streams_config_.max_padding_rate = limits.max_padding_rate; streams_config_.max_padding_rate = limits.max_padding_rate;
streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate; streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate;
UpdateStreamsConfig(); UpdateStreamsConfig();
} }
void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) { void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
streams_config_.pacing_factor = pacing_factor; streams_config_.pacing_factor = pacing_factor;
UpdateStreamsConfig(); UpdateStreamsConfig();
} }
@ -240,13 +230,11 @@ RtpTransportControllerSend::GetStreamFeedbackProvider() {
void RtpTransportControllerSend::RegisterTargetTransferRateObserver( void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
TargetTransferRateObserver* observer) { TargetTransferRateObserver* observer) {
task_queue_.RunOrPost([this, observer] { RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK(observer_ == nullptr);
RTC_DCHECK(observer_ == nullptr); observer_ = observer;
observer_ = observer; observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate);
observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate); MaybeCreateControllers();
MaybeCreateControllers();
});
} }
bool RtpTransportControllerSend::IsRelevantRouteChange( bool RtpTransportControllerSend::IsRelevantRouteChange(
@ -269,8 +257,8 @@ bool RtpTransportControllerSend::IsRelevantRouteChange(
void RtpTransportControllerSend::OnNetworkRouteChanged( void RtpTransportControllerSend::OnNetworkRouteChanged(
absl::string_view transport_name, absl::string_view transport_name,
const rtc::NetworkRoute& network_route) { const rtc::NetworkRoute& network_route) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// Check if the network route is connected. // Check if the network route is connected.
if (!network_route.connected) { if (!network_route.connected) {
// TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and // TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and
// consider merging these two methods. // consider merging these two methods.
@ -300,10 +288,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
if (relay_constraint_update.has_value()) { if (relay_constraint_update.has_value()) {
UpdateBitrateConstraints(*relay_constraint_update); UpdateBitrateConstraints(*relay_constraint_update);
} }
task_queue_.RunOrPost([this, network_route] { transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
RTC_DCHECK_RUN_ON(&task_queue_);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
});
// No need to reset BWE if this is the first time the network connects. // No need to reset BWE if this is the first time the network connects.
return; return;
} }
@ -329,51 +314,42 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
NetworkRouteChange msg; NetworkRouteChange msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds()); msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.constraints = ConvertConstraints(bitrate_config, clock_); msg.constraints = ConvertConstraints(bitrate_config, clock_);
task_queue_.RunOrPost([this, msg, network_route] { transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
RTC_DCHECK_RUN_ON(&task_queue_); if (reset_feedback_on_route_change_) {
transport_overhead_bytes_per_packet_ = network_route.packet_overhead; transport_feedback_adapter_.SetNetworkRoute(network_route);
if (reset_feedback_on_route_change_) { }
transport_feedback_adapter_.SetNetworkRoute(network_route); if (controller_) {
} PostUpdates(controller_->OnNetworkRouteChange(msg));
if (controller_) { } else {
PostUpdates(controller_->OnNetworkRouteChange(msg)); UpdateInitialConstraints(msg.constraints);
} else { }
UpdateInitialConstraints(msg.constraints); is_congested_ = false;
} pacer_.SetCongested(false);
is_congested_ = false;
pacer_.SetCongested(false);
});
} }
} }
void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
RTC_DCHECK_RUN_ON(&main_thread_); RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_LOG(LS_VERBOSE) << "SignalNetworkState " RTC_LOG(LS_VERBOSE) << "SignalNetworkState "
<< (network_available ? "Up" : "Down"); << (network_available ? "Up" : "Down");
NetworkAvailability msg; NetworkAvailability msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds()); msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.network_available = network_available; msg.network_available = network_available;
task_queue_.RunOrPost([this, msg]() { network_available_ = network_available;
RTC_DCHECK_RUN_ON(&task_queue_); if (network_available) {
if (network_available_ == msg.network_available) pacer_.Resume();
return; } else {
network_available_ = msg.network_available; pacer_.Pause();
if (network_available_) { }
pacer_.Resume(); is_congested_ = false;
} else { pacer_.SetCongested(false);
pacer_.Pause();
}
is_congested_ = false;
pacer_.SetCongested(false);
if (controller_) {
control_handler_->SetNetworkAvailability(network_available_);
PostUpdates(controller_->OnNetworkAvailability(msg));
UpdateControlState();
} else {
MaybeCreateControllers();
}
});
if (controller_) {
control_handler_->SetNetworkAvailability(network_available);
PostUpdates(controller_->OnNetworkAvailability(msg));
UpdateControlState();
} else {
MaybeCreateControllers();
}
for (auto& rtp_sender : video_rtp_senders_) { for (auto& rtp_sender : video_rtp_senders_) {
rtp_sender->OnNetworkAvailability(network_available); rtp_sender->OnNetworkAvailability(network_available);
} }
@ -389,11 +365,10 @@ absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
return pacer_.FirstSentPacketTime(); return pacer_.FirstSentPacketTime();
} }
void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
task_queue_.RunOrPost([this, enable]() { RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&task_queue_);
streams_config_.requests_alr_probing = enable; streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig(); UpdateStreamsConfig();
});
} }
void RtpTransportControllerSend::OnSentPacket( void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) { const rtc::SentPacket& sent_packet) {
@ -401,28 +376,22 @@ void RtpTransportControllerSend::OnSentPacket(
// TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and
// simplify task posting logic when the combined network/worker project // simplify task posting logic when the combined network/worker project
// launches. // launches.
if (TaskQueueBase::Current() != task_queue_.TaskQueueForPost()) { if (TaskQueueBase::Current() != task_queue_) {
// We can't use SafeTask here if we are using an owned task queue, because task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() {
// the safety flag will be destroyed when RtpTransportControllerSend is RTC_DCHECK_RUN_ON(&sequence_checker_);
// destroyed on the worker thread. But we must use SafeTask if we are using ProcessSentPacket(sent_packet, /*posted_to_worker=*/true);
// the worker thread, since the worker thread outlives }));
// RtpTransportControllerSend.
task_queue_.TaskQueueForPost()->PostTask(
task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
ProcessSentPacket(sent_packet, /*posted_to_worker=*/true);
}));
return; return;
} }
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
ProcessSentPacket(sent_packet, /*posted_to_worker=*/false); ProcessSentPacket(sent_packet, /*posted_to_worker=*/false);
} }
// RTC_RUN_ON(task_queue_)
void RtpTransportControllerSend::ProcessSentPacket( void RtpTransportControllerSend::ProcessSentPacket(
const rtc::SentPacket& sent_packet, const rtc::SentPacket& sent_packet,
bool posted_to_worker) { bool posted_to_worker) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
absl::optional<SentPacket> packet_msg = absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet); transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (!packet_msg) if (!packet_msg)
@ -445,18 +414,19 @@ void RtpTransportControllerSend::ProcessSentPacket(
// PacketRouter::SendPacket, we need to break the chain here and PostTask to // PacketRouter::SendPacket, we need to break the chain here and PostTask to
// get out of the lock. In testing, having updates to process happens pretty // get out of the lock. In testing, having updates to process happens pretty
// rarely so we do not usually get here. // rarely so we do not usually get here.
task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask( task_queue_->PostTask(
safety_.flag(), SafeTask(safety_.flag(),
[this, control_update = std::move(control_update)]() mutable { [this, control_update = std::move(control_update)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
ProcessSentPacketUpdates(std::move(control_update)); ProcessSentPacketUpdates(std::move(control_update));
})); }));
} }
} }
// RTC_RUN_ON(task_queue_) // RTC_RUN_ON(task_queue_)
void RtpTransportControllerSend::ProcessSentPacketUpdates( void RtpTransportControllerSend::ProcessSentPacketUpdates(
NetworkControlUpdate updates) { NetworkControlUpdate updates) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// Only update outstanding data if: // Only update outstanding data if:
// 1. Packet feedback is used. // 1. Packet feedback is used.
// 2. The packet has not yet received an acknowledgement. // 2. The packet has not yet received an acknowledgement.
@ -469,28 +439,25 @@ void RtpTransportControllerSend::ProcessSentPacketUpdates(
void RtpTransportControllerSend::OnReceivedPacket( void RtpTransportControllerSend::OnReceivedPacket(
const ReceivedPacket& packet_msg) { const ReceivedPacket& packet_msg) {
task_queue_.RunOrPost([this, packet_msg]() { RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&task_queue_); if (controller_)
if (controller_) PostUpdates(controller_->OnReceivedPacket(packet_msg));
PostUpdates(controller_->OnReceivedPacket(packet_msg));
});
} }
void RtpTransportControllerSend::UpdateBitrateConstraints( void RtpTransportControllerSend::UpdateBitrateConstraints(
const BitrateConstraints& updated) { const BitrateConstraints& updated) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
TargetRateConstraints msg = ConvertConstraints(updated, clock_); TargetRateConstraints msg = ConvertConstraints(updated, clock_);
task_queue_.RunOrPost([this, msg]() { if (controller_) {
RTC_DCHECK_RUN_ON(&task_queue_); PostUpdates(controller_->OnTargetRateConstraints(msg));
if (controller_) { } else {
PostUpdates(controller_->OnTargetRateConstraints(msg)); UpdateInitialConstraints(msg);
} else { }
UpdateInitialConstraints(msg);
}
});
} }
void RtpTransportControllerSend::SetSdpBitrateParameters( void RtpTransportControllerSend::SetSdpBitrateParameters(
const BitrateConstraints& constraints) { const BitrateConstraints& constraints) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
absl::optional<BitrateConstraints> updated = absl::optional<BitrateConstraints> updated =
bitrate_configurator_.UpdateWithSdpParameters(constraints); bitrate_configurator_.UpdateWithSdpParameters(constraints);
if (updated.has_value()) { if (updated.has_value()) {
@ -504,6 +471,7 @@ void RtpTransportControllerSend::SetSdpBitrateParameters(
void RtpTransportControllerSend::SetClientBitratePreferences( void RtpTransportControllerSend::SetClientBitratePreferences(
const BitrateSettings& preferences) { const BitrateSettings& preferences) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
absl::optional<BitrateConstraints> updated = absl::optional<BitrateConstraints> updated =
bitrate_configurator_.UpdateWithClientPreferences(preferences); bitrate_configurator_.UpdateWithClientPreferences(preferences);
if (updated.has_value()) { if (updated.has_value()) {
@ -523,7 +491,7 @@ RtpTransportControllerSend::ApplyOrLiftRelayCap(bool is_relayed) {
void RtpTransportControllerSend::OnTransportOverheadChanged( void RtpTransportControllerSend::OnTransportOverheadChanged(
size_t transport_overhead_bytes_per_packet) { size_t transport_overhead_bytes_per_packet) {
RTC_DCHECK_RUN_ON(&main_thread_); RTC_DCHECK_RUN_ON(&sequence_checker_);
if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) { if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) {
RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes; RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes;
return; return;
@ -550,6 +518,7 @@ void RtpTransportControllerSend::IncludeOverheadInPacedSender() {
} }
void RtpTransportControllerSend::EnsureStarted() { void RtpTransportControllerSend::EnsureStarted() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (!pacer_started_) { if (!pacer_started_) {
pacer_started_ = true; pacer_started_ = true;
pacer_.EnsureStarted(); pacer_.EnsureStarted();
@ -557,75 +526,64 @@ void RtpTransportControllerSend::EnsureStarted() {
} }
void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) { void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RemoteBitrateReport msg; RemoteBitrateReport msg;
msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds()); msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::BitsPerSec(bitrate); msg.bandwidth = DataRate::BitsPerSec(bitrate);
task_queue_.RunOrPost([this, msg]() { if (controller_)
RTC_DCHECK_RUN_ON(&task_queue_); PostUpdates(controller_->OnRemoteBitrateReport(msg));
if (controller_)
PostUpdates(controller_->OnRemoteBitrateReport(msg));
});
} }
void RtpTransportControllerSend::OnReceivedRtcpReceiverReport( void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
const ReportBlockList& report_blocks, const ReportBlockList& report_blocks,
int64_t rtt_ms, int64_t rtt_ms,
int64_t now_ms) { int64_t now_ms) {
task_queue_.RunOrPost([this, report_blocks, now_ms, rtt_ms]() { RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&task_queue_); OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms); RoundTripTimeUpdate report;
RoundTripTimeUpdate report; report.receive_time = Timestamp::Millis(now_ms);
report.receive_time = Timestamp::Millis(now_ms); report.round_trip_time = TimeDelta::Millis(rtt_ms);
report.round_trip_time = TimeDelta::Millis(rtt_ms); report.smoothed = false;
report.smoothed = false; if (controller_ && !report.round_trip_time.IsZero())
if (controller_ && !report.round_trip_time.IsZero()) PostUpdates(controller_->OnRoundTripTimeUpdate(report));
PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
} }
void RtpTransportControllerSend::OnAddPacket( void RtpTransportControllerSend::OnAddPacket(
const RtpPacketSendInfo& packet_info) { const RtpPacketSendInfo& packet_info) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds()); Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
feedback_demuxer_.AddPacket(packet_info);
task_queue_.RunOrPost([this, packet_info, creation_time]() { transport_feedback_adapter_.AddPacket(
RTC_DCHECK_RUN_ON(&task_queue_); packet_info, transport_overhead_bytes_per_packet_, creation_time);
feedback_demuxer_.AddPacket(packet_info);
transport_feedback_adapter_.AddPacket(
packet_info, transport_overhead_bytes_per_packet_, creation_time);
});
} }
void RtpTransportControllerSend::OnTransportFeedback( void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) { const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds()); auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.RunOrPost([this, feedback, feedback_time]() { feedback_demuxer_.OnTransportFeedback(feedback);
RTC_DCHECK_RUN_ON(&task_queue_); absl::optional<TransportPacketsFeedback> feedback_msg =
feedback_demuxer_.OnTransportFeedback(feedback); transport_feedback_adapter_.ProcessTransportFeedback(feedback,
absl::optional<TransportPacketsFeedback> feedback_msg = feedback_time);
transport_feedback_adapter_.ProcessTransportFeedback(feedback, if (feedback_msg) {
feedback_time); if (controller_)
if (feedback_msg) { PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
if (controller_)
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
// Only update outstanding data if any packet is first time acked. // Only update outstanding data if any packet is first time acked.
UpdateCongestedState(); UpdateCongestedState();
} }
});
} }
void RtpTransportControllerSend::OnRemoteNetworkEstimate( void RtpTransportControllerSend::OnRemoteNetworkEstimate(
NetworkStateEstimate estimate) { NetworkStateEstimate estimate) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (event_log_) { if (event_log_) {
event_log_->Log(std::make_unique<RtcEventRemoteEstimate>( event_log_->Log(std::make_unique<RtcEventRemoteEstimate>(
estimate.link_capacity_lower, estimate.link_capacity_upper)); estimate.link_capacity_lower, estimate.link_capacity_upper));
} }
estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds()); estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.RunOrPost([this, estimate] { if (controller_)
RTC_DCHECK_RUN_ON(&task_queue_); PostUpdates(controller_->OnNetworkStateEstimate(estimate));
if (controller_)
PostUpdates(controller_->OnNetworkStateEstimate(estimate));
});
} }
void RtpTransportControllerSend::MaybeCreateControllers() { void RtpTransportControllerSend::MaybeCreateControllers() {
@ -663,12 +621,11 @@ void RtpTransportControllerSend::UpdateInitialConstraints(
} }
void RtpTransportControllerSend::StartProcessPeriodicTasks() { void RtpTransportControllerSend::StartProcessPeriodicTasks() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
if (!pacer_queue_update_task_.Running()) { if (!pacer_queue_update_task_.Running()) {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart( pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.TaskQueueForDelayedTasks(), kPacerQueueUpdateInterval, task_queue_, kPacerQueueUpdateInterval, [this]() {
[this]() { RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time = pacer_.ExpectedQueueTime(); TimeDelta expected_queue_time = pacer_.ExpectedQueueTime();
control_handler_->SetPacerQueue(expected_queue_time); control_handler_->SetPacerQueue(expected_queue_time);
UpdateControlState(); UpdateControlState();
@ -678,8 +635,8 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() {
controller_task_.Stop(); controller_task_.Stop();
if (process_interval_.IsFinite()) { if (process_interval_.IsFinite()) {
controller_task_ = RepeatingTaskHandle::DelayedStart( controller_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.TaskQueueForDelayedTasks(), process_interval_, [this]() { task_queue_, process_interval_, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
UpdateControllerWithTimeInterval(); UpdateControllerWithTimeInterval();
return process_interval_; return process_interval_;
}); });
@ -722,6 +679,7 @@ void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks( void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
const ReportBlockList& report_blocks, const ReportBlockList& report_blocks,
int64_t now_ms) { int64_t now_ms) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (report_blocks.empty()) if (report_blocks.empty())
return; return;

View file

@ -34,7 +34,6 @@
#include "modules/pacing/packet_router.h" #include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h" #include "modules/pacing/rtp_packet_pacer.h"
#include "modules/pacing/task_queue_paced_sender.h" #include "modules/pacing/task_queue_paced_sender.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/network_route.h" #include "rtc_base/network_route.h"
#include "rtc_base/race_checker.h" #include "rtc_base/race_checker.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
@ -75,7 +74,6 @@ class RtpTransportControllerSend final
RtpVideoSenderInterface* rtp_video_sender) override; RtpVideoSenderInterface* rtp_video_sender) override;
// Implements RtpTransportControllerSendInterface // Implements RtpTransportControllerSendInterface
MaybeWorkerThread* GetWorkerQueue() override;
PacketRouter* packet_router() override; PacketRouter* packet_router() override;
NetworkStateEstimateObserver* network_state_estimate_observer() override; NetworkStateEstimateObserver* network_state_estimate_observer() override;
@ -123,85 +121,88 @@ class RtpTransportControllerSend final
void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override; void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override;
private: private:
void MaybeCreateControllers() RTC_RUN_ON(task_queue_); void MaybeCreateControllers() RTC_RUN_ON(sequence_checker_);
void UpdateInitialConstraints(TargetRateConstraints new_contraints) void UpdateInitialConstraints(TargetRateConstraints new_contraints)
RTC_RUN_ON(task_queue_); RTC_RUN_ON(sequence_checker_);
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); void StartProcessPeriodicTasks() RTC_RUN_ON(sequence_checker_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); void UpdateControllerWithTimeInterval() RTC_RUN_ON(sequence_checker_);
absl::optional<BitrateConstraints> ApplyOrLiftRelayCap(bool is_relayed); absl::optional<BitrateConstraints> ApplyOrLiftRelayCap(bool is_relayed);
bool IsRelevantRouteChange(const rtc::NetworkRoute& old_route, bool IsRelevantRouteChange(const rtc::NetworkRoute& old_route,
const rtc::NetworkRoute& new_route) const; const rtc::NetworkRoute& new_route) const;
void UpdateBitrateConstraints(const BitrateConstraints& updated); void UpdateBitrateConstraints(const BitrateConstraints& updated);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void UpdateStreamsConfig() RTC_RUN_ON(sequence_checker_);
void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
int64_t now_ms) int64_t now_ms)
RTC_RUN_ON(task_queue_); RTC_RUN_ON(sequence_checker_);
void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_); void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(sequence_checker_);
void UpdateControlState() RTC_RUN_ON(task_queue_); void UpdateControlState() RTC_RUN_ON(sequence_checker_);
void UpdateCongestedState() RTC_RUN_ON(task_queue_); void UpdateCongestedState() RTC_RUN_ON(sequence_checker_);
absl::optional<bool> GetCongestedStateUpdate() const RTC_RUN_ON(task_queue_); absl::optional<bool> GetCongestedStateUpdate() const
RTC_RUN_ON(sequence_checker_);
void ProcessSentPacket(const rtc::SentPacket& sent_packet, void ProcessSentPacket(const rtc::SentPacket& sent_packet,
bool posted_to_worker) RTC_RUN_ON(task_queue_); bool posted_to_worker) RTC_RUN_ON(sequence_checker_);
void ProcessSentPacketUpdates(NetworkControlUpdate updates) void ProcessSentPacketUpdates(NetworkControlUpdate updates)
RTC_RUN_ON(task_queue_); RTC_RUN_ON(sequence_checker_);
Clock* const clock_; Clock* const clock_;
RtcEventLog* const event_log_; RtcEventLog* const event_log_;
TaskQueueFactory* const task_queue_factory_; TaskQueueFactory* const task_queue_factory_;
SequenceChecker main_thread_; SequenceChecker sequence_checker_;
TaskQueueBase* task_queue_;
PacketRouter packet_router_; PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_ std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_
RTC_GUARDED_BY(&main_thread_); RTC_GUARDED_BY(&sequence_checker_);
RtpBitrateConfigurator bitrate_configurator_; RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_; std::map<std::string, rtc::NetworkRoute> network_routes_
bool pacer_started_; RTC_GUARDED_BY(sequence_checker_);
bool pacer_started_ RTC_GUARDED_BY(sequence_checker_);
TaskQueuePacedSender pacer_; TaskQueuePacedSender pacer_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); TargetTransferRateObserver* observer_ RTC_GUARDED_BY(sequence_checker_);
TransportFeedbackDemuxer feedback_demuxer_; TransportFeedbackDemuxer feedback_demuxer_;
TransportFeedbackAdapter transport_feedback_adapter_ TransportFeedbackAdapter transport_feedback_adapter_
RTC_GUARDED_BY(task_queue_); RTC_GUARDED_BY(sequence_checker_);
NetworkControllerFactoryInterface* const controller_factory_override_ NetworkControllerFactoryInterface* const controller_factory_override_
RTC_PT_GUARDED_BY(task_queue_); RTC_PT_GUARDED_BY(sequence_checker_);
const std::unique_ptr<NetworkControllerFactoryInterface> const std::unique_ptr<NetworkControllerFactoryInterface>
controller_factory_fallback_ RTC_PT_GUARDED_BY(task_queue_); controller_factory_fallback_ RTC_PT_GUARDED_BY(sequence_checker_);
std::unique_ptr<CongestionControlHandler> control_handler_ std::unique_ptr<CongestionControlHandler> control_handler_
RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_); RTC_GUARDED_BY(sequence_checker_) RTC_PT_GUARDED_BY(sequence_checker_);
std::unique_ptr<NetworkControllerInterface> controller_ std::unique_ptr<NetworkControllerInterface> controller_
RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_); RTC_GUARDED_BY(sequence_checker_) RTC_PT_GUARDED_BY(sequence_checker_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); TimeDelta process_interval_ RTC_GUARDED_BY(sequence_checker_);
std::map<uint32_t, RTCPReportBlock> last_report_blocks_ std::map<uint32_t, RTCPReportBlock> last_report_blocks_
RTC_GUARDED_BY(task_queue_); RTC_GUARDED_BY(sequence_checker_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_); Timestamp last_report_block_time_ RTC_GUARDED_BY(sequence_checker_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_); NetworkControllerConfig initial_config_ RTC_GUARDED_BY(sequence_checker_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_); StreamsConfig streams_config_ RTC_GUARDED_BY(sequence_checker_);
const bool reset_feedback_on_route_change_; const bool reset_feedback_on_route_change_;
const bool add_pacing_to_cwin_; const bool add_pacing_to_cwin_;
FieldTrialParameter<DataRate> relay_bandwidth_cap_; FieldTrialParameter<DataRate> relay_bandwidth_cap_;
size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_); size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(sequence_checker_);
bool network_available_ RTC_GUARDED_BY(task_queue_); bool network_available_ RTC_GUARDED_BY(sequence_checker_);
RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle pacer_queue_update_task_
RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); RTC_GUARDED_BY(sequence_checker_);
RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(sequence_checker_);
DataSize congestion_window_size_ RTC_GUARDED_BY(task_queue_); DataSize congestion_window_size_ RTC_GUARDED_BY(sequence_checker_);
bool is_congested_ RTC_GUARDED_BY(task_queue_); bool is_congested_ RTC_GUARDED_BY(sequence_checker_);
// Protected by internal locks. // Protected by internal locks.
RateLimiter retransmission_rate_limiter_; RateLimiter retransmission_rate_limiter_;
ScopedTaskSafety safety_; ScopedTaskSafety safety_;
MaybeWorkerThread task_queue_;
const FieldTrialsView& field_trials_; const FieldTrialsView& field_trials_;
}; };

View file

@ -42,7 +42,6 @@ class TaskQueue;
namespace webrtc { namespace webrtc {
class FrameEncryptorInterface; class FrameEncryptorInterface;
class MaybeWorkerThread;
class TargetTransferRateObserver; class TargetTransferRateObserver;
class Transport; class Transport;
class PacketRouter; class PacketRouter;
@ -94,9 +93,6 @@ struct RtpSenderFrameEncryptionConfig {
class RtpTransportControllerSendInterface { class RtpTransportControllerSendInterface {
public: public:
virtual ~RtpTransportControllerSendInterface() {} virtual ~RtpTransportControllerSendInterface() {}
// TODO(webrtc:14502): Remove MaybeWorkerThread when experiment has been
// evaluated.
virtual MaybeWorkerThread* GetWorkerQueue() = 0;
virtual PacketRouter* packet_router() = 0; virtual PacketRouter* packet_router() = 0;
virtual RtpVideoSenderInterface* CreateRtpVideoSender( virtual RtpVideoSenderInterface* CreateRtpVideoSender(

View file

@ -27,7 +27,6 @@
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
#include "modules/rtp_rtcp/source/rtp_sender.h" #include "modules/rtp_rtcp/source/rtp_sender.h"
#include "modules/utility/maybe_worker_thread.h"
#include "modules/video_coding/include/video_codec_interface.h" #include "modules/video_coding/include/video_codec_interface.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"

View file

@ -193,24 +193,10 @@ class RtpVideoSenderTestFixture {
MockTransport& transport() { return transport_; } MockTransport& transport() { return transport_; }
void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }
void Stop() { void Stop() { router_->Stop(); }
RunOnTransportQueue([&]() { router_->Stop(); });
}
void SetActiveModules(const std::vector<bool>& active_modules) { void SetActiveModules(const std::vector<bool>& active_modules) {
RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); }); router_->SetActiveModules(active_modules);
}
// Several RtpVideoSender methods expect to be called on the task queue as
// owned by the send transport. While the SequenceChecker may pick up the
// default thread as the transport queue, explicit checks for the transport
// queue (not just using a SequenceChecker) aren't possible unless such a
// queue is actually active. So RunOnTransportQueue is a convenience function
// that allow for running a `task` on the transport queue, similar to
// SendTask().
void RunOnTransportQueue(absl::AnyInvocable<void() &&> task) {
transport_controller_.GetWorkerQueue()->RunOrPost(std::move(task));
AdvanceTime(TimeDelta::Zero());
} }
private: private:

View file

@ -50,7 +50,6 @@ class MockRtpTransportControllerSend
DestroyRtpVideoSender, DestroyRtpVideoSender,
(RtpVideoSenderInterface*), (RtpVideoSenderInterface*),
(override)); (override));
MOCK_METHOD(MaybeWorkerThread*, GetWorkerQueue, (), (override));
MOCK_METHOD(PacketRouter*, packet_router, (), (override)); MOCK_METHOD(PacketRouter*, packet_router, (), (override));
MOCK_METHOD(NetworkStateEstimateObserver*, MOCK_METHOD(NetworkStateEstimateObserver*,
network_state_estimate_observer, network_state_estimate_observer,

View file

@ -80,7 +80,7 @@ Scenario::~Scenario() {
if (start_time_.IsFinite()) if (start_time_.IsFinite())
Stop(); Stop();
for (auto& call_client : clients_) { for (auto& call_client : clients_) {
call_client->transport_->Disconnect(); call_client->SendTask([&] { call_client->transport_->Disconnect(); });
call_client->UnBind(); call_client->UnBind();
} }
} }