Migrate call/ to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: Ifcdcd343fcba1d850e40813bc08862c42647b0c5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268002
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37477}
This commit is contained in:
Danil Chapovalov 2022-07-06 18:35:01 +02:00 committed by WebRTC LUCI CQ
parent 3b526d4501
commit b7128ed172
7 changed files with 40 additions and 38 deletions

View file

@ -526,6 +526,7 @@ if (rtc_include_tests) {
]
absl_deps = [
"//third_party/abseil-cpp/absl/container:inlined_vector",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",

View file

@ -41,11 +41,11 @@ void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,
ResourceUsageState usage_state) {
if (!task_queue_->IsCurrent()) {
task_queue_->PostTask(ToQueuedTask(
task_queue_->PostTask(
[this_ref = rtc::scoped_refptr<ResourceListenerDelegate>(this),
resource, usage_state] {
this_ref->OnResourceUsageStateMeasured(resource, usage_state);
}));
});
return;
}
RTC_DCHECK_RUN_ON(task_queue_);
@ -142,8 +142,8 @@ void ResourceAdaptationProcessor::RemoveResource(
void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
rtc::scoped_refptr<Resource> resource) {
if (!task_queue_->IsCurrent()) {
task_queue_->PostTask(ToQueuedTask(
[this, resource]() { RemoveLimitationsImposedByResource(resource); }));
task_queue_->PostTask(
[this, resource]() { RemoveLimitationsImposedByResource(resource); });
return;
}
RTC_DCHECK_RUN_ON(task_queue_);

View file

@ -430,8 +430,8 @@ TEST_F(ResourceAdaptationProcessorTest,
SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize);
TaskQueueForTest resource_task_queue("ResourceTaskQueue");
resource_task_queue.PostTask(ToQueuedTask(
[&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); }));
resource_task_queue.PostTask(
[&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); });
EXPECT_EQ_WAIT(1u, restrictions_listener_.restrictions_updated_count(),
kDefaultTimeoutMs);
@ -447,10 +447,10 @@ TEST_F(ResourceAdaptationProcessorTest,
// has passed it on to the processor's task queue.
rtc::Event resource_event;
TaskQueueForTest resource_task_queue("ResourceTaskQueue");
resource_task_queue.PostTask(ToQueuedTask([&]() {
resource_task_queue.PostTask([&]() {
resource_->SetUsageState(ResourceUsageState::kOveruse);
resource_event.Set();
}));
});
EXPECT_TRUE(resource_event.Wait(kDefaultTimeoutMs));
// Now destroy the processor while handling the overuse is in flight.
@ -470,10 +470,10 @@ TEST_F(ResourceAdaptationProcessorTest,
rtc::Event overuse_event;
TaskQueueForTest resource_task_queue("ResourceTaskQueue");
// Queues task for `resource_` overuse while `processor_` is still listening.
resource_task_queue.PostTask(ToQueuedTask([&]() {
resource_task_queue.PostTask([&]() {
resource_->SetUsageState(ResourceUsageState::kOveruse);
overuse_event.Set();
}));
});
EXPECT_TRUE(overuse_event.Wait(kDefaultTimeoutMs));
// Once we know the overuse task is queued, remove `resource_` so that
// `processor_` is not listening to it.

View file

@ -1209,14 +1209,14 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
} else {
// TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to
// post to the worker thread.
worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure)));
worker_thread_->PostTask(SafeTask(task_safety_.flag(), std::move(closure)));
}
}
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
RTC_DCHECK_RUN_ON(network_thread_);
worker_thread_->PostTask(
ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() {
SafeTask(task_safety_.flag(), [this, transport_overhead_per_packet]() {
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) {
@ -1408,7 +1408,7 @@ void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) {
// TODO(bugs.webrtc.org/11993): This should execute directly on the network
// thread.
worker_thread_->PostTask(
ToQueuedTask(task_safety_, [this, packet = std::move(packet)]() {
SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() {
RTC_DCHECK_RUN_ON(worker_thread_);
receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));

View file

@ -113,7 +113,7 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
task_queue_(task_queue) {}
void OnFrame(const VideoFrame& video_frame) override {
task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
task_queue_->PostTask([this]() { CheckStats(); });
}
void CheckStats() {
@ -343,7 +343,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
}
task_queue()->PostTask(
ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
[to_delete = observer.release()]() { delete to_delete; });
}
TEST_F(CallPerfTest, Synchronization_PlaysOutAudioAndVideoWithoutClockDrift) {
@ -680,7 +680,7 @@ void CallPerfTest::TestMinTransmitBitrate(bool pad_to_min_bitrate) {
private:
// TODO(holmer): Run this with a timer instead of once per packet.
Action OnSendRtp(const uint8_t* packet, size_t length) override {
task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
task_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
VideoSendStream::Stats stats = send_stream_->GetStats();
if (!stats.substreams.empty()) {
@ -1146,7 +1146,7 @@ void CallPerfTest::TestEncodeFramerate(VideoEncoderFactory* encoder_factory,
const Timestamp now = clock_->CurrentTime();
if (now - last_getstats_time_ > kMinGetStatsInterval) {
last_getstats_time_ = now;
task_queue_->PostTask(ToQueuedTask([this, now]() {
task_queue_->PostTask([this, now]() {
VideoSendStream::Stats stats = send_stream_->GetStats();
for (const auto& stat : stats.substreams) {
encode_frame_rate_lists_[stat.first].push_back(
@ -1156,7 +1156,7 @@ void CallPerfTest::TestEncodeFramerate(VideoEncoderFactory* encoder_factory,
VerifyStats();
observation_complete_.Set();
}
}));
});
}
return SEND_PACKET;
}

View file

@ -62,20 +62,20 @@ bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() {
return false;
}
task_queue_->PostTask(ToQueuedTask(task_safety_, [this, time_to_next] {
task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] {
RTC_DCHECK_RUN_ON(task_queue_);
int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds();
if (!next_process_ms_ || next_process_time < *next_process_ms_) {
next_process_ms_ = next_process_time;
task_queue_->PostDelayedHighPrecisionTask(
ToQueuedTask(task_safety_,
SafeTask(task_safety_.flag(),
[this] {
RTC_DCHECK_RUN_ON(task_queue_);
if (!Process()) {
next_process_ms_.reset();
}
}),
*time_to_next);
TimeDelta::Millis(*time_to_next));
}
}));
@ -146,8 +146,9 @@ DegradedCall::DegradedCall(
receive_pipe_->SetReceiver(call_->Receiver());
if (receive_configs_.size() > 1) {
call_->network_thread()->PostDelayedTask(
ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }),
receive_configs_[0].duration.ms());
SafeTask(task_safety_.flag(),
[this] { UpdateReceiveNetworkConfig(); }),
receive_configs_[0].duration);
}
}
if (!send_configs_.empty()) {
@ -157,8 +158,8 @@ DegradedCall::DegradedCall(
call_->network_thread(), task_safety_, clock_, std::move(network));
if (send_configs_.size() > 1) {
call_->network_thread()->PostDelayedTask(
ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }),
send_configs_[0].duration.ms());
SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
send_configs_[0].duration);
}
}
}
@ -352,8 +353,8 @@ void DegradedCall::UpdateSendNetworkConfig() {
send_config_index_ = (send_config_index_ + 1) % send_configs_.size();
send_simulated_network_->SetConfig(send_configs_[send_config_index_]);
call_->network_thread()->PostDelayedTask(
ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }),
send_configs_[send_config_index_].duration.ms());
SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
send_configs_[send_config_index_].duration);
}
void DegradedCall::UpdateReceiveNetworkConfig() {
@ -361,7 +362,7 @@ void DegradedCall::UpdateReceiveNetworkConfig() {
receive_simulated_network_->SetConfig(
receive_configs_[receive_config_index_]);
call_->network_thread()->PostDelayedTask(
ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }),
receive_configs_[receive_config_index_].duration.ms());
SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }),
receive_configs_[receive_config_index_].duration);
}
} // namespace webrtc

View file

@ -15,6 +15,7 @@
#include <string>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "call/rtp_transport_controller_send.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/byte_io.h"
@ -203,10 +204,9 @@ class RtpVideoSenderTestFixture {
// default thread as the transport queue, explicit checks for the transport
// queue (not just using a SequenceChecker) aren't possible unless such a
// queue is actually active. So RunOnTransportQueue is a convenience function
// that allow for running a closure on the transport queue, similar to
// that allow for running a `task` on the transport queue, similar to
// SendTask().
template <typename Closure>
void RunOnTransportQueue(Closure&& task) {
void RunOnTransportQueue(absl::AnyInvocable<void() &&> task) {
transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
AdvanceTime(TimeDelta::Millis(0));
}