PlatformThread: add support for detached threads.

The change introduces support for detachable PlatformThreads, for which
the Stop() call doesn't wait until the thread has finished executing.

The change also introduces rtc::ThreadAttributes that carries priority
and detachability thread attributes. It additionally refactors all
known use to use the new semantics.

Bug: b:181572711, webrtc:12659
Change-Id: Id96e87c2a0dafabc8047767d241fd5da4505d14c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214704
Reviewed-by: Tommi <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33796}
This commit is contained in:
Markus Handell 2021-04-20 17:41:54 +02:00 committed by Commit Bot
parent 6ef4af9546
commit 97c4458c8f
18 changed files with 278 additions and 182 deletions

View file

@ -429,15 +429,21 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
AudioCodingModuleMtTestOldApi() AudioCodingModuleMtTestOldApi()
: AudioCodingModuleTestOldApi(), : AudioCodingModuleTestOldApi(),
send_thread_(CbSendThread, this, "send", rtc::kRealtimePriority), send_thread_(
insert_packet_thread_(CbInsertPacketThread, CbSendThread,
this,
"send",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
insert_packet_thread_(
CbInsertPacketThread,
this, this,
"insert_packet", "insert_packet",
rtc::kRealtimePriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
pull_audio_thread_(CbPullAudioThread, pull_audio_thread_(
CbPullAudioThread,
this, this,
"pull_audio", "pull_audio",
rtc::kRealtimePriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
send_count_(0), send_count_(0),
insert_packet_count_(0), insert_packet_count_(0),
pull_audio_count_(0), pull_audio_count_(0),
@ -693,14 +699,16 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
AcmReRegisterIsacMtTestOldApi() AcmReRegisterIsacMtTestOldApi()
: AudioCodingModuleTestOldApi(), : AudioCodingModuleTestOldApi(),
receive_thread_(CbReceiveThread, receive_thread_(
CbReceiveThread,
this, this,
"receive", "receive",
rtc::kRealtimePriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registration_thread_(CbCodecRegistrationThread, codec_registration_thread_(
CbCodecRegistrationThread,
this, this,
"codec_registration", "codec_registration",
rtc::kRealtimePriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registered_(false), codec_registered_(false),
receive_packet_count_(0), receive_packet_count_(0),
next_insert_packet_time_ms_(0), next_insert_packet_time_ms_(0),

View file

@ -218,7 +218,7 @@ int32_t FileAudioDevice::StartPlayout() {
_ptrThreadPlay.reset(new rtc::PlatformThread( _ptrThreadPlay.reset(new rtc::PlatformThread(
PlayThreadFunc, this, "webrtc_audio_module_play_thread", PlayThreadFunc, this, "webrtc_audio_module_play_thread",
rtc::kRealtimePriority)); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadPlay->Start(); _ptrThreadPlay->Start();
RTC_LOG(LS_INFO) << "Started playout capture to output file: " RTC_LOG(LS_INFO) << "Started playout capture to output file: "
@ -278,7 +278,7 @@ int32_t FileAudioDevice::StartRecording() {
_ptrThreadRec.reset(new rtc::PlatformThread( _ptrThreadRec.reset(new rtc::PlatformThread(
RecThreadFunc, this, "webrtc_audio_module_capture_thread", RecThreadFunc, this, "webrtc_audio_module_capture_thread",
rtc::kRealtimePriority)); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadRec->Start(); _ptrThreadRec->Start();

View file

@ -1042,7 +1042,7 @@ int32_t AudioDeviceLinuxALSA::StartRecording() {
// RECORDING // RECORDING
_ptrThreadRec.reset(new rtc::PlatformThread( _ptrThreadRec.reset(new rtc::PlatformThread(
RecThreadFunc, this, "webrtc_audio_module_capture_thread", RecThreadFunc, this, "webrtc_audio_module_capture_thread",
rtc::kRealtimePriority)); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadRec->Start(); _ptrThreadRec->Start();
@ -1160,7 +1160,7 @@ int32_t AudioDeviceLinuxALSA::StartPlayout() {
// PLAYOUT // PLAYOUT
_ptrThreadPlay.reset(new rtc::PlatformThread( _ptrThreadPlay.reset(new rtc::PlatformThread(
PlayThreadFunc, this, "webrtc_audio_module_play_thread", PlayThreadFunc, this, "webrtc_audio_module_play_thread",
rtc::kRealtimePriority)); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadPlay->Start(); _ptrThreadPlay->Start();
int errVal = LATE(snd_pcm_prepare)(_handlePlayout); int errVal = LATE(snd_pcm_prepare)(_handlePlayout);

View file

@ -158,16 +158,16 @@ AudioDeviceGeneric::InitStatus AudioDeviceLinuxPulse::Init() {
#endif #endif
// RECORDING // RECORDING
_ptrThreadRec.reset(new rtc::PlatformThread(RecThreadFunc, this, _ptrThreadRec.reset(new rtc::PlatformThread(
"webrtc_audio_module_rec_thread", RecThreadFunc, this, "webrtc_audio_module_rec_thread",
rtc::kRealtimePriority)); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadRec->Start(); _ptrThreadRec->Start();
// PLAYOUT // PLAYOUT
_ptrThreadPlay.reset(new rtc::PlatformThread( _ptrThreadPlay.reset(new rtc::PlatformThread(
PlayThreadFunc, this, "webrtc_audio_module_play_thread", PlayThreadFunc, this, "webrtc_audio_module_play_thread",
rtc::kRealtimePriority)); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadPlay->Start(); _ptrThreadPlay->Start();
_initialized = true; _initialized = true;

View file

@ -1310,7 +1310,8 @@ int32_t AudioDeviceMac::StartRecording() {
RTC_DCHECK(!capture_worker_thread_.get()); RTC_DCHECK(!capture_worker_thread_.get());
capture_worker_thread_.reset(new rtc::PlatformThread( capture_worker_thread_.reset(new rtc::PlatformThread(
RunCapture, this, "CaptureWorkerThread", rtc::kRealtimePriority)); RunCapture, this, "CaptureWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
RTC_DCHECK(capture_worker_thread_.get()); RTC_DCHECK(capture_worker_thread_.get());
capture_worker_thread_->Start(); capture_worker_thread_->Start();
@ -1445,7 +1446,8 @@ int32_t AudioDeviceMac::StartPlayout() {
RTC_DCHECK(!render_worker_thread_.get()); RTC_DCHECK(!render_worker_thread_.get());
render_worker_thread_.reset(new rtc::PlatformThread( render_worker_thread_.reset(new rtc::PlatformThread(
RunRender, this, "RenderWorkerThread", rtc::kRealtimePriority)); RunRender, this, "RenderWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
render_worker_thread_->Start(); render_worker_thread_->Start();
if (_twoDevices || !_recording) { if (_twoDevices || !_recording) {

View file

@ -9,15 +9,16 @@
*/ */
#include "modules/audio_device/win/core_audio_base_win.h" #include "modules/audio_device/win/core_audio_base_win.h"
#include "modules/audio_device/audio_device_buffer.h"
#include <memory> #include <memory>
#include <string> #include <string>
#include "modules/audio_device/audio_device_buffer.h"
#include "rtc_base/arraysize.h" #include "rtc_base/arraysize.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h" #include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_base/win/scoped_com_initializer.h" #include "rtc_base/win/scoped_com_initializer.h"
#include "rtc_base/win/windows_version.h" #include "rtc_base/win/windows_version.h"
@ -560,7 +561,7 @@ bool CoreAudioBase::Start() {
if (!audio_thread_) { if (!audio_thread_) {
audio_thread_ = std::make_unique<rtc::PlatformThread>( audio_thread_ = std::make_unique<rtc::PlatformThread>(
Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread", Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread",
rtc::kRealtimePriority); rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority));
RTC_DCHECK(audio_thread_); RTC_DCHECK(audio_thread_);
audio_thread_->Start(); audio_thread_->Start();
if (!audio_thread_->IsRunning()) { if (!audio_thread_->IsRunning()) {

View file

@ -485,18 +485,21 @@ void PopulateAudioFrame(float amplitude,
} }
AudioProcessingImplLockTest::AudioProcessingImplLockTest() AudioProcessingImplLockTest::AudioProcessingImplLockTest()
: render_thread_(RenderProcessorThreadFunc, : render_thread_(
RenderProcessorThreadFunc,
this, this,
"render", "render",
rtc::kRealtimePriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
capture_thread_(CaptureProcessorThreadFunc, capture_thread_(
CaptureProcessorThreadFunc,
this, this,
"capture", "capture",
rtc::kRealtimePriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
stats_thread_(StatsProcessorThreadFunc, stats_thread_(
StatsProcessorThreadFunc,
this, this,
"stats", "stats",
rtc::kNormalPriority), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
apm_(AudioProcessingBuilderForTesting().Create()), apm_(AudioProcessingBuilderForTesting().Create()),
render_thread_state_(kMaxFrameSize, render_thread_state_(kMaxFrameSize,
&rand_gen_, &rand_gen_,

View file

@ -391,14 +391,16 @@ class TimedThreadApiProcessor {
class CallSimulator : public ::testing::TestWithParam<SimulationConfig> { class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
public: public:
CallSimulator() CallSimulator()
: render_thread_(new rtc::PlatformThread(RenderProcessorThreadFunc, : render_thread_(new rtc::PlatformThread(
RenderProcessorThreadFunc,
this, this,
"render", "render",
rtc::kRealtimePriority)), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
capture_thread_(new rtc::PlatformThread(CaptureProcessorThreadFunc, capture_thread_(new rtc::PlatformThread(
CaptureProcessorThreadFunc,
this, this,
"capture", "capture",
rtc::kRealtimePriority)), rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
rand_gen_(42U), rand_gen_(42U),
simulation_config_(static_cast<SimulationConfig>(GetParam())) {} simulation_config_(static_cast<SimulationConfig>(GetParam())) {}

View file

@ -243,9 +243,9 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
// start capture thread; // start capture thread;
if (!_captureThread) { if (!_captureThread) {
quit_ = false; quit_ = false;
_captureThread.reset( _captureThread.reset(new rtc::PlatformThread(
new rtc::PlatformThread(VideoCaptureModuleV4L2::CaptureThread, this, VideoCaptureModuleV4L2::CaptureThread, this, "CaptureThread",
"CaptureThread", rtc::kHighPriority)); rtc::ThreadAttributes().SetPriority(rtc::kHighPriority)));
_captureThread->Start(); _captureThread->Start();
} }

View file

@ -238,7 +238,10 @@ rtc_library("platform_thread") {
":timeutils", ":timeutils",
"../api:sequence_checker", "../api:sequence_checker",
] ]
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] absl_deps = [
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
]
} }
rtc_library("rtc_event") { rtc_library("rtc_event") {

View file

@ -91,7 +91,7 @@ class EventLogger final {
: logging_thread_(EventTracingThreadFunc, : logging_thread_(EventTracingThreadFunc,
this, this,
"EventTracingThread", "EventTracingThread",
kLowPriority) {} ThreadAttributes().SetPriority(kLowPriority)) {}
~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); } ~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); }
void AddTraceEvent(const char* name, void AddTraceEvent(const char* name,

View file

@ -10,6 +10,8 @@
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include <memory>
#if !defined(WEBRTC_WIN) #if !defined(WEBRTC_WIN)
#include <sched.h> #include <sched.h>
#endif #endif
@ -18,123 +20,22 @@
#include <algorithm> #include <algorithm>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
namespace rtc { namespace rtc {
namespace { namespace {
#if !defined(WEBRTC_WIN) struct ThreadStartData {
struct ThreadAttributes { ThreadRunFunction run_function;
ThreadAttributes() { pthread_attr_init(&attr); } void* obj;
~ThreadAttributes() { pthread_attr_destroy(&attr); } std::string thread_name;
pthread_attr_t* operator&() { return &attr; } ThreadPriority priority;
pthread_attr_t attr;
}; };
#endif // defined(WEBRTC_WIN)
} // namespace
PlatformThread::PlatformThread(ThreadRunFunction func, bool SetPriority(ThreadPriority priority) {
void* obj,
absl::string_view thread_name,
ThreadPriority priority /*= kNormalPriority*/)
: run_function_(func), priority_(priority), obj_(obj), name_(thread_name) {
RTC_DCHECK(func);
RTC_DCHECK(!name_.empty());
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
RTC_DCHECK(name_.length() < 64);
spawned_thread_checker_.Detach();
}
PlatformThread::~PlatformThread() {
RTC_DCHECK(thread_checker_.IsCurrent());
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
RTC_DCHECK(!thread_); return SetThreadPriority(GetCurrentThread(), priority) != FALSE;
RTC_DCHECK(!thread_id_);
#endif // defined(WEBRTC_WIN)
}
#if defined(WEBRTC_WIN)
DWORD WINAPI PlatformThread::StartThread(void* param) {
// The GetLastError() function only returns valid results when it is called
// after a Win32 API function that returns a "failed" result. A crash dump
// contains the result from GetLastError() and to make sure it does not
// falsely report a Windows error we call SetLastError here.
::SetLastError(ERROR_SUCCESS);
static_cast<PlatformThread*>(param)->Run();
return 0;
}
#else
void* PlatformThread::StartThread(void* param) {
static_cast<PlatformThread*>(param)->Run();
return 0;
}
#endif // defined(WEBRTC_WIN)
void PlatformThread::Start() {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_) << "Thread already started?";
#if defined(WEBRTC_WIN)
// See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION.
// Set the reserved stack stack size to 1M, which is the default on Windows
// and Linux.
thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, this,
STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_);
RTC_CHECK(thread_) << "CreateThread failed";
RTC_DCHECK(thread_id_);
#else
ThreadAttributes attr;
// Set the stack stack size to 1M.
pthread_attr_setstacksize(&attr, 1024 * 1024);
RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, this));
#endif // defined(WEBRTC_WIN)
}
bool PlatformThread::IsRunning() const {
RTC_DCHECK(thread_checker_.IsCurrent());
#if defined(WEBRTC_WIN)
return thread_ != nullptr;
#else
return thread_ != 0;
#endif // defined(WEBRTC_WIN)
}
PlatformThreadRef PlatformThread::GetThreadRef() const {
#if defined(WEBRTC_WIN)
return thread_id_;
#else
return thread_;
#endif // defined(WEBRTC_WIN)
}
void PlatformThread::Stop() {
RTC_DCHECK(thread_checker_.IsCurrent());
if (!IsRunning())
return;
#if defined(WEBRTC_WIN)
WaitForSingleObject(thread_, INFINITE);
CloseHandle(thread_);
thread_ = nullptr;
thread_id_ = 0;
#else
RTC_CHECK_EQ(0, pthread_join(thread_, nullptr));
thread_ = 0;
#endif // defined(WEBRTC_WIN)
spawned_thread_checker_.Detach();
}
void PlatformThread::Run() {
// Attach the worker thread checker to this thread.
RTC_DCHECK(spawned_thread_checker_.IsCurrent());
rtc::SetCurrentThreadName(name_.c_str());
SetPriority(priority_);
run_function_(obj_);
}
bool PlatformThread::SetPriority(ThreadPriority priority) {
RTC_DCHECK(spawned_thread_checker_.IsCurrent());
#if defined(WEBRTC_WIN)
return SetThreadPriority(thread_, priority) != FALSE;
#elif defined(__native_client__) || defined(WEBRTC_FUCHSIA) #elif defined(__native_client__) || defined(WEBRTC_FUCHSIA)
// Setting thread priorities is not supported in NaCl or Fuchsia. // Setting thread priorities is not supported in NaCl or Fuchsia.
return true; return true;
@ -176,13 +77,124 @@ bool PlatformThread::SetPriority(ThreadPriority priority) {
param.sched_priority = top_prio; param.sched_priority = top_prio;
break; break;
} }
return pthread_setschedparam(thread_, policy, &param) == 0; return pthread_setschedparam(pthread_self(), policy, &param) == 0;
#endif // defined(WEBRTC_WIN)
}
void RunPlatformThread(std::unique_ptr<ThreadStartData> data) {
rtc::SetCurrentThreadName(data->thread_name.c_str());
data->thread_name.clear();
SetPriority(data->priority);
data->run_function(data->obj);
}
#if defined(WEBRTC_WIN)
DWORD WINAPI StartThread(void* param) {
// The GetLastError() function only returns valid results when it is called
// after a Win32 API function that returns a "failed" result. A crash dump
// contains the result from GetLastError() and to make sure it does not
// falsely report a Windows error we call SetLastError here.
::SetLastError(ERROR_SUCCESS);
RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param)));
return 0;
}
#else
void* StartThread(void* param) {
RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param)));
return 0;
}
#endif // defined(WEBRTC_WIN)
} // namespace
PlatformThread::PlatformThread(ThreadRunFunction func,
void* obj,
absl::string_view thread_name,
ThreadAttributes attributes)
: run_function_(func),
attributes_(attributes),
obj_(obj),
name_(thread_name) {
RTC_DCHECK(func);
RTC_DCHECK(!name_.empty());
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
RTC_DCHECK(name_.length() < 64);
}
PlatformThread::~PlatformThread() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(!thread_);
#if defined(WEBRTC_WIN)
RTC_DCHECK(!thread_id_);
#endif // defined(WEBRTC_WIN)
}
void PlatformThread::Start() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(!thread_) << "Thread already started?";
ThreadStartData* data =
new ThreadStartData{run_function_, obj_, name_, attributes_.priority};
#if defined(WEBRTC_WIN)
// See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION.
// Set the reserved stack stack size to 1M, which is the default on Windows
// and Linux.
thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, data,
STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_);
RTC_CHECK(thread_) << "CreateThread failed";
RTC_DCHECK(thread_id_);
#else
pthread_attr_t attr;
pthread_attr_init(&attr);
// Set the stack stack size to 1M.
pthread_attr_setstacksize(&attr, 1024 * 1024);
pthread_attr_setdetachstate(&attr, attributes_.joinable
? PTHREAD_CREATE_JOINABLE
: PTHREAD_CREATE_DETACHED);
RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, data));
pthread_attr_destroy(&attr);
#endif // defined(WEBRTC_WIN)
}
bool PlatformThread::IsRunning() const {
RTC_DCHECK_RUN_ON(&thread_checker_);
#if defined(WEBRTC_WIN)
return thread_ != nullptr;
#else
return thread_ != 0;
#endif // defined(WEBRTC_WIN)
}
PlatformThreadRef PlatformThread::GetThreadRef() const {
#if defined(WEBRTC_WIN)
return thread_id_;
#else
return thread_;
#endif // defined(WEBRTC_WIN)
}
void PlatformThread::Stop() {
RTC_DCHECK_RUN_ON(&thread_checker_);
if (!IsRunning())
return;
#if defined(WEBRTC_WIN)
if (attributes_.joinable) {
WaitForSingleObject(thread_, INFINITE);
}
CloseHandle(thread_);
thread_ = nullptr;
thread_id_ = 0;
#else
if (attributes_.joinable) {
RTC_CHECK_EQ(0, pthread_join(thread_, nullptr));
}
thread_ = 0;
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
} }
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) { bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(IsRunning()); RTC_DCHECK(IsRunning());
return QueueUserAPC(function, thread_, data) != FALSE; return QueueUserAPC(function, thread_, data) != FALSE;

View file

@ -42,6 +42,20 @@ enum ThreadPriority {
#endif #endif
}; };
struct ThreadAttributes {
ThreadPriority priority = kNormalPriority;
bool joinable = true;
ThreadAttributes& SetPriority(ThreadPriority priority_param) {
priority = priority_param;
return *this;
}
ThreadAttributes& SetDetached() {
joinable = false;
return *this;
}
};
// Represents a simple worker thread. The implementation must be assumed // Represents a simple worker thread. The implementation must be assumed
// to be single threaded, meaning that all methods of the class, must be // to be single threaded, meaning that all methods of the class, must be
// called from the same thread, including instantiation. // called from the same thread, including instantiation.
@ -50,13 +64,14 @@ class PlatformThread {
PlatformThread(ThreadRunFunction func, PlatformThread(ThreadRunFunction func,
void* obj, void* obj,
absl::string_view thread_name, absl::string_view thread_name,
ThreadPriority priority = kNormalPriority); ThreadAttributes attributes = ThreadAttributes());
virtual ~PlatformThread(); virtual ~PlatformThread();
const std::string& name() const { return name_; } const std::string& name() const { return name_; }
// Spawns a thread and tries to set thread priority according to the priority // Spawns a thread and tries to set thread priority according to the priority
// from when CreateThread was called. // from when CreateThread was called.
// Start can only be called after the constructor or after a call to Stop().
void Start(); void Start();
bool IsRunning() const; bool IsRunning() const;
@ -65,7 +80,11 @@ class PlatformThread {
// thread checks. // thread checks.
PlatformThreadRef GetThreadRef() const; PlatformThreadRef GetThreadRef() const;
// Stops (joins) the spawned thread. // Stop() prepares the PlatformThread for destruction or another call to
// Start(). For a PlatformThread that's been created with
// ThreadAttributes::joinable true (the default), Stop() suspends the calling
// thread until the created thread exits unless the thread has already exited.
// Stop() can only be called after calling Start().
void Stop(); void Stop();
protected: protected:
@ -75,25 +94,17 @@ class PlatformThread {
#endif #endif
private: private:
void Run();
bool SetPriority(ThreadPriority priority);
ThreadRunFunction const run_function_ = nullptr; ThreadRunFunction const run_function_ = nullptr;
const ThreadPriority priority_ = kNormalPriority; const ThreadAttributes attributes_;
void* const obj_; void* const obj_;
// TODO(pbos): Make sure call sites use string literals and update to a const // TODO(pbos): Make sure call sites use string literals and update to a const
// char* instead of a std::string. // char* instead of a std::string.
const std::string name_; const std::string name_;
webrtc::SequenceChecker thread_checker_; webrtc::SequenceChecker thread_checker_;
webrtc::SequenceChecker spawned_thread_checker_;
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
static DWORD WINAPI StartThread(void* param);
HANDLE thread_ = nullptr; HANDLE thread_ = nullptr;
DWORD thread_id_ = 0; DWORD thread_id_ = 0;
#else #else
static void* StartThread(void* param);
pthread_t thread_ = 0; pthread_t thread_ = 0;
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread); RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread);

View file

@ -10,7 +10,9 @@
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include "test/gtest.h" #include "rtc_base/event.h"
#include "system_wrappers/include/sleep.h"
#include "test/gmock.h"
namespace rtc { namespace rtc {
namespace { namespace {
@ -23,6 +25,11 @@ void SetFlagRunFunction(void* obj) {
*obj_as_bool = true; *obj_as_bool = true;
} }
void StdFunctionRunFunction(void* obj) {
std::function<void()>* fun = static_cast<std::function<void()>*>(obj);
(*fun)();
}
} // namespace } // namespace
TEST(PlatformThreadTest, StartStop) { TEST(PlatformThreadTest, StartStop) {
@ -58,4 +65,41 @@ TEST(PlatformThreadTest, RunFunctionIsCalled) {
EXPECT_TRUE(flag); EXPECT_TRUE(flag);
} }
TEST(PlatformThreadTest, JoinsThread) {
// This test flakes if there are problems with the join implementation.
EXPECT_TRUE(ThreadAttributes().joinable);
rtc::Event event;
std::function<void()> thread_function = [&] { event.Set(); };
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T");
thread.Start();
thread.Stop();
EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0));
}
TEST(PlatformThreadTest, StopsBeforeDetachedThreadExits) {
// This test flakes if there are problems with the detached thread
// implementation.
bool flag = false;
rtc::Event thread_started;
rtc::Event thread_continue;
rtc::Event thread_exiting;
std::function<void()> thread_function = [&] {
thread_started.Set();
thread_continue.Wait(Event::kForever);
flag = true;
thread_exiting.Set();
};
{
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T",
ThreadAttributes().SetDetached());
thread.Start();
thread.Stop();
}
thread_started.Wait(Event::kForever);
EXPECT_FALSE(flag);
thread_continue.Set();
thread_exiting.Wait(Event::kForever);
EXPECT_TRUE(flag);
}
} // namespace rtc } // namespace rtc

View file

@ -173,7 +173,10 @@ class TaskQueueLibevent::SetTimerTask : public QueuedTask {
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
rtc::ThreadPriority priority) rtc::ThreadPriority priority)
: event_base_(event_base_new()), : event_base_(event_base_new()),
thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) { thread_(&TaskQueueLibevent::ThreadMain,
this,
queue_name,
rtc::ThreadAttributes().SetPriority(priority)) {
int fds[2]; int fds[2];
RTC_CHECK(pipe(fds) == 0); RTC_CHECK(pipe(fds) == 0);
SetNonBlocking(fds[0]); SetNonBlocking(fds[0]);

View file

@ -126,7 +126,10 @@ TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,
: started_(/*manual_reset=*/false, /*initially_signaled=*/false), : started_(/*manual_reset=*/false, /*initially_signaled=*/false),
stopped_(/*manual_reset=*/false, /*initially_signaled=*/false), stopped_(/*manual_reset=*/false, /*initially_signaled=*/false),
flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
thread_(&TaskQueueStdlib::ThreadMain, this, queue_name, priority) { thread_(&TaskQueueStdlib::ThreadMain,
this,
queue_name,
rtc::ThreadAttributes().SetPriority(priority)) {
thread_.Start(); thread_.Start();
started_.Wait(rtc::Event::kForever); started_.Wait(rtc::Event::kForever);
} }

View file

@ -175,7 +175,10 @@ class TaskQueueWin : public TaskQueueBase {
void* obj, void* obj,
absl::string_view thread_name, absl::string_view thread_name,
rtc::ThreadPriority priority) rtc::ThreadPriority priority)
: PlatformThread(func, obj, thread_name, priority) {} : PlatformThread(func,
obj,
thread_name,
rtc::ThreadAttributes().SetPriority(priority)) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return rtc::PlatformThread::QueueAPC(apc_function, data); return rtc::PlatformThread::QueueAPC(apc_function, data);

View file

@ -21,6 +21,7 @@
#include "common_video/libyuv/include/webrtc_libyuv.h" #include "common_video/libyuv/include/webrtc_libyuv.h"
#include "rtc_base/cpu_time.h" #include "rtc_base/cpu_time.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_tools/frame_analyzer/video_geometry_aligner.h" #include "rtc_tools/frame_analyzer/video_geometry_aligner.h"
@ -144,7 +145,7 @@ void DefaultVideoQualityAnalyzer::Start(
auto thread = std::make_unique<rtc::PlatformThread>( auto thread = std::make_unique<rtc::PlatformThread>(
&DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this, &DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this,
("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(), ("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(),
rtc::ThreadPriority::kNormalPriority); rtc::ThreadAttributes().SetPriority(rtc::kNormalPriority));
thread->Start(); thread->Start();
thread_pool_.push_back(std::move(thread)); thread_pool_.push_back(std::move(thread));
} }