Update TaskQueueLibevent implementation to absl::AnyInvocable

Bug: webrtc:14245, webrtc:12889
Change-Id: I1aa20e3d5645c270abd1bee0c45c6982e799eaa4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268767
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37563}
This commit is contained in:
Danil Chapovalov 2022-07-19 14:12:43 +02:00 committed by WebRTC LUCI CQ
parent ed7526c3f1
commit 30c2a31309
2 changed files with 53 additions and 51 deletions

View file

@ -645,10 +645,12 @@ if (rtc_enable_libevent) {
":safe_conversions",
":timeutils",
"../api/task_queue",
"../api/units:time_delta",
"synchronization:mutex",
]
absl_deps = [
"//third_party/abseil-cpp/absl/container:inlined_vector",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings",
]
if (rtc_build_libevent) {

View file

@ -24,9 +24,10 @@
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
@ -106,14 +107,18 @@ class TaskQueueLibevent final : public TaskQueueBase {
TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
void PostTask(absl::AnyInvocable<void() &&> task) override;
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override;
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override;
private:
class SetTimerTask;
struct TimerEvent;
void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
TimeDelta delay);
~TaskQueueLibevent() override = default;
static void OnWakeup(int socket, short flags, void* context); // NOLINT
@ -126,43 +131,20 @@ class TaskQueueLibevent final : public TaskQueueBase {
event wakeup_event_;
rtc::PlatformThread thread_;
Mutex pending_lock_;
absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
RTC_GUARDED_BY(pending_lock_);
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
};
struct TaskQueueLibevent::TimerEvent {
TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
: task_queue(task_queue), task(std::move(task)) {}
~TimerEvent() { event_del(&ev); }
event ev;
TaskQueueLibevent* task_queue;
std::unique_ptr<QueuedTask> task;
};
class TaskQueueLibevent::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
: task_(std::move(task)),
milliseconds_(milliseconds),
posted_(rtc::Time32()) {}
private:
bool Run() override {
// Compensate for the time that has passed since construction
// and until we got here.
uint32_t post_time = rtc::Time32() - posted_;
TaskQueueLibevent::Current()->PostDelayedTask(
std::move(task_),
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
return true;
}
std::unique_ptr<QueuedTask> task_;
const uint32_t milliseconds_;
const uint32_t posted_;
absl::AnyInvocable<void() &&> task;
};
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
@ -219,7 +201,7 @@ void TaskQueueLibevent::Delete() {
delete this;
}
void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
{
MutexLock lock(&pending_lock_);
bool had_pending_tasks = !pending_.empty();
@ -242,21 +224,43 @@ void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
sizeof(message));
}
void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
// libevent api is not thread safe by default, thus event_add need to be
// called on the `thread_`.
RTC_DCHECK(IsCurrent());
TimerEvent* timer = new TimerEvent(this, std::move(task));
EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
timer);
pending_timers_.push_back(timer);
timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
.tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
event_add(&timer->ev, &tv);
}
void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
if (IsCurrent()) {
TimerEvent* timer = new TimerEvent(this, std::move(task));
EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
timer);
pending_timers_.push_back(timer);
timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
event_add(&timer->ev, &tv);
PostDelayedTaskOnTaskQueue(std::move(task), delay);
} else {
PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
int64_t posted_us = rtc::TimeMicros();
PostTask([posted_us, delay, task = std::move(task), this]() mutable {
// Compensate for the time that has passed since the posting.
TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
PostDelayedTaskOnTaskQueue(
std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
});
}
}
void TaskQueueLibevent::PostDelayedHighPrecisionTask(
absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
PostDelayedTask(std::move(task), delay);
}
// static
void TaskQueueLibevent::OnWakeup(int socket,
short flags, // NOLINT
@ -271,19 +275,16 @@ void TaskQueueLibevent::OnWakeup(int socket,
event_base_loopbreak(me->event_base_);
break;
case kRunTasks: {
absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
{
MutexLock lock(&me->pending_lock_);
tasks.swap(me->pending_);
}
RTC_DCHECK(!tasks.empty());
for (auto& task : tasks) {
if (task->Run()) {
task.reset();
} else {
// `false` means the task should *not* be deleted.
task.release();
}
std::move(task)();
// Prefer to delete the `task` before running the next one.
task = nullptr;
}
break;
}
@ -298,8 +299,7 @@ void TaskQueueLibevent::RunTimer(int fd,
short flags, // NOLINT
void* context) {
TimerEvent* timer = static_cast<TimerEvent*>(context);
if (!timer->task->Run())
timer->task.release();
std::move(timer->task)();
timer->task_queue->pending_timers_.remove(timer);
delete timer;
}