From 82da9324bc41d9033fbc7f9cf7a939df569b3096 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Fri, 16 Dec 2022 15:50:24 +0100 Subject: [PATCH] Ensure task queues delete closures in task queue context. Bug: webrtc:14449 Change-Id: I90d09d35398c1f8817701662f51cbc6a684a2fe0 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/275773 Commit-Queue: Markus Handell Reviewed-by: Tomas Gunnarsson Reviewed-by: Harald Alvestrand Reviewed-by: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#38917} --- BUILD.gn | 1 + api/task_queue/BUILD.gn | 1 + api/task_queue/task_queue_base.h | 14 +++++-- api/task_queue/task_queue_test.cc | 56 +++++++++++++++++++++++++- rtc_base/BUILD.gn | 15 +++++++ rtc_base/task_queue_gcd.cc | 7 ++-- rtc_base/task_queue_libevent.cc | 12 +++++- rtc_base/task_queue_stdlib.cc | 13 ++++++ rtc_base/task_queue_stdlib_unittest.cc | 29 +++++++++++++ rtc_base/task_queue_win.cc | 12 ++++++ rtc_base/thread.cc | 2 + 11 files changed, 152 insertions(+), 10 deletions(-) create mode 100644 rtc_base/task_queue_stdlib_unittest.cc diff --git a/BUILD.gn b/BUILD.gn index 259019268b..da094e1591 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -590,6 +590,7 @@ if (rtc_include_tests && !build_with_chromium) { "rtc_base:rtc_operations_chain_unittests", "rtc_base:rtc_task_queue_unittests", "rtc_base:sigslot_unittest", + "rtc_base:task_queue_stdlib_unittest", "rtc_base:untyped_function_unittest", "rtc_base:weak_ptr_unittests", "rtc_base/experiments:experiments_unittests", diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index dc69686f64..69393b80ff 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -65,6 +65,7 @@ rtc_library("task_queue_test") { ":default_task_queue_factory", ":task_queue", "../../api:field_trials_view", + "../../api:make_ref_counted", "../../api/units:time_delta", "../../rtc_base:refcount", "../../rtc_base:rtc_event", diff --git a/api/task_queue/task_queue_base.h b/api/task_queue/task_queue_base.h index a2cff9c738..f78600de60 100644 --- a/api/task_queue/task_queue_base.h +++ b/api/task_queue/task_queue_base.h @@ -51,10 +51,16 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueueBase { // Schedules a `task` to execute. Tasks are executed in FIFO order. // When a TaskQueue is deleted, pending tasks will not be executed but they - // will be deleted. The deletion of tasks may happen synchronously on the - // TaskQueue or it may happen asynchronously after TaskQueue is deleted. - // This may vary from one implementation to the next so assumptions about - // lifetimes of pending tasks should not be made. + // will be deleted. + // + // As long as tasks are not posted from task destruction, posted tasks are + // guaranteed to be destroyed with Current() pointing to the task queue they + // were posted to, whether they're executed or not. That means SequenceChecker + // works during task destruction, a fact that can be used to guarantee + // thread-compatible object deletion happening on a particular task queue + // which can simplify class design. + // Note that this guarantee does not apply to delayed tasks. + // // May be called on any thread or task queue, including this task queue. virtual void PostTask(absl::AnyInvocable task) = 0; diff --git a/api/task_queue/task_queue_test.cc b/api/task_queue/task_queue_test.cc index 0f6b1d013f..7849f4273d 100644 --- a/api/task_queue/task_queue_test.cc +++ b/api/task_queue/task_queue_test.cc @@ -13,7 +13,7 @@ #include "absl/cleanup/cleanup.h" #include "absl/strings/string_view.h" -#include "api/task_queue/default_task_queue_factory.h" +#include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "rtc_base/event.h" #include "rtc_base/ref_counter.h" @@ -22,6 +22,13 @@ namespace webrtc { namespace { +// Avoids a dependency to system_wrappers. +void SleepFor(TimeDelta duration) { + rtc::ScopedAllowBaseSyncPrimitivesForTesting allow; + rtc::Event event; + event.Wait(duration); +} + std::unique_ptr CreateTaskQueue( const std::unique_ptr& factory, absl::string_view task_queue_name, @@ -147,6 +154,53 @@ TEST_P(TaskQueueTest, PostDelayedAfterDestruct) { EXPECT_FALSE(run.Wait(TimeDelta::Zero())); // and should not run. } +TEST_P(TaskQueueTest, PostDelayedHighPrecisionAfterDestruct) { + std::unique_ptr factory = GetParam()(nullptr); + rtc::Event run; + rtc::Event deleted; + auto queue = + CreateTaskQueue(factory, "PostDelayedHighPrecisionAfterDestruct"); + absl::Cleanup cleanup = [&deleted] { deleted.Set(); }; + queue->PostDelayedHighPrecisionTask( + [&run, cleanup = std::move(cleanup)] { run.Set(); }, + TimeDelta::Millis(100)); + // Destroy the queue. + queue = nullptr; + // Task might outlive the TaskQueue, but still should be deleted. + EXPECT_TRUE(deleted.Wait(TimeDelta::Seconds(1))); + EXPECT_FALSE(run.Wait(TimeDelta::Zero())); // and should not run. +} + +TEST_P(TaskQueueTest, PostedUnexecutedClosureDestroyedOnTaskQueue) { + std::unique_ptr factory = GetParam()(nullptr); + auto queue = + CreateTaskQueue(factory, "PostedUnexecutedClosureDestroyedOnTaskQueue"); + TaskQueueBase* queue_ptr = queue.get(); + queue->PostTask([] { SleepFor(TimeDelta::Millis(100)); }); + // Give the task queue a chance to start executing the first lambda. + SleepFor(TimeDelta::Millis(10)); + // Then ensure the next lambda (which is likely not executing yet) is + // destroyed in the task queue context when the queue is deleted. + auto cleanup = absl::Cleanup( + [queue_ptr] { EXPECT_EQ(queue_ptr, TaskQueueBase::Current()); }); + queue->PostTask([cleanup = std::move(cleanup)] {}); + queue = nullptr; +} + +TEST_P(TaskQueueTest, PostedExecutedClosureDestroyedOnTaskQueue) { + std::unique_ptr factory = GetParam()(nullptr); + auto queue = + CreateTaskQueue(factory, "PostedExecutedClosureDestroyedOnTaskQueue"); + TaskQueueBase* queue_ptr = queue.get(); + // Ensure an executed lambda is destroyed on the task queue. + rtc::Event finished; + queue->PostTask([cleanup = absl::Cleanup([queue_ptr, &finished] { + EXPECT_EQ(queue_ptr, TaskQueueBase::Current()); + finished.Set(); + })] {}); + finished.Wait(rtc::Event::kForever); +} + TEST_P(TaskQueueTest, PostAndReuse) { std::unique_ptr factory = GetParam()(nullptr); rtc::Event event; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 8ac6e4102c..5d4cdb33d4 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -735,6 +735,21 @@ rtc_library("rtc_task_queue_stdlib") { ] } +if (rtc_include_tests) { + rtc_library("task_queue_stdlib_unittest") { + testonly = true + + sources = [ "task_queue_stdlib_unittest.cc" ] + deps = [ + ":gunit_helpers", + ":rtc_task_queue_stdlib", + "../api/task_queue:task_queue_test", + "../test:test_main", + "../test:test_support", + ] + } +} + rtc_library("weak_ptr") { sources = [ "weak_ptr.cc", diff --git a/rtc_base/task_queue_gcd.cc b/rtc_base/task_queue_gcd.cc index e498ba3017..6612b45f49 100644 --- a/rtc_base/task_queue_gcd.cc +++ b/rtc_base/task_queue_gcd.cc @@ -121,11 +121,10 @@ void TaskQueueGcd::PostDelayedHighPrecisionTask( // static void TaskQueueGcd::RunTask(void* task_context) { std::unique_ptr tc(static_cast(task_context)); - if (!tc->queue->is_active_) - return; - CurrentTaskQueueSetter set_current(tc->queue); - std::move(tc->task)(); + if (tc->queue->is_active_) { + std::move(tc->task)(); + } // Delete the task before CurrentTaskQueueSetter clears state that this code // is running on the task queue. tc = nullptr; diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index f50e5a63df..63633b5198 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -166,10 +166,20 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, CurrentTaskQueueSetter set_current(this); while (is_active_) event_base_loop(event_base_, 0); - } + // Ensure remaining deleted tasks are destroyed with Current() set up + // to this task queue. + absl::InlinedVector, 4> pending; + MutexLock lock(&pending_lock_); + pending_.swap(pending); + } for (TimerEvent* timer : pending_timers_) delete timer; + +#if RTC_DCHECK_IS_ON + MutexLock lock(&pending_lock_); + RTC_DCHECK(pending_.empty()); +#endif }, queue_name, rtc::ThreadAttributes().SetPriority(priority)); } diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc index 3f39ad13b5..300873ac15 100644 --- a/rtc_base/task_queue_stdlib.cc +++ b/rtc_base/task_queue_stdlib.cc @@ -249,6 +249,19 @@ void TaskQueueStdlib::ProcessTasks() { flag_notify_.Wait(task.sleep_time); } + + // Ensure remaining deleted tasks are destroyed with Current() set up to this + // task queue. + std::queue>> pending_queue; + { + MutexLock lock(&pending_lock_); + pending_queue_.swap(pending_queue); + } + pending_queue = {}; +#if RTC_DCHECK_IS_ON + MutexLock lock(&pending_lock_); + RTC_DCHECK(pending_queue_.empty()); +#endif } void TaskQueueStdlib::NotifyWake() { diff --git a/rtc_base/task_queue_stdlib_unittest.cc b/rtc_base/task_queue_stdlib_unittest.cc new file mode 100644 index 0000000000..0654e9719c --- /dev/null +++ b/rtc_base/task_queue_stdlib_unittest.cc @@ -0,0 +1,29 @@ +/* + * Copyright 2022 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_queue_stdlib.h" + +#include "api/task_queue/task_queue_test.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +std::unique_ptr CreateTaskQueueFactory( + const webrtc::FieldTrialsView*) { + return CreateTaskQueueStdlibFactory(); +} + +INSTANTIATE_TEST_SUITE_P(TaskQueueStdlib, + TaskQueueTest, + ::testing::Values(CreateTaskQueueFactory)); + +} // namespace +} // namespace webrtc diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc index bb8e522c71..9ea7fc60ae 100644 --- a/rtc_base/task_queue_win.cc +++ b/rtc_base/task_queue_win.cc @@ -290,6 +290,18 @@ void TaskQueueWin::RunThreadMain() { RunPendingTasks(); } } + // Ensure remaining deleted tasks are destroyed with Current() set up to this + // task queue. + std::queue> pending; + { + MutexLock lock(&pending_lock_); + pending_.swap(pending); + } + pending = {}; +#if RTC_DCHECK_IS_ON + MutexLock lock(&pending_lock_); + RTC_DCHECK(pending_.empty()); +#endif } bool TaskQueueWin::ProcessQueuedMessages() { diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 18a79bc518..8c16949f5a 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -11,6 +11,7 @@ #include "rtc_base/thread.h" #include "absl/strings/string_view.h" +#include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "rtc_base/socket_server.h" @@ -391,6 +392,7 @@ void Thread::DoDestroy() { } ThreadManager::Remove(this); // Clear. + CurrentTaskQueueSetter set_current(this); messages_ = {}; delayed_messages_ = {}; }