Cleanup usage of the rtc::TaskQueue in audio/

Bug: webrtc:14169
Change-Id: I91f158ce072cb1109ec2d8f9e9c8f6a530aa02cd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/335080
Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41559}
This commit is contained in:
Danil Chapovalov 2024-01-18 11:57:48 +01:00 committed by WebRTC LUCI CQ
parent 192c0628cb
commit b1799b0814
7 changed files with 46 additions and 39 deletions

View file

@ -97,7 +97,6 @@ rtc_library("audio") {
"../rtc_base:refcount", "../rtc_base:refcount",
"../rtc_base:rtc_event", "../rtc_base:rtc_event",
"../rtc_base:rtc_numerics", "../rtc_base:rtc_numerics",
"../rtc_base:rtc_task_queue",
"../rtc_base:safe_conversions", "../rtc_base:safe_conversions",
"../rtc_base:safe_minmax", "../rtc_base:safe_minmax",
"../rtc_base:stringutils", "../rtc_base:stringutils",

View file

@ -28,7 +28,6 @@
#include "rtc_base/experiments/struct_parameters_parser.h" #include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h" #include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
namespace webrtc { namespace webrtc {
class RtcEventLog; class RtcEventLog;

View file

@ -16,8 +16,8 @@
#include "api/frame_transformer_interface.h" #include "api/frame_transformer_interface.h"
#include "api/sequence_checker.h" #include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/system/no_unique_address.h" #include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread.h" #include "rtc_base/thread.h"
namespace webrtc { namespace webrtc {

View file

@ -39,7 +39,7 @@
#include "rtc_base/rate_limiter.h" #include "rtc_base/rate_limiter.h"
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h" #include "rtc_base/system/no_unique_address.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
@ -171,7 +171,7 @@ class ChannelSend : public ChannelSendInterface,
rtc::ArrayView<const uint8_t> payload, rtc::ArrayView<const uint8_t> payload,
int64_t absolute_capture_timestamp_ms, int64_t absolute_capture_timestamp_ms,
rtc::ArrayView<const uint32_t> csrcs) rtc::ArrayView<const uint32_t> csrcs)
RTC_RUN_ON(encoder_queue_); RTC_RUN_ON(encoder_queue_checker_);
void OnReceivedRtt(int64_t rtt_ms); void OnReceivedRtt(int64_t rtt_ms);
@ -182,7 +182,7 @@ class ChannelSend : public ChannelSendInterface,
// specific threads we know about. The goal is to eventually split up // specific threads we know about. The goal is to eventually split up
// voe::Channel into parts with single-threaded semantics, and thereby reduce // voe::Channel into parts with single-threaded semantics, and thereby reduce
// the need for locks. // the need for locks.
SequenceChecker worker_thread_checker_; RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_;
// Methods accessed from audio and video threads are checked for sequential- // Methods accessed from audio and video threads are checked for sequential-
// only access. We don't necessarily own and control these threads, so thread // only access. We don't necessarily own and control these threads, so thread
// checkers cannot be used. E.g. Chromium may transfer "ownership" from one // checkers cannot be used. E.g. Chromium may transfer "ownership" from one
@ -206,16 +206,16 @@ class ChannelSend : public ChannelSendInterface,
absl::optional<int64_t> last_capture_timestamp_ms_ absl::optional<int64_t> last_capture_timestamp_ms_
RTC_GUARDED_BY(audio_thread_race_checker_); RTC_GUARDED_BY(audio_thread_race_checker_);
RmsLevel rms_level_ RTC_GUARDED_BY(encoder_queue_); RmsLevel rms_level_ RTC_GUARDED_BY(encoder_queue_checker_);
bool input_mute_ RTC_GUARDED_BY(volume_settings_mutex_) = false; bool input_mute_ RTC_GUARDED_BY(volume_settings_mutex_) = false;
bool previous_frame_muted_ RTC_GUARDED_BY(encoder_queue_) = false; bool previous_frame_muted_ RTC_GUARDED_BY(encoder_queue_checker_) = false;
PacketRouter* packet_router_ RTC_GUARDED_BY(&worker_thread_checker_) = PacketRouter* packet_router_ RTC_GUARDED_BY(&worker_thread_checker_) =
nullptr; nullptr;
const std::unique_ptr<RtpPacketSenderProxy> rtp_packet_pacer_proxy_; const std::unique_ptr<RtpPacketSenderProxy> rtp_packet_pacer_proxy_;
const std::unique_ptr<RateLimiter> retransmission_rate_limiter_; const std::unique_ptr<RateLimiter> retransmission_rate_limiter_;
SequenceChecker construction_thread_; RTC_NO_UNIQUE_ADDRESS SequenceChecker construction_thread_;
std::atomic<bool> include_audio_level_indication_ = false; std::atomic<bool> include_audio_level_indication_ = false;
std::atomic<bool> encoder_queue_is_active_ = false; std::atomic<bool> encoder_queue_is_active_ = false;
@ -223,7 +223,7 @@ class ChannelSend : public ChannelSendInterface,
// E2EE Audio Frame Encryption // E2EE Audio Frame Encryption
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor_ rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor_
RTC_GUARDED_BY(encoder_queue_); RTC_GUARDED_BY(encoder_queue_checker_);
// E2EE Frame Encryption Options // E2EE Frame Encryption Options
const webrtc::CryptoOptions crypto_options_; const webrtc::CryptoOptions crypto_options_;
@ -231,15 +231,14 @@ class ChannelSend : public ChannelSendInterface,
// receives callbacks with the transformed frames; delegates calls to // receives callbacks with the transformed frames; delegates calls to
// ChannelSend::SendRtpAudio to send the transformed audio. // ChannelSend::SendRtpAudio to send the transformed audio.
rtc::scoped_refptr<ChannelSendFrameTransformerDelegate> rtc::scoped_refptr<ChannelSendFrameTransformerDelegate>
frame_transformer_delegate_ RTC_GUARDED_BY(encoder_queue_); frame_transformer_delegate_ RTC_GUARDED_BY(encoder_queue_checker_);
mutable Mutex rtcp_counter_mutex_; mutable Mutex rtcp_counter_mutex_;
RtcpPacketTypeCounter rtcp_packet_type_counter_ RtcpPacketTypeCounter rtcp_packet_type_counter_
RTC_GUARDED_BY(rtcp_counter_mutex_); RTC_GUARDED_BY(rtcp_counter_mutex_);
// Defined last to ensure that there are no running tasks when the other std::unique_ptr<TaskQueueBase, TaskQueueDeleter> encoder_queue_;
// members are destroyed. RTC_NO_UNIQUE_ADDRESS SequenceChecker encoder_queue_checker_;
rtc::TaskQueue encoder_queue_;
SdpAudioFormat encoder_format_; SdpAudioFormat encoder_format_;
}; };
@ -268,7 +267,7 @@ class RtpPacketSenderProxy : public RtpPacketSender {
} }
private: private:
SequenceChecker thread_checker_; RTC_NO_UNIQUE_ADDRESS SequenceChecker thread_checker_;
Mutex mutex_; Mutex mutex_;
RtpPacketSender* rtp_packet_pacer_ RTC_GUARDED_BY(&mutex_); RtpPacketSender* rtp_packet_pacer_ RTC_GUARDED_BY(&mutex_);
}; };
@ -279,7 +278,7 @@ int32_t ChannelSend::SendData(AudioFrameType frameType,
const uint8_t* payloadData, const uint8_t* payloadData,
size_t payloadSize, size_t payloadSize,
int64_t absolute_capture_timestamp_ms) { int64_t absolute_capture_timestamp_ms) {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
rtc::ArrayView<const uint8_t> payload(payloadData, payloadSize); rtc::ArrayView<const uint8_t> payload(payloadData, payloadSize);
if (frame_transformer_delegate_) { if (frame_transformer_delegate_) {
// Asynchronously transform the payload before sending it. After the payload // Asynchronously transform the payload before sending it. After the payload
@ -406,6 +405,7 @@ ChannelSend::ChannelSend(
encoder_queue_(task_queue_factory->CreateTaskQueue( encoder_queue_(task_queue_factory->CreateTaskQueue(
"AudioEncoder", "AudioEncoder",
TaskQueueFactory::Priority::NORMAL)), TaskQueueFactory::Priority::NORMAL)),
encoder_queue_checker_(encoder_queue_.get()),
encoder_format_("x-unknown", 0, 0) { encoder_format_("x-unknown", 0, 0) {
audio_coding_ = AudioCodingModule::Create(); audio_coding_ = AudioCodingModule::Create();
@ -458,6 +458,10 @@ ChannelSend::~ChannelSend() {
StopSend(); StopSend();
int error = audio_coding_->RegisterTransportCallback(NULL); int error = audio_coding_->RegisterTransportCallback(NULL);
RTC_DCHECK_EQ(0, error); RTC_DCHECK_EQ(0, error);
// Delete the encoder task queue first to ensure that there are no running
// tasks when the other members are destroyed.
encoder_queue_ = nullptr;
} }
void ChannelSend::StartSend() { void ChannelSend::StartSend() {
@ -487,8 +491,8 @@ void ChannelSend::StopSend() {
// Wait until all pending encode tasks are executed and clear any remaining // Wait until all pending encode tasks are executed and clear any remaining
// buffers in the encoder. // buffers in the encoder.
rtc::Event flush; rtc::Event flush;
encoder_queue_.PostTask([this, &flush]() { encoder_queue_->PostTask([this, &flush]() {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
CallEncoder([](AudioEncoder* encoder) { encoder->Reset(); }); CallEncoder([](AudioEncoder* encoder) { encoder->Reset(); });
flush.Set(); flush.Set();
}); });
@ -761,9 +765,9 @@ void ChannelSend::ProcessAndEncodeAudio(
// Profile time between when the audio frame is added to the task queue and // Profile time between when the audio frame is added to the task queue and
// when the task is actually executed. // when the task is actually executed.
audio_frame->UpdateProfileTimeStamp(); audio_frame->UpdateProfileTimeStamp();
encoder_queue_.PostTask( encoder_queue_->PostTask(
[this, audio_frame = std::move(audio_frame)]() mutable { [this, audio_frame = std::move(audio_frame)]() mutable {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
if (!encoder_queue_is_active_.load()) { if (!encoder_queue_is_active_.load()) {
return; return;
} }
@ -825,8 +829,8 @@ int64_t ChannelSend::GetRTT() const {
void ChannelSend::SetFrameEncryptor( void ChannelSend::SetFrameEncryptor(
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) { rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_DCHECK_RUN_ON(&worker_thread_checker_);
encoder_queue_.PostTask([this, frame_encryptor]() mutable { encoder_queue_->PostTask([this, frame_encryptor]() mutable {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
frame_encryptor_ = std::move(frame_encryptor); frame_encryptor_ = std::move(frame_encryptor);
}); });
} }
@ -837,9 +841,9 @@ void ChannelSend::SetEncoderToPacketizerFrameTransformer(
if (!frame_transformer) if (!frame_transformer)
return; return;
encoder_queue_.PostTask( encoder_queue_->PostTask(
[this, frame_transformer = std::move(frame_transformer)]() mutable { [this, frame_transformer = std::move(frame_transformer)]() mutable {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
InitFrameTransformerDelegate(std::move(frame_transformer)); InitFrameTransformerDelegate(std::move(frame_transformer));
}); });
} }
@ -852,7 +856,7 @@ void ChannelSend::OnReceivedRtt(int64_t rtt_ms) {
void ChannelSend::InitFrameTransformerDelegate( void ChannelSend::InitFrameTransformerDelegate(
rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) { rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
RTC_DCHECK(frame_transformer); RTC_DCHECK(frame_transformer);
RTC_DCHECK(!frame_transformer_delegate_); RTC_DCHECK(!frame_transformer_delegate_);
@ -864,7 +868,7 @@ void ChannelSend::InitFrameTransformerDelegate(
rtc::ArrayView<const uint8_t> payload, rtc::ArrayView<const uint8_t> payload,
int64_t absolute_capture_timestamp_ms, int64_t absolute_capture_timestamp_ms,
rtc::ArrayView<const uint32_t> csrcs) { rtc::ArrayView<const uint32_t> csrcs) {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
return SendRtpAudio( return SendRtpAudio(
frameType, payloadType, frameType, payloadType,
rtp_timestamp_with_offset - rtp_rtcp_->StartTimestamp(), payload, rtp_timestamp_with_offset - rtp_rtcp_->StartTimestamp(), payload,
@ -873,7 +877,7 @@ void ChannelSend::InitFrameTransformerDelegate(
frame_transformer_delegate_ = frame_transformer_delegate_ =
rtc::make_ref_counted<ChannelSendFrameTransformerDelegate>( rtc::make_ref_counted<ChannelSendFrameTransformerDelegate>(
std::move(send_audio_callback), std::move(frame_transformer), std::move(send_audio_callback), std::move(frame_transformer),
encoder_queue_.Get()); encoder_queue_.get());
frame_transformer_delegate_->Init(); frame_transformer_delegate_->Init();
} }

View file

@ -94,7 +94,6 @@ rtc_library("audio_egress") {
"../../modules/rtp_rtcp", "../../modules/rtp_rtcp",
"../../modules/rtp_rtcp:rtp_rtcp_format", "../../modules/rtp_rtcp:rtp_rtcp_format",
"../../rtc_base:logging", "../../rtc_base:logging",
"../../rtc_base:rtc_task_queue",
"../../rtc_base:timeutils", "../../rtc_base:timeutils",
"../../rtc_base/synchronization:mutex", "../../rtc_base/synchronization:mutex",
"../../rtc_base/system:no_unique_address", "../../rtc_base/system:no_unique_address",

View file

@ -13,6 +13,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "api/sequence_checker.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
namespace webrtc { namespace webrtc {
@ -25,12 +26,17 @@ AudioEgress::AudioEgress(RtpRtcpInterface* rtp_rtcp,
audio_coding_(AudioCodingModule::Create()), audio_coding_(AudioCodingModule::Create()),
encoder_queue_(task_queue_factory->CreateTaskQueue( encoder_queue_(task_queue_factory->CreateTaskQueue(
"AudioEncoder", "AudioEncoder",
TaskQueueFactory::Priority::NORMAL)) { TaskQueueFactory::Priority::NORMAL)),
encoder_queue_checker_(encoder_queue_.get()) {
audio_coding_->RegisterTransportCallback(this); audio_coding_->RegisterTransportCallback(this);
} }
AudioEgress::~AudioEgress() { AudioEgress::~AudioEgress() {
audio_coding_->RegisterTransportCallback(nullptr); audio_coding_->RegisterTransportCallback(nullptr);
// Delete first to ensure that there are no running tasks when the other
// members are destroyed.
encoder_queue_ = nullptr;
} }
bool AudioEgress::IsSending() const { bool AudioEgress::IsSending() const {
@ -73,9 +79,9 @@ void AudioEgress::SendAudioData(std::unique_ptr<AudioFrame> audio_frame) {
RTC_DCHECK_GT(audio_frame->samples_per_channel_, 0); RTC_DCHECK_GT(audio_frame->samples_per_channel_, 0);
RTC_DCHECK_LE(audio_frame->num_channels_, 8); RTC_DCHECK_LE(audio_frame->num_channels_, 8);
encoder_queue_.PostTask( encoder_queue_->PostTask(
[this, audio_frame = std::move(audio_frame)]() mutable { [this, audio_frame = std::move(audio_frame)]() mutable {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
if (!rtp_rtcp_->SendingMedia()) { if (!rtp_rtcp_->SendingMedia()) {
return; return;
} }
@ -112,7 +118,7 @@ int32_t AudioEgress::SendData(AudioFrameType frame_type,
uint32_t timestamp, uint32_t timestamp,
const uint8_t* payload_data, const uint8_t* payload_data,
size_t payload_size) { size_t payload_size) {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
rtc::ArrayView<const uint8_t> payload(payload_data, payload_size); rtc::ArrayView<const uint8_t> payload(payload_data, payload_size);
@ -175,8 +181,8 @@ bool AudioEgress::SendTelephoneEvent(int dtmf_event, int duration_ms) {
} }
void AudioEgress::SetMute(bool mute) { void AudioEgress::SetMute(bool mute) {
encoder_queue_.PostTask([this, mute] { encoder_queue_->PostTask([this, mute] {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_checker_);
encoder_context_.mute_ = mute; encoder_context_.mute_ = mute;
}); });
} }

View file

@ -16,6 +16,7 @@
#include "api/audio_codecs/audio_format.h" #include "api/audio_codecs/audio_format.h"
#include "api/sequence_checker.h" #include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h" #include "api/task_queue/task_queue_factory.h"
#include "audio/audio_level.h" #include "audio/audio_level.h"
#include "audio/utility/audio_frame_operations.h" #include "audio/utility/audio_frame_operations.h"
@ -25,7 +26,7 @@
#include "modules/rtp_rtcp/source/rtp_rtcp_interface.h" #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h"
#include "modules/rtp_rtcp/source/rtp_sender_audio.h" #include "modules/rtp_rtcp/source/rtp_sender_audio.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h" #include "rtc_base/system/no_unique_address.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
namespace webrtc { namespace webrtc {
@ -146,11 +147,10 @@ class AudioEgress : public AudioSender, public AudioPacketizationCallback {
bool previously_muted_ = false; bool previously_muted_ = false;
}; };
EncoderContext encoder_context_ RTC_GUARDED_BY(encoder_queue_); EncoderContext encoder_context_ RTC_GUARDED_BY(encoder_queue_checker_);
// Defined last to ensure that there are no running tasks when the other std::unique_ptr<TaskQueueBase, TaskQueueDeleter> encoder_queue_;
// members are destroyed. RTC_NO_UNIQUE_ADDRESS SequenceChecker encoder_queue_checker_;
rtc::TaskQueue encoder_queue_;
}; };
} // namespace webrtc } // namespace webrtc