Ensure task queues delete closures in task queue context.

Bug: webrtc:14449
Change-Id: I90d09d35398c1f8817701662f51cbc6a684a2fe0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/275773
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38917}
This commit is contained in:
Markus Handell 2022-12-16 15:50:24 +01:00 committed by WebRTC LUCI CQ
parent 2bd52fab82
commit 82da9324bc
11 changed files with 152 additions and 10 deletions

View file

@ -590,6 +590,7 @@ if (rtc_include_tests && !build_with_chromium) {
"rtc_base:rtc_operations_chain_unittests",
"rtc_base:rtc_task_queue_unittests",
"rtc_base:sigslot_unittest",
"rtc_base:task_queue_stdlib_unittest",
"rtc_base:untyped_function_unittest",
"rtc_base:weak_ptr_unittests",
"rtc_base/experiments:experiments_unittests",

View file

@ -65,6 +65,7 @@ rtc_library("task_queue_test") {
":default_task_queue_factory",
":task_queue",
"../../api:field_trials_view",
"../../api:make_ref_counted",
"../../api/units:time_delta",
"../../rtc_base:refcount",
"../../rtc_base:rtc_event",

View file

@ -51,10 +51,16 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
// Schedules a `task` to execute. Tasks are executed in FIFO order.
// When a TaskQueue is deleted, pending tasks will not be executed but they
// will be deleted. The deletion of tasks may happen synchronously on the
// TaskQueue or it may happen asynchronously after TaskQueue is deleted.
// This may vary from one implementation to the next so assumptions about
// lifetimes of pending tasks should not be made.
// will be deleted.
//
// As long as tasks are not posted from task destruction, posted tasks are
// guaranteed to be destroyed with Current() pointing to the task queue they
// were posted to, whether they're executed or not. That means SequenceChecker
// works during task destruction, a fact that can be used to guarantee
// thread-compatible object deletion happening on a particular task queue
// which can simplify class design.
// Note that this guarantee does not apply to delayed tasks.
//
// May be called on any thread or task queue, including this task queue.
virtual void PostTask(absl::AnyInvocable<void() &&> task) = 0;

View file

@ -13,7 +13,7 @@
#include "absl/cleanup/cleanup.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/event.h"
#include "rtc_base/ref_counter.h"
@ -22,6 +22,13 @@
namespace webrtc {
namespace {
// Avoids a dependency to system_wrappers.
void SleepFor(TimeDelta duration) {
rtc::ScopedAllowBaseSyncPrimitivesForTesting allow;
rtc::Event event;
event.Wait(duration);
}
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
const std::unique_ptr<webrtc::TaskQueueFactory>& factory,
absl::string_view task_queue_name,
@ -147,6 +154,53 @@ TEST_P(TaskQueueTest, PostDelayedAfterDestruct) {
EXPECT_FALSE(run.Wait(TimeDelta::Zero())); // and should not run.
}
TEST_P(TaskQueueTest, PostDelayedHighPrecisionAfterDestruct) {
std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()(nullptr);
rtc::Event run;
rtc::Event deleted;
auto queue =
CreateTaskQueue(factory, "PostDelayedHighPrecisionAfterDestruct");
absl::Cleanup cleanup = [&deleted] { deleted.Set(); };
queue->PostDelayedHighPrecisionTask(
[&run, cleanup = std::move(cleanup)] { run.Set(); },
TimeDelta::Millis(100));
// Destroy the queue.
queue = nullptr;
// Task might outlive the TaskQueue, but still should be deleted.
EXPECT_TRUE(deleted.Wait(TimeDelta::Seconds(1)));
EXPECT_FALSE(run.Wait(TimeDelta::Zero())); // and should not run.
}
TEST_P(TaskQueueTest, PostedUnexecutedClosureDestroyedOnTaskQueue) {
std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()(nullptr);
auto queue =
CreateTaskQueue(factory, "PostedUnexecutedClosureDestroyedOnTaskQueue");
TaskQueueBase* queue_ptr = queue.get();
queue->PostTask([] { SleepFor(TimeDelta::Millis(100)); });
// Give the task queue a chance to start executing the first lambda.
SleepFor(TimeDelta::Millis(10));
// Then ensure the next lambda (which is likely not executing yet) is
// destroyed in the task queue context when the queue is deleted.
auto cleanup = absl::Cleanup(
[queue_ptr] { EXPECT_EQ(queue_ptr, TaskQueueBase::Current()); });
queue->PostTask([cleanup = std::move(cleanup)] {});
queue = nullptr;
}
TEST_P(TaskQueueTest, PostedExecutedClosureDestroyedOnTaskQueue) {
std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()(nullptr);
auto queue =
CreateTaskQueue(factory, "PostedExecutedClosureDestroyedOnTaskQueue");
TaskQueueBase* queue_ptr = queue.get();
// Ensure an executed lambda is destroyed on the task queue.
rtc::Event finished;
queue->PostTask([cleanup = absl::Cleanup([queue_ptr, &finished] {
EXPECT_EQ(queue_ptr, TaskQueueBase::Current());
finished.Set();
})] {});
finished.Wait(rtc::Event::kForever);
}
TEST_P(TaskQueueTest, PostAndReuse) {
std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()(nullptr);
rtc::Event event;

View file

@ -735,6 +735,21 @@ rtc_library("rtc_task_queue_stdlib") {
]
}
if (rtc_include_tests) {
rtc_library("task_queue_stdlib_unittest") {
testonly = true
sources = [ "task_queue_stdlib_unittest.cc" ]
deps = [
":gunit_helpers",
":rtc_task_queue_stdlib",
"../api/task_queue:task_queue_test",
"../test:test_main",
"../test:test_support",
]
}
}
rtc_library("weak_ptr") {
sources = [
"weak_ptr.cc",

View file

@ -121,11 +121,10 @@ void TaskQueueGcd::PostDelayedHighPrecisionTask(
// static
void TaskQueueGcd::RunTask(void* task_context) {
std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(task_context));
if (!tc->queue->is_active_)
return;
CurrentTaskQueueSetter set_current(tc->queue);
std::move(tc->task)();
if (tc->queue->is_active_) {
std::move(tc->task)();
}
// Delete the task before CurrentTaskQueueSetter clears state that this code
// is running on the task queue.
tc = nullptr;

View file

@ -166,10 +166,20 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
CurrentTaskQueueSetter set_current(this);
while (is_active_)
event_base_loop(event_base_, 0);
}
// Ensure remaining deleted tasks are destroyed with Current() set up
// to this task queue.
absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending;
MutexLock lock(&pending_lock_);
pending_.swap(pending);
}
for (TimerEvent* timer : pending_timers_)
delete timer;
#if RTC_DCHECK_IS_ON
MutexLock lock(&pending_lock_);
RTC_DCHECK(pending_.empty());
#endif
},
queue_name, rtc::ThreadAttributes().SetPriority(priority));
}

View file

@ -249,6 +249,19 @@ void TaskQueueStdlib::ProcessTasks() {
flag_notify_.Wait(task.sleep_time);
}
// Ensure remaining deleted tasks are destroyed with Current() set up to this
// task queue.
std::queue<std::pair<OrderId, absl::AnyInvocable<void() &&>>> pending_queue;
{
MutexLock lock(&pending_lock_);
pending_queue_.swap(pending_queue);
}
pending_queue = {};
#if RTC_DCHECK_IS_ON
MutexLock lock(&pending_lock_);
RTC_DCHECK(pending_queue_.empty());
#endif
}
void TaskQueueStdlib::NotifyWake() {

View file

@ -0,0 +1,29 @@
/*
* Copyright 2022 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/task_queue_stdlib.h"
#include "api/task_queue/task_queue_test.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
std::unique_ptr<TaskQueueFactory> CreateTaskQueueFactory(
const webrtc::FieldTrialsView*) {
return CreateTaskQueueStdlibFactory();
}
INSTANTIATE_TEST_SUITE_P(TaskQueueStdlib,
TaskQueueTest,
::testing::Values(CreateTaskQueueFactory));
} // namespace
} // namespace webrtc

View file

@ -290,6 +290,18 @@ void TaskQueueWin::RunThreadMain() {
RunPendingTasks();
}
}
// Ensure remaining deleted tasks are destroyed with Current() set up to this
// task queue.
std::queue<absl::AnyInvocable<void() &&>> pending;
{
MutexLock lock(&pending_lock_);
pending_.swap(pending);
}
pending = {};
#if RTC_DCHECK_IS_ON
MutexLock lock(&pending_lock_);
RTC_DCHECK(pending_.empty());
#endif
}
bool TaskQueueWin::ProcessQueuedMessages() {

View file

@ -11,6 +11,7 @@
#include "rtc_base/thread.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/socket_server.h"
@ -391,6 +392,7 @@ void Thread::DoDestroy() {
}
ThreadManager::Remove(this);
// Clear.
CurrentTaskQueueSetter set_current(this);
messages_ = {};
delayed_messages_ = {};
}