diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index 59aed762d5..01fdab5f77 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -99,6 +99,11 @@ rtc_source_set("default_task_queue_factory_impl") { "default_task_queue_factory_gcd.cc", ] deps += [ "../../rtc_base:rtc_task_queue_gcd" ] + } else if (is_win && current_os != "winuwp") { + sources = [ + "default_task_queue_factory_win.cc", + ] + deps += [ "../../rtc_base:rtc_task_queue_win" ] } else { sources = [ "default_task_queue_factory_unimplemented.cc", diff --git a/api/task_queue/default_task_queue_factory_win.cc b/api/task_queue/default_task_queue_factory_win.cc new file mode 100644 index 0000000000..493ea66ea5 --- /dev/null +++ b/api/task_queue/default_task_queue_factory_win.cc @@ -0,0 +1,21 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include + +#include "api/task_queue/task_queue_factory.h" +#include "rtc_base/task_queue_win.h" + +namespace webrtc { + +std::unique_ptr CreateDefaultTaskQueueFactory() { + return CreateTaskQueueWinFactory(); +} + +} // namespace webrtc diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index ff8af81a68..13c9957ae7 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -552,9 +552,10 @@ if (is_mac || is_ios) { if (is_win) { rtc_source_set("rtc_task_queue_win") { - visibility = [ ":rtc_task_queue_impl" ] + visibility = [ "../api/task_queue:default_task_queue_factory_impl" ] sources = [ "task_queue_win.cc", + "task_queue_win.h", ] deps = [ ":checks", @@ -562,12 +563,14 @@ if (is_win) { ":logging", ":macromagic", ":platform_thread", - ":refcount", ":rtc_event", ":rtc_task_queue_api", ":safe_conversions", ":timeutils", - "../api:scoped_refptr", + "../api/task_queue", + "../api/task_queue:task_queue_factory", + "//third_party/abseil-cpp/absl/memory", + "//third_party/abseil-cpp/absl/strings", ] } } @@ -594,22 +597,17 @@ rtc_source_set("rtc_task_queue_stdlib") { rtc_source_set("rtc_task_queue_impl") { visibility = [ "*" ] - if (rtc_enable_libevent || is_mac || is_ios) { + if (rtc_enable_libevent || is_mac || is_ios || + (is_win && current_os != "winuwp")) { deps = [ "../api/task_queue:default_task_queue_factory_impl", "../api/task_queue:global_task_queue_factory", ] } else { - if (is_win) { - if (current_os == "winuwp") { - deps = [ - ":rtc_task_queue_stdlib", - ] - } else { - deps = [ - ":rtc_task_queue_win", - ] - } + if (is_win && current_os == "winuwp") { + deps = [ + ":rtc_task_queue_stdlib", + ] } } } diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc index b73aa95493..696eda3bae 100644 --- a/rtc_base/task_queue_win.cc +++ b/rtc_base/task_queue_win.cc @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/task_queue.h" +#include "rtc_base/task_queue_win.h" // clang-format off // clang formating would change include order. @@ -27,6 +27,10 @@ #include #include +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "api/task_queue/queued_task.h" +#include "api/task_queue/task_queue_base.h" #include "rtc_base/arraysize.h" #include "rtc_base/checks.h" #include "rtc_base/critical_section.h" @@ -34,62 +38,40 @@ #include "rtc_base/logging.h" #include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/platform_thread.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/ref_counted_object.h" #include "rtc_base/time_utils.h" -namespace rtc { +namespace webrtc { namespace { #define WM_RUN_TASK WM_USER + 1 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 -using Priority = TaskQueue::Priority; - -DWORD g_queue_ptr_tls = 0; - -BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { - g_queue_ptr_tls = TlsAlloc(); - return TRUE; -} - -DWORD GetQueuePtrTls() { - static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; - ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); - return g_queue_ptr_tls; -} - -struct ThreadStartupData { - Event* started; - void* thread_context; -}; - void CALLBACK InitializeQueueThread(ULONG_PTR param) { MSG msg; ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE); - ThreadStartupData* data = reinterpret_cast(param); - ::TlsSetValue(GetQueuePtrTls(), data->thread_context); - data->started->Set(); + rtc::Event* data = reinterpret_cast(param); + data->Set(); } -ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { +rtc::ThreadPriority TaskQueuePriorityToThreadPriority( + TaskQueueFactory::Priority priority) { switch (priority) { - case Priority::HIGH: - return kRealtimePriority; - case Priority::LOW: - return kLowPriority; - case Priority::NORMAL: - return kNormalPriority; + case TaskQueueFactory::Priority::HIGH: + return rtc::kRealtimePriority; + case TaskQueueFactory::Priority::LOW: + return rtc::kLowPriority; + case TaskQueueFactory::Priority::NORMAL: + return rtc::kNormalPriority; default: RTC_NOTREACHED(); break; } - return kNormalPriority; + return rtc::kNormalPriority; } int64_t GetTick() { static const UINT kPeriod = 1; bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR); - int64_t ret = TimeMillis(); + int64_t ret = rtc::TimeMillis(); if (high_res) timeEndPeriod(kPeriod); return ret; @@ -168,81 +150,56 @@ class MultimediaTimer { RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); }; -} // namespace - -class TaskQueue::Impl : public RefCountInterface { +class TaskQueueWin : public TaskQueueBase { public: - Impl(const char* queue_name, TaskQueue* queue, Priority priority); - ~Impl() override; + TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority); + ~TaskQueueWin() override = default; - static TaskQueue::Impl* Current(); - static TaskQueue* CurrentQueue(); - - // Used for DCHECKing the current queue. - bool IsCurrent() const; - - template >::value>::type* = nullptr> - void PostTask(Closure&& closure) { - PostTask(NewClosure(std::forward(closure))); - } - - void PostTask(std::unique_ptr task); - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); + void Delete() override; + void PostTask(std::unique_ptr task) override; + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override; void RunPendingTasks(); private: static void ThreadMain(void* context); - class WorkerThread : public PlatformThread { + class WorkerThread : public rtc::PlatformThread { public: - WorkerThread(ThreadRunFunction func, + WorkerThread(rtc::ThreadRunFunction func, void* obj, - const char* thread_name, - ThreadPriority priority) + absl::string_view thread_name, + rtc::ThreadPriority priority) : PlatformThread(func, obj, thread_name, priority) {} bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { - return PlatformThread::QueueAPC(apc_function, data); + return rtc::PlatformThread::QueueAPC(apc_function, data); } }; - class ThreadState { - public: - explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} - ~ThreadState() {} + void RunThreadMain(); + bool ProcessQueuedMessages(); + void RunDueTasks(); + void ScheduleNextTimer(); + void CancelTimers(); - void RunThreadMain(); - - private: - bool ProcessQueuedMessages(); - void RunDueTasks(); - void ScheduleNextTimer(); - void CancelTimers(); - - // Since priority_queue<> by defult orders items in terms of - // largest->smallest, using std::less<>, and we want smallest->largest, - // we would like to use std::greater<> here. Alas it's only available in - // C++14 and later, so we roll our own compare template that that relies on - // operator<(). - template - struct greater { - bool operator()(const T& l, const T& r) { return l > r; } - }; - - MultimediaTimer timer_; - std::priority_queue, - greater> - timer_tasks_; - UINT_PTR timer_id_ = 0; - HANDLE in_queue_; + // Since priority_queue<> by defult orders items in terms of + // largest->smallest, using std::less<>, and we want smallest->largest, + // we would like to use std::greater<> here. Alas it's only available in + // C++14 and later, so we roll our own compare template that that relies on + // operator<(). + template + struct greater { + bool operator()(const T& l, const T& r) { return l > r; } }; - TaskQueue* const queue_; + MultimediaTimer timer_; + std::priority_queue, + greater> + timer_tasks_; + UINT_PTR timer_id_ = 0; WorkerThread thread_; rtc::CriticalSection pending_lock_; std::queue> pending_ @@ -250,26 +207,19 @@ class TaskQueue::Impl : public RefCountInterface { HANDLE in_queue_; }; -TaskQueue::Impl::Impl(const char* queue_name, - TaskQueue* queue, - Priority priority) - : queue_(queue), - thread_(&TaskQueue::Impl::ThreadMain, - this, - queue_name, - TaskQueuePriorityToThreadPriority(priority)), +TaskQueueWin::TaskQueueWin(absl::string_view queue_name, + rtc::ThreadPriority priority) + : thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority), in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { - RTC_DCHECK(queue_name); RTC_DCHECK(in_queue_); thread_.Start(); - Event event(false, false); - ThreadStartupData startup = {&event, this}; + rtc::Event event(false, false); RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, - reinterpret_cast(&startup))); - event.Wait(Event::kForever); + reinterpret_cast(&event))); + event.Wait(rtc::Event::kForever); } -TaskQueue::Impl::~Impl() { +void TaskQueueWin::Delete() { RTC_DCHECK(!IsCurrent()); while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); @@ -277,31 +227,17 @@ TaskQueue::Impl::~Impl() { } thread_.Stop(); ::CloseHandle(in_queue_); + delete this; } -// static -TaskQueue::Impl* TaskQueue::Impl::Current() { - return static_cast(::TlsGetValue(GetQueuePtrTls())); -} - -// static -TaskQueue* TaskQueue::Impl::CurrentQueue() { - TaskQueue::Impl* current = Current(); - return current ? current->queue_ : nullptr; -} - -bool TaskQueue::Impl::IsCurrent() const { - return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); -} - -void TaskQueue::Impl::PostTask(std::unique_ptr task) { +void TaskQueueWin::PostTask(std::unique_ptr task) { rtc::CritScope lock(&pending_lock_); pending_.push(std::move(task)); ::SetEvent(in_queue_); } -void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { +void TaskQueueWin::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { if (!milliseconds) { PostTask(std::move(task)); return; @@ -318,7 +254,7 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, } } -void TaskQueue::Impl::RunPendingTasks() { +void TaskQueueWin::RunPendingTasks() { while (true) { std::unique_ptr task; { @@ -335,12 +271,12 @@ void TaskQueue::Impl::RunPendingTasks() { } // static -void TaskQueue::Impl::ThreadMain(void* context) { - ThreadState state(static_cast(context)->in_queue_); - state.RunThreadMain(); +void TaskQueueWin::ThreadMain(void* context) { + static_cast(context)->RunThreadMain(); } -void TaskQueue::Impl::ThreadState::RunThreadMain() { +void TaskQueueWin::RunThreadMain() { + CurrentTaskQueueSetter set_current(this); HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_}; while (true) { // Make sure we do an alertable wait as that's required to allow APCs to run @@ -366,12 +302,12 @@ void TaskQueue::Impl::ThreadState::RunThreadMain() { if (result == (WAIT_OBJECT_0 + 1)) { ::ResetEvent(in_queue_); - TaskQueue::Impl::Current()->RunPendingTasks(); + RunPendingTasks(); } } } -bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() { +bool TaskQueueWin::ProcessQueuedMessages() { MSG msg = {}; // To protect against overly busy message queues, we limit the time // we process tasks to a few milliseconds. If we don't do that, there's @@ -425,7 +361,7 @@ bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() { return msg.message != WM_QUIT; } -void TaskQueue::Impl::ThreadState::RunDueTasks() { +void TaskQueueWin::RunDueTasks() { RTC_DCHECK(!timer_tasks_.empty()); auto now = GetTick(); do { @@ -437,7 +373,7 @@ void TaskQueue::Impl::ThreadState::RunDueTasks() { } while (!timer_tasks_.empty()); } -void TaskQueue::Impl::ThreadState::ScheduleNextTimer() { +void TaskQueueWin::ScheduleNextTimer() { RTC_DCHECK_EQ(timer_id_, 0); if (timer_tasks_.empty()) return; @@ -449,7 +385,7 @@ void TaskQueue::Impl::ThreadState::ScheduleNextTimer() { timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr); } -void TaskQueue::Impl::ThreadState::CancelTimers() { +void TaskQueueWin::CancelTimers() { timer_.Cancel(); if (timer_id_) { ::KillTimer(nullptr, timer_id_); @@ -457,30 +393,20 @@ void TaskQueue::Impl::ThreadState::CancelTimers() { } } -// Boilerplate for the PIMPL pattern. -TaskQueue::TaskQueue(const char* queue_name, Priority priority) - : impl_(new RefCountedObject(queue_name, this, priority)) { +class TaskQueueWinFactory : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return std::unique_ptr( + new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority))); + } +}; + +} // namespace + +std::unique_ptr CreateTaskQueueWinFactory() { + return absl::make_unique(); } -TaskQueue::~TaskQueue() {} - -// static -TaskQueue* TaskQueue::Current() { - return TaskQueue::Impl::CurrentQueue(); -} - -// Used for DCHECKing the current queue. -bool TaskQueue::IsCurrent() const { - return impl_->IsCurrent(); -} - -void TaskQueue::PostTask(std::unique_ptr task) { - return TaskQueue::impl_->PostTask(std::move(task)); -} - -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); -} - -} // namespace rtc +} // namespace webrtc diff --git a/rtc_base/task_queue_win.h b/rtc_base/task_queue_win.h new file mode 100644 index 0000000000..972611abc2 --- /dev/null +++ b/rtc_base/task_queue_win.h @@ -0,0 +1,24 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_QUEUE_WIN_H_ +#define RTC_BASE_TASK_QUEUE_WIN_H_ + +#include + +#include "api/task_queue/task_queue_factory.h" + +namespace webrtc { + +std::unique_ptr CreateTaskQueueWinFactory(); + +} + +#endif // RTC_BASE_TASK_QUEUE_WIN_H_