Delete ProcessThread creation from test TimeController as unused

Bug: webrtc:7219
Change-Id: Ia34f24a804b8a1e06b089774e37cac6e6d749e82
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/266366
Reviewed-by: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37311}
This commit is contained in:
Danil Chapovalov 2022-06-21 11:38:32 +02:00 committed by WebRTC LUCI CQ
parent 4d3ba77975
commit 24b0543ee0
12 changed files with 1 additions and 409 deletions

View file

@ -778,7 +778,6 @@ rtc_source_set("time_controller") {
]
deps = [
"../modules/utility",
"../rtc_base",
"../rtc_base:threading",
"../rtc_base/synchronization:yield_policy",

View file

@ -30,7 +30,6 @@ specific_include_rules = {
"+modules/audio_processing/include/audio_processing.h",
],
"time_controller\.h": [
"+modules/utility/include/process_thread.h",
"+rtc_base/synchronization/yield_policy.h",
"+system_wrappers/include/clock.h",
],

View file

@ -17,7 +17,6 @@
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/synchronization/yield_policy.h"
#include "rtc_base/thread.h"
#include "system_wrappers/include/clock.h"
@ -41,9 +40,6 @@ class TimeController {
// is destroyed.
std::unique_ptr<TaskQueueFactory> CreateTaskQueueFactory();
// Creates a process thread.
virtual std::unique_ptr<ProcessThread> CreateProcessThread(
const char* thread_name) = 0;
// Creates an rtc::Thread instance. If `socket_server` is nullptr, a default
// noop socket server is created.
// Returned thread is not null and started.

View file

@ -15,8 +15,6 @@ rtc_library("time_controller") {
"external_time_controller.h",
"real_time_controller.cc",
"real_time_controller.h",
"simulated_process_thread.cc",
"simulated_process_thread.h",
"simulated_task_queue.cc",
"simulated_task_queue.h",
"simulated_thread.cc",
@ -33,8 +31,6 @@ rtc_library("time_controller") {
"../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../modules:module_api",
"../../modules/utility:utility",
"../../rtc_base",
"../../rtc_base:checks",
"../../rtc_base:null_socket_server",

View file

@ -19,110 +19,12 @@
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/include/module.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/synchronization/yield_policy.h"
#include "test/time_controller/simulated_time_controller.h"
namespace webrtc {
// Wraps a ProcessThread so that it can reschedule the time controller whenever
// an external call changes the ProcessThread's state. For example, when a new
// module is registered, the ProcessThread may need to be called sooner than the
// time controller's currently-scheduled deadline.
class ExternalTimeController::ProcessThreadWrapper : public ProcessThread {
public:
ProcessThreadWrapper(ExternalTimeController* parent,
std::unique_ptr<ProcessThread> thread)
: parent_(parent), thread_(std::move(thread)) {}
void Start() override {
parent_->UpdateTime();
thread_->Start();
parent_->ScheduleNext();
}
void Stop() override {
parent_->UpdateTime();
thread_->Stop();
parent_->ScheduleNext();
}
void WakeUp(Module* module) override {
parent_->UpdateTime();
thread_->WakeUp(GetWrapper(module));
parent_->ScheduleNext();
}
void PostTask(std::unique_ptr<QueuedTask> task) override {
parent_->UpdateTime();
thread_->PostTask(std::move(task));
parent_->ScheduleNext();
}
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override {
parent_->UpdateTime();
thread_->PostDelayedTask(std::move(task), milliseconds);
parent_->ScheduleNext();
}
void RegisterModule(Module* module, const rtc::Location& from) override {
parent_->UpdateTime();
module_wrappers_.emplace(module, new ModuleWrapper(module, this));
thread_->RegisterModule(GetWrapper(module), from);
parent_->ScheduleNext();
}
void DeRegisterModule(Module* module) override {
parent_->UpdateTime();
thread_->DeRegisterModule(GetWrapper(module));
parent_->ScheduleNext();
module_wrappers_.erase(module);
}
private:
class ModuleWrapper : public Module {
public:
ModuleWrapper(Module* module, ProcessThreadWrapper* thread)
: module_(module), thread_(thread) {}
int64_t TimeUntilNextProcess() override {
return module_->TimeUntilNextProcess();
}
void Process() override { module_->Process(); }
void ProcessThreadAttached(ProcessThread* process_thread) override {
if (process_thread) {
module_->ProcessThreadAttached(thread_);
} else {
module_->ProcessThreadAttached(nullptr);
}
}
private:
Module* module_;
ProcessThreadWrapper* thread_;
};
void Delete() override {
// ProcessThread shouldn't be deleted as a TaskQueue.
RTC_DCHECK_NOTREACHED();
}
ModuleWrapper* GetWrapper(Module* module) {
auto it = module_wrappers_.find(module);
RTC_DCHECK(it != module_wrappers_.end());
return it->second.get();
}
ExternalTimeController* const parent_;
std::unique_ptr<ProcessThread> thread_;
std::map<Module*, std::unique_ptr<ModuleWrapper>> module_wrappers_;
};
// Wraps a TaskQueue so that it can reschedule the time controller whenever
// an external call schedules a new task.
class ExternalTimeController::TaskQueueWrapper : public TaskQueueBase {
@ -187,12 +89,6 @@ TaskQueueFactory* ExternalTimeController::GetTaskQueueFactory() {
return this;
}
std::unique_ptr<ProcessThread> ExternalTimeController::CreateProcessThread(
const char* thread_name) {
return std::make_unique<ProcessThreadWrapper>(
this, impl_.CreateProcessThread(thread_name));
}
void ExternalTimeController::AdvanceTime(TimeDelta duration) {
alarm_->Sleep(duration);
}

View file

@ -19,7 +19,6 @@
#include "api/test/time_controller.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/utility/include/process_thread.h"
#include "system_wrappers/include/clock.h"
#include "test/time_controller/simulated_time_controller.h"
@ -35,8 +34,6 @@ class ExternalTimeController : public TimeController, public TaskQueueFactory {
// Implementation of TimeController.
Clock* GetClock() override;
TaskQueueFactory* GetTaskQueueFactory() override;
std::unique_ptr<ProcessThread> CreateProcessThread(
const char* thread_name) override;
void AdvanceTime(TimeDelta duration) override;
std::unique_ptr<rtc::Thread> CreateThread(
const std::string& name,
@ -49,7 +46,6 @@ class ExternalTimeController : public TimeController, public TaskQueueFactory {
TaskQueueFactory::Priority priority) const override;
private:
class ProcessThreadWrapper;
class TaskQueueWrapper;
// Executes any tasks scheduled at or before the current time. May call

View file

@ -44,11 +44,6 @@ TaskQueueFactory* RealTimeController::GetTaskQueueFactory() {
return task_queue_factory_.get();
}
std::unique_ptr<ProcessThread> RealTimeController::CreateProcessThread(
const char* thread_name) {
return ProcessThread::Create(thread_name);
}
std::unique_ptr<rtc::Thread> RealTimeController::CreateThread(
const std::string& name,
std::unique_ptr<rtc::SocketServer> socket_server) {

View file

@ -16,7 +16,6 @@
#include "api/task_queue/task_queue_factory.h"
#include "api/test/time_controller.h"
#include "api/units/time_delta.h"
#include "modules/utility/include/process_thread.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
@ -26,8 +25,6 @@ class RealTimeController : public TimeController {
Clock* GetClock() override;
TaskQueueFactory* GetTaskQueueFactory() override;
std::unique_ptr<ProcessThread> CreateProcessThread(
const char* thread_name) override;
std::unique_ptr<rtc::Thread> CreateThread(
const std::string& name,
std::unique_ptr<rtc::SocketServer> socket_server) override;

View file

@ -1,188 +0,0 @@
/*
* Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/time_controller/simulated_process_thread.h"
#include <algorithm>
#include <utility>
namespace webrtc {
namespace {
// Helper function to remove from a std container by value.
template <class C>
bool RemoveByValue(C* vec, typename C::value_type val) {
auto it = std::find(vec->begin(), vec->end(), val);
if (it == vec->end())
return false;
vec->erase(it);
return true;
}
} // namespace
SimulatedProcessThread::SimulatedProcessThread(
sim_time_impl::SimulatedTimeControllerImpl* handler,
absl::string_view name)
: handler_(handler), name_(new char[name.size()]) {
std::copy_n(name.begin(), name.size(), name_);
}
SimulatedProcessThread::~SimulatedProcessThread() {
handler_->Unregister(this);
delete[] name_;
}
void SimulatedProcessThread::RunReady(Timestamp at_time) {
CurrentTaskQueueSetter set_current(this);
MutexLock lock(&lock_);
std::vector<Module*> ready_modules;
for (auto it = delayed_modules_.begin();
it != delayed_modules_.end() && it->first <= at_time;
it = delayed_modules_.erase(it)) {
for (auto module : it->second) {
ready_modules.push_back(module);
}
}
for (auto* module : ready_modules) {
module->Process();
delayed_modules_[GetNextTime(module, at_time)].push_back(module);
}
for (auto it = delayed_tasks_.begin();
it != delayed_tasks_.end() && it->first <= at_time;
it = delayed_tasks_.erase(it)) {
for (auto& task : it->second) {
queue_.push_back(std::move(task));
}
}
while (!queue_.empty()) {
std::unique_ptr<QueuedTask> task = std::move(queue_.front());
queue_.pop_front();
lock_.Unlock();
bool should_delete = task->Run();
RTC_CHECK(should_delete);
lock_.Lock();
}
RTC_DCHECK(queue_.empty());
if (!delayed_modules_.empty()) {
next_run_time_ = delayed_modules_.begin()->first;
} else {
next_run_time_ = Timestamp::PlusInfinity();
}
if (!delayed_tasks_.empty()) {
next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first);
}
}
void SimulatedProcessThread::Start() {
std::vector<Module*> starting;
{
MutexLock lock(&lock_);
if (process_thread_running_)
return;
process_thread_running_ = true;
starting.swap(stopped_modules_);
}
for (auto& module : starting)
module->ProcessThreadAttached(this);
Timestamp at_time = handler_->CurrentTime();
MutexLock lock(&lock_);
for (auto& module : starting)
delayed_modules_[GetNextTime(module, at_time)].push_back(module);
if (!queue_.empty()) {
next_run_time_ = Timestamp::MinusInfinity();
} else if (!delayed_modules_.empty()) {
next_run_time_ = delayed_modules_.begin()->first;
} else {
next_run_time_ = Timestamp::PlusInfinity();
}
}
void SimulatedProcessThread::Stop() {
std::vector<Module*> stopping;
{
MutexLock lock(&lock_);
process_thread_running_ = false;
for (auto& delayed : delayed_modules_) {
for (auto mod : delayed.second)
stopped_modules_.push_back(mod);
}
delayed_modules_.clear();
stopping = stopped_modules_;
}
for (auto& module : stopping)
module->ProcessThreadAttached(nullptr);
}
void SimulatedProcessThread::WakeUp(Module* module) {
MutexLock lock(&lock_);
for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) {
if (RemoveByValue(&it->second, module))
break;
}
Timestamp next_time = GetNextTime(module, handler_->CurrentTime());
delayed_modules_[next_time].push_back(module);
next_run_time_ = std::min(next_run_time_, next_time);
}
void SimulatedProcessThread::RegisterModule(Module* module,
const rtc::Location& from) {
module->ProcessThreadAttached(this);
MutexLock lock(&lock_);
if (!process_thread_running_) {
stopped_modules_.push_back(module);
} else {
Timestamp next_time = GetNextTime(module, handler_->CurrentTime());
delayed_modules_[next_time].push_back(module);
next_run_time_ = std::min(next_run_time_, next_time);
}
}
void SimulatedProcessThread::DeRegisterModule(Module* module) {
bool modules_running;
{
MutexLock lock(&lock_);
if (!process_thread_running_) {
RemoveByValue(&stopped_modules_, module);
} else {
for (auto& pair : delayed_modules_) {
if (RemoveByValue(&pair.second, module))
break;
}
}
modules_running = process_thread_running_;
}
if (modules_running)
module->ProcessThreadAttached(nullptr);
}
void SimulatedProcessThread::PostTask(std::unique_ptr<QueuedTask> task) {
MutexLock lock(&lock_);
queue_.emplace_back(std::move(task));
next_run_time_ = Timestamp::MinusInfinity();
}
void SimulatedProcessThread::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
MutexLock lock(&lock_);
Timestamp target_time =
handler_->CurrentTime() + TimeDelta::Millis(milliseconds);
delayed_tasks_[target_time].push_back(std::move(task));
next_run_time_ = std::min(next_run_time_, target_time);
}
Timestamp SimulatedProcessThread::GetNextTime(Module* module,
Timestamp at_time) {
CurrentTaskQueueSetter set_current(this);
return at_time + TimeDelta::Millis(module->TimeUntilNextProcess());
}
} // namespace webrtc

View file

@ -1,73 +0,0 @@
/*
* Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_TIME_CONTROLLER_SIMULATED_PROCESS_THREAD_H_
#define TEST_TIME_CONTROLLER_SIMULATED_PROCESS_THREAD_H_
#include <deque>
#include <list>
#include <map>
#include <memory>
#include <vector>
#include "rtc_base/synchronization/mutex.h"
#include "test/time_controller/simulated_time_controller.h"
namespace webrtc {
class SimulatedProcessThread : public ProcessThread,
public sim_time_impl::SimulatedSequenceRunner {
public:
SimulatedProcessThread(sim_time_impl::SimulatedTimeControllerImpl* handler,
absl::string_view name);
virtual ~SimulatedProcessThread();
void RunReady(Timestamp at_time) override;
Timestamp GetNextRunTime() const override {
MutexLock lock(&lock_);
return next_run_time_;
}
TaskQueueBase* GetAsTaskQueue() override { return this; }
// ProcessThread interface
void Start() override;
void Stop() override;
void WakeUp(Module* module) override;
void RegisterModule(Module* module, const rtc::Location& from) override;
void DeRegisterModule(Module* module) override;
void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
private:
void Delete() override {
// ProcessThread shouldn't be deleted as a TaskQueue.
RTC_DCHECK_NOTREACHED();
}
Timestamp GetNextTime(Module* module, Timestamp at_time);
sim_time_impl::SimulatedTimeControllerImpl* const handler_;
// Using char* to be debugger friendly.
char* name_;
mutable Mutex lock_;
Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();
std::deque<std::unique_ptr<QueuedTask>> queue_;
std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks_
RTC_GUARDED_BY(lock_);
bool process_thread_running_ RTC_GUARDED_BY(lock_) = false;
std::vector<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);
std::map<Timestamp, std::list<Module*>> delayed_modules_
RTC_GUARDED_BY(lock_);
};
} // namespace webrtc
#endif // TEST_TIME_CONTROLLER_SIMULATED_PROCESS_THREAD_H_

View file

@ -18,7 +18,6 @@
#include <vector>
#include "absl/strings/string_view.h"
#include "test/time_controller/simulated_process_thread.h"
#include "test/time_controller/simulated_task_queue.h"
#include "test/time_controller/simulated_thread.h"
@ -50,19 +49,10 @@ SimulatedTimeControllerImpl::CreateTaskQueue(
auto mutable_this = const_cast<SimulatedTimeControllerImpl*>(this);
auto task_queue = std::unique_ptr<SimulatedTaskQueue, TaskQueueDeleter>(
new SimulatedTaskQueue(mutable_this, name));
;
mutable_this->Register(task_queue.get());
return task_queue;
}
std::unique_ptr<ProcessThread> SimulatedTimeControllerImpl::CreateProcessThread(
const char* thread_name) {
auto process_thread =
std::make_unique<SimulatedProcessThread>(this, thread_name);
Register(process_thread.get());
return process_thread;
}
std::unique_ptr<rtc::Thread> SimulatedTimeControllerImpl::CreateThread(
const std::string& name,
std::unique_ptr<rtc::SocketServer> socket_server) {
@ -192,11 +182,6 @@ TaskQueueFactory* GlobalSimulatedTimeController::GetTaskQueueFactory() {
return &impl_;
}
std::unique_ptr<ProcessThread>
GlobalSimulatedTimeController::CreateProcessThread(const char* thread_name) {
return impl_.CreateProcessThread(thread_name);
}
std::unique_ptr<rtc::Thread> GlobalSimulatedTimeController::CreateThread(
const std::string& name,
std::unique_ptr<rtc::SocketServer> socket_server) {

View file

@ -20,8 +20,6 @@
#include "api/sequence_checker.h"
#include "api/test/time_controller.h"
#include "api/units/timestamp.h"
#include "modules/include/module.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/platform_thread_types.h"
#include "rtc_base/synchronization/mutex.h"
@ -58,9 +56,7 @@ class SimulatedTimeControllerImpl : public TaskQueueFactory,
// except that if this method is called from a task, the task queue running
// that task is skipped.
void YieldExecution() RTC_LOCKS_EXCLUDED(time_lock_, lock_) override;
// Create process thread with the name `thread_name`.
std::unique_ptr<ProcessThread> CreateProcessThread(const char* thread_name)
RTC_LOCKS_EXCLUDED(time_lock_, lock_);
// Create thread using provided `socket_server`.
std::unique_ptr<rtc::Thread> CreateThread(
const std::string& name,
@ -131,8 +127,6 @@ class GlobalSimulatedTimeController : public TimeController {
Clock* GetClock() override;
TaskQueueFactory* GetTaskQueueFactory() override;
std::unique_ptr<ProcessThread> CreateProcessThread(
const char* thread_name) override;
std::unique_ptr<rtc::Thread> CreateThread(
const std::string& name,
std::unique_ptr<rtc::SocketServer> socket_server) override;