[WebRTC-SendPacketsOnWorkerThread] Cleanup AudioSendStream

This remove use of MaybeWorkerThread* rtp_transport_queue_ from
AudioSendStream.  The worker queue is alwauys assumed ot be used where
rtp_transport_queue_ was used.

Bug: webrtc:14502
Change-Id: Ia516ce7340d712671e0ecb301bba9d66e7216973
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300400
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39816}
This commit is contained in:
Per K 2023-04-05 14:58:18 +02:00 committed by WebRTC LUCI CQ
parent b70a36e770
commit dd557fdb1e
4 changed files with 28 additions and 68 deletions

View file

@ -86,7 +86,6 @@ rtc_library("audio") {
"../modules/pacing", "../modules/pacing",
"../modules/rtp_rtcp", "../modules/rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format", "../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/utility:utility",
"../rtc_base:audio_format_to_string", "../rtc_base:audio_format_to_string",
"../rtc_base:buffer", "../rtc_base:buffer",
"../rtc_base:checks", "../rtc_base:checks",
@ -196,7 +195,6 @@ if (rtc_include_tests) {
"../modules/pacing", "../modules/pacing",
"../modules/rtp_rtcp:mock_rtp_rtcp", "../modules/rtp_rtcp:mock_rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format", "../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/utility:utility",
"../rtc_base:checks", "../rtc_base:checks",
"../rtc_base:gunit_helpers", "../rtc_base:gunit_helpers",
"../rtc_base:macromagic", "../rtc_base:macromagic",

View file

@ -147,7 +147,6 @@ AudioSendStream::AudioSendStream(
const FieldTrialsView& field_trials) const FieldTrialsView& field_trials)
: clock_(clock), : clock_(clock),
field_trials_(field_trials), field_trials_(field_trials),
rtp_transport_queue_(rtp_transport->GetWorkerQueue()),
allocate_audio_without_feedback_( allocate_audio_without_feedback_(
field_trials_.IsEnabled("WebRTC-Audio-ABWENoTWCC")), field_trials_.IsEnabled("WebRTC-Audio-ABWENoTWCC")),
enable_audio_alr_probing_( enable_audio_alr_probing_(
@ -164,7 +163,6 @@ AudioSendStream::AudioSendStream(
rtp_rtcp_module_(channel_send_->GetRtpRtcp()), rtp_rtcp_module_(channel_send_->GetRtpRtcp()),
suspended_rtp_state_(suspended_rtp_state) { suspended_rtp_state_(suspended_rtp_state) {
RTC_LOG(LS_INFO) << "AudioSendStream: " << config.rtp.ssrc; RTC_LOG(LS_INFO) << "AudioSendStream: " << config.rtp.ssrc;
RTC_DCHECK(rtp_transport_queue_);
RTC_DCHECK(audio_state_); RTC_DCHECK(audio_state_);
RTC_DCHECK(channel_send_); RTC_DCHECK(channel_send_);
RTC_DCHECK(bitrate_allocator_); RTC_DCHECK(bitrate_allocator_);
@ -182,10 +180,6 @@ AudioSendStream::~AudioSendStream() {
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc; RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_); RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects(); channel_send_->ResetSenderCongestionControlObjects();
// Blocking call to synchronize state with worker queue to ensure that there
// are no pending tasks left that keeps references to audio.
rtp_transport_queue_->RunSynchronous([] {});
} }
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const { const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
@ -510,7 +504,7 @@ void AudioSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
} }
uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) { uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
RTC_DCHECK_RUN_ON(rtp_transport_queue_); RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// Pick a target bitrate between the constraints. Overrules the allocator if // Pick a target bitrate between the constraints. Overrules the allocator if
// it 1) allocated a bitrate of zero to disable the stream or 2) allocated a // it 1) allocated a bitrate of zero to disable the stream or 2) allocated a
@ -825,6 +819,7 @@ void AudioSendStream::ReconfigureBitrateObserver(
} }
void AudioSendStream::ConfigureBitrateObserver() { void AudioSendStream::ConfigureBitrateObserver() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// This either updates the current observer or adds a new observer. // This either updates the current observer or adds a new observer.
// TODO(srte): Add overhead compensation here. // TODO(srte): Add overhead compensation here.
auto constraints = GetMinMaxBitrateConstraints(); auto constraints = GetMinMaxBitrateConstraints();
@ -846,30 +841,24 @@ void AudioSendStream::ConfigureBitrateObserver() {
priority_bitrate += min_overhead; priority_bitrate += min_overhead;
} }
if (allocation_settings_.priority_bitrate_raw) if (allocation_settings_.priority_bitrate_raw) {
priority_bitrate = *allocation_settings_.priority_bitrate_raw; priority_bitrate = *allocation_settings_.priority_bitrate_raw;
}
rtp_transport_queue_->RunOrPost([this, constraints, priority_bitrate,
config_bitrate_priority =
config_.bitrate_priority] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
bitrate_allocator_->AddObserver( bitrate_allocator_->AddObserver(
this, this,
MediaStreamAllocationConfig{ MediaStreamAllocationConfig{
constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(), constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(), 0,
0, priority_bitrate.bps(), true, priority_bitrate.bps(), true,
allocation_settings_.bitrate_priority.value_or( allocation_settings_.bitrate_priority.value_or(
config_bitrate_priority)}); config_.bitrate_priority)});
});
registered_with_allocator_ = true; registered_with_allocator_ = true;
} }
void AudioSendStream::RemoveBitrateObserver() { void AudioSendStream::RemoveBitrateObserver() {
registered_with_allocator_ = false; registered_with_allocator_ = false;
rtp_transport_queue_->RunSynchronous([this] {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
bitrate_allocator_->RemoveObserver(this); bitrate_allocator_->RemoveObserver(this);
});
} }
absl::optional<AudioSendStream::TargetAudioBitrateConstraints> absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
@ -930,10 +919,7 @@ void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() {
if (!new_constraints.has_value()) { if (!new_constraints.has_value()) {
return; return;
} }
rtp_transport_queue_->RunOrPost([this, new_constraints]() {
RTC_DCHECK_RUN_ON(rtp_transport_queue_);
cached_constraints_ = new_constraints; cached_constraints_ = new_constraints;
});
} }
} // namespace internal } // namespace internal

