Delete ProcessThread and related Module interface

Bug: webrtc:7219
Change-Id: Id71430a24b21e591494557cf54419d2bc8b3f8c6
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267400
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Auto-Submit: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37416}
This commit is contained in:
Danil Chapovalov 2022-07-01 13:38:59 +02:00 committed by WebRTC LUCI CQ
parent a5f267d5ac
commit 0fd2ed516b
18 changed files with 17 additions and 995 deletions

View file

@ -186,7 +186,6 @@ if (rtc_include_tests) {
"../modules/pacing",
"../modules/rtp_rtcp:mock_rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../modules/utility",
"../rtc_base:checks",
"../rtc_base:macromagic",
"../rtc_base:refcount",

View file

@ -23,7 +23,6 @@ rtc_library("voip_core") {
"../../modules/audio_device:audio_device_api",
"../../modules/audio_mixer:audio_mixer_impl",
"../../modules/audio_processing:api",
"../../modules/utility:utility",
"../../rtc_base:criticalsection",
"../../rtc_base:logging",
"../../rtc_base/synchronization:mutex",
@ -46,7 +45,6 @@ rtc_library("audio_channel") {
"../../modules/audio_device:audio_device_api",
"../../modules/rtp_rtcp",
"../../modules/rtp_rtcp:rtp_rtcp_format",
"../../modules/utility",
"../../rtc_base:criticalsection",
"../../rtc_base:location",
"../../rtc_base:logging",
@ -71,7 +69,6 @@ rtc_library("audio_ingress") {
"../../modules/audio_coding",
"../../modules/rtp_rtcp",
"../../modules/rtp_rtcp:rtp_rtcp_format",
"../../modules/utility",
"../../rtc_base:criticalsection",
"../../rtc_base:logging",
"../../rtc_base:safe_minmax",

View file

@ -30,7 +30,6 @@ if (rtc_include_tests) {
"../../../api/task_queue:default_task_queue_factory",
"../../../modules/audio_device:mock_audio_device",
"../../../modules/audio_processing:mocks",
"../../../modules/utility:mock_process_thread",
"../../../test:audio_codec_mocks",
"../../../test:mock_transport",
"../../../test:run_loop",
@ -53,7 +52,6 @@ if (rtc_include_tests) {
"../../../modules/audio_mixer:audio_mixer_test_utils",
"../../../modules/rtp_rtcp:rtp_rtcp",
"../../../modules/rtp_rtcp:rtp_rtcp_format",
"../../../modules/utility",
"../../../rtc_base:logging",
"../../../test:mock_transport",
"../../../test:test_support",

View file

@ -67,7 +67,6 @@ if (is_android) {
"//api/task_queue:default_task_queue_factory",
"//api/voip:voip_api",
"//api/voip:voip_engine_factory",
"//modules/utility:utility",
"//rtc_base",
"//rtc_base/third_party/sigslot:sigslot",
"//sdk/android:native_api_audio_device_module",

View file

@ -36,10 +36,7 @@ rtc_source_set("module_api_public") {
rtc_source_set("module_api") {
visibility = [ "*" ]
sources = [
"include/module.h",
"include/module_common_types.h",
]
sources = [ "include/module_common_types.h" ]
}
rtc_source_set("module_fec_api") {
@ -221,7 +218,6 @@ if (rtc_include_tests && !build_with_chromium) {
"pacing:pacing_unittests",
"remote_bitrate_estimator:remote_bitrate_estimator_unittests",
"rtp_rtcp:rtp_rtcp_unittests",
"utility:utility_unittests",
"video_coding:video_coding_unittests",
"video_coding/timing:timing_unittests",
"video_processing:video_processing_unittests",

View file

@ -433,7 +433,6 @@ if (rtc_include_tests && !build_with_chromium) {
"../../system_wrappers",
"../../test:fileutils",
"../../test:test_support",
"../utility",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
if (is_linux || is_chromeos || is_mac || is_win) {
@ -460,6 +459,7 @@ if (rtc_include_tests && !build_with_chromium) {
"../../sdk/android:libjingle_peerconnection_java",
"../../sdk/android:native_api_jni",
"../../sdk/android:native_test_jni_onload",
"../utility",
]
}
if (!rtc_include_internal_audio_device) {

View file

@ -30,7 +30,6 @@ rtc_library("goog_cc") {
":probe_controller",
":pushback_controller",
":send_side_bwe",
"../..:module_api",
"../../../api:field_trials_view",
"../../../api:network_state_predictor_api",
"../../../api/rtc_event_log",

View file

@ -1,63 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_INCLUDE_MODULE_H_
#define MODULES_INCLUDE_MODULE_H_
#include <stdint.h>
namespace webrtc {
class ProcessThread;
class Module {
public:
// Returns the number of milliseconds until the module wants a worker
// thread to call Process.
// This method is called on the same worker thread as Process will
// be called on.
// TODO(tommi): Almost all implementations of this function, need to know
// the current tick count. Consider passing it as an argument. It could
// also improve the accuracy of when the next callback occurs since the
// thread that calls Process() will also have it's tick count reference
// which might not match with what the implementations use.
virtual int64_t TimeUntilNextProcess() = 0;
// Process any pending tasks such as timeouts.
// Called on a worker thread.
virtual void Process() = 0;
// This method is called when the module is attached to a *running* process
// thread or detached from one. In the case of detaching, `process_thread`
// will be nullptr.
//
// This method will be called in the following cases:
//
// * Non-null process_thread:
// * ProcessThread::RegisterModule() is called while the thread is running.
// * ProcessThread::Start() is called and RegisterModule has previously
// been called. The thread will be started immediately after notifying
// all modules.
//
// * Null process_thread:
// * ProcessThread::DeRegisterModule() is called while the thread is
// running.
// * ProcessThread::Stop() was called and the thread has been stopped.
//
// NOTE: This method is not called from the worker thread itself, but from
// the thread that registers/deregisters the module or calls Start/Stop.
virtual void ProcessThreadAttached(ProcessThread* process_thread) {}
protected:
virtual ~Module() {}
};
} // namespace webrtc
#endif // MODULES_INCLUDE_MODULE_H_

View file

@ -32,7 +32,6 @@ rtc_library("pacing") {
deps = [
":interval_budget",
"..:module_api",
"../../api:field_trials_view",
"../../api:field_trials_view",
"../../api:function_view",
@ -63,7 +62,6 @@ rtc_library("pacing") {
"../../system_wrappers:metrics",
"../rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format",
"../utility",
]
absl_deps = [
"//third_party/abseil-cpp/absl/memory",
@ -106,7 +104,6 @@ if (rtc_include_tests) {
"../../api/units:data_rate",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../modules/utility:mock_process_thread",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_tests_utils",
"../../rtc_base/experiments:alr_experiment",

View file

@ -23,7 +23,6 @@
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/mock/mock_process_thread.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/scoped_key_value_config.h"

View file

@ -8,68 +8,29 @@
import("../../webrtc.gni")
rtc_library("utility") {
visibility = [ "*" ]
sources = [
"include/process_thread.h",
"source/process_thread_impl.cc",
"source/process_thread_impl.h",
]
if (is_android) {
rtc_library("utility") {
visibility = [ "*" ]
if (is_android) {
sources += [
sources = [
"include/helpers_android.h",
"include/jvm_android.h",
"source/helpers_android.cc",
"source/jvm_android.cc",
]
}
if (is_ios) {
frameworks = [ "AVFoundation.framework" ]
}
deps = [
"..:module_api",
"../../api:sequence_checker",
"../../api/task_queue",
"../../common_audio",
"../../rtc_base:checks",
"../../rtc_base:event_tracer",
"../../rtc_base:location",
"../../rtc_base:logging",
"../../rtc_base:platform_thread",
"../../rtc_base:rtc_event",
"../../rtc_base:timeutils",
"../../rtc_base/system:arch",
"../../system_wrappers",
]
}
rtc_library("mock_process_thread") {
testonly = true
visibility = [ "*" ]
sources = [ "include/mock/mock_process_thread.h" ]
deps = [
":utility",
"../../rtc_base:location",
"../../test:test_support",
]
}
if (rtc_include_tests) {
rtc_library("utility_unittests") {
testonly = true
sources = [ "source/process_thread_impl_unittest.cc" ]
deps = [
":utility",
"..:module_api",
"../../api/task_queue",
"../../api/task_queue:task_queue_test",
"../../rtc_base:location",
"../../rtc_base:timeutils",
"../../test:test_support",
"../../api:sequence_checker",
"../../rtc_base:checks",
"../../rtc_base:logging",
"../../rtc_base:platform_thread",
"../../rtc_base/system:arch",
]
}
} else {
# Add an empty source set so that dependent targets may include utility
# unconditionally.
rtc_source_set("utility") {
visibility = [ "*" ]
}
}

View file

@ -1,41 +0,0 @@
/*
* Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_
#define MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_
#include <memory>
#include "modules/utility/include/process_thread.h"
#include "rtc_base/location.h"
#include "test/gmock.h"
namespace webrtc {
class MockProcessThread : public ProcessThread {
public:
MOCK_METHOD(void, Start, (), (override));
MOCK_METHOD(void, Stop, (), (override));
MOCK_METHOD(void, Delete, (), (override));
MOCK_METHOD(void, WakeUp, (Module*), (override));
MOCK_METHOD(void, PostTask, (std::unique_ptr<QueuedTask>), (override));
MOCK_METHOD(void,
PostDelayedTask,
(std::unique_ptr<QueuedTask>, uint32_t),
(override));
MOCK_METHOD(void,
RegisterModule,
(Module*, const rtc::Location&),
(override));
MOCK_METHOD(void, DeRegisterModule, (Module*), (override));
};
} // namespace webrtc
#endif // MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_

View file

@ -1,60 +0,0 @@
/*
* Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_
#define MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_
#include <memory>
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
namespace rtc {
class Location;
}
namespace webrtc {
class Module;
// TODO(tommi): ProcessThread probably doesn't need to be a virtual
// interface. There exists one override besides ProcessThreadImpl,
// MockProcessThread, but when looking at how it is used, it seems
// a nullptr might suffice (or simply an actual ProcessThread instance).
class ProcessThread : public TaskQueueBase {
public:
~ProcessThread() override;
static std::unique_ptr<ProcessThread> Create(const char* thread_name);
// Starts the worker thread. Must be called from the construction thread.
virtual void Start() = 0;
// Stops the worker thread. Must be called from the construction thread.
virtual void Stop() = 0;
// Wakes the thread up to give a module a chance to do processing right
// away. This causes the worker thread to wake up and requery the specified
// module for when it should be called back. (Typically the module should
// return 0 from TimeUntilNextProcess on the worker thread at that point).
// Can be called on any thread.
virtual void WakeUp(Module* module) = 0;
// Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread.
virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;
// Removes a previously registered module.
// Can be called from any thread.
virtual void DeRegisterModule(Module* module) = 0;
};
} // namespace webrtc
#endif // MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_

View file

@ -1,298 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/utility/source/process_thread_impl.h"
#include <string>
#include "modules/include/module.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
namespace webrtc {
namespace {
// We use this constant internally to signal that a module has requested
// a callback right away. When this is set, no call to TimeUntilNextProcess
// should be made, but Process() should be called directly.
const int64_t kCallProcessImmediately = -1;
int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
int64_t interval = module->TimeUntilNextProcess();
if (interval < 0) {
// Falling behind, we should call the callback now.
return time_now;
}
return time_now + interval;
}
} // namespace
ProcessThread::~ProcessThread() {}
// static
std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
}
ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
: stop_(false), thread_name_(thread_name) {}
ProcessThreadImpl::~ProcessThreadImpl() {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!stop_);
while (!delayed_tasks_.empty()) {
delete delayed_tasks_.top().task;
delayed_tasks_.pop();
}
while (!queue_.empty()) {
delete queue_.front();
queue_.pop();
}
}
void ProcessThreadImpl::Delete() {
RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
<< " is destroyed as a TaskQueue.";
Stop();
delete this;
}
// Doesn't need locking, because the contending thread isn't running.
void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(thread_.empty());
if (!thread_.empty())
return;
RTC_DCHECK(!stop_);
for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(this);
thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
CurrentTaskQueueSetter set_current(this);
while (Process()) {
}
},
thread_name_);
}
void ProcessThreadImpl::Stop() {
RTC_DCHECK(thread_checker_.IsCurrent());
if (thread_.empty())
return;
{
// Need to take lock, for synchronization with `thread_`.
MutexLock lock(&mutex_);
stop_ = true;
}
wake_up_.Set();
thread_.Finalize();
StopNoLocks();
}
// No locking needed, since this is called after the contending thread is
// stopped.
void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(thread_.empty());
stop_ = false;
for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(nullptr);
}
void ProcessThreadImpl::WakeUp(Module* module) {
// Allowed to be called on any thread.
auto holds_mutex = [this] {
if (!IsCurrent()) {
return false;
}
RTC_DCHECK_RUN_ON(this);
return holds_mutex_;
};
if (holds_mutex()) {
// Avoid locking if called on the ProcessThread, via a module's Process),
WakeUpNoLocks(module);
} else {
MutexLock lock(&mutex_);
WakeUpInternal(module);
}
wake_up_.Set();
}
// Must be called only indirectly from Process, which already holds the lock.
void ProcessThreadImpl::WakeUpNoLocks(Module* module)
RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK_RUN_ON(this);
WakeUpInternal(module);
}
void ProcessThreadImpl::WakeUpInternal(Module* module) {
for (ModuleCallback& m : modules_) {
if (m.module == module)
m.next_callback = kCallProcessImmediately;
}
}
void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
// Allowed to be called on any thread, except from a module's Process method.
if (IsCurrent()) {
RTC_DCHECK_RUN_ON(this);
RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from "
"Module::Process is not supported";
}
{
MutexLock lock(&mutex_);
queue_.push(task.release());
}
wake_up_.Set();
}
void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
bool recalculate_wakeup_time;
{
MutexLock lock(&mutex_);
recalculate_wakeup_time =
delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task));
}
if (recalculate_wakeup_time) {
wake_up_.Set();
}
}
void ProcessThreadImpl::RegisterModule(Module* module,
const rtc::Location& from) {
TRACE_EVENT0("webrtc", "ProcessThreadImpl::RegisterModule");
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(module) << from.ToString();
#if RTC_DCHECK_IS_ON
{
// Catch programmer error.
MutexLock lock(&mutex_);
for (const ModuleCallback& mc : modules_) {
RTC_DCHECK(mc.module != module)
<< "Already registered here: " << mc.location.ToString()
<< "\n"
"Now attempting from here: "
<< from.ToString();
}
}
#endif
// Now that we know the module isn't in the list, we'll call out to notify
// the module that it's attached to the worker thread. We don't hold
// the lock while we make this call.
if (!thread_.empty())
module->ProcessThreadAttached(this);
{
MutexLock lock(&mutex_);
modules_.push_back(ModuleCallback(module, from));
}
// Wake the thread calling ProcessThreadImpl::Process() to update the
// waiting time. The waiting time for the just registered module may be
// shorter than all other registered modules.
wake_up_.Set();
}
void ProcessThreadImpl::DeRegisterModule(Module* module) {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(module);
{
MutexLock lock(&mutex_);
modules_.remove_if(
[&module](const ModuleCallback& m) { return m.module == module; });
}
// Notify the module that it's been detached.
module->ProcessThreadAttached(nullptr);
}
bool ProcessThreadImpl::Process() {
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
int64_t now = rtc::TimeMillis();
int64_t next_checkpoint = now + (1000 * 60);
RTC_DCHECK_RUN_ON(this);
{
MutexLock lock(&mutex_);
if (stop_)
return false;
for (ModuleCallback& m : modules_) {
// TODO(tommi): Would be good to measure the time TimeUntilNextProcess
// takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
// operation should not require taking a lock, so querying all modules
// should run in a matter of nanoseconds.
if (m.next_callback == 0)
m.next_callback = GetNextCallbackTime(m.module, now);
// Set to true for the duration of the calls to modules' Process().
holds_mutex_ = true;
if (m.next_callback <= now ||
m.next_callback == kCallProcessImmediately) {
{
TRACE_EVENT2("webrtc", "ModuleProcess", "function",
m.location.function_name(), "file",
m.location.file_name());
m.module->Process();
}
// Use a new 'now' reference to calculate when the next callback
// should occur. We'll continue to use 'now' above for the baseline
// of calculating how long we should wait, to reduce variance.
int64_t new_now = rtc::TimeMillis();
m.next_callback = GetNextCallbackTime(m.module, new_now);
}
holds_mutex_ = false;
if (m.next_callback < next_checkpoint)
next_checkpoint = m.next_callback;
}
while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
queue_.push(delayed_tasks_.top().task);
delayed_tasks_.pop();
}
if (!delayed_tasks_.empty()) {
next_checkpoint =
std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
}
while (!queue_.empty()) {
QueuedTask* task = queue_.front();
queue_.pop();
mutex_.Unlock();
if (task->Run()) {
delete task;
}
mutex_.Lock();
}
}
int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
if (time_to_wait > 0)
wake_up_.Wait(static_cast<int>(time_to_wait));
return true;
}
} // namespace webrtc

View file

@ -1,128 +0,0 @@
/*
* Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
#define MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
#include <stdint.h>
#include <list>
#include <memory>
#include <queue>
#include "api/sequence_checker.h"
#include "api/task_queue/queued_task.h"
#include "modules/include/module.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/event.h"
#include "rtc_base/location.h"
#include "rtc_base/platform_thread.h"
namespace webrtc {
class ProcessThreadImpl : public ProcessThread {
public:
explicit ProcessThreadImpl(const char* thread_name);
~ProcessThreadImpl() override;
void Start() override;
void Stop() override;
void WakeUp(Module* module) override;
void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
void RegisterModule(Module* module, const rtc::Location& from) override;
void DeRegisterModule(Module* module) override;
protected:
bool Process();
private:
struct ModuleCallback {
ModuleCallback() = delete;
ModuleCallback(ModuleCallback&& cb) = default;
ModuleCallback(const ModuleCallback& cb) = default;
ModuleCallback(Module* module, const rtc::Location& location)
: module(module), location(location) {}
bool operator==(const ModuleCallback& cb) const {
return cb.module == module;
}
Module* const module;
int64_t next_callback = 0; // Absolute timestamp.
const rtc::Location location;
private:
ModuleCallback& operator=(ModuleCallback&);
};
struct DelayedTask {
DelayedTask(int64_t run_at_ms,
uint64_t sequence_id,
std::unique_ptr<QueuedTask> task)
: run_at_ms(run_at_ms),
sequence_id_(sequence_id),
task(task.release()) {}
friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) {
// Earliest DelayedTask should be at the top of the priority queue.
if (lhs.run_at_ms != rhs.run_at_ms) {
return lhs.run_at_ms > rhs.run_at_ms;
}
return lhs.sequence_id_ > rhs.sequence_id_;
}
int64_t run_at_ms;
uint64_t sequence_id_;
// DelayedTask owns the `task`, but some delayed tasks must be removed from
// the std::priority_queue, but mustn't be deleted. std::priority_queue does
// not give non-const access to the values, so storing unique_ptr would
// delete the task as soon as it is remove from the priority queue.
// Thus lifetime of the `task` is managed manually.
QueuedTask* task;
};
typedef std::list<ModuleCallback> ModuleList;
void Delete() override;
// The part of Stop processing that doesn't need any locking.
void StopNoLocks();
void WakeUpNoLocks(Module* module);
void WakeUpInternal(Module* module) RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Members protected by this mutex are accessed on the constructor thread and
// on the spawned process thread, and locking is needed only while the process
// thread is running.
Mutex mutex_;
SequenceChecker thread_checker_;
rtc::Event wake_up_;
rtc::PlatformThread thread_;
ModuleList modules_ RTC_GUARDED_BY(mutex_);
// Set to true when calling Process, to allow reentrant calls to WakeUp.
bool holds_mutex_ RTC_GUARDED_BY(this) = false;
std::queue<QueuedTask*> queue_;
// `std::priority_queue` does not guarantee stable sort. For delayed tasks
// with the same wakeup time, use `sequence_id_` to ensure FIFO ordering.
std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(mutex_);
uint64_t sequence_id_ RTC_GUARDED_BY(mutex_) = 0;
// The `stop_` flag is modified only by the construction thread, protected by
// `thread_checker_`. It is read also by the spawned `thread_`. The latter
// thread must take `mutex_` before access, and for thread safety, the
// constructor thread needs to take `mutex_` when it modifies `stop_` and
// `thread_` is running. Annotations like RTC_GUARDED_BY doesn't support this
// usage pattern.
bool stop_ RTC_GUARDED_BY(mutex_);
const char* thread_name_;
};
} // namespace webrtc
#endif // MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_

View file

@ -1,331 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/utility/source/process_thread_impl.h"
#include <memory>
#include <utility>
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_test.h"
#include "modules/include/module.h"
#include "rtc_base/location.h"
#include "rtc_base/time_utils.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace webrtc {
using ::testing::_;
using ::testing::DoAll;
using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::SetArgPointee;
// The length of time, in milliseconds, to wait for an event to become signaled.
// Set to a fairly large value as there is quite a bit of variation on some
// Windows bots.
static const int kEventWaitTimeout = 500;
class MockModule : public Module {
public:
MOCK_METHOD(int64_t, TimeUntilNextProcess, (), (override));
MOCK_METHOD(void, Process, (), (override));
MOCK_METHOD(void, ProcessThreadAttached, (ProcessThread*), (override));
};
class RaiseEventTask : public QueuedTask {
public:
RaiseEventTask(rtc::Event* event) : event_(event) {}
bool Run() override {
event_->Set();
return true;
}
private:
rtc::Event* event_;
};
ACTION_P(SetEvent, event) {
event->Set();
}
ACTION_P(Increment, counter) {
++(*counter);
}
ACTION_P(SetTimestamp, ptr) {
*ptr = rtc::TimeMillis();
}
TEST(ProcessThreadImpl, StartStop) {
ProcessThreadImpl thread("ProcessThread");
thread.Start();
thread.Stop();
}
TEST(ProcessThreadImpl, MultipleStartStop) {
ProcessThreadImpl thread("ProcessThread");
for (int i = 0; i < 5; ++i) {
thread.Start();
thread.Stop();
}
}
// Verifies that we get at least call back to Process() on the worker thread.
TEST(ProcessThreadImpl, ProcessCall) {
ProcessThreadImpl thread("ProcessThread");
thread.Start();
rtc::Event event;
MockModule module;
EXPECT_CALL(module, TimeUntilNextProcess())
.WillOnce(Return(0))
.WillRepeatedly(Return(1));
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetEvent(&event), Return()))
.WillRepeatedly(Return());
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module, RTC_FROM_HERE);
EXPECT_TRUE(event.Wait(kEventWaitTimeout));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
}
// Same as ProcessCall except the module is registered before the
// call to Start().
TEST(ProcessThreadImpl, ProcessCall2) {
ProcessThreadImpl thread("ProcessThread");
rtc::Event event;
MockModule module;
EXPECT_CALL(module, TimeUntilNextProcess())
.WillOnce(Return(0))
.WillRepeatedly(Return(1));
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetEvent(&event), Return()))
.WillRepeatedly(Return());
thread.RegisterModule(&module, RTC_FROM_HERE);
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.Start();
EXPECT_TRUE(event.Wait(kEventWaitTimeout));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
}
// Tests setting up a module for callbacks and then unregister that module.
// After unregistration, we should not receive any further callbacks.
TEST(ProcessThreadImpl, Deregister) {
ProcessThreadImpl thread("ProcessThread");
rtc::Event event;
int process_count = 0;
MockModule module;
EXPECT_CALL(module, TimeUntilNextProcess())
.WillOnce(Return(0))
.WillRepeatedly(Return(1));
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetEvent(&event), Increment(&process_count), Return()))
.WillRepeatedly(DoAll(Increment(&process_count), Return()));
thread.RegisterModule(&module, RTC_FROM_HERE);
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.Start();
EXPECT_TRUE(event.Wait(kEventWaitTimeout));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.DeRegisterModule(&module);
EXPECT_GE(process_count, 1);
int count_after_deregister = process_count;
// We shouldn't get any more callbacks.
EXPECT_FALSE(event.Wait(20));
EXPECT_EQ(count_after_deregister, process_count);
thread.Stop();
}
// Helper function for testing receiving a callback after a certain amount of
// time. There's some variance of timing built into it to reduce chance of
// flakiness on bots.
void ProcessCallAfterAFewMs(int64_t milliseconds) {
ProcessThreadImpl thread("ProcessThread");
thread.Start();
rtc::Event event;
MockModule module;
int64_t start_time = 0;
int64_t called_time = 0;
EXPECT_CALL(module, TimeUntilNextProcess())
.WillOnce(DoAll(SetTimestamp(&start_time), Return(milliseconds)))
.WillRepeatedly(Return(milliseconds));
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetTimestamp(&called_time), SetEvent(&event), Return()))
.WillRepeatedly(Return());
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module, RTC_FROM_HERE);
// Add a buffer of 50ms due to slowness of some trybots
// (e.g. win_drmemory_light)
EXPECT_TRUE(event.Wait(milliseconds + 50));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
ASSERT_GT(start_time, 0);
ASSERT_GT(called_time, 0);
// Use >= instead of > since due to rounding and timer accuracy (or lack
// thereof), can make the test run in "0"ms time.
EXPECT_GE(called_time, start_time);
// Check for an acceptable range.
uint32_t diff = called_time - start_time;
EXPECT_GE(diff, milliseconds - 15);
EXPECT_LT(diff, milliseconds + 15);
}
// DISABLED for now since the virtual build bots are too slow :(
// TODO(tommi): Fix.
TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter5ms) {
ProcessCallAfterAFewMs(5);
}
// DISABLED for now since the virtual build bots are too slow :(
// TODO(tommi): Fix.
TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter50ms) {
ProcessCallAfterAFewMs(50);
}
// DISABLED for now since the virtual build bots are too slow :(
// TODO(tommi): Fix.
TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter200ms) {
ProcessCallAfterAFewMs(200);
}
// Runs callbacks with the goal of getting up to 50 callbacks within a second
// (on average 1 callback every 20ms). On real hardware, we're usually pretty
// close to that, but the test bots that run on virtual machines, will
// typically be in the range 30-40 callbacks.
// DISABLED for now since this can take up to 2 seconds to run on the slowest
// build bots.
// TODO(tommi): Fix.
TEST(ProcessThreadImpl, DISABLED_Process50Times) {
ProcessThreadImpl thread("ProcessThread");
thread.Start();
rtc::Event event;
MockModule module;
int callback_count = 0;
// Ask for a callback after 20ms.
EXPECT_CALL(module, TimeUntilNextProcess()).WillRepeatedly(Return(20));
EXPECT_CALL(module, Process())
.WillRepeatedly(DoAll(Increment(&callback_count), Return()));
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module, RTC_FROM_HERE);
EXPECT_TRUE(event.Wait(1000));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
printf("Callback count: %i\n", callback_count);
// Check that we got called back up to 50 times.
// Some of the try bots run on slow virtual machines, so the lower bound
// is much more relaxed to avoid flakiness.
EXPECT_GE(callback_count, 25);
EXPECT_LE(callback_count, 50);
}
// Tests that we can wake up the worker thread to give us a callback right
// away when we know the thread is sleeping.
TEST(ProcessThreadImpl, WakeUp) {
ProcessThreadImpl thread("ProcessThread");
thread.Start();
rtc::Event started;
rtc::Event called;
MockModule module;
int64_t start_time;
int64_t called_time;
// Ask for a callback after 1000ms.
// TimeUntilNextProcess will be called twice.
// The first time we use it to get the thread into a waiting state.
// Then we wake the thread and there should not be another call made to
// TimeUntilNextProcess before Process() is called.
// The second time TimeUntilNextProcess is then called, is after Process
// has been called and we don't expect any more calls.
EXPECT_CALL(module, TimeUntilNextProcess())
.WillOnce(
DoAll(SetTimestamp(&start_time), SetEvent(&started), Return(1000)))
.WillOnce(Return(1000));
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetTimestamp(&called_time), SetEvent(&called), Return()))
.WillRepeatedly(Return());
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module, RTC_FROM_HERE);
EXPECT_TRUE(started.Wait(kEventWaitTimeout));
thread.WakeUp(&module);
EXPECT_TRUE(called.Wait(kEventWaitTimeout));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
EXPECT_GE(called_time, start_time);
uint32_t diff = called_time - start_time;
// We should have been called back much quicker than 1sec.
EXPECT_LE(diff, 100u);
}
// Tests that we can post a task that gets run straight away on the worker
// thread.
TEST(ProcessThreadImpl, PostTask) {
ProcessThreadImpl thread("ProcessThread");
rtc::Event task_ran;
std::unique_ptr<RaiseEventTask> task(new RaiseEventTask(&task_ran));
thread.Start();
thread.PostTask(std::move(task));
EXPECT_TRUE(task_ran.Wait(kEventWaitTimeout));
thread.Stop();
}
class ProcessThreadFactory : public TaskQueueFactory {
public:
~ProcessThreadFactory() override = default;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
ProcessThreadImpl* process_thread = new ProcessThreadImpl("thread");
process_thread->Start();
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(process_thread);
}
};
INSTANTIATE_TEST_SUITE_P(
ProcessThread,
TaskQueueTest,
testing::Values(std::make_unique<ProcessThreadFactory>));
} // namespace webrtc

View file

@ -31,7 +31,6 @@ rtc_library("video_processing") {
"../../api/video:video_rtp_headers",
"../../common_audio",
"../../common_video",
"../../modules/utility",
"../../rtc_base:checks",
"../../rtc_base/system:arch",
"../../system_wrappers",

View file

@ -2463,7 +2463,6 @@ if (rtc_include_tests && !build_with_chromium) {
"../media:rtc_media_tests_utils",
"../modules/audio_processing",
"../modules/audio_processing:api",
"../modules/utility",
"../p2p:p2p_test_utils",
"../p2p:rtc_p2p",
"../rtc_base",