diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index 8884bd3fb4..ebb7e39dec 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -72,7 +72,31 @@ rtc_source_set("default_task_queue_factory") { # TODO(bugs.webrtc.org/10284): Include implementation unconditionally when # global task queue factory is removed. if (rtc_link_task_queue_impl) { - sources += [ "default_task_queue_factory.cc" ] + deps += [ ":default_task_queue_factory_impl" ] + } +} + +# TODO(bugs.webrtc.org/10191): Merge back to default_task_queue_factory when +# rtc_task_queue_impl build target is removed. +rtc_source_set("default_task_queue_factory_impl") { + # Include the implementation when rtc_link_task_queue_impl is set to default + # value of true or when explicit dependency on "rtc_task_queue_impl" is added. + visibility = [ + ":default_task_queue_factory", + "../../rtc_base:rtc_task_queue_impl", + ] + deps = [ + ":task_queue_factory", + ] + if (rtc_enable_libevent) { + sources = [ + "default_task_queue_factory_libevent.cc", + ] + deps += [ "../../rtc_base:rtc_task_queue_libevent" ] + } else { + sources = [ + "default_task_queue_factory_unimplemented.cc", + ] deps += [ "../../rtc_base:checks" ] } } diff --git a/api/task_queue/default_task_queue_factory_libevent.cc b/api/task_queue/default_task_queue_factory_libevent.cc new file mode 100644 index 0000000000..f2fb418fd3 --- /dev/null +++ b/api/task_queue/default_task_queue_factory_libevent.cc @@ -0,0 +1,21 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include + +#include "api/task_queue/task_queue_factory.h" +#include "rtc_base/task_queue_libevent.h" + +namespace webrtc { + +std::unique_ptr CreateDefaultTaskQueueFactory() { + return CreateTaskQueueLibeventFactory(); +} + +} // namespace webrtc diff --git a/api/task_queue/default_task_queue_factory.cc b/api/task_queue/default_task_queue_factory_unimplemented.cc similarity index 91% rename from api/task_queue/default_task_queue_factory.cc rename to api/task_queue/default_task_queue_factory_unimplemented.cc index b3e86bb4df..d4020f8a82 100644 --- a/api/task_queue/default_task_queue_factory.cc +++ b/api/task_queue/default_task_queue_factory_unimplemented.cc @@ -7,8 +7,9 @@ * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ -#include "api/task_queue/default_task_queue_factory.h" +#include +#include "api/task_queue/task_queue_factory.h" #include "rtc_base/checks.h" namespace webrtc { diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 44623812f4..c152533469 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -507,11 +507,10 @@ rtc_source_set("rtc_task_queue_api") { if (rtc_enable_libevent) { rtc_source_set("rtc_task_queue_libevent") { - visibility = [ ":rtc_task_queue_impl" ] + visibility = [ "../api/task_queue:default_task_queue_factory_impl" ] sources = [ "task_queue_libevent.cc", - "task_queue_posix.cc", - "task_queue_posix.h", + "task_queue_libevent.h", ] deps = [ ":checks", @@ -520,12 +519,12 @@ if (rtc_enable_libevent) { ":macromagic", ":platform_thread", ":platform_thread_types", - ":refcount", - ":rtc_task_queue_api", ":safe_conversions", ":timeutils", - "../api:scoped_refptr", - "system:unused", + "../api/task_queue", + "../api/task_queue:task_queue_factory", + "//third_party/abseil-cpp/absl/memory", + "//third_party/abseil-cpp/absl/strings", ] if (rtc_build_libevent) { deps += [ "//base/third_party/libevent" ] @@ -597,7 +596,8 @@ rtc_source_set("rtc_task_queue_impl") { visibility = [ "*" ] if (rtc_enable_libevent) { deps = [ - ":rtc_task_queue_libevent", + "../api/task_queue:default_task_queue_factory_impl", + "../api/task_queue:global_task_queue_factory", ] } else { if (is_mac || is_ios) { diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 5ff177c3d3..38704c89ae 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/task_queue.h" +#include "rtc_base/task_queue_libevent.h" #include #include @@ -22,7 +22,10 @@ #include #include -#include "api/scoped_refptr.h" +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "api/task_queue/queued_task.h" +#include "api/task_queue/task_queue_base.h" #include "base/third_party/libevent/event.h" #include "rtc_base/checks.h" #include "rtc_base/critical_section.h" @@ -30,22 +33,15 @@ #include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread_types.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/ref_counted_object.h" -#include "rtc_base/system/unused.h" -#include "rtc_base/task_queue_posix.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" -namespace rtc { -using internal::GetQueuePtrTls; -using internal::AutoSetCurrentQueuePtr; - +namespace webrtc { namespace { -static const char kQuit = 1; -static const char kRunTask = 2; +constexpr char kQuit = 1; +constexpr char kRunTask = 2; -using Priority = TaskQueue::Priority; +using Priority = TaskQueueFactory::Priority; // This ignores the SIGPIPE signal on the calling thread. // This signal can be fired when trying to write() to a pipe that's being @@ -67,14 +63,6 @@ void IgnoreSigPipeSignalOnCurrentThread() { pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr); } -struct TimerEvent { - explicit TimerEvent(std::unique_ptr task) - : task(std::move(task)) {} - ~TimerEvent() { event_del(&ev); } - event ev; - std::unique_ptr task; -}; - bool SetNonBlocking(int fd) { const int flags = fcntl(fd, F_GETFL); RTC_CHECK(flags != -1); @@ -101,78 +89,76 @@ void EventAssign(struct event* ev, #endif } -ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { +rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { switch (priority) { case Priority::HIGH: - return kRealtimePriority; + return rtc::kRealtimePriority; case Priority::LOW: - return kLowPriority; + return rtc::kLowPriority; case Priority::NORMAL: - return kNormalPriority; + return rtc::kNormalPriority; default: RTC_NOTREACHED(); break; } - return kNormalPriority; + return rtc::kNormalPriority; } -} // namespace -class TaskQueue::Impl : public RefCountInterface { +class TaskQueueLibevent final : public TaskQueueBase { public: - explicit Impl(const char* queue_name, - TaskQueue* queue, - Priority priority = Priority::NORMAL); - ~Impl() override; + TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority); - static TaskQueue::Impl* Current(); - static TaskQueue* CurrentQueue(); - - // Used for DCHECKing the current queue. - bool IsCurrent() const; - - void PostTask(std::unique_ptr task); - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); + void Delete() override; + void PostTask(std::unique_ptr task) override; + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override; private: + class SetTimerTask; + struct TimerEvent; + + ~TaskQueueLibevent() override = default; + static void ThreadMain(void* context); static void OnWakeup(int socket, short flags, void* context); // NOLINT static void RunTask(int fd, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT - class SetTimerTask; - - struct QueueContext; - TaskQueue* const queue_; + bool is_active_ = true; int wakeup_pipe_in_ = -1; int wakeup_pipe_out_ = -1; event_base* event_base_; - std::unique_ptr wakeup_event_; - PlatformThread thread_; + event wakeup_event_; + rtc::PlatformThread thread_; rtc::CriticalSection pending_lock_; std::list> pending_ RTC_GUARDED_BY(pending_lock_); -}; - -struct TaskQueue::Impl::QueueContext { - explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {} - TaskQueue::Impl* queue; - bool is_active; // Holds a list of events pending timers for cleanup when the loop exits. std::list pending_timers_; }; -class TaskQueue::Impl::SetTimerTask : public QueuedTask { +struct TaskQueueLibevent::TimerEvent { + TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr task) + : task_queue(task_queue), task(std::move(task)) {} + ~TimerEvent() { event_del(&ev); } + + event ev; + TaskQueueLibevent* task_queue; + std::unique_ptr task; +}; + +class TaskQueueLibevent::SetTimerTask : public QueuedTask { public: SetTimerTask(std::unique_ptr task, uint32_t milliseconds) : task_(std::move(task)), milliseconds_(milliseconds), - posted_(Time32()) {} + posted_(rtc::Time32()) {} private: bool Run() override { // Compensate for the time that has passed since construction // and until we got here. - uint32_t post_time = Time32() - posted_; - TaskQueue::Impl::Current()->PostDelayedTask( + uint32_t post_time = rtc::Time32() - posted_; + TaskQueueLibevent::Current()->PostDelayedTask( std::move(task_), post_time > milliseconds_ ? 0 : milliseconds_ - post_time); return true; @@ -183,17 +169,10 @@ class TaskQueue::Impl::SetTimerTask : public QueuedTask { const uint32_t posted_; }; -TaskQueue::Impl::Impl(const char* queue_name, - TaskQueue* queue, - Priority priority /*= NORMAL*/) - : queue_(queue), - event_base_(event_base_new()), - wakeup_event_(new event()), - thread_(&TaskQueue::Impl::ThreadMain, - this, - queue_name, - TaskQueuePriorityToThreadPriority(priority)) { - RTC_DCHECK(queue_name); +TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, + rtc::ThreadPriority priority) + : event_base_(event_base_new()), + thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) { int fds[2]; RTC_CHECK(pipe(fds) == 0); SetNonBlocking(fds[0]); @@ -201,13 +180,13 @@ TaskQueue::Impl::Impl(const char* queue_name, wakeup_pipe_out_ = fds[0]; wakeup_pipe_in_ = fds[1]; - EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, + EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_, EV_READ | EV_PERSIST, OnWakeup, this); - event_add(wakeup_event_.get(), 0); + event_add(&wakeup_event_, 0); thread_.Start(); } -TaskQueue::Impl::~Impl() { +void TaskQueueLibevent::Delete() { RTC_DCHECK(!IsCurrent()); struct timespec ts; char message = kQuit; @@ -221,7 +200,7 @@ TaskQueue::Impl::~Impl() { thread_.Stop(); - event_del(wakeup_event_.get()); + event_del(&wakeup_event_); IgnoreSigPipeSignalOnCurrentThread(); @@ -231,48 +210,30 @@ TaskQueue::Impl::~Impl() { wakeup_pipe_out_ = -1; event_base_free(event_base_); + delete this; } -// static -TaskQueue::Impl* TaskQueue::Impl::Current() { - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - return ctx ? ctx->queue : nullptr; -} - -// static -TaskQueue* TaskQueue::Impl::CurrentQueue() { - TaskQueue::Impl* current = Current(); - if (current) { - return current->queue_; - } - return nullptr; -} - -bool TaskQueue::Impl::IsCurrent() const { - return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); -} - -void TaskQueue::Impl::PostTask(std::unique_ptr task) { +void TaskQueueLibevent::PostTask(std::unique_ptr task) { RTC_DCHECK(task.get()); // libevent isn't thread safe. This means that we can't use methods such // as event_base_once to post tasks to the worker thread from a different // thread. However, we can use it when posting from the worker thread itself. if (IsCurrent()) { - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask, - task.get(), nullptr) == 0) { + if (event_base_once(event_base_, -1, EV_TIMEOUT, + &TaskQueueLibevent::RunTask, task.get(), + nullptr) == 0) { task.release(); } } else { QueuedTask* task_id = task.get(); // Only used for comparison. { - CritScope lock(&pending_lock_); + rtc::CritScope lock(&pending_lock_); pending_.push_back(std::move(task)); } char message = kRunTask; if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { RTC_LOG(WARNING) << "Failed to queue task."; - CritScope lock(&pending_lock_); + rtc::CritScope lock(&pending_lock_); pending_.remove_if([task_id](std::unique_ptr& t) { return t.get() == task_id; }); @@ -280,61 +241,55 @@ void TaskQueue::Impl::PostTask(std::unique_ptr task) { } } -void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { +void TaskQueueLibevent::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { if (IsCurrent()) { - TimerEvent* timer = new TimerEvent(std::move(task)); - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer, + TimerEvent* timer = new TimerEvent(this, std::move(task)); + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer, timer); - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - ctx->pending_timers_.push_back(timer); + pending_timers_.push_back(timer); timeval tv = {rtc::dchecked_cast(milliseconds / 1000), rtc::dchecked_cast(milliseconds % 1000) * 1000}; event_add(&timer->ev, &tv); } else { - PostTask(std::unique_ptr( - new SetTimerTask(std::move(task), milliseconds))); + PostTask(absl::make_unique(std::move(task), milliseconds)); } } // static -void TaskQueue::Impl::ThreadMain(void* context) { - TaskQueue::Impl* me = static_cast(context); +void TaskQueueLibevent::ThreadMain(void* context) { + TaskQueueLibevent* me = static_cast(context); - QueueContext queue_context(me); - pthread_setspecific(GetQueuePtrTls(), &queue_context); + { + CurrentTaskQueueSetter set_current(me); + while (me->is_active_) + event_base_loop(me->event_base_, 0); + } - while (queue_context.is_active) - event_base_loop(me->event_base_, 0); - - pthread_setspecific(GetQueuePtrTls(), nullptr); - - for (TimerEvent* timer : queue_context.pending_timers_) + for (TimerEvent* timer : me->pending_timers_) delete timer; } // static -void TaskQueue::Impl::OnWakeup(int socket, - short flags, - void* context) { // NOLINT - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); +void TaskQueueLibevent::OnWakeup(int socket, + short flags, // NOLINT + void* context) { + TaskQueueLibevent* me = static_cast(context); + RTC_DCHECK(me->wakeup_pipe_out_ == socket); char buf; RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); switch (buf) { case kQuit: - ctx->is_active = false; - event_base_loopbreak(ctx->queue->event_base_); + me->is_active_ = false; + event_base_loopbreak(me->event_base_); break; case kRunTask: { std::unique_ptr task; { - CritScope lock(&ctx->queue->pending_lock_); - RTC_DCHECK(!ctx->queue->pending_.empty()); - task = std::move(ctx->queue->pending_.front()); - ctx->queue->pending_.pop_front(); + rtc::CritScope lock(&me->pending_lock_); + RTC_DCHECK(!me->pending_.empty()); + task = std::move(me->pending_.front()); + me->pending_.pop_front(); RTC_DCHECK(task.get()); } if (!task->Run()) @@ -348,46 +303,38 @@ void TaskQueue::Impl::OnWakeup(int socket, } // static -void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT +void TaskQueueLibevent::RunTask(int fd, short flags, void* context) { // NOLINT auto* task = static_cast(context); if (task->Run()) delete task; } // static -void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT +void TaskQueueLibevent::RunTimer(int fd, + short flags, // NOLINT + void* context) { TimerEvent* timer = static_cast(context); if (!timer->task->Run()) timer->task.release(); - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - ctx->pending_timers_.remove(timer); + timer->task_queue->pending_timers_.remove(timer); delete timer; } -TaskQueue::TaskQueue(const char* queue_name, Priority priority) - : impl_(new RefCountedObject(queue_name, this, priority)) { +class TaskQueueLibeventFactory final : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return std::unique_ptr( + new TaskQueueLibevent(name, + TaskQueuePriorityToThreadPriority(priority))); + } +}; + +} // namespace + +std::unique_ptr CreateTaskQueueLibeventFactory() { + return absl::make_unique(); } -TaskQueue::~TaskQueue() {} - -// static -TaskQueue* TaskQueue::Current() { - return TaskQueue::Impl::CurrentQueue(); -} - -// Used for DCHECKing the current queue. -bool TaskQueue::IsCurrent() const { - return impl_->IsCurrent(); -} - -void TaskQueue::PostTask(std::unique_ptr task) { - return TaskQueue::impl_->PostTask(std::move(task)); -} - -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); -} - -} // namespace rtc +} // namespace webrtc diff --git a/rtc_base/task_queue_libevent.h b/rtc_base/task_queue_libevent.h new file mode 100644 index 0000000000..aaa72d4a1b --- /dev/null +++ b/rtc_base/task_queue_libevent.h @@ -0,0 +1,24 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_QUEUE_LIBEVENT_H_ +#define RTC_BASE_TASK_QUEUE_LIBEVENT_H_ + +#include + +#include "api/task_queue/task_queue_factory.h" + +namespace webrtc { + +std::unique_ptr CreateTaskQueueLibeventFactory(); + +} // namespace webrtc + +#endif // RTC_BASE_TASK_QUEUE_LIBEVENT_H_