Use non-recursive mutex in rtc::Thread

With rtc::Thread::Clear removed, there are no longer calls to external code while holding the mutex and thus it doesn't need to be recursive.

Bug: webrtc:11567
Change-Id: If350684be0bfcbc33806019bd986138905aec66d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/276542
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38179}
This commit is contained in:
Danil Chapovalov 2022-09-23 12:39:21 +02:00 committed by WebRTC LUCI CQ
parent 0deda15c96
commit 3ab76d945c
2 changed files with 22 additions and 17 deletions

View file

@ -41,6 +41,7 @@
#include "rtc_base/internal/default_socket_server.h" #include "rtc_base/internal/default_socket_server.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h" #include "rtc_base/null_socket_server.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
@ -74,6 +75,7 @@ class ScopedAutoReleasePool {
namespace rtc { namespace rtc {
namespace { namespace {
using ::webrtc::MutexLock;
using ::webrtc::TimeDelta; using ::webrtc::TimeDelta;
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
@ -426,8 +428,8 @@ absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
int64_t cmsDelayNext = kForever; int64_t cmsDelayNext = kForever;
{ {
// All queue operations need to be locked, but nothing else in this loop // All queue operations need to be locked, but nothing else in this loop
// can happen inside the crit. // can happen while holding the `mutex_`.
CritScope cs(&crit_); MutexLock lock(&mutex_);
// Check for delayed messages that have been triggered and calculate the // Check for delayed messages that have been triggered and calculate the
// next trigger time. // next trigger time.
while (!delayed_messages_.empty()) { while (!delayed_messages_.empty()) {
@ -491,7 +493,7 @@ void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
// Signal for the multiplexer to return // Signal for the multiplexer to return
{ {
CritScope cs(&crit_); MutexLock lock(&mutex_);
messages_.push(std::move(task)); messages_.push(std::move(task));
} }
WakeUpSocketServer(); WakeUpSocketServer();
@ -510,7 +512,7 @@ void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>(); int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
int64_t run_time_ms = TimeAfter(delay_ms); int64_t run_time_ms = TimeAfter(delay_ms);
{ {
CritScope cs(&crit_); MutexLock lock(&mutex_);
delayed_messages_.push({.delay_ms = delay_ms, delayed_messages_.push({.delay_ms = delay_ms,
.run_time_ms = run_time_ms, .run_time_ms = run_time_ms,
.message_number = delayed_next_num_, .message_number = delayed_next_num_,
@ -525,7 +527,7 @@ void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
} }
int Thread::GetDelay() { int Thread::GetDelay() {
CritScope cs(&crit_); MutexLock lock(&mutex_);
if (!messages_.empty()) if (!messages_.empty())
return 0; return 0;
@ -786,8 +788,10 @@ void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
absl::Cleanup cleanup = [this, &ready, current_thread, absl::Cleanup cleanup = [this, &ready, current_thread,
done = done_event.get()] { done = done_event.get()] {
if (current_thread) { if (current_thread) {
CritScope cs(&crit_); {
MutexLock lock(&mutex_);
ready = true; ready = true;
}
current_thread->socketserver()->WakeUp(); current_thread->socketserver()->WakeUp();
} else { } else {
done->Set(); done->Set();
@ -796,14 +800,14 @@ void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
PostTask([functor, cleanup = std::move(cleanup)] { functor(); }); PostTask([functor, cleanup = std::move(cleanup)] { functor(); });
if (current_thread) { if (current_thread) {
bool waited = false; bool waited = false;
crit_.Enter(); mutex_.Lock();
while (!ready) { while (!ready) {
crit_.Leave(); mutex_.Unlock();
current_thread->socketserver()->Wait(SocketServer::kForever, false); current_thread->socketserver()->Wait(SocketServer::kForever, false);
waited = true; waited = true;
crit_.Enter(); mutex_.Lock();
} }
crit_.Leave(); mutex_.Unlock();
// Our Wait loop above may have consumed some WakeUp events for this // Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can // Thread, that weren't relevant to this Send. Losing these WakeUps can

View file

@ -38,6 +38,7 @@
#include "rtc_base/location.h" #include "rtc_base/location.h"
#include "rtc_base/platform_thread_types.h" #include "rtc_base/platform_thread_types.h"
#include "rtc_base/socket_server.h" #include "rtc_base/socket_server.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -270,7 +271,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
bool empty() const { return size() == 0u; } bool empty() const { return size() == 0u; }
size_t size() const { size_t size() const {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
return messages_.size() + delayed_messages_.size(); return messages_.size() + delayed_messages_.size();
} }
@ -429,7 +430,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Perform cleanup; subclasses must call this from the destructor, // Perform cleanup; subclasses must call this from the destructor,
// and are not expected to actually hold the lock. // and are not expected to actually hold the lock.
void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void WakeUpSocketServer(); void WakeUpSocketServer();
@ -482,16 +483,16 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Called by the ThreadManager when being unset as the current thread. // Called by the ThreadManager when being unset as the current thread.
void ClearCurrentTaskQueue(); void ClearCurrentTaskQueue();
std::queue<absl::AnyInvocable<void() &&>> messages_ RTC_GUARDED_BY(crit_); std::queue<absl::AnyInvocable<void() &&>> messages_ RTC_GUARDED_BY(mutex_);
std::priority_queue<DelayedMessage> delayed_messages_ RTC_GUARDED_BY(crit_); std::priority_queue<DelayedMessage> delayed_messages_ RTC_GUARDED_BY(mutex_);
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(mutex_);
#if RTC_DCHECK_IS_ON #if RTC_DCHECK_IS_ON
uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0; uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0;
uint32_t could_be_blocking_call_count_ RTC_GUARDED_BY(this) = 0; uint32_t could_be_blocking_call_count_ RTC_GUARDED_BY(this) = 0;
std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this); std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this);
bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false; bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false;
#endif #endif
RecursiveCriticalSection crit_; mutable webrtc::Mutex mutex_;
bool fInitialized_; bool fInitialized_;
bool fDestroyed_; bool fDestroyed_;