View file

@ -25,7 +25,6 @@
#include "call/audio_state.h" #include "call/audio_state.h"
#include "call/bitrate_allocator.h" #include "call/bitrate_allocator.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_interface.h" #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/experiments/struct_parameters_parser.h" #include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h" #include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
@ -173,7 +172,6 @@ class AudioSendStream final : public webrtc::AudioSendStream,
SequenceChecker worker_thread_checker_; SequenceChecker worker_thread_checker_;
rtc::RaceChecker audio_capture_race_checker_; rtc::RaceChecker audio_capture_race_checker_;
MaybeWorkerThread* rtp_transport_queue_;
const bool allocate_audio_without_feedback_; const bool allocate_audio_without_feedback_;
const bool force_no_audio_feedback_ = allocate_audio_without_feedback_; const bool force_no_audio_feedback_ = allocate_audio_without_feedback_;
@ -196,10 +194,10 @@ class AudioSendStream final : public webrtc::AudioSendStream,
webrtc::voe::AudioLevel audio_level_ RTC_GUARDED_BY(audio_level_lock_); webrtc::voe::AudioLevel audio_level_ RTC_GUARDED_BY(audio_level_lock_);
BitrateAllocatorInterface* const bitrate_allocator_ BitrateAllocatorInterface* const bitrate_allocator_
RTC_GUARDED_BY(rtp_transport_queue_); RTC_GUARDED_BY(worker_thread_checker_);
// Constrains cached to be accessed from `rtp_transport_queue_`.
absl::optional<AudioSendStream::TargetAudioBitrateConstraints> absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
cached_constraints_ RTC_GUARDED_BY(rtp_transport_queue_) = absl::nullopt; cached_constraints_ RTC_GUARDED_BY(worker_thread_checker_) =
absl::nullopt;
RtpTransportControllerSendInterface* const rtp_transport_; RtpTransportControllerSendInterface* const rtp_transport_;
RtpRtcpInterface* const rtp_rtcp_module_; RtpRtcpInterface* const rtp_rtcp_module_;

