From fa52efadf16eadef63edea1b7467d603c909e578 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Thu, 21 Feb 2019 11:13:58 +0100 Subject: [PATCH] Migrate stdlib task queue to TaskQueueBase interface Bug: webrtc:10191 Change-Id: I16e13b69dce7cafa545977e9ac253b6e57312690 Reviewed-on: https://webrtc-review.googlesource.com/c/123760 Reviewed-by: Karl Wiberg Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#26796} --- api/task_queue/BUILD.gn | 4 +- ...c => default_task_queue_factory_stdlib.cc} | 6 +- rtc_base/BUILD.gn | 26 +- rtc_base/task_queue_stdlib.cc | 225 +++++------------- rtc_base/task_queue_stdlib.h | 24 ++ 5 files changed, 101 insertions(+), 184 deletions(-) rename api/task_queue/{default_task_queue_factory_unimplemented.cc => default_task_queue_factory_stdlib.cc} (74%) create mode 100644 rtc_base/task_queue_stdlib.h diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index 01fdab5f77..578a212465 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -106,9 +106,9 @@ rtc_source_set("default_task_queue_factory_impl") { deps += [ "../../rtc_base:rtc_task_queue_win" ] } else { sources = [ - "default_task_queue_factory_unimplemented.cc", + "default_task_queue_factory_stdlib.cc", ] - deps += [ "../../rtc_base:checks" ] + deps += [ "../../rtc_base:rtc_task_queue_stdlib" ] } } diff --git a/api/task_queue/default_task_queue_factory_unimplemented.cc b/api/task_queue/default_task_queue_factory_stdlib.cc similarity index 74% rename from api/task_queue/default_task_queue_factory_unimplemented.cc rename to api/task_queue/default_task_queue_factory_stdlib.cc index d4020f8a82..ca7d720cbe 100644 --- a/api/task_queue/default_task_queue_factory_unimplemented.cc +++ b/api/task_queue/default_task_queue_factory_stdlib.cc @@ -10,14 +10,12 @@ #include #include "api/task_queue/task_queue_factory.h" -#include "rtc_base/checks.h" +#include "rtc_base/task_queue_stdlib.h" namespace webrtc { std::unique_ptr CreateDefaultTaskQueueFactory() { - RTC_CHECK(false) - << "Default task queue is not implemented for current platform, " - "overwrite the task queue implementation by setting global factory."; + return CreateTaskQueueStdlibFactory(); } } // namespace webrtc diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 13c9957ae7..63b4f3f165 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -576,9 +576,10 @@ if (is_win) { } rtc_source_set("rtc_task_queue_stdlib") { - visibility = [ ":rtc_task_queue_impl" ] + visibility = [ "../api/task_queue:default_task_queue_factory_impl" ] sources = [ "task_queue_stdlib.cc", + "task_queue_stdlib.h", ] deps = [ ":checks", @@ -586,30 +587,23 @@ rtc_source_set("rtc_task_queue_stdlib") { ":logging", ":macromagic", ":platform_thread", - ":refcount", ":rtc_event", ":rtc_task_queue_api", ":safe_conversions", ":timeutils", - "../api:scoped_refptr", + "../api/task_queue", + "../api/task_queue:task_queue_factory", + "//third_party/abseil-cpp/absl/memory", + "//third_party/abseil-cpp/absl/strings", ] } rtc_source_set("rtc_task_queue_impl") { visibility = [ "*" ] - if (rtc_enable_libevent || is_mac || is_ios || - (is_win && current_os != "winuwp")) { - deps = [ - "../api/task_queue:default_task_queue_factory_impl", - "../api/task_queue:global_task_queue_factory", - ] - } else { - if (is_win && current_os == "winuwp") { - deps = [ - ":rtc_task_queue_stdlib", - ] - } - } + deps = [ + "../api/task_queue:default_task_queue_factory_impl", + "../api/task_queue:global_task_queue_factory", + ] } rtc_source_set("sequenced_task_checker") { diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc index 2b9d5a2ffc..88128b5c56 100644 --- a/rtc_base/task_queue_stdlib.cc +++ b/rtc_base/task_queue_stdlib.cc @@ -8,83 +8,55 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/task_queue.h" +#include "rtc_base/task_queue_stdlib.h" #include #include -#include -#include #include #include #include +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "api/task_queue/queued_task.h" +#include "api/task_queue/task_queue_base.h" #include "rtc_base/checks.h" #include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" #include "rtc_base/platform_thread.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/ref_counted_object.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" -namespace rtc { +namespace webrtc { namespace { -using Priority = TaskQueue::Priority; - -ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { +rtc::ThreadPriority TaskQueuePriorityToThreadPriority( + TaskQueueFactory::Priority priority) { switch (priority) { - case Priority::HIGH: - return kRealtimePriority; - case Priority::LOW: - return kLowPriority; - case Priority::NORMAL: - return kNormalPriority; + case TaskQueueFactory::Priority::HIGH: + return rtc::kRealtimePriority; + case TaskQueueFactory::Priority::LOW: + return rtc::kLowPriority; + case TaskQueueFactory::Priority::NORMAL: + return rtc::kNormalPriority; default: RTC_NOTREACHED(); - return kNormalPriority; + return rtc::kNormalPriority; } - return kNormalPriority; } -} // namespace - -class TaskQueue::Impl : public RefCountInterface { +class TaskQueueStdlib final : public TaskQueueBase { public: - Impl(const char* queue_name, TaskQueue* queue, Priority priority); - ~Impl() override; + TaskQueueStdlib(absl::string_view queue_name, rtc::ThreadPriority priority); + ~TaskQueueStdlib() override = default; - static TaskQueue::Impl* Current(); - static TaskQueue* CurrentQueue(); - - // Used for DCHECKing the current queue. - bool IsCurrent() const; - - template >::value>::type* = nullptr> - void PostTask(Closure&& closure) { - PostTask(NewClosure(std::forward(closure))); - } - - void PostTask(std::unique_ptr task); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue); - - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); - - class WorkerThread : public PlatformThread { - public: - WorkerThread(ThreadRunFunction func, - void* obj, - const char* thread_name, - ThreadPriority priority) - : PlatformThread(func, obj, thread_name, priority) {} - }; + void Delete() override; + void PostTask(std::unique_ptr task) override; + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override; + private: using OrderId = uint64_t; struct DelayedEntryTimeout { @@ -103,38 +75,26 @@ class TaskQueue::Impl : public RefCountInterface { int64_t sleep_time_ms_{}; }; - protected: NextTask GetNextTask(); - private: - // The ThreadQueue::Current() method requires that the current thread - // returns the task queue if the current thread is the active task - // queue and this variable holds the value needed in thread_local to - // on the initialized worker thread holding the queue. - static thread_local TaskQueue::Impl* thread_context_; - static void ThreadMain(void* context); void ProcessTasks(); void NotifyWake(); - // The back pointer from the owner task queue object - // from this implementation detail. - TaskQueue* const queue_; - // Indicates if the thread has started. - Event started_; + rtc::Event started_; // Indicates if the thread has stopped. - Event stopped_; + rtc::Event stopped_; // Signaled whenever a new task is pending. - Event flag_notify_; + rtc::Event flag_notify_; // Contains the active worker thread assigned to processing // tasks (including delayed tasks). - WorkerThread thread_; + rtc::PlatformThread thread_; rtc::CriticalSection pending_lock_; @@ -160,57 +120,34 @@ class TaskQueue::Impl : public RefCountInterface { RTC_GUARDED_BY(pending_lock_); }; -// static -thread_local TaskQueue::Impl* TaskQueue::Impl::thread_context_ = nullptr; - -TaskQueue::Impl::Impl(const char* queue_name, - TaskQueue* queue, - Priority priority) - : queue_(queue), - started_(/*manual_reset=*/false, /*initially_signaled=*/false), +TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name, + rtc::ThreadPriority priority) + : started_(/*manual_reset=*/false, /*initially_signaled=*/false), stopped_(/*manual_reset=*/false, /*initially_signaled=*/false), flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), - thread_(&TaskQueue::Impl::ThreadMain, - this, - queue_name, - TaskQueuePriorityToThreadPriority(priority)) { - RTC_DCHECK(queue_name); + thread_(&TaskQueueStdlib::ThreadMain, this, queue_name, priority) { thread_.Start(); - started_.Wait(Event::kForever); + started_.Wait(rtc::Event::kForever); } -TaskQueue::Impl::~Impl() { +void TaskQueueStdlib::Delete() { RTC_DCHECK(!IsCurrent()); { - CritScope lock(&pending_lock_); + rtc::CritScope lock(&pending_lock_); thread_should_quit_ = true; } NotifyWake(); - stopped_.Wait(Event::kForever); + stopped_.Wait(rtc::Event::kForever); thread_.Stop(); + delete this; } -// static -TaskQueue::Impl* TaskQueue::Impl::Current() { - return thread_context_; -} - -// static -TaskQueue* TaskQueue::Impl::CurrentQueue() { - TaskQueue::Impl* current = Current(); - return current ? current->queue_ : nullptr; -} - -bool TaskQueue::Impl::IsCurrent() const { - return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); -} - -void TaskQueue::Impl::PostTask(std::unique_ptr task) { +void TaskQueueStdlib::PostTask(std::unique_ptr task) { { - CritScope lock(&pending_lock_); + rtc::CritScope lock(&pending_lock_); OrderId order = thread_posting_order_++; pending_queue_.push(std::pair>( @@ -220,7 +157,7 @@ void TaskQueue::Impl::PostTask(std::unique_ptr task) { NotifyWake(); } -void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, +void TaskQueueStdlib::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { auto fire_at = rtc::TimeMillis() + milliseconds; @@ -228,7 +165,7 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, delay.next_fire_at_ms_ = fire_at; { - CritScope lock(&pending_lock_); + rtc::CritScope lock(&pending_lock_); delay.order_ = ++thread_posting_order_; delayed_queue_[delay] = std::move(task); } @@ -236,25 +173,12 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, NotifyWake(); } -void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue::Impl* reply_queue) { - QueuedTask* task_ptr = task.release(); - QueuedTask* reply_task_ptr = reply.release(); - PostTask([task_ptr, reply_task_ptr, reply_queue]() { - if (task_ptr->Run()) - delete task_ptr; - - reply_queue->PostTask(std::unique_ptr(reply_task_ptr)); - }); -} - -TaskQueue::Impl::NextTask TaskQueue::Impl::GetNextTask() { +TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() { NextTask result{}; auto tick = rtc::TimeMillis(); - CritScope lock(&pending_lock_); + rtc::CritScope lock(&pending_lock_); if (thread_should_quit_) { result.final_task_ = true; @@ -295,13 +219,13 @@ TaskQueue::Impl::NextTask TaskQueue::Impl::GetNextTask() { } // static -void TaskQueue::Impl::ThreadMain(void* context) { - TaskQueue::Impl* me = static_cast(context); +void TaskQueueStdlib::ThreadMain(void* context) { + TaskQueueStdlib* me = static_cast(context); + CurrentTaskQueueSetter set_current(me); me->ProcessTasks(); } -void TaskQueue::Impl::ProcessTasks() { - thread_context_ = this; +void TaskQueueStdlib::ProcessTasks() { started_.Set(); while (true) { @@ -321,7 +245,7 @@ void TaskQueue::Impl::ProcessTasks() { } if (0 == task.sleep_time_ms_) - flag_notify_.Wait(Event::kForever); + flag_notify_.Wait(rtc::Event::kForever); else flag_notify_.Wait(task.sleep_time_ms_); } @@ -329,7 +253,7 @@ void TaskQueue::Impl::ProcessTasks() { stopped_.Set(); } -void TaskQueue::Impl::NotifyWake() { +void TaskQueueStdlib::NotifyWake() { // The queue holds pending tasks to complete. Either tasks are to be // executed immediately or tasks are to be run at some future delayed time. // For immediate tasks the task queue's thread is busy running the task and @@ -357,43 +281,20 @@ void TaskQueue::Impl::NotifyWake() { flag_notify_.Set(); } -// Boilerplate for the PIMPL pattern. -TaskQueue::TaskQueue(const char* queue_name, Priority priority) - : impl_(new RefCountedObject(queue_name, this, priority)) { +class TaskQueueStdlibFactory final : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return std::unique_ptr( + new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority))); + } +}; + +} // namespace + +std::unique_ptr CreateTaskQueueStdlibFactory() { + return absl::make_unique(); } -TaskQueue::~TaskQueue() {} - -// static -TaskQueue* TaskQueue::Current() { - return TaskQueue::Impl::CurrentQueue(); -} - -// Used for DCHECKing the current queue. -bool TaskQueue::IsCurrent() const { - return impl_->IsCurrent(); -} - -void TaskQueue::PostTask(std::unique_ptr task) { - return TaskQueue::impl_->PostTask(std::move(task)); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - reply_queue->impl_.get()); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), - impl_.get()); -} - -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); -} - -} // namespace rtc +} // namespace webrtc diff --git a/rtc_base/task_queue_stdlib.h b/rtc_base/task_queue_stdlib.h new file mode 100644 index 0000000000..fb03dff3d8 --- /dev/null +++ b/rtc_base/task_queue_stdlib.h @@ -0,0 +1,24 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_QUEUE_STDLIB_H_ +#define RTC_BASE_TASK_QUEUE_STDLIB_H_ + +#include + +#include "api/task_queue/task_queue_factory.h" + +namespace webrtc { + +std::unique_ptr CreateTaskQueueStdlibFactory(); + +} // namespace webrtc + +#endif // RTC_BASE_TASK_QUEUE_STDLIB_H_