Fix TaskQueueLibevent::PostTask when used on the same TaskQueue

Stop using event_base_once because it doesn't guarantee to free QueuedTask when task not run and thus may break TaskQueue guarantee all posted tasks are eventually deleted

Bug: webrtc:10731, webrtc:10278
Change-Id: Id073a6092cf603cac5768da7a0770371053b20cc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/141420
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28241}
This commit is contained in:
Danil Chapovalov 2019-06-11 18:01:56 +02:00 committed by Commit Bot
parent eceb537086
commit 00e71ef49e

View file

@ -121,7 +121,6 @@ class TaskQueueLibevent final : public TaskQueueBase {
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
bool is_active_ = true;
@ -214,30 +213,18 @@ void TaskQueueLibevent::Delete() {
}
void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
RTC_DCHECK(task.get());
// libevent isn't thread safe. This means that we can't use methods such
// as event_base_once to post tasks to the worker thread from a different
// thread. However, we can use it when posting from the worker thread itself.
if (IsCurrent()) {
if (event_base_once(event_base_, -1, EV_TIMEOUT,
&TaskQueueLibevent::RunTask, task.get(),
nullptr) == 0) {
task.release();
}
} else {
QueuedTask* task_id = task.get(); // Only used for comparison.
{
rtc::CritScope lock(&pending_lock_);
pending_.push_back(std::move(task));
}
char message = kRunTask;
if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
RTC_LOG(WARNING) << "Failed to queue task.";
rtc::CritScope lock(&pending_lock_);
pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
return t.get() == task_id;
});
}
QueuedTask* task_id = task.get(); // Only used for comparison.
{
rtc::CritScope lock(&pending_lock_);
pending_.push_back(std::move(task));
}
char message = kRunTask;
if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
RTC_LOG(WARNING) << "Failed to queue task.";
rtc::CritScope lock(&pending_lock_);
pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
return t.get() == task_id;
});
}
}
@ -302,13 +289,6 @@ void TaskQueueLibevent::OnWakeup(int socket,
}
}
// static
void TaskQueueLibevent::RunTask(int fd, short flags, void* context) { // NOLINT
auto* task = static_cast<QueuedTask*>(context);
if (task->Run())
delete task;
}
// static
void TaskQueueLibevent::RunTimer(int fd,
short flags, // NOLINT