mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-16 23:30:48 +01:00

TaskQueue posts delayed task in milliseconds precision. If delayed tasks have the same wakeup time in queue, we should ensure they are waked up in FIFO order. E.g., call `PostDelayedTask(task-i, 0)` in a loop, we expect `task-i` is waked up as enqueue order. Co-Author: jiahe.zhang@intel.com Bug: webrtc:13761 Change-Id: I3bc87c2d251f8dffee868a012e828fd42e783afc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251960 Reviewed-by: Chen Xing <chxg@google.com> Reviewed-by: Markus Handell <handellm@webrtc.org> Reviewed-by: Stefan Holmer <stefan@webrtc.org> Reviewed-by: Henrik Boström <hbos@webrtc.org> Commit-Queue: Henrik Boström <hbos@webrtc.org> Cr-Commit-Position: refs/heads/main@{#36582}
298 lines
8.5 KiB
C++
298 lines
8.5 KiB
C++
/*
|
|
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
|
|
#include "modules/utility/source/process_thread_impl.h"
|
|
|
|
#include <string>
|
|
|
|
#include "modules/include/module.h"
|
|
#include "rtc_base/checks.h"
|
|
#include "rtc_base/logging.h"
|
|
#include "rtc_base/time_utils.h"
|
|
#include "rtc_base/trace_event.h"
|
|
|
|
namespace webrtc {
|
|
namespace {
|
|
|
|
// We use this constant internally to signal that a module has requested
|
|
// a callback right away. When this is set, no call to TimeUntilNextProcess
|
|
// should be made, but Process() should be called directly.
|
|
const int64_t kCallProcessImmediately = -1;
|
|
|
|
int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
|
|
int64_t interval = module->TimeUntilNextProcess();
|
|
if (interval < 0) {
|
|
// Falling behind, we should call the callback now.
|
|
return time_now;
|
|
}
|
|
return time_now + interval;
|
|
}
|
|
} // namespace
|
|
|
|
ProcessThread::~ProcessThread() {}
|
|
|
|
// static
|
|
std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
|
|
return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
|
|
}
|
|
|
|
ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
|
|
: stop_(false), thread_name_(thread_name) {}
|
|
|
|
ProcessThreadImpl::~ProcessThreadImpl() {
|
|
RTC_DCHECK(thread_checker_.IsCurrent());
|
|
RTC_DCHECK(!stop_);
|
|
|
|
while (!delayed_tasks_.empty()) {
|
|
delete delayed_tasks_.top().task;
|
|
delayed_tasks_.pop();
|
|
}
|
|
|
|
while (!queue_.empty()) {
|
|
delete queue_.front();
|
|
queue_.pop();
|
|
}
|
|
}
|
|
|
|
void ProcessThreadImpl::Delete() {
|
|
RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
|
|
<< " is destroyed as a TaskQueue.";
|
|
Stop();
|
|
delete this;
|
|
}
|
|
|
|
// Doesn't need locking, because the contending thread isn't running.
|
|
void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
|
|
RTC_DCHECK(thread_checker_.IsCurrent());
|
|
RTC_DCHECK(thread_.empty());
|
|
if (!thread_.empty())
|
|
return;
|
|
|
|
RTC_DCHECK(!stop_);
|
|
|
|
for (ModuleCallback& m : modules_)
|
|
m.module->ProcessThreadAttached(this);
|
|
|
|
thread_ = rtc::PlatformThread::SpawnJoinable(
|
|
[this] {
|
|
CurrentTaskQueueSetter set_current(this);
|
|
while (Process()) {
|
|
}
|
|
},
|
|
thread_name_);
|
|
}
|
|
|
|
void ProcessThreadImpl::Stop() {
|
|
RTC_DCHECK(thread_checker_.IsCurrent());
|
|
if (thread_.empty())
|
|
return;
|
|
|
|
{
|
|
// Need to take lock, for synchronization with `thread_`.
|
|
MutexLock lock(&mutex_);
|
|
stop_ = true;
|
|
}
|
|
|
|
wake_up_.Set();
|
|
thread_.Finalize();
|
|
|
|
StopNoLocks();
|
|
}
|
|
|
|
// No locking needed, since this is called after the contending thread is
|
|
// stopped.
|
|
void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
|
|
RTC_DCHECK(thread_.empty());
|
|
stop_ = false;
|
|
|
|
for (ModuleCallback& m : modules_)
|
|
m.module->ProcessThreadAttached(nullptr);
|
|
}
|
|
|
|
void ProcessThreadImpl::WakeUp(Module* module) {
|
|
// Allowed to be called on any thread.
|
|
auto holds_mutex = [this] {
|
|
if (!IsCurrent()) {
|
|
return false;
|
|
}
|
|
RTC_DCHECK_RUN_ON(this);
|
|
return holds_mutex_;
|
|
};
|
|
if (holds_mutex()) {
|
|
// Avoid locking if called on the ProcessThread, via a module's Process),
|
|
WakeUpNoLocks(module);
|
|
} else {
|
|
MutexLock lock(&mutex_);
|
|
WakeUpInternal(module);
|
|
}
|
|
wake_up_.Set();
|
|
}
|
|
|
|
// Must be called only indirectly from Process, which already holds the lock.
|
|
void ProcessThreadImpl::WakeUpNoLocks(Module* module)
|
|
RTC_NO_THREAD_SAFETY_ANALYSIS {
|
|
RTC_DCHECK_RUN_ON(this);
|
|
WakeUpInternal(module);
|
|
}
|
|
|
|
void ProcessThreadImpl::WakeUpInternal(Module* module) {
|
|
for (ModuleCallback& m : modules_) {
|
|
if (m.module == module)
|
|
m.next_callback = kCallProcessImmediately;
|
|
}
|
|
}
|
|
|
|
void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
|
|
// Allowed to be called on any thread, except from a module's Process method.
|
|
if (IsCurrent()) {
|
|
RTC_DCHECK_RUN_ON(this);
|
|
RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from "
|
|
"Module::Process is not supported";
|
|
}
|
|
{
|
|
MutexLock lock(&mutex_);
|
|
queue_.push(task.release());
|
|
}
|
|
wake_up_.Set();
|
|
}
|
|
|
|
void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
|
uint32_t milliseconds) {
|
|
int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
|
|
bool recalculate_wakeup_time;
|
|
{
|
|
MutexLock lock(&mutex_);
|
|
recalculate_wakeup_time =
|
|
delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
|
|
delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task));
|
|
}
|
|
if (recalculate_wakeup_time) {
|
|
wake_up_.Set();
|
|
}
|
|
}
|
|
|
|
void ProcessThreadImpl::RegisterModule(Module* module,
|
|
const rtc::Location& from) {
|
|
TRACE_EVENT0("webrtc", "ProcessThreadImpl::RegisterModule");
|
|
RTC_DCHECK(thread_checker_.IsCurrent());
|
|
RTC_DCHECK(module) << from.ToString();
|
|
|
|
#if RTC_DCHECK_IS_ON
|
|
{
|
|
// Catch programmer error.
|
|
MutexLock lock(&mutex_);
|
|
for (const ModuleCallback& mc : modules_) {
|
|
RTC_DCHECK(mc.module != module)
|
|
<< "Already registered here: " << mc.location.ToString()
|
|
<< "\n"
|
|
"Now attempting from here: "
|
|
<< from.ToString();
|
|
}
|
|
}
|
|
#endif
|
|
|
|
// Now that we know the module isn't in the list, we'll call out to notify
|
|
// the module that it's attached to the worker thread. We don't hold
|
|
// the lock while we make this call.
|
|
if (!thread_.empty())
|
|
module->ProcessThreadAttached(this);
|
|
|
|
{
|
|
MutexLock lock(&mutex_);
|
|
modules_.push_back(ModuleCallback(module, from));
|
|
}
|
|
|
|
// Wake the thread calling ProcessThreadImpl::Process() to update the
|
|
// waiting time. The waiting time for the just registered module may be
|
|
// shorter than all other registered modules.
|
|
wake_up_.Set();
|
|
}
|
|
|
|
void ProcessThreadImpl::DeRegisterModule(Module* module) {
|
|
RTC_DCHECK(thread_checker_.IsCurrent());
|
|
RTC_DCHECK(module);
|
|
|
|
{
|
|
MutexLock lock(&mutex_);
|
|
modules_.remove_if(
|
|
[&module](const ModuleCallback& m) { return m.module == module; });
|
|
}
|
|
|
|
// Notify the module that it's been detached.
|
|
module->ProcessThreadAttached(nullptr);
|
|
}
|
|
|
|
bool ProcessThreadImpl::Process() {
|
|
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
|
|
int64_t now = rtc::TimeMillis();
|
|
int64_t next_checkpoint = now + (1000 * 60);
|
|
RTC_DCHECK_RUN_ON(this);
|
|
{
|
|
MutexLock lock(&mutex_);
|
|
if (stop_)
|
|
return false;
|
|
for (ModuleCallback& m : modules_) {
|
|
// TODO(tommi): Would be good to measure the time TimeUntilNextProcess
|
|
// takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
|
|
// operation should not require taking a lock, so querying all modules
|
|
// should run in a matter of nanoseconds.
|
|
if (m.next_callback == 0)
|
|
m.next_callback = GetNextCallbackTime(m.module, now);
|
|
|
|
// Set to true for the duration of the calls to modules' Process().
|
|
holds_mutex_ = true;
|
|
if (m.next_callback <= now ||
|
|
m.next_callback == kCallProcessImmediately) {
|
|
{
|
|
TRACE_EVENT2("webrtc", "ModuleProcess", "function",
|
|
m.location.function_name(), "file",
|
|
m.location.file_name());
|
|
m.module->Process();
|
|
}
|
|
// Use a new 'now' reference to calculate when the next callback
|
|
// should occur. We'll continue to use 'now' above for the baseline
|
|
// of calculating how long we should wait, to reduce variance.
|
|
int64_t new_now = rtc::TimeMillis();
|
|
m.next_callback = GetNextCallbackTime(m.module, new_now);
|
|
}
|
|
holds_mutex_ = false;
|
|
|
|
if (m.next_callback < next_checkpoint)
|
|
next_checkpoint = m.next_callback;
|
|
}
|
|
|
|
while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
|
|
queue_.push(delayed_tasks_.top().task);
|
|
delayed_tasks_.pop();
|
|
}
|
|
|
|
if (!delayed_tasks_.empty()) {
|
|
next_checkpoint =
|
|
std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
|
|
}
|
|
|
|
while (!queue_.empty()) {
|
|
QueuedTask* task = queue_.front();
|
|
queue_.pop();
|
|
mutex_.Unlock();
|
|
if (task->Run()) {
|
|
delete task;
|
|
}
|
|
mutex_.Lock();
|
|
}
|
|
}
|
|
|
|
int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
|
|
if (time_to_wait > 0)
|
|
wake_up_.Wait(static_cast<int>(time_to_wait));
|
|
|
|
return true;
|
|
}
|
|
} // namespace webrtc
|