View file

@ -30,7 +30,6 @@
#include "modules/audio_processing/include/mock_audio_processing.h" #include "modules/audio_processing/include/mock_audio_processing.h"
#include "modules/rtp_rtcp/mocks/mock_rtcp_bandwidth_observer.h" #include "modules/rtp_rtcp/mocks/mock_rtcp_bandwidth_observer.h"
#include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h" #include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h"
#include "modules/utility/maybe_worker_thread.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
#include "test/gtest.h" #include "test/gtest.h"
#include "test/mock_audio_encoder.h" #include "test/mock_audio_encoder.h"
@ -155,9 +154,6 @@ struct ConfigHelper {
? nullptr ? nullptr
: rtc::make_ref_counted<NiceMock<MockAudioProcessing>>()), : rtc::make_ref_counted<NiceMock<MockAudioProcessing>>()),
bitrate_allocator_(&limit_observer_), bitrate_allocator_(&limit_observer_),
worker_queue_(field_trials,
"ConfigHelper_worker_queue",
time_controller_.GetTaskQueueFactory()),
audio_encoder_(nullptr) { audio_encoder_(nullptr) {
using ::testing::Invoke; using ::testing::Invoke;
@ -188,8 +184,6 @@ struct ConfigHelper {
} }
std::unique_ptr<internal::AudioSendStream> CreateAudioSendStream() { std::unique_ptr<internal::AudioSendStream> CreateAudioSendStream() {
EXPECT_CALL(rtp_transport_, GetWorkerQueue())
.WillRepeatedly(Return(&worker_queue_));
return std::unique_ptr<internal::AudioSendStream>( return std::unique_ptr<internal::AudioSendStream>(
new internal::AudioSendStream( new internal::AudioSendStream(
time_controller_.GetClock(), stream_config_, audio_state_, time_controller_.GetClock(), stream_config_, audio_state_,
@ -319,8 +313,6 @@ struct ConfigHelper {
} }
} }
MaybeWorkerThread* worker() { return &worker_queue_; }
test::ScopedKeyValueConfig field_trials; test::ScopedKeyValueConfig field_trials;
private: private:
@ -336,9 +328,6 @@ struct ConfigHelper {
::testing::NiceMock<MockRtpRtcpInterface> rtp_rtcp_; ::testing::NiceMock<MockRtpRtcpInterface> rtp_rtcp_;
::testing::NiceMock<MockLimitObserver> limit_observer_; ::testing::NiceMock<MockLimitObserver> limit_observer_;
BitrateAllocator bitrate_allocator_; BitrateAllocator bitrate_allocator_;
// `worker_queue` is defined last to ensure all pending tasks are cancelled
// and deleted before any other members.
MaybeWorkerThread worker_queue_;
std::unique_ptr<AudioEncoder> audio_encoder_; std::unique_ptr<AudioEncoder> audio_encoder_;
}; };
@ -636,8 +625,7 @@ TEST(AudioSendStreamTest, DoesNotPassHigherBitrateThanMaxBitrate) {
update.packet_loss_ratio = 0; update.packet_loss_ratio = 0;
update.round_trip_time = TimeDelta::Millis(50); update.round_trip_time = TimeDelta::Millis(50);
update.bwe_period = TimeDelta::Millis(6000); update.bwe_period = TimeDelta::Millis(6000);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -653,8 +641,7 @@ TEST(AudioSendStreamTest, SSBweTargetInRangeRespected) {
BitrateAllocationUpdate update; BitrateAllocationUpdate update;
update.target_bitrate = update.target_bitrate =
DataRate::BitsPerSec(helper.config().max_bitrate_bps - 5000); DataRate::BitsPerSec(helper.config().max_bitrate_bps - 5000);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -670,8 +657,7 @@ TEST(AudioSendStreamTest, SSBweFieldTrialMinRespected) {
Eq(DataRate::KilobitsPerSec(6))))); Eq(DataRate::KilobitsPerSec(6)))));
BitrateAllocationUpdate update; BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(1); update.target_bitrate = DataRate::KilobitsPerSec(1);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -687,8 +673,7 @@ TEST(AudioSendStreamTest, SSBweFieldTrialMaxRespected) {
Eq(DataRate::KilobitsPerSec(64))))); Eq(DataRate::KilobitsPerSec(64)))));
BitrateAllocationUpdate update; BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(128); update.target_bitrate = DataRate::KilobitsPerSec(128);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -708,8 +693,7 @@ TEST(AudioSendStreamTest, SSBweWithOverhead) {
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate)))); &BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update; BitrateAllocationUpdate update;
update.target_bitrate = bitrate; update.target_bitrate = bitrate;
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -729,8 +713,7 @@ TEST(AudioSendStreamTest, SSBweWithOverheadMinRespected) {
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate)))); &BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update; BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(1); update.target_bitrate = DataRate::KilobitsPerSec(1);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -750,8 +733,7 @@ TEST(AudioSendStreamTest, SSBweWithOverheadMaxRespected) {
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate)))); &BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update; BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(128); update.target_bitrate = DataRate::KilobitsPerSec(128);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -769,8 +751,7 @@ TEST(AudioSendStreamTest, ProbingIntervalOnBitrateUpdated) {
update.packet_loss_ratio = 0; update.packet_loss_ratio = 0;
update.round_trip_time = TimeDelta::Millis(50); update.round_trip_time = TimeDelta::Millis(50);
update.bwe_period = TimeDelta::Millis(5000); update.bwe_period = TimeDelta::Millis(5000);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
} }
} }
@ -872,8 +853,7 @@ TEST(AudioSendStreamTest, AudioOverheadChanged) {
DataRate::BitsPerSec(helper.config().max_bitrate_bps) + DataRate::BitsPerSec(helper.config().max_bitrate_bps) +
kMaxOverheadRate; kMaxOverheadRate;
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation); EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(audio_overhead_per_packet_bytes, EXPECT_EQ(audio_overhead_per_packet_bytes,
send_stream->TestOnlyGetPerPacketOverheadBytes()); send_stream->TestOnlyGetPerPacketOverheadBytes());
@ -881,8 +861,7 @@ TEST(AudioSendStreamTest, AudioOverheadChanged) {
EXPECT_CALL(*helper.rtp_rtcp(), ExpectedPerPacketOverhead) EXPECT_CALL(*helper.rtp_rtcp(), ExpectedPerPacketOverhead)
.WillRepeatedly(Return(audio_overhead_per_packet_bytes + 20)); .WillRepeatedly(Return(audio_overhead_per_packet_bytes + 20));
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation); EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(audio_overhead_per_packet_bytes + 20, EXPECT_EQ(audio_overhead_per_packet_bytes + 20,
send_stream->TestOnlyGetPerPacketOverheadBytes()); send_stream->TestOnlyGetPerPacketOverheadBytes());
@ -906,8 +885,7 @@ TEST(AudioSendStreamTest, OnAudioAndTransportOverheadChanged) {
DataRate::BitsPerSec(helper.config().max_bitrate_bps) + DataRate::BitsPerSec(helper.config().max_bitrate_bps) +
kMaxOverheadRate; kMaxOverheadRate;
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation); EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
helper.worker()->RunSynchronous( send_stream->OnBitrateUpdated(update);
[&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ( EXPECT_EQ(
transport_overhead_per_packet_bytes + audio_overhead_per_packet_bytes, transport_overhead_per_packet_bytes + audio_overhead_per_packet_bytes,