[WebRTC-SendPacketsOnWorkerThread] Delete MaybeWorkerThread

This helper class is no longer used.


Bug: webrtc:14502
Change-Id: I7940de762ebb9a7c6d04927603f249f5b0061051
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/301161
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39880}
This commit is contained in:
Per K 2023-04-13 10:08:45 +02:00 committed by WebRTC LUCI CQ
parent 1f251dd67e
commit 8b5bf6dd05
6 changed files with 3 additions and 398 deletions

View file

@ -227,7 +227,6 @@ if (rtc_include_tests && !build_with_chromium) {
"pacing:pacing_unittests",
"remote_bitrate_estimator:remote_bitrate_estimator_unittests",
"rtp_rtcp:rtp_rtcp_unittests",
"utility:utility_unittests",
"video_coding:video_coding_unittests",
"video_coding/deprecated:deprecated_unittests",
"video_coding/timing:timing_unittests",

View file

@ -20,14 +20,13 @@
#include "absl/types/optional.h"
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/thread_annotations.h"

View file

@ -9,38 +9,16 @@
import("../../webrtc.gni")
rtc_source_set("utility") {
sources = [
"maybe_worker_thread.cc",
"maybe_worker_thread.h",
]
deps = [
"../../api:field_trials_view",
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:pending_task_safety_flag",
"../../rtc_base:checks",
"../../rtc_base:logging",
"../../rtc_base:macromagic",
"../../rtc_base:rtc_event",
"../../rtc_base:rtc_task_queue",
]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings:strings",
]
if (is_android) {
visibility = [ "*" ]
sources += [
sources = [
"include/helpers_android.h",
"include/jvm_android.h",
"source/helpers_android.cc",
"source/jvm_android.cc",
]
deps += [
deps = [
"../../api:sequence_checker",
"../../rtc_base:checks",
"../../rtc_base:logging",
@ -49,26 +27,3 @@ rtc_source_set("utility") {
]
}
}
if (rtc_include_tests) {
rtc_library("utility_unittests") {
testonly = true
sources = [ "maybe_worker_thread_unittests.cc" ]
deps = [
":utility",
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:default_task_queue_factory",
"../../api/task_queue:pending_task_safety_flag",
"../../api/units:time_delta",
"../../rtc_base:rtc_event",
"../../rtc_base:threading",
"../../test:explicit_key_value_config",
"../../test:field_trial",
"../../test:test_main",
"../../test:test_support",
"../../test/time_controller",
]
}
}

View file

@ -1,99 +0,0 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/utility/maybe_worker_thread.h"
#include <utility>
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials,
absl::string_view task_queue_name,
TaskQueueFactory* factory)
: owned_task_queue_(
!field_trials.IsDisabled("WebRTC-SendPacketsOnWorkerThread")
? nullptr
: factory->CreateTaskQueue(task_queue_name,
rtc::TaskQueue::Priority::NORMAL)),
worker_thread_(TaskQueueBase::Current()) {
RTC_DCHECK(worker_thread_);
RTC_LOG(LS_INFO) << "WebRTC-SendPacketsOnWorkerThread"
<< (owned_task_queue_ ? " Disabled" : " Enabled");
}
MaybeWorkerThread::~MaybeWorkerThread() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (owned_task_queue_) {
// owned_task_queue_ must be a valid pointer when the task queue is
// destroyed since there may be tasks that use this object that run when the
// task queue is deleted.
owned_task_queue_->Delete();
owned_task_queue_.release();
}
}
void MaybeWorkerThread::RunSynchronous(absl::AnyInvocable<void() &&> task) {
if (owned_task_queue_) {
rtc::Event thread_sync_event;
auto closure = [&thread_sync_event, task = std::move(task)]() mutable {
std::move(task)();
thread_sync_event.Set();
};
owned_task_queue_->PostTask(std::move(closure));
thread_sync_event.Wait(rtc::Event::kForever);
} else {
RTC_DCHECK_RUN_ON(&sequence_checker_);
std::move(task)();
}
}
void MaybeWorkerThread::RunOrPost(absl::AnyInvocable<void() &&> task) {
if (owned_task_queue_) {
owned_task_queue_->PostTask(std::move(task));
} else {
RTC_DCHECK_RUN_ON(&sequence_checker_);
std::move(task)();
}
}
TaskQueueBase* MaybeWorkerThread::TaskQueueForDelayedTasks() const {
RTC_DCHECK(IsCurrent());
return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
}
TaskQueueBase* MaybeWorkerThread::TaskQueueForPost() const {
return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
}
bool MaybeWorkerThread::IsCurrent() const {
if (owned_task_queue_) {
return owned_task_queue_->IsCurrent();
}
return worker_thread_->IsCurrent();
}
absl::AnyInvocable<void() &&> MaybeWorkerThread::MaybeSafeTask(
rtc::scoped_refptr<PendingTaskSafetyFlag> flag,
absl::AnyInvocable<void() &&> task) {
if (owned_task_queue_) {
return task;
} else {
return SafeTask(std::move(flag), std::move(task));
}
}
} // namespace webrtc

View file

@ -1,86 +0,0 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_UTILITY_MAYBE_WORKER_THREAD_H_
#define MODULES_UTILITY_MAYBE_WORKER_THREAD_H_
#include <memory>
#include "absl/strings/string_view.h"
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
// Helper class used by experiment to replace usage of the
// RTP worker task queue owned by RtpTransportControllerSend, and the pacer task
// queue owned by TaskQueuePacedSender with the one and only worker thread.
// Tasks will run on the target sequence which is either the worker thread or
// one of these task queues depending on the field trial
// "WebRTC-SendPacketsOnWorkerThread".
// This class is assumed to be created on the worker thread and the worker
// thread is assumed to outlive an instance of this class.
//
// Experiment can be tracked in
// https://bugs.chromium.org/p/webrtc/issues/detail?id=14502
//
// After experiment evaluation, this class should be deleted.
// Calls to RunOrPost and RunSynchronous should be removed and the task should
// be invoked immediately.
// Instead of MaybeSafeTask a SafeTask should be used when posting tasks.
class RTC_LOCKABLE MaybeWorkerThread {
public:
MaybeWorkerThread(const FieldTrialsView& field_trials,
absl::string_view task_queue_name,
TaskQueueFactory* factory);
~MaybeWorkerThread();
// Runs `task` immediately on the worker thread if in experiment, otherwise
// post the task on the task queue.
void RunOrPost(absl::AnyInvocable<void() &&> task);
// Runs `task` immediately on the worker thread if in experiment, otherwise
// post the task on the task queue and use an even to wait for completion.
void RunSynchronous(absl::AnyInvocable<void() &&> task);
// Used for posting delayed or repeated tasks on the worker thread or task
// queue depending on the field trial. DCHECKs that this method is called on
// the target sequence.
TaskQueueBase* TaskQueueForDelayedTasks() const;
// Used when a task has to be posted from one sequence to the target
// sequence. A task should only be posted if a sequence hop is needed.
TaskQueueBase* TaskQueueForPost() const;
// Workaround to use a SafeTask only if the target sequence is the worker
// thread. This is used when a SafeTask can not be used because the object
// that posted the task is not destroyed on the target sequence. Instead, the
// caller has to guarantee that this MaybeWorkerThread is destroyed first
// since that guarantee that the posted task is deleted or run before the
// owning class.
absl::AnyInvocable<void() &&> MaybeSafeTask(
rtc::scoped_refptr<PendingTaskSafetyFlag> flag,
absl::AnyInvocable<void() &&> task);
// To implement macro RTC_DCHECK_RUN_ON.
// Implementation delegate to the actual used sequence.
bool IsCurrent() const;
private:
SequenceChecker sequence_checker_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> owned_task_queue_;
TaskQueueBase* const worker_thread_;
};
} // namespace webrtc
#endif // MODULES_UTILITY_MAYBE_WORKER_THREAD_H_

View file

@ -1,163 +0,0 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <memory>
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/event.h"
#include "test/explicit_key_value_config.h"
#include "test/gtest.h"
#include "test/time_controller/real_time_controller.h"
namespace webrtc {
namespace {
constexpr char kFieldTrialEnabledString[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/";
constexpr char kFieldTrialDisabledString[] =
"WebRTC-SendPacketsOnWorkerThread/Disabled/";
TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
bool run = false;
m.RunOrPost([&] {
EXPECT_TRUE(checker.IsCurrent());
run = true;
});
EXPECT_TRUE(run);
}
TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
rtc::Event event;
m.RunOrPost([&] {
EXPECT_FALSE(checker.IsCurrent());
event.Set();
});
EXPECT_TRUE(event.Wait(TimeDelta::Seconds(10)));
}
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
bool run = false;
m.RunSynchronous([&] {
EXPECT_TRUE(checker.IsCurrent());
run = true;
});
EXPECT_TRUE(run);
}
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
bool run = false;
m.RunSynchronous([&] {
EXPECT_FALSE(checker.IsCurrent());
run = true;
});
EXPECT_TRUE(run);
}
TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does not have more references after a
// call.
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
PendingTaskSafetyFlag::Create();
auto closure = m.MaybeSafeTask(flag, [] {});
EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kDroppedLastRef);
flag.release();
}
TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does have one more references after a
// call.
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
PendingTaskSafetyFlag::Create();
auto closure = m.MaybeSafeTask(flag, [] {});
EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kOtherRefsRemained);
flag.release();
}
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
EXPECT_FALSE(m.IsCurrent());
m.RunSynchronous([&] { EXPECT_TRUE(m.IsCurrent()); });
}
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
EXPECT_TRUE(m.IsCurrent());
auto tq = controller.GetTaskQueueFactory()->CreateTaskQueue(
"tq", TaskQueueFactory::Priority::NORMAL);
rtc::Event event;
tq->PostTask([&] {
EXPECT_FALSE(m.IsCurrent());
event.Set();
});
ASSERT_TRUE(event.Wait(TimeDelta::Seconds(10)));
}
TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorPerDefault) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialDisabledString);
RealTimeController controller;
{
MaybeWorkerThread m(field_trial, "test_tq",
controller.GetTaskQueueFactory());
m.RunOrPost([&] { EXPECT_TRUE(m.IsCurrent()); });
}
}
TEST(MaybeWorkerThreadTest, IsCurrentCanBeCalledInDestructorInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialEnabledString);
RealTimeController controller;
{
MaybeWorkerThread m(field_trial, "test_tq",
controller.GetTaskQueueFactory());
m.RunOrPost([&] { EXPECT_TRUE(m.IsCurrent()); });
}
}
} // namespace
} // namespace webrtc