Revert "Refactor the PlatformThread API."

This reverts commit c89fdd716c.

Reason for revert: Causes rare compilation error on win-libfuzzer-asan trybot.
See https://ci.chromium.org/p/chromium/builders/try/win-libfuzzer-asan-rel/713745?

Original change's description:
> Refactor the PlatformThread API.
>
> PlatformThread's API is using old style function pointers, causes
> casting, is unintuitive and forces artificial call sequences, and
> is additionally possible to misuse in release mode.
>
> Fix this by an API face lift:
> 1. The class is turned into a handle, which can be empty.
> 2. The only way of getting a non-empty PlatformThread is by calling
> SpawnJoinable or SpawnDetached, clearly conveying the semantics to the
> code reader.
> 3. Handles can be Finalized, which works differently for joinable and
> detached threads:
>   a) Handles for detached threads are simply closed where applicable.
>   b) Joinable threads are joined before handles are closed.
> 4. The destructor finalizes handles. No explicit call is needed.
>
> Fixed: webrtc:12727
> Change-Id: Id00a0464edf4fc9e552b6a1fbb5d2e1280e88811
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215075
> Commit-Queue: Markus Handell <handellm@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Tommi <tommi@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33923}

# Not skipping CQ checks because original CL landed > 1 day ago.

TBR=handellm@webrtc.org

Bug: webrtc:12727
Change-Id: Ic0146be8866f6dd3ad9c364fb8646650b8e07419
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/217583
Reviewed-by: Guido Urdaneta <guidou@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Commit-Queue: Guido Urdaneta <guidou@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33936}
This commit is contained in:
Guido Urdaneta 2021-05-06 13:12:47 +00:00 committed by WebRTC LUCI CQ
parent a6983c6ea2
commit 793bac569f
38 changed files with 863 additions and 585 deletions

View file

@ -40,14 +40,21 @@ class CompileTimeTestForGuardedBy {
}; };
void RunOnDifferentThread(rtc::FunctionView<void()> run) { void RunOnDifferentThread(rtc::FunctionView<void()> run) {
rtc::Event thread_has_run_event; struct Object {
rtc::PlatformThread::SpawnJoinable( static void Run(void* obj) {
[&] { auto* me = static_cast<Object*>(obj);
run(); me->run();
thread_has_run_event.Set(); me->thread_has_run_event.Set();
}, }
"thread");
EXPECT_TRUE(thread_has_run_event.Wait(1000)); rtc::FunctionView<void()> run;
rtc::Event thread_has_run_event;
} object{run};
rtc::PlatformThread thread(&Object::Run, &object, "thread");
thread.Start();
EXPECT_TRUE(object.thread_has_run_event.Wait(1000));
thread.Stop();
} }
} // namespace } // namespace

View file

@ -429,6 +429,21 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
AudioCodingModuleMtTestOldApi() AudioCodingModuleMtTestOldApi()
: AudioCodingModuleTestOldApi(), : AudioCodingModuleTestOldApi(),
send_thread_(
CbSendThread,
this,
"send",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
insert_packet_thread_(
CbInsertPacketThread,
this,
"insert_packet",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
pull_audio_thread_(
CbPullAudioThread,
this,
"pull_audio",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
send_count_(0), send_count_(0),
insert_packet_count_(0), insert_packet_count_(0),
pull_audio_count_(0), pull_audio_count_(0),
@ -445,38 +460,17 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
void StartThreads() { void StartThreads() {
quit_.store(false); quit_.store(false);
send_thread_.Start();
const auto attributes = insert_packet_thread_.Start();
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); pull_audio_thread_.Start();
send_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbSendImpl();
}
},
"send", attributes);
insert_packet_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbInsertPacketImpl();
}
},
"insert_packet", attributes);
pull_audio_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbPullAudioImpl();
}
},
"pull_audio", attributes);
} }
void TearDown() { void TearDown() {
AudioCodingModuleTestOldApi::TearDown(); AudioCodingModuleTestOldApi::TearDown();
quit_.store(true); quit_.store(true);
pull_audio_thread_.Finalize(); pull_audio_thread_.Stop();
send_thread_.Finalize(); send_thread_.Stop();
insert_packet_thread_.Finalize(); insert_packet_thread_.Stop();
} }
bool RunTest() { bool RunTest() {
@ -494,6 +488,14 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
return false; return false;
} }
static void CbSendThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbSendImpl();
}
}
// The send thread doesn't have to care about the current simulated time, // The send thread doesn't have to care about the current simulated time,
// since only the AcmReceiver is using the clock. // since only the AcmReceiver is using the clock.
void CbSendImpl() { void CbSendImpl() {
@ -509,6 +511,14 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
} }
} }
static void CbInsertPacketThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbInsertPacketImpl();
}
}
void CbInsertPacketImpl() { void CbInsertPacketImpl() {
SleepMs(1); SleepMs(1);
{ {
@ -523,6 +533,14 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
InsertPacket(); InsertPacket();
} }
static void CbPullAudioThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbPullAudioImpl();
}
}
void CbPullAudioImpl() { void CbPullAudioImpl() {
SleepMs(1); SleepMs(1);
{ {
@ -681,6 +699,16 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
AcmReRegisterIsacMtTestOldApi() AcmReRegisterIsacMtTestOldApi()
: AudioCodingModuleTestOldApi(), : AudioCodingModuleTestOldApi(),
receive_thread_(
CbReceiveThread,
this,
"receive",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registration_thread_(
CbCodecRegistrationThread,
this,
"codec_registration",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registered_(false), codec_registered_(false),
receive_packet_count_(0), receive_packet_count_(0),
next_insert_packet_time_ms_(0), next_insert_packet_time_ms_(0),
@ -712,34 +740,28 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
void StartThreads() { void StartThreads() {
quit_.store(false); quit_.store(false);
const auto attributes = receive_thread_.Start();
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); codec_registration_thread_.Start();
receive_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load() && CbReceiveImpl()) {
}
},
"receive", attributes);
codec_registration_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbCodecRegistrationImpl();
}
},
"codec_registration", attributes);
} }
void TearDown() override { void TearDown() override {
AudioCodingModuleTestOldApi::TearDown(); AudioCodingModuleTestOldApi::TearDown();
quit_.store(true); quit_.store(true);
receive_thread_.Finalize(); receive_thread_.Stop();
codec_registration_thread_.Finalize(); codec_registration_thread_.Stop();
} }
bool RunTest() { bool RunTest() {
return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout. return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout.
} }
static void CbReceiveThread(void* context) {
AcmReRegisterIsacMtTestOldApi* fixture =
reinterpret_cast<AcmReRegisterIsacMtTestOldApi*>(context);
while (!fixture->quit_.load() && fixture->CbReceiveImpl()) {
}
}
bool CbReceiveImpl() { bool CbReceiveImpl() {
SleepMs(1); SleepMs(1);
rtc::Buffer encoded; rtc::Buffer encoded;
@ -785,6 +807,14 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
return true; return true;
} }
static void CbCodecRegistrationThread(void* context) {
AcmReRegisterIsacMtTestOldApi* fixture =
reinterpret_cast<AcmReRegisterIsacMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbCodecRegistrationImpl();
}
}
void CbCodecRegistrationImpl() { void CbCodecRegistrationImpl() {
SleepMs(1); SleepMs(1);
if (HasFatalFailure()) { if (HasFatalFailure()) {

View file

@ -216,13 +216,10 @@ int32_t FileAudioDevice::StartPlayout() {
} }
} }
_ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( _ptrThreadPlay.reset(new rtc::PlatformThread(
[this] { PlayThreadFunc, this, "webrtc_audio_module_play_thread",
while (PlayThreadProcess()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
} _ptrThreadPlay->Start();
},
"webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_LOG(LS_INFO) << "Started playout capture to output file: " RTC_LOG(LS_INFO) << "Started playout capture to output file: "
<< _outputFilename; << _outputFilename;
@ -236,8 +233,10 @@ int32_t FileAudioDevice::StopPlayout() {
} }
// stop playout thread first // stop playout thread first
if (!_ptrThreadPlay.empty()) if (_ptrThreadPlay) {
_ptrThreadPlay.Finalize(); _ptrThreadPlay->Stop();
_ptrThreadPlay.reset();
}
MutexLock lock(&mutex_); MutexLock lock(&mutex_);
@ -277,13 +276,11 @@ int32_t FileAudioDevice::StartRecording() {
} }
} }
_ptrThreadRec = rtc::PlatformThread::SpawnJoinable( _ptrThreadRec.reset(new rtc::PlatformThread(
[this] { RecThreadFunc, this, "webrtc_audio_module_capture_thread",
while (RecThreadProcess()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
}
}, _ptrThreadRec->Start();
"webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename; RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename;
@ -296,8 +293,10 @@ int32_t FileAudioDevice::StopRecording() {
_recording = false; _recording = false;
} }
if (!_ptrThreadRec.empty()) if (_ptrThreadRec) {
_ptrThreadRec.Finalize(); _ptrThreadRec->Stop();
_ptrThreadRec.reset();
}
MutexLock lock(&mutex_); MutexLock lock(&mutex_);
_recordingFramesLeft = 0; _recordingFramesLeft = 0;
@ -440,6 +439,18 @@ void FileAudioDevice::AttachAudioBuffer(AudioDeviceBuffer* audioBuffer) {
_ptrAudioBuffer->SetPlayoutChannels(0); _ptrAudioBuffer->SetPlayoutChannels(0);
} }
void FileAudioDevice::PlayThreadFunc(void* pThis) {
FileAudioDevice* device = static_cast<FileAudioDevice*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void FileAudioDevice::RecThreadFunc(void* pThis) {
FileAudioDevice* device = static_cast<FileAudioDevice*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool FileAudioDevice::PlayThreadProcess() { bool FileAudioDevice::PlayThreadProcess() {
if (!_playing) { if (!_playing) {
return false; return false;

View file

@ -17,11 +17,14 @@
#include <string> #include <string>
#include "modules/audio_device/audio_device_generic.h" #include "modules/audio_device/audio_device_generic.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/file_wrapper.h" #include "rtc_base/system/file_wrapper.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
namespace rtc {
class PlatformThread;
} // namespace rtc
namespace webrtc { namespace webrtc {
// This is a fake audio device which plays audio from a file as its microphone // This is a fake audio device which plays audio from a file as its microphone
@ -142,8 +145,9 @@ class FileAudioDevice : public AudioDeviceGeneric {
size_t _recordingFramesIn10MS; size_t _recordingFramesIn10MS;
size_t _playoutFramesIn10MS; size_t _playoutFramesIn10MS;
rtc::PlatformThread _ptrThreadRec; // TODO(pbos): Make plain members instead of pointers and stop resetting them.
rtc::PlatformThread _ptrThreadPlay; std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
bool _playing; bool _playing;
bool _recording; bool _recording;

View file

@ -178,13 +178,26 @@ int32_t AudioDeviceLinuxALSA::Terminate() {
_mixerManager.Close(); _mixerManager.Close();
// RECORDING // RECORDING
mutex_.Unlock(); if (_ptrThreadRec) {
_ptrThreadRec.Finalize(); rtc::PlatformThread* tmpThread = _ptrThreadRec.release();
mutex_.Unlock();
tmpThread->Stop();
delete tmpThread;
mutex_.Lock();
}
// PLAYOUT // PLAYOUT
_ptrThreadPlay.Finalize(); if (_ptrThreadPlay) {
mutex_.Lock(); rtc::PlatformThread* tmpThread = _ptrThreadPlay.release();
mutex_.Unlock();
tmpThread->Stop();
delete tmpThread;
mutex_.Lock();
}
#if defined(WEBRTC_USE_X11) #if defined(WEBRTC_USE_X11)
if (_XDisplay) { if (_XDisplay) {
XCloseDisplay(_XDisplay); XCloseDisplay(_XDisplay);
@ -1027,13 +1040,11 @@ int32_t AudioDeviceLinuxALSA::StartRecording() {
return -1; return -1;
} }
// RECORDING // RECORDING
_ptrThreadRec = rtc::PlatformThread::SpawnJoinable( _ptrThreadRec.reset(new rtc::PlatformThread(
[this] { RecThreadFunc, this, "webrtc_audio_module_capture_thread",
while (RecThreadProcess()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
}
}, _ptrThreadRec->Start();
"webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
errVal = LATE(snd_pcm_prepare)(_handleRecord); errVal = LATE(snd_pcm_prepare)(_handleRecord);
if (errVal < 0) { if (errVal < 0) {
@ -1077,7 +1088,10 @@ int32_t AudioDeviceLinuxALSA::StopRecordingLocked() {
_recIsInitialized = false; _recIsInitialized = false;
_recording = false; _recording = false;
_ptrThreadRec.Finalize(); if (_ptrThreadRec) {
_ptrThreadRec->Stop();
_ptrThreadRec.reset();
}
_recordingFramesLeft = 0; _recordingFramesLeft = 0;
if (_recordingBuffer) { if (_recordingBuffer) {
@ -1144,13 +1158,10 @@ int32_t AudioDeviceLinuxALSA::StartPlayout() {
} }
// PLAYOUT // PLAYOUT
_ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( _ptrThreadPlay.reset(new rtc::PlatformThread(
[this] { PlayThreadFunc, this, "webrtc_audio_module_play_thread",
while (PlayThreadProcess()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
} _ptrThreadPlay->Start();
},
"webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
int errVal = LATE(snd_pcm_prepare)(_handlePlayout); int errVal = LATE(snd_pcm_prepare)(_handlePlayout);
if (errVal < 0) { if (errVal < 0) {
@ -1180,7 +1191,10 @@ int32_t AudioDeviceLinuxALSA::StopPlayoutLocked() {
_playing = false; _playing = false;
// stop playout thread first // stop playout thread first
_ptrThreadPlay.Finalize(); if (_ptrThreadPlay) {
_ptrThreadPlay->Stop();
_ptrThreadPlay.reset();
}
_playoutFramesLeft = 0; _playoutFramesLeft = 0;
delete[] _playoutBuffer; delete[] _playoutBuffer;
@ -1455,6 +1469,18 @@ int32_t AudioDeviceLinuxALSA::ErrorRecovery(int32_t error,
// Thread Methods // Thread Methods
// ============================================================================ // ============================================================================
void AudioDeviceLinuxALSA::PlayThreadFunc(void* pThis) {
AudioDeviceLinuxALSA* device = static_cast<AudioDeviceLinuxALSA*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void AudioDeviceLinuxALSA::RecThreadFunc(void* pThis) {
AudioDeviceLinuxALSA* device = static_cast<AudioDeviceLinuxALSA*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool AudioDeviceLinuxALSA::PlayThreadProcess() { bool AudioDeviceLinuxALSA::PlayThreadProcess() {
if (!_playing) if (!_playing)
return false; return false;

View file

@ -155,8 +155,10 @@ class AudioDeviceLinuxALSA : public AudioDeviceGeneric {
Mutex mutex_; Mutex mutex_;
rtc::PlatformThread _ptrThreadRec; // TODO(pbos): Make plain members and start/stop instead of resetting these
rtc::PlatformThread _ptrThreadPlay; // pointers. A thread can be reused.
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
AudioMixerManagerLinuxALSA _mixerManager; AudioMixerManagerLinuxALSA _mixerManager;

View file

@ -15,7 +15,6 @@
#include "modules/audio_device/linux/latebindingsymboltable_linux.h" #include "modules/audio_device/linux/latebindingsymboltable_linux.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
WebRTCPulseSymbolTable* GetPulseSymbolTable() { WebRTCPulseSymbolTable* GetPulseSymbolTable() {
static WebRTCPulseSymbolTable* pulse_symbol_table = static WebRTCPulseSymbolTable* pulse_symbol_table =
@ -159,22 +158,18 @@ AudioDeviceGeneric::InitStatus AudioDeviceLinuxPulse::Init() {
#endif #endif
// RECORDING // RECORDING
const auto attributes = _ptrThreadRec.reset(new rtc::PlatformThread(
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); RecThreadFunc, this, "webrtc_audio_module_rec_thread",
_ptrThreadRec = rtc::PlatformThread::SpawnJoinable( rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
[this] {
while (RecThreadProcess()) { _ptrThreadRec->Start();
}
},
"webrtc_audio_module_rec_thread", attributes);
// PLAYOUT // PLAYOUT
_ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( _ptrThreadPlay.reset(new rtc::PlatformThread(
[this] { PlayThreadFunc, this, "webrtc_audio_module_play_thread",
while (PlayThreadProcess()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
} _ptrThreadPlay->Start();
},
"webrtc_audio_module_play_thread", attributes);
_initialized = true; _initialized = true;
return InitStatus::OK; return InitStatus::OK;
@ -192,12 +187,22 @@ int32_t AudioDeviceLinuxPulse::Terminate() {
_mixerManager.Close(); _mixerManager.Close();
// RECORDING // RECORDING
_timeEventRec.Set(); if (_ptrThreadRec) {
_ptrThreadRec.Finalize(); rtc::PlatformThread* tmpThread = _ptrThreadRec.release();
_timeEventRec.Set();
tmpThread->Stop();
delete tmpThread;
}
// PLAYOUT // PLAYOUT
_timeEventPlay.Set(); if (_ptrThreadPlay) {
_ptrThreadPlay.Finalize(); rtc::PlatformThread* tmpThread = _ptrThreadPlay.release();
_timeEventPlay.Set();
tmpThread->Stop();
delete tmpThread;
}
// Terminate PulseAudio // Terminate PulseAudio
if (TerminatePulseAudio() < 0) { if (TerminatePulseAudio() < 0) {
@ -1976,6 +1981,18 @@ int32_t AudioDeviceLinuxPulse::ProcessRecordedData(int8_t* bufferData,
return 0; return 0;
} }
void AudioDeviceLinuxPulse::PlayThreadFunc(void* pThis) {
AudioDeviceLinuxPulse* device = static_cast<AudioDeviceLinuxPulse*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void AudioDeviceLinuxPulse::RecThreadFunc(void* pThis) {
AudioDeviceLinuxPulse* device = static_cast<AudioDeviceLinuxPulse*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool AudioDeviceLinuxPulse::PlayThreadProcess() { bool AudioDeviceLinuxPulse::PlayThreadProcess() {
if (!_timeEventPlay.Wait(1000)) { if (!_timeEventPlay.Wait(1000)) {
return true; return true;

View file

@ -268,8 +268,9 @@ class AudioDeviceLinuxPulse : public AudioDeviceGeneric {
rtc::Event _recStartEvent; rtc::Event _recStartEvent;
rtc::Event _playStartEvent; rtc::Event _playStartEvent;
rtc::PlatformThread _ptrThreadPlay; // TODO(pbos): Remove unique_ptr and use directly without resetting.
rtc::PlatformThread _ptrThreadRec; std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
AudioMixerManagerLinuxPulse _mixerManager; AudioMixerManagerLinuxPulse _mixerManager;

View file

@ -166,8 +166,8 @@ AudioDeviceMac::~AudioDeviceMac() {
Terminate(); Terminate();
} }
RTC_DCHECK(capture_worker_thread_.empty()); RTC_DCHECK(!capture_worker_thread_.get());
RTC_DCHECK(render_worker_thread_.empty()); RTC_DCHECK(!render_worker_thread_.get());
if (_paRenderBuffer) { if (_paRenderBuffer) {
delete _paRenderBuffer; delete _paRenderBuffer;
@ -1308,14 +1308,12 @@ int32_t AudioDeviceMac::StartRecording() {
return -1; return -1;
} }
RTC_DCHECK(capture_worker_thread_.empty()); RTC_DCHECK(!capture_worker_thread_.get());
capture_worker_thread_ = rtc::PlatformThread::SpawnJoinable( capture_worker_thread_.reset(new rtc::PlatformThread(
[this] { RunCapture, this, "CaptureWorkerThread",
while (CaptureWorkerThread()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
} RTC_DCHECK(capture_worker_thread_.get());
}, capture_worker_thread_->Start();
"CaptureWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
OSStatus err = noErr; OSStatus err = noErr;
if (_twoDevices) { if (_twoDevices) {
@ -1397,9 +1395,10 @@ int32_t AudioDeviceMac::StopRecording() {
// Setting this signal will allow the worker thread to be stopped. // Setting this signal will allow the worker thread to be stopped.
AtomicSet32(&_captureDeviceIsAlive, 0); AtomicSet32(&_captureDeviceIsAlive, 0);
if (!capture_worker_thread_.empty()) { if (capture_worker_thread_.get()) {
mutex_.Unlock(); mutex_.Unlock();
capture_worker_thread_.Finalize(); capture_worker_thread_->Stop();
capture_worker_thread_.reset();
mutex_.Lock(); mutex_.Lock();
} }
@ -1445,14 +1444,11 @@ int32_t AudioDeviceMac::StartPlayout() {
return 0; return 0;
} }
RTC_DCHECK(render_worker_thread_.empty()); RTC_DCHECK(!render_worker_thread_.get());
render_worker_thread_ = rtc::PlatformThread::SpawnJoinable( render_worker_thread_.reset(new rtc::PlatformThread(
[this] { RunRender, this, "RenderWorkerThread",
while (RenderWorkerThread()) { rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
} render_worker_thread_->Start();
},
"RenderWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
if (_twoDevices || !_recording) { if (_twoDevices || !_recording) {
OSStatus err = noErr; OSStatus err = noErr;
@ -1510,9 +1506,10 @@ int32_t AudioDeviceMac::StopPlayout() {
// Setting this signal will allow the worker thread to be stopped. // Setting this signal will allow the worker thread to be stopped.
AtomicSet32(&_renderDeviceIsAlive, 0); AtomicSet32(&_renderDeviceIsAlive, 0);
if (!render_worker_thread_.empty()) { if (render_worker_thread_.get()) {
mutex_.Unlock(); mutex_.Unlock();
render_worker_thread_.Finalize(); render_worker_thread_->Stop();
render_worker_thread_.reset();
mutex_.Lock(); mutex_.Lock();
} }
@ -2374,6 +2371,12 @@ OSStatus AudioDeviceMac::implInConverterProc(UInt32* numberDataPackets,
return 0; return 0;
} }
void AudioDeviceMac::RunRender(void* ptrThis) {
AudioDeviceMac* device = static_cast<AudioDeviceMac*>(ptrThis);
while (device->RenderWorkerThread()) {
}
}
bool AudioDeviceMac::RenderWorkerThread() { bool AudioDeviceMac::RenderWorkerThread() {
PaRingBufferSize numSamples = PaRingBufferSize numSamples =
ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame; ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame;
@ -2439,6 +2442,12 @@ bool AudioDeviceMac::RenderWorkerThread() {
return true; return true;
} }
void AudioDeviceMac::RunCapture(void* ptrThis) {
AudioDeviceMac* device = static_cast<AudioDeviceMac*>(ptrThis);
while (device->CaptureWorkerThread()) {
}
}
bool AudioDeviceMac::CaptureWorkerThread() { bool AudioDeviceMac::CaptureWorkerThread() {
OSStatus err = noErr; OSStatus err = noErr;
UInt32 noRecSamples = UInt32 noRecSamples =

View file

@ -21,12 +21,15 @@
#include "modules/audio_device/mac/audio_mixer_manager_mac.h" #include "modules/audio_device/mac/audio_mixer_manager_mac.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
struct PaUtilRingBuffer; struct PaUtilRingBuffer;
namespace rtc {
class PlatformThread;
} // namespace rtc
namespace webrtc { namespace webrtc {
const uint32_t N_REC_SAMPLES_PER_SEC = 48000; const uint32_t N_REC_SAMPLES_PER_SEC = 48000;
@ -268,11 +271,13 @@ class AudioDeviceMac : public AudioDeviceGeneric {
rtc::Event _stopEventRec; rtc::Event _stopEventRec;
rtc::Event _stopEvent; rtc::Event _stopEvent;
// TODO(pbos): Replace with direct members, just start/stop, no need to
// recreate the thread.
// Only valid/running between calls to StartRecording and StopRecording. // Only valid/running between calls to StartRecording and StopRecording.
rtc::PlatformThread capture_worker_thread_; std::unique_ptr<rtc::PlatformThread> capture_worker_thread_;
// Only valid/running between calls to StartPlayout and StopPlayout. // Only valid/running between calls to StartPlayout and StopPlayout.
rtc::PlatformThread render_worker_thread_; std::unique_ptr<rtc::PlatformThread> render_worker_thread_;
AudioMixerManagerMac _mixerManager; AudioMixerManagerMac _mixerManager;

View file

@ -119,6 +119,11 @@ const char* SessionDisconnectReasonToString(
} }
} }
void Run(void* obj) {
RTC_DCHECK(obj);
reinterpret_cast<CoreAudioBase*>(obj)->ThreadRun();
}
// Returns true if the selected audio device supports low latency, i.e, if it // Returns true if the selected audio device supports low latency, i.e, if it
// is possible to initialize the engine using periods less than the default // is possible to initialize the engine using periods less than the default
// period (10ms). // period (10ms).
@ -548,19 +553,24 @@ bool CoreAudioBase::Start() {
// Audio thread should be alive during internal restart since the restart // Audio thread should be alive during internal restart since the restart
// callback is triggered on that thread and it also makes the restart // callback is triggered on that thread and it also makes the restart
// sequence less complex. // sequence less complex.
RTC_DCHECK(!audio_thread_.empty()); RTC_DCHECK(audio_thread_);
} }
// Start an audio thread but only if one does not already exist (which is the // Start an audio thread but only if one does not already exist (which is the
// case during restart). // case during restart).
if (audio_thread_.empty()) { if (!audio_thread_) {
const absl::string_view name = audio_thread_ = std::make_unique<rtc::PlatformThread>(
IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread"; Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread",
audio_thread_ = rtc::PlatformThread::SpawnJoinable( rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority));
[this] { ThreadRun(); }, name, RTC_DCHECK(audio_thread_);
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); audio_thread_->Start();
RTC_DLOG(INFO) << "Started thread with name: " << name if (!audio_thread_->IsRunning()) {
<< " and handle: " << *audio_thread_.GetHandle(); StopThread();
RTC_LOG(LS_ERROR) << "Failed to start audio thread";
return false;
}
RTC_DLOG(INFO) << "Started thread with name: " << audio_thread_->name()
<< " and id: " << audio_thread_->GetThreadRef();
} }
// Start streaming data between the endpoint buffer and the audio engine. // Start streaming data between the endpoint buffer and the audio engine.
@ -687,11 +697,14 @@ bool CoreAudioBase::Restart() {
void CoreAudioBase::StopThread() { void CoreAudioBase::StopThread() {
RTC_DLOG(INFO) << __FUNCTION__; RTC_DLOG(INFO) << __FUNCTION__;
RTC_DCHECK(!IsRestarting()); RTC_DCHECK(!IsRestarting());
if (!audio_thread_.empty()) { if (audio_thread_) {
RTC_DLOG(INFO) << "Sets stop_event..."; if (audio_thread_->IsRunning()) {
SetEvent(stop_event_.Get()); RTC_DLOG(INFO) << "Sets stop_event...";
RTC_DLOG(INFO) << "PlatformThread::Finalize..."; SetEvent(stop_event_.Get());
audio_thread_.Finalize(); RTC_DLOG(INFO) << "PlatformThread::Stop...";
audio_thread_->Stop();
}
audio_thread_.reset();
// Ensure that we don't quit the main thread loop immediately next // Ensure that we don't quit the main thread loop immediately next
// time Start() is called. // time Start() is called.
@ -704,7 +717,7 @@ bool CoreAudioBase::HandleRestartEvent() {
RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction()) RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction())
<< "]"; << "]";
RTC_DCHECK_RUN_ON(&thread_checker_audio_); RTC_DCHECK_RUN_ON(&thread_checker_audio_);
RTC_DCHECK(!audio_thread_.empty()); RTC_DCHECK(audio_thread_);
RTC_DCHECK(IsRestarting()); RTC_DCHECK(IsRestarting());
// Let each client (input and/or output) take care of its own restart // Let each client (input and/or output) take care of its own restart
// sequence since each side might need unique actions. // sequence since each side might need unique actions.

View file

@ -158,7 +158,7 @@ class CoreAudioBase : public IAudioSessionEvents {
// Set when restart process starts and cleared when restart stops // Set when restart process starts and cleared when restart stops
// successfully. Accessed atomically. // successfully. Accessed atomically.
std::atomic<bool> is_restarting_; std::atomic<bool> is_restarting_;
rtc::PlatformThread audio_thread_; std::unique_ptr<rtc::PlatformThread> audio_thread_;
Microsoft::WRL::ComPtr<IAudioSessionControl> audio_session_control_; Microsoft::WRL::ComPtr<IAudioSessionControl> audio_session_control_;
void StopThread(); void StopThread();

View file

@ -387,6 +387,33 @@ class AudioProcessingImplLockTest
void SetUp() override; void SetUp() override;
void TearDown() override; void TearDown() override;
// Thread callback for the render thread
static void RenderProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->render_thread_state_.Process();
}
}
// Thread callback for the capture thread
static void CaptureProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->capture_thread_state_.Process();
}
}
// Thread callback for the stats thread
static void StatsProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->stats_thread_state_.Process();
}
}
// Tests whether all the required render and capture side calls have been // Tests whether all the required render and capture side calls have been
// done. // done.
bool TestDone() { bool TestDone() {
@ -396,28 +423,9 @@ class AudioProcessingImplLockTest
// Start the threads used in the test. // Start the threads used in the test.
void StartThreads() { void StartThreads() {
const auto attributes = render_thread_.Start();
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); capture_thread_.Start();
render_thread_ = rtc::PlatformThread::SpawnJoinable( stats_thread_.Start();
[this] {
while (!MaybeEndTest())
render_thread_state_.Process();
},
"render", attributes);
capture_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest()) {
capture_thread_state_.Process();
}
},
"capture", attributes);
stats_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest())
stats_thread_state_.Process();
},
"stats", attributes);
} }
// Event handlers for the test. // Event handlers for the test.
@ -426,6 +434,9 @@ class AudioProcessingImplLockTest
rtc::Event capture_call_event_; rtc::Event capture_call_event_;
// Thread related variables. // Thread related variables.
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
rtc::PlatformThread stats_thread_;
mutable RandomGenerator rand_gen_; mutable RandomGenerator rand_gen_;
std::unique_ptr<AudioProcessing> apm_; std::unique_ptr<AudioProcessing> apm_;
@ -434,9 +445,6 @@ class AudioProcessingImplLockTest
RenderProcessor render_thread_state_; RenderProcessor render_thread_state_;
CaptureProcessor capture_thread_state_; CaptureProcessor capture_thread_state_;
StatsProcessor stats_thread_state_; StatsProcessor stats_thread_state_;
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
rtc::PlatformThread stats_thread_;
}; };
// Sleeps a random time between 0 and max_sleep milliseconds. // Sleeps a random time between 0 and max_sleep milliseconds.
@ -477,7 +485,22 @@ void PopulateAudioFrame(float amplitude,
} }
AudioProcessingImplLockTest::AudioProcessingImplLockTest() AudioProcessingImplLockTest::AudioProcessingImplLockTest()
: apm_(AudioProcessingBuilderForTesting().Create()), : render_thread_(
RenderProcessorThreadFunc,
this,
"render",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
capture_thread_(
CaptureProcessorThreadFunc,
this,
"capture",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
stats_thread_(
StatsProcessorThreadFunc,
this,
"stats",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
apm_(AudioProcessingBuilderForTesting().Create()),
render_thread_state_(kMaxFrameSize, render_thread_state_(kMaxFrameSize,
&rand_gen_, &rand_gen_,
&render_call_event_, &render_call_event_,
@ -529,6 +552,9 @@ void AudioProcessingImplLockTest::SetUp() {
void AudioProcessingImplLockTest::TearDown() { void AudioProcessingImplLockTest::TearDown() {
render_call_event_.Set(); render_call_event_.Set();
capture_call_event_.Set(); capture_call_event_.Set();
render_thread_.Stop();
capture_thread_.Stop();
stats_thread_.Stop();
} }
StatsProcessor::StatsProcessor(RandomGenerator* rand_gen, StatsProcessor::StatsProcessor(RandomGenerator* rand_gen,

View file

@ -391,7 +391,17 @@ class TimedThreadApiProcessor {
class CallSimulator : public ::testing::TestWithParam<SimulationConfig> { class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
public: public:
CallSimulator() CallSimulator()
: rand_gen_(42U), : render_thread_(new rtc::PlatformThread(
RenderProcessorThreadFunc,
this,
"render",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
capture_thread_(new rtc::PlatformThread(
CaptureProcessorThreadFunc,
this,
"capture",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
rand_gen_(42U),
simulation_config_(static_cast<SimulationConfig>(GetParam())) {} simulation_config_(static_cast<SimulationConfig>(GetParam())) {}
// Run the call simulation with a timeout. // Run the call simulation with a timeout.
@ -426,10 +436,13 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
static const int kMinNumFramesToProcess = 150; static const int kMinNumFramesToProcess = 150;
static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess; static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess;
// ::testing::TestWithParam<> implementation.
void TearDown() override { StopThreads(); }
// Stop all running threads. // Stop all running threads.
void StopThreads() { void StopThreads() {
render_thread_.Finalize(); render_thread_->Stop();
capture_thread_.Finalize(); capture_thread_->Stop();
} }
// Simulator and APM setup. // Simulator and APM setup.
@ -520,28 +533,32 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels)); kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels));
} }
// Thread callback for the render thread.
static void RenderProcessorThreadFunc(void* context) {
CallSimulator* call_simulator = reinterpret_cast<CallSimulator*>(context);
while (call_simulator->render_thread_state_->Process()) {
}
}
// Thread callback for the capture thread.
static void CaptureProcessorThreadFunc(void* context) {
CallSimulator* call_simulator = reinterpret_cast<CallSimulator*>(context);
while (call_simulator->capture_thread_state_->Process()) {
}
}
// Start the threads used in the test. // Start the threads used in the test.
void StartThreads() { void StartThreads() {
const auto attributes = ASSERT_NO_FATAL_FAILURE(render_thread_->Start());
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); ASSERT_NO_FATAL_FAILURE(capture_thread_->Start());
render_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (render_thread_state_->Process()) {
}
},
"render", attributes);
capture_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (capture_thread_state_->Process()) {
}
},
"capture", attributes);
} }
// Event handler for the test. // Event handler for the test.
rtc::Event test_complete_; rtc::Event test_complete_;
// Thread related variables. // Thread related variables.
std::unique_ptr<rtc::PlatformThread> render_thread_;
std::unique_ptr<rtc::PlatformThread> capture_thread_;
Random rand_gen_; Random rand_gen_;
std::unique_ptr<AudioProcessing> apm_; std::unique_ptr<AudioProcessing> apm_;
@ -550,8 +567,6 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
LockedFlag capture_call_checker_; LockedFlag capture_call_checker_;
std::unique_ptr<TimedThreadApiProcessor> render_thread_state_; std::unique_ptr<TimedThreadApiProcessor> render_thread_state_;
std::unique_ptr<TimedThreadApiProcessor> capture_thread_state_; std::unique_ptr<TimedThreadApiProcessor> capture_thread_state_;
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
}; };
// Implements the callback functionality for the threads. // Implements the callback functionality for the threads.

View file

@ -48,12 +48,13 @@ void TestScreenDrawerLock(
~Task() = default; ~Task() = default;
void RunTask() { static void RunTask(void* me) {
std::unique_ptr<ScreenDrawerLock> lock = ctor_(); Task* task = static_cast<Task*>(me);
std::unique_ptr<ScreenDrawerLock> lock = task->ctor_();
ASSERT_TRUE(!!lock); ASSERT_TRUE(!!lock);
created_->store(true); task->created_->store(true);
// Wait for the main thread to get the signal of created_. // Wait for the main thread to get the signal of created_.
while (!ready_.load()) { while (!task->ready_.load()) {
SleepMs(1); SleepMs(1);
} }
// At this point, main thread should begin to create a second lock. Though // At this point, main thread should begin to create a second lock. Though
@ -76,8 +77,8 @@ void TestScreenDrawerLock(
const rtc::FunctionView<std::unique_ptr<ScreenDrawerLock>()> ctor_; const rtc::FunctionView<std::unique_ptr<ScreenDrawerLock>()> ctor_;
} task(&created, ready, ctor); } task(&created, ready, ctor);
auto lock_thread = rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread lock_thread(&Task::RunTask, &task, "lock_thread");
[&task] { task.RunTask(); }, "lock_thread"); lock_thread.Start();
// Wait for the first lock in Task::RunTask() to be created. // Wait for the first lock in Task::RunTask() to be created.
// TODO(zijiehe): Find a better solution to wait for the creation of the first // TODO(zijiehe): Find a better solution to wait for the creation of the first
@ -94,6 +95,7 @@ void TestScreenDrawerLock(
ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms); ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms);
ctor(); ctor();
ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms); ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms);
lock_thread.Stop();
} }
} // namespace } // namespace

View file

@ -48,6 +48,7 @@ ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
ProcessThreadImpl::~ProcessThreadImpl() { ProcessThreadImpl::~ProcessThreadImpl() {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get());
RTC_DCHECK(!stop_); RTC_DCHECK(!stop_);
while (!delayed_tasks_.empty()) { while (!delayed_tasks_.empty()) {
@ -71,8 +72,8 @@ void ProcessThreadImpl::Delete() {
// Doesn't need locking, because the contending thread isn't running. // Doesn't need locking, because the contending thread isn't running.
void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(thread_.empty()); RTC_DCHECK(!thread_.get());
if (!thread_.empty()) if (thread_.get())
return; return;
RTC_DCHECK(!stop_); RTC_DCHECK(!stop_);
@ -80,18 +81,14 @@ void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
for (ModuleCallback& m : modules_) for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(this); m.module->ProcessThreadAttached(this);
thread_ = rtc::PlatformThread::SpawnJoinable( thread_.reset(
[this] { new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
CurrentTaskQueueSetter set_current(this); thread_->Start();
while (Process()) {
}
},
thread_name_);
} }
void ProcessThreadImpl::Stop() { void ProcessThreadImpl::Stop() {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
if (thread_.empty()) if (!thread_.get())
return; return;
{ {
@ -101,7 +98,9 @@ void ProcessThreadImpl::Stop() {
} }
wake_up_.Set(); wake_up_.Set();
thread_.Finalize();
thread_->Stop();
thread_.reset();
StopNoLocks(); StopNoLocks();
} }
@ -109,7 +108,7 @@ void ProcessThreadImpl::Stop() {
// No locking needed, since this is called after the contending thread is // No locking needed, since this is called after the contending thread is
// stopped. // stopped.
void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS { void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(thread_.empty()); RTC_DCHECK(!thread_);
stop_ = false; stop_ = false;
for (ModuleCallback& m : modules_) for (ModuleCallback& m : modules_)
@ -200,7 +199,7 @@ void ProcessThreadImpl::RegisterModule(Module* module,
// Now that we know the module isn't in the list, we'll call out to notify // Now that we know the module isn't in the list, we'll call out to notify
// the module that it's attached to the worker thread. We don't hold // the module that it's attached to the worker thread. We don't hold
// the lock while we make this call. // the lock while we make this call.
if (!thread_.empty()) if (thread_.get())
module->ProcessThreadAttached(this); module->ProcessThreadAttached(this);
{ {
@ -228,6 +227,14 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) {
module->ProcessThreadAttached(nullptr); module->ProcessThreadAttached(nullptr);
} }
// static
void ProcessThreadImpl::Run(void* obj) {
ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj);
CurrentTaskQueueSetter set_current(impl);
while (impl->Process()) {
}
}
bool ProcessThreadImpl::Process() { bool ProcessThreadImpl::Process() {
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_); TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
int64_t now = rtc::TimeMillis(); int64_t now = rtc::TimeMillis();

View file

@ -45,6 +45,7 @@ class ProcessThreadImpl : public ProcessThread {
void DeRegisterModule(Module* module) override; void DeRegisterModule(Module* module) override;
protected: protected:
static void Run(void* obj);
bool Process(); bool Process();
private: private:
@ -96,7 +97,8 @@ class ProcessThreadImpl : public ProcessThread {
SequenceChecker thread_checker_; SequenceChecker thread_checker_;
rtc::Event wake_up_; rtc::Event wake_up_;
rtc::PlatformThread thread_; // TODO(pbos): Remove unique_ptr and stop recreating the thread.
std::unique_ptr<rtc::PlatformThread> thread_;
ModuleList modules_ RTC_GUARDED_BY(mutex_); ModuleList modules_ RTC_GUARDED_BY(mutex_);
// Set to true when calling Process, to allow reentrant calls to WakeUp. // Set to true when calling Process, to allow reentrant calls to WakeUp.

View file

@ -240,15 +240,12 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
} }
// start capture thread; // start capture thread;
if (_captureThread.empty()) { if (!_captureThread) {
quit_ = false; quit_ = false;
_captureThread = rtc::PlatformThread::SpawnJoinable( _captureThread.reset(new rtc::PlatformThread(
[this] { VideoCaptureModuleV4L2::CaptureThread, this, "CaptureThread",
while (CaptureProcess()) { rtc::ThreadAttributes().SetPriority(rtc::kHighPriority)));
} _captureThread->Start();
},
"CaptureThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kHigh));
} }
// Needed to start UVC camera - from the uvcview application // Needed to start UVC camera - from the uvcview application
@ -264,13 +261,14 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
} }
int32_t VideoCaptureModuleV4L2::StopCapture() { int32_t VideoCaptureModuleV4L2::StopCapture() {
if (!_captureThread.empty()) { if (_captureThread) {
{ {
MutexLock lock(&capture_lock_); MutexLock lock(&capture_lock_);
quit_ = true; quit_ = true;
} }
// Make sure the capture thread stops using the mutex. // Make sure the capture thread stop stop using the critsect.
_captureThread.Finalize(); _captureThread->Stop();
_captureThread.reset();
} }
MutexLock lock(&capture_lock_); MutexLock lock(&capture_lock_);
@ -358,6 +356,11 @@ bool VideoCaptureModuleV4L2::CaptureStarted() {
return _captureStarted; return _captureStarted;
} }
void VideoCaptureModuleV4L2::CaptureThread(void* obj) {
VideoCaptureModuleV4L2* capture = static_cast<VideoCaptureModuleV4L2*>(obj);
while (capture->CaptureProcess()) {
}
}
bool VideoCaptureModuleV4L2::CaptureProcess() { bool VideoCaptureModuleV4L2::CaptureProcess() {
int retVal = 0; int retVal = 0;
fd_set rSet; fd_set rSet;

View file

@ -41,7 +41,8 @@ class VideoCaptureModuleV4L2 : public VideoCaptureImpl {
bool AllocateVideoBuffers(); bool AllocateVideoBuffers();
bool DeAllocateVideoBuffers(); bool DeAllocateVideoBuffers();
rtc::PlatformThread _captureThread; // TODO(pbos): Stop using unique_ptr and resetting the thread.
std::unique_ptr<rtc::PlatformThread> _captureThread;
Mutex capture_lock_; Mutex capture_lock_;
bool quit_ RTC_GUARDED_BY(capture_lock_); bool quit_ RTC_GUARDED_BY(capture_lock_);
int32_t _deviceId; int32_t _deviceId;

View file

@ -245,7 +245,6 @@ rtc_library("platform_thread") {
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
] ]
} }
@ -562,10 +561,7 @@ if (is_win) {
"../api/task_queue", "../api/task_queue",
"synchronization:mutex", "synchronization:mutex",
] ]
absl_deps = [ absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
} }
} }
@ -1417,7 +1413,6 @@ if (rtc_include_tests) {
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
] ]
} }

View file

@ -123,7 +123,7 @@ void AsyncResolver::Start(const SocketAddress& addr) {
RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK(!destroy_called_); RTC_DCHECK(!destroy_called_);
addr_ = addr; addr_ = addr;
PlatformThread::SpawnDetached( auto thread_function =
[this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(), [this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(),
state = state_] { state = state_] {
std::vector<IPAddress> addresses; std::vector<IPAddress> addresses;
@ -146,8 +146,14 @@ void AsyncResolver::Start(const SocketAddress& addr) {
} }
})); }));
} }
}, };
"AsyncResolver"); PlatformThread thread(RunResolution,
new std::function<void()>(std::move(thread_function)),
"NameResolution", ThreadAttributes().SetDetached());
thread.Start();
// Although |thread| is detached, the PlatformThread contract mandates to call
// Stop() before destruction. The call doesn't actually stop anything.
thread.Stop();
} }
bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const {

View file

@ -30,7 +30,8 @@ const int kProcessingTimeMillisecs = 500;
const int kWorkingThreads = 2; const int kWorkingThreads = 2;
// Consumes approximately kProcessingTimeMillisecs of CPU time in single thread. // Consumes approximately kProcessingTimeMillisecs of CPU time in single thread.
void WorkingFunction(int64_t* counter) { void WorkingFunction(void* counter_pointer) {
int64_t* counter = reinterpret_cast<int64_t*>(counter_pointer);
*counter = 0; *counter = 0;
int64_t stop_cpu_time = int64_t stop_cpu_time =
rtc::GetThreadCpuTimeNanos() + rtc::GetThreadCpuTimeNanos() +
@ -61,12 +62,14 @@ TEST(CpuTimeTest, MAYBE_TEST(TwoThreads)) {
int64_t thread_start_time_nanos = GetThreadCpuTimeNanos(); int64_t thread_start_time_nanos = GetThreadCpuTimeNanos();
int64_t counter1; int64_t counter1;
int64_t counter2; int64_t counter2;
auto thread1 = PlatformThread::SpawnJoinable( PlatformThread thread1(WorkingFunction, reinterpret_cast<void*>(&counter1),
[&counter1] { WorkingFunction(&counter1); }, "Thread1"); "Thread1");
auto thread2 = PlatformThread::SpawnJoinable( PlatformThread thread2(WorkingFunction, reinterpret_cast<void*>(&counter2),
[&counter2] { WorkingFunction(&counter2); }, "Thread2"); "Thread2");
thread1.Finalize(); thread1.Start();
thread2.Finalize(); thread2.Start();
thread1.Stop();
thread2.Stop();
EXPECT_GE(counter1, 0); EXPECT_GE(counter1, 0);
EXPECT_GE(counter2, 0); EXPECT_GE(counter2, 0);

View file

@ -329,28 +329,33 @@ class PerfTestData {
class PerfTestThread { class PerfTestThread {
public: public:
PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {}
void Start(PerfTestData* data, int repeats, int id) { void Start(PerfTestData* data, int repeats, int id) {
RTC_DCHECK(!thread_.IsRunning());
RTC_DCHECK(!data_); RTC_DCHECK(!data_);
data_ = data; data_ = data;
repeats_ = repeats; repeats_ = repeats;
my_id_ = id; my_id_ = id;
thread_ = PlatformThread::SpawnJoinable( thread_.Start();
[this] {
for (int i = 0; i < repeats_; ++i)
data_->AddToCounter(my_id_);
},
"CsPerf");
} }
void Stop() { void Stop() {
RTC_DCHECK(thread_.IsRunning());
RTC_DCHECK(data_); RTC_DCHECK(data_);
thread_.Finalize(); thread_.Stop();
repeats_ = 0; repeats_ = 0;
data_ = nullptr; data_ = nullptr;
my_id_ = 0; my_id_ = 0;
} }
private: private:
static void ThreadFunc(void* param) {
PerfTestThread* me = static_cast<PerfTestThread*>(param);
for (int i = 0; i < me->repeats_; ++i)
me->data_->AddToCounter(me->my_id_);
}
PlatformThread thread_; PlatformThread thread_;
PerfTestData* data_ = nullptr; PerfTestData* data_ = nullptr;
int repeats_ = 0; int repeats_ = 0;

View file

@ -79,12 +79,19 @@ namespace rtc {
namespace tracing { namespace tracing {
namespace { namespace {
static void EventTracingThreadFunc(void* params);
// Atomic-int fast path for avoiding logging when disabled. // Atomic-int fast path for avoiding logging when disabled.
static volatile int g_event_logging_active = 0; static volatile int g_event_logging_active = 0;
// TODO(pbos): Log metadata for all threads, etc. // TODO(pbos): Log metadata for all threads, etc.
class EventLogger final { class EventLogger final {
public: public:
EventLogger()
: logging_thread_(EventTracingThreadFunc,
this,
"EventTracingThread",
ThreadAttributes().SetPriority(kLowPriority)) {}
~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); } ~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); }
void AddTraceEvent(const char* name, void AddTraceEvent(const char* name,
@ -202,8 +209,7 @@ class EventLogger final {
rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1)); rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1));
// Finally start, everything should be set up now. // Finally start, everything should be set up now.
logging_thread_ = logging_thread_.Start();
PlatformThread::SpawnJoinable([this] { Log(); }, "EventTracingThread");
TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start"); TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start");
} }
@ -217,7 +223,7 @@ class EventLogger final {
// Wake up logging thread to finish writing. // Wake up logging thread to finish writing.
shutdown_event_.Set(); shutdown_event_.Set();
// Join the logging thread. // Join the logging thread.
logging_thread_.Finalize(); logging_thread_.Stop();
} }
private: private:
@ -320,6 +326,10 @@ class EventLogger final {
bool output_file_owned_ = false; bool output_file_owned_ = false;
}; };
static void EventTracingThreadFunc(void* params) {
static_cast<EventLogger*>(params)->Log();
}
static EventLogger* volatile g_event_logger = nullptr; static EventLogger* volatile g_event_logger = nullptr;
static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT(""); static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT("");
const unsigned char* InternalGetCategoryEnabled(const char* name) { const unsigned char* InternalGetCategoryEnabled(const char* name) {

View file

@ -43,21 +43,22 @@ TEST(EventTest, AutoReset) {
class SignalerThread { class SignalerThread {
public: public:
SignalerThread() : thread_(&ThreadFn, this, "EventPerf") {}
void Start(Event* writer, Event* reader) { void Start(Event* writer, Event* reader) {
writer_ = writer; writer_ = writer;
reader_ = reader; reader_ = reader;
thread_ = PlatformThread::SpawnJoinable( thread_.Start();
[this] {
while (!stop_event_.Wait(0)) {
writer_->Set();
reader_->Wait(Event::kForever);
}
},
"EventPerf");
} }
void Stop() { void Stop() {
stop_event_.Set(); stop_event_.Set();
thread_.Finalize(); thread_.Stop();
}
static void ThreadFn(void* param) {
auto* me = static_cast<SignalerThread*>(param);
while (!me->stop_event_.Wait(0)) {
me->writer_->Set();
me->reader_->Wait(Event::kForever);
}
} }
Event stop_event_; Event stop_event_;
Event* writer_; Event* writer_;

View file

@ -160,13 +160,18 @@ TEST(LogTest, MultipleStreams) {
class LogThread { class LogThread {
public: public:
void Start() { LogThread() : thread_(&ThreadEntry, this, "LogThread") {}
thread_ = PlatformThread::SpawnJoinable( ~LogThread() { thread_.Stop(); }
[] { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }, "LogThread");
} void Start() { thread_.Start(); }
private: private:
void Run() { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }
static void ThreadEntry(void* p) { static_cast<LogThread*>(p)->Run(); }
PlatformThread thread_; PlatformThread thread_;
Event event_;
}; };
// Ensure we don't crash when adding/removing streams while threads are going. // Ensure we don't crash when adding/removing streams while threads are going.

View file

@ -10,37 +10,32 @@
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include <algorithm>
#include <memory> #include <memory>
#if !defined(WEBRTC_WIN) #if !defined(WEBRTC_WIN)
#include <sched.h> #include <sched.h>
#endif #endif
#include <stdint.h>
#include <time.h>
#include <algorithm>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
namespace rtc { namespace rtc {
namespace {
#if defined(WEBRTC_WIN) namespace {
int Win32PriorityFromThreadPriority(ThreadPriority priority) { struct ThreadStartData {
switch (priority) { ThreadRunFunction run_function;
case ThreadPriority::kLow: void* obj;
return THREAD_PRIORITY_BELOW_NORMAL; std::string thread_name;
case ThreadPriority::kNormal: ThreadPriority priority;
return THREAD_PRIORITY_NORMAL; };
case ThreadPriority::kHigh:
return THREAD_PRIORITY_ABOVE_NORMAL;
case ThreadPriority::kRealtime:
return THREAD_PRIORITY_TIME_CRITICAL;
}
}
#endif
bool SetPriority(ThreadPriority priority) { bool SetPriority(ThreadPriority priority) {
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
return SetThreadPriority(GetCurrentThread(), return SetThreadPriority(GetCurrentThread(), priority) != FALSE;
Win32PriorityFromThreadPriority(priority)) != FALSE;
#elif defined(__native_client__) || defined(WEBRTC_FUCHSIA) #elif defined(__native_client__) || defined(WEBRTC_FUCHSIA)
// Setting thread priorities is not supported in NaCl or Fuchsia. // Setting thread priorities is not supported in NaCl or Fuchsia.
return true; return true;
@ -64,18 +59,21 @@ bool SetPriority(ThreadPriority priority) {
const int top_prio = max_prio - 1; const int top_prio = max_prio - 1;
const int low_prio = min_prio + 1; const int low_prio = min_prio + 1;
switch (priority) { switch (priority) {
case ThreadPriority::kLow: case kLowPriority:
param.sched_priority = low_prio; param.sched_priority = low_prio;
break; break;
case ThreadPriority::kNormal: case kNormalPriority:
// The -1 ensures that the kHighPriority is always greater or equal to // The -1 ensures that the kHighPriority is always greater or equal to
// kNormalPriority. // kNormalPriority.
param.sched_priority = (low_prio + top_prio - 1) / 2; param.sched_priority = (low_prio + top_prio - 1) / 2;
break; break;
case ThreadPriority::kHigh: case kHighPriority:
param.sched_priority = std::max(top_prio - 2, low_prio); param.sched_priority = std::max(top_prio - 2, low_prio);
break; break;
case ThreadPriority::kRealtime: case kHighestPriority:
param.sched_priority = std::max(top_prio - 1, low_prio);
break;
case kRealtimePriority:
param.sched_priority = top_prio; param.sched_priority = top_prio;
break; break;
} }
@ -83,129 +81,124 @@ bool SetPriority(ThreadPriority priority) {
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
} }
void RunPlatformThread(std::unique_ptr<ThreadStartData> data) {
rtc::SetCurrentThreadName(data->thread_name.c_str());
data->thread_name.clear();
SetPriority(data->priority);
data->run_function(data->obj);
}
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
DWORD WINAPI RunPlatformThread(void* param) { DWORD WINAPI StartThread(void* param) {
// The GetLastError() function only returns valid results when it is called // The GetLastError() function only returns valid results when it is called
// after a Win32 API function that returns a "failed" result. A crash dump // after a Win32 API function that returns a "failed" result. A crash dump
// contains the result from GetLastError() and to make sure it does not // contains the result from GetLastError() and to make sure it does not
// falsely report a Windows error we call SetLastError here. // falsely report a Windows error we call SetLastError here.
::SetLastError(ERROR_SUCCESS); ::SetLastError(ERROR_SUCCESS);
auto function = static_cast<std::function<void()>*>(param); RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param)));
(*function)();
delete function;
return 0; return 0;
} }
#else #else
void* RunPlatformThread(void* param) { void* StartThread(void* param) {
auto function = static_cast<std::function<void()>*>(param); RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param)));
(*function)();
delete function;
return 0; return 0;
} }
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
} // namespace } // namespace
PlatformThread::PlatformThread(Handle handle, bool joinable) PlatformThread::PlatformThread(ThreadRunFunction func,
: handle_(handle), joinable_(joinable) {} void* obj,
absl::string_view thread_name,
PlatformThread::PlatformThread(PlatformThread&& rhs) ThreadAttributes attributes)
: handle_(rhs.handle_), joinable_(rhs.joinable_) { : run_function_(func),
rhs.handle_ = absl::nullopt; attributes_(attributes),
} obj_(obj),
name_(thread_name) {
PlatformThread& PlatformThread::operator=(PlatformThread&& rhs) { RTC_DCHECK(func);
Finalize(); RTC_DCHECK(!name_.empty());
handle_ = rhs.handle_; // TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
joinable_ = rhs.joinable_; RTC_DCHECK(name_.length() < 64);
rhs.handle_ = absl::nullopt;
return *this;
} }
PlatformThread::~PlatformThread() { PlatformThread::~PlatformThread() {
Finalize(); RTC_DCHECK_RUN_ON(&thread_checker_);
} RTC_DCHECK(!thread_);
PlatformThread PlatformThread::SpawnJoinable(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes) {
return SpawnThread(std::move(thread_function), name, attributes,
/*joinable=*/true);
}
PlatformThread PlatformThread::SpawnDetached(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes) {
return SpawnThread(std::move(thread_function), name, attributes,
/*joinable=*/false);
}
absl::optional<PlatformThread::Handle> PlatformThread::GetHandle() const {
return handle_;
}
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) { RTC_DCHECK(!thread_id_);
RTC_DCHECK(handle_.has_value()); #endif // defined(WEBRTC_WIN)
return handle_.has_value() ? QueueUserAPC(function, *handle_, data) != FALSE
: false;
}
#endif
void PlatformThread::Finalize() {
if (!handle_.has_value())
return;
#if defined(WEBRTC_WIN)
if (joinable_)
WaitForSingleObject(*handle_, INFINITE);
CloseHandle(*handle_);
#else
if (joinable_)
RTC_CHECK_EQ(0, pthread_join(*handle_, nullptr));
#endif
handle_ = absl::nullopt;
} }
PlatformThread PlatformThread::SpawnThread( void PlatformThread::Start() {
std::function<void()> thread_function, RTC_DCHECK_RUN_ON(&thread_checker_);
absl::string_view name, RTC_DCHECK(!thread_) << "Thread already started?";
ThreadAttributes attributes, ThreadStartData* data =
bool joinable) { new ThreadStartData{run_function_, obj_, name_, attributes_.priority};
RTC_DCHECK(thread_function);
RTC_DCHECK(!name.empty());
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
RTC_DCHECK(name.length() < 64);
auto start_thread_function_ptr =
new std::function<void()>([thread_function = std::move(thread_function),
name = std::string(name), attributes] {
rtc::SetCurrentThreadName(name.c_str());
SetPriority(attributes.priority);
thread_function();
});
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
// See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION. // See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION.
// Set the reserved stack stack size to 1M, which is the default on Windows // Set the reserved stack stack size to 1M, which is the default on Windows
// and Linux. // and Linux.
DWORD thread_id = 0; thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, data,
PlatformThread::Handle handle = ::CreateThread( STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_);
nullptr, 1024 * 1024, &RunPlatformThread, start_thread_function_ptr, RTC_CHECK(thread_) << "CreateThread failed";
STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id); RTC_DCHECK(thread_id_);
RTC_CHECK(handle) << "CreateThread failed";
#else #else
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
// Set the stack stack size to 1M. // Set the stack stack size to 1M.
pthread_attr_setstacksize(&attr, 1024 * 1024); pthread_attr_setstacksize(&attr, 1024 * 1024);
pthread_attr_setdetachstate( pthread_attr_setdetachstate(&attr, attributes_.joinable
&attr, joinable ? PTHREAD_CREATE_JOINABLE : PTHREAD_CREATE_DETACHED); ? PTHREAD_CREATE_JOINABLE
PlatformThread::Handle handle; : PTHREAD_CREATE_DETACHED);
RTC_CHECK_EQ(0, pthread_create(&handle, &attr, &RunPlatformThread, RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, data));
start_thread_function_ptr));
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
#endif // defined(WEBRTC_WIN) #endif // defined(WEBRTC_WIN)
return PlatformThread(handle, joinable);
} }
bool PlatformThread::IsRunning() const {
RTC_DCHECK_RUN_ON(&thread_checker_);
#if defined(WEBRTC_WIN)
return thread_ != nullptr;
#else
return thread_ != 0;
#endif // defined(WEBRTC_WIN)
}
PlatformThreadRef PlatformThread::GetThreadRef() const {
#if defined(WEBRTC_WIN)
return thread_id_;
#else
return thread_;
#endif // defined(WEBRTC_WIN)
}
void PlatformThread::Stop() {
RTC_DCHECK_RUN_ON(&thread_checker_);
if (!IsRunning())
return;
#if defined(WEBRTC_WIN)
if (attributes_.joinable) {
WaitForSingleObject(thread_, INFINITE);
}
CloseHandle(thread_);
thread_ = nullptr;
thread_id_ = 0;
#else
if (attributes_.joinable) {
RTC_CHECK_EQ(0, pthread_join(thread_, nullptr));
}
thread_ = 0;
#endif // defined(WEBRTC_WIN)
}
#if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(IsRunning());
return QueueUserAPC(function, thread_, data) != FALSE;
}
#endif
} // namespace rtc } // namespace rtc

View file

@ -11,101 +11,103 @@
#ifndef RTC_BASE_PLATFORM_THREAD_H_ #ifndef RTC_BASE_PLATFORM_THREAD_H_
#define RTC_BASE_PLATFORM_THREAD_H_ #define RTC_BASE_PLATFORM_THREAD_H_
#include <functional> #ifndef WEBRTC_WIN
#include <pthread.h>
#endif
#include <string> #include <string>
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "api/sequence_checker.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/platform_thread_types.h" #include "rtc_base/platform_thread_types.h"
namespace rtc { namespace rtc {
enum class ThreadPriority { // Callback function that the spawned thread will enter once spawned.
kLow = 1, typedef void (*ThreadRunFunction)(void*);
kNormal,
kHigh, enum ThreadPriority {
kRealtime, #ifdef WEBRTC_WIN
kLowPriority = THREAD_PRIORITY_BELOW_NORMAL,
kNormalPriority = THREAD_PRIORITY_NORMAL,
kHighPriority = THREAD_PRIORITY_ABOVE_NORMAL,
kHighestPriority = THREAD_PRIORITY_HIGHEST,
kRealtimePriority = THREAD_PRIORITY_TIME_CRITICAL
#else
kLowPriority = 1,
kNormalPriority = 2,
kHighPriority = 3,
kHighestPriority = 4,
kRealtimePriority = 5
#endif
}; };
struct ThreadAttributes { struct ThreadAttributes {
ThreadPriority priority = ThreadPriority::kNormal; ThreadPriority priority = kNormalPriority;
bool joinable = true;
ThreadAttributes& SetPriority(ThreadPriority priority_param) { ThreadAttributes& SetPriority(ThreadPriority priority_param) {
priority = priority_param; priority = priority_param;
return *this; return *this;
} }
ThreadAttributes& SetDetached() {
joinable = false;
return *this;
}
}; };
// Represents a simple worker thread. // Represents a simple worker thread. The implementation must be assumed
class PlatformThread final { // to be single threaded, meaning that all methods of the class, must be
// called from the same thread, including instantiation.
class PlatformThread {
public: public:
// Handle is the base platform thread handle. PlatformThread(ThreadRunFunction func,
#if defined(WEBRTC_WIN) void* obj,
using Handle = HANDLE; absl::string_view thread_name,
#else ThreadAttributes attributes = ThreadAttributes());
using Handle = pthread_t;
#endif // defined(WEBRTC_WIN)
// This ctor creates the PlatformThread with an unset handle (returning true
// in empty()) and is provided for convenience.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread() = default;
// Moves |rhs| into this, storing an empty state in |rhs|.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread(PlatformThread&& rhs);
// Moves |rhs| into this, storing an empty state in |rhs|.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread& operator=(PlatformThread&& rhs);
// For a PlatformThread that's been spawned joinable, the destructor suspends
// the calling thread until the created thread exits unless the thread has
// already exited.
virtual ~PlatformThread(); virtual ~PlatformThread();
// Finalizes any allocated resources. const std::string& name() const { return name_; }
// For a PlatformThread that's been spawned joinable, Finalize() suspends
// the calling thread until the created thread exits unless the thread has
// already exited.
// empty() returns true after completion.
void Finalize();
// Returns true if default constructed, moved from, or Finalize()ed. // Spawns a thread and tries to set thread priority according to the priority
bool empty() const { return !handle_.has_value(); } // from when CreateThread was called.
// Start can only be called after the constructor or after a call to Stop().
void Start();
// Creates a started joinable thread which will be joined when the returned bool IsRunning() const;
// PlatformThread destructs or Finalize() is called.
static PlatformThread SpawnJoinable(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes = ThreadAttributes());
// Creates a started detached thread. The caller has to use external // Returns an identifier for the worker thread that can be used to do
// synchronization as nothing is provided by the PlatformThread construct. // thread checks.
static PlatformThread SpawnDetached( PlatformThreadRef GetThreadRef() const;
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes = ThreadAttributes());
// Returns the base platform thread handle of this thread. // Stop() prepares the PlatformThread for destruction or another call to
absl::optional<Handle> GetHandle() const; // Start(). For a PlatformThread that's been created with
// ThreadAttributes::joinable true (the default), Stop() suspends the calling
// thread until the created thread exits unless the thread has already exited.
// Stop() can only be called after calling Start().
void Stop();
protected:
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
// Queue a Windows APC function that runs when the thread is alertable. // Exposed to derived classes to allow for special cases specific to Windows.
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data); bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data);
#endif #endif
private: private:
PlatformThread(Handle handle, bool joinable); ThreadRunFunction const run_function_ = nullptr;
static PlatformThread SpawnThread(std::function<void()> thread_function, const ThreadAttributes attributes_;
absl::string_view name, void* const obj_;
ThreadAttributes attributes, // TODO(pbos): Make sure call sites use string literals and update to a const
bool joinable); // char* instead of a std::string.
const std::string name_;
absl::optional<Handle> handle_; webrtc::SequenceChecker thread_checker_;
bool joinable_ = false; #if defined(WEBRTC_WIN)
HANDLE thread_ = nullptr;
DWORD thread_id_ = 0;
#else
pthread_t thread_ = 0;
#endif // defined(WEBRTC_WIN)
RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread);
}; };
} // namespace rtc } // namespace rtc

View file

@ -10,73 +10,69 @@
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include "absl/types/optional.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "system_wrappers/include/sleep.h" #include "system_wrappers/include/sleep.h"
#include "test/gmock.h" #include "test/gmock.h"
namespace rtc { namespace rtc {
namespace {
TEST(PlatformThreadTest, DefaultConstructedIsEmpty) { void NullRunFunction(void* obj) {}
PlatformThread thread;
EXPECT_EQ(thread.GetHandle(), absl::nullopt); // Function that sets a boolean.
EXPECT_TRUE(thread.empty()); void SetFlagRunFunction(void* obj) {
bool* obj_as_bool = static_cast<bool*>(obj);
*obj_as_bool = true;
} }
TEST(PlatformThreadTest, StartFinalize) { void StdFunctionRunFunction(void* obj) {
PlatformThread thread = PlatformThread::SpawnJoinable([] {}, "1"); std::function<void()>* fun = static_cast<std::function<void()>*>(obj);
EXPECT_NE(thread.GetHandle(), absl::nullopt); (*fun)();
EXPECT_FALSE(thread.empty());
thread.Finalize();
EXPECT_TRUE(thread.empty());
thread = PlatformThread::SpawnDetached([] {}, "2");
EXPECT_FALSE(thread.empty());
thread.Finalize();
EXPECT_TRUE(thread.empty());
} }
TEST(PlatformThreadTest, MovesEmpty) { } // namespace
PlatformThread thread1;
PlatformThread thread2 = std::move(thread1); TEST(PlatformThreadTest, StartStop) {
EXPECT_TRUE(thread1.empty()); PlatformThread thread(&NullRunFunction, nullptr, "PlatformThreadTest");
EXPECT_TRUE(thread2.empty()); EXPECT_TRUE(thread.name() == "PlatformThreadTest");
EXPECT_TRUE(thread.GetThreadRef() == 0);
thread.Start();
EXPECT_TRUE(thread.GetThreadRef() != 0);
thread.Stop();
EXPECT_TRUE(thread.GetThreadRef() == 0);
} }
TEST(PlatformThreadTest, MovesHandles) { TEST(PlatformThreadTest, StartStop2) {
PlatformThread thread1 = PlatformThread::SpawnJoinable([] {}, "1"); PlatformThread thread1(&NullRunFunction, nullptr, "PlatformThreadTest1");
PlatformThread thread2 = std::move(thread1); PlatformThread thread2(&NullRunFunction, nullptr, "PlatformThreadTest2");
EXPECT_TRUE(thread1.empty()); EXPECT_TRUE(thread1.GetThreadRef() == thread2.GetThreadRef());
EXPECT_FALSE(thread2.empty()); thread1.Start();
thread1 = PlatformThread::SpawnDetached([] {}, "2"); thread2.Start();
thread2 = std::move(thread1); EXPECT_TRUE(thread1.GetThreadRef() != thread2.GetThreadRef());
EXPECT_TRUE(thread1.empty()); thread2.Stop();
EXPECT_FALSE(thread2.empty()); thread1.Stop();
}
TEST(PlatformThreadTest,
TwoThreadHandlesAreDifferentWhenStartedAndEqualWhenJoined) {
PlatformThread thread1 = PlatformThread();
PlatformThread thread2 = PlatformThread();
EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle());
thread1 = PlatformThread::SpawnJoinable([] {}, "1");
thread2 = PlatformThread::SpawnJoinable([] {}, "2");
EXPECT_NE(thread1.GetHandle(), thread2.GetHandle());
thread1.Finalize();
EXPECT_NE(thread1.GetHandle(), thread2.GetHandle());
thread2.Finalize();
EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle());
} }
TEST(PlatformThreadTest, RunFunctionIsCalled) { TEST(PlatformThreadTest, RunFunctionIsCalled) {
bool flag = false; bool flag = false;
PlatformThread::SpawnJoinable([&] { flag = true; }, "T"); PlatformThread thread(&SetFlagRunFunction, &flag, "RunFunctionIsCalled");
thread.Start();
// At this point, the flag may be either true or false.
thread.Stop();
// We expect the thread to have run at least once.
EXPECT_TRUE(flag); EXPECT_TRUE(flag);
} }
TEST(PlatformThreadTest, JoinsThread) { TEST(PlatformThreadTest, JoinsThread) {
// This test flakes if there are problems with the join implementation. // This test flakes if there are problems with the join implementation.
EXPECT_TRUE(ThreadAttributes().joinable);
rtc::Event event; rtc::Event event;
PlatformThread::SpawnJoinable([&] { event.Set(); }, "T"); std::function<void()> thread_function = [&] { event.Set(); };
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T");
thread.Start();
thread.Stop();
EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0)); EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0));
} }
@ -87,14 +83,18 @@ TEST(PlatformThreadTest, StopsBeforeDetachedThreadExits) {
rtc::Event thread_started; rtc::Event thread_started;
rtc::Event thread_continue; rtc::Event thread_continue;
rtc::Event thread_exiting; rtc::Event thread_exiting;
PlatformThread::SpawnDetached( std::function<void()> thread_function = [&] {
[&] { thread_started.Set();
thread_started.Set(); thread_continue.Wait(Event::kForever);
thread_continue.Wait(Event::kForever); flag = true;
flag = true; thread_exiting.Set();
thread_exiting.Set(); };
}, {
"T"); PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T",
ThreadAttributes().SetDetached());
thread.Start();
thread.Stop();
}
thread_started.Wait(Event::kForever); thread_started.Wait(Event::kForever);
EXPECT_FALSE(flag); EXPECT_FALSE(flag);
thread_continue.Set(); thread_continue.Set();

View file

@ -127,6 +127,10 @@ class ThreadTask {
rtc::Event end_signal_; rtc::Event end_signal_;
}; };
void RunTask(void* thread_task) {
reinterpret_cast<ThreadTask*>(thread_task)->Run();
}
TEST_F(RateLimitTest, MultiThreadedUsage) { TEST_F(RateLimitTest, MultiThreadedUsage) {
// Simple sanity test, with different threads calling the various methods. // Simple sanity test, with different threads calling the various methods.
// Runs a few simple tasks, each on its own thread, but coordinated with // Runs a few simple tasks, each on its own thread, but coordinated with
@ -145,8 +149,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2)); EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2));
} }
} set_window_size_task(rate_limiter.get()); } set_window_size_task(rate_limiter.get());
auto thread1 = rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread thread1(RunTask, &set_window_size_task, "Thread1");
[&set_window_size_task] { set_window_size_task.Run(); }, "Thread1"); thread1.Start();
class SetMaxRateTask : public ThreadTask { class SetMaxRateTask : public ThreadTask {
public: public:
@ -156,8 +160,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); } void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); }
} set_max_rate_task(rate_limiter.get()); } set_max_rate_task(rate_limiter.get());
auto thread2 = rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread thread2(RunTask, &set_max_rate_task, "Thread2");
[&set_max_rate_task] { set_max_rate_task.Run(); }, "Thread2"); thread2.Start();
class UseRateTask : public ThreadTask { class UseRateTask : public ThreadTask {
public: public:
@ -173,8 +177,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
SimulatedClock* const clock_; SimulatedClock* const clock_;
} use_rate_task(rate_limiter.get(), &clock_); } use_rate_task(rate_limiter.get(), &clock_);
auto thread3 = rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread thread3(RunTask, &use_rate_task, "Thread3");
[&use_rate_task] { use_rate_task.Run(); }, "Thread3"); thread3.Start();
set_window_size_task.start_signal_.Set(); set_window_size_task.start_signal_.Set();
EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs)); EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs));
@ -187,6 +191,10 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
// All rate consumed. // All rate consumed.
EXPECT_FALSE(rate_limiter->TryUseRate(1)); EXPECT_FALSE(rate_limiter->TryUseRate(1));
thread1.Stop();
thread2.Stop();
thread3.Stop();
} }
} // namespace webrtc } // namespace webrtc

View file

@ -93,12 +93,16 @@ void EventAssign(struct event* ev,
rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
switch (priority) { switch (priority) {
case Priority::HIGH: case Priority::HIGH:
return rtc::ThreadPriority::kRealtime; return rtc::kRealtimePriority;
case Priority::LOW: case Priority::LOW:
return rtc::ThreadPriority::kLow; return rtc::kLowPriority;
case Priority::NORMAL: case Priority::NORMAL:
return rtc::ThreadPriority::kNormal; return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
break;
} }
return rtc::kNormalPriority;
} }
class TaskQueueLibevent final : public TaskQueueBase { class TaskQueueLibevent final : public TaskQueueBase {
@ -116,6 +120,7 @@ class TaskQueueLibevent final : public TaskQueueBase {
~TaskQueueLibevent() override = default; ~TaskQueueLibevent() override = default;
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT
@ -167,7 +172,11 @@ class TaskQueueLibevent::SetTimerTask : public QueuedTask {
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
rtc::ThreadPriority priority) rtc::ThreadPriority priority)
: event_base_(event_base_new()) { : event_base_(event_base_new()),
thread_(&TaskQueueLibevent::ThreadMain,
this,
queue_name,
rtc::ThreadAttributes().SetPriority(priority)) {
int fds[2]; int fds[2];
RTC_CHECK(pipe(fds) == 0); RTC_CHECK(pipe(fds) == 0);
SetNonBlocking(fds[0]); SetNonBlocking(fds[0]);
@ -178,18 +187,7 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_, EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this); EV_READ | EV_PERSIST, OnWakeup, this);
event_add(&wakeup_event_, 0); event_add(&wakeup_event_, 0);
thread_ = rtc::PlatformThread::SpawnJoinable( thread_.Start();
[this] {
{
CurrentTaskQueueSetter set_current(this);
while (is_active_)
event_base_loop(event_base_, 0);
}
for (TimerEvent* timer : pending_timers_)
delete timer;
},
queue_name, rtc::ThreadAttributes().SetPriority(priority));
} }
void TaskQueueLibevent::Delete() { void TaskQueueLibevent::Delete() {
@ -204,7 +202,7 @@ void TaskQueueLibevent::Delete() {
nanosleep(&ts, nullptr); nanosleep(&ts, nullptr);
} }
thread_.Finalize(); thread_.Stop();
event_del(&wakeup_event_); event_del(&wakeup_event_);
@ -257,6 +255,20 @@ void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
} }
} }
// static
void TaskQueueLibevent::ThreadMain(void* context) {
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
{
CurrentTaskQueueSetter set_current(me);
while (me->is_active_)
event_base_loop(me->event_base_, 0);
}
for (TimerEvent* timer : me->pending_timers_)
delete timer;
}
// static // static
void TaskQueueLibevent::OnWakeup(int socket, void TaskQueueLibevent::OnWakeup(int socket,
short flags, // NOLINT short flags, // NOLINT

View file

@ -36,11 +36,14 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
TaskQueueFactory::Priority priority) { TaskQueueFactory::Priority priority) {
switch (priority) { switch (priority) {
case TaskQueueFactory::Priority::HIGH: case TaskQueueFactory::Priority::HIGH:
return rtc::ThreadPriority::kRealtime; return rtc::kRealtimePriority;
case TaskQueueFactory::Priority::LOW: case TaskQueueFactory::Priority::LOW:
return rtc::ThreadPriority::kLow; return rtc::kLowPriority;
case TaskQueueFactory::Priority::NORMAL: case TaskQueueFactory::Priority::NORMAL:
return rtc::ThreadPriority::kNormal; return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
return rtc::kNormalPriority;
} }
} }
@ -75,6 +78,8 @@ class TaskQueueStdlib final : public TaskQueueBase {
NextTask GetNextTask(); NextTask GetNextTask();
static void ThreadMain(void* context);
void ProcessTasks(); void ProcessTasks();
void NotifyWake(); void NotifyWake();
@ -121,13 +126,11 @@ TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,
: started_(/*manual_reset=*/false, /*initially_signaled=*/false), : started_(/*manual_reset=*/false, /*initially_signaled=*/false),
stopped_(/*manual_reset=*/false, /*initially_signaled=*/false), stopped_(/*manual_reset=*/false, /*initially_signaled=*/false),
flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
thread_(rtc::PlatformThread::SpawnJoinable( thread_(&TaskQueueStdlib::ThreadMain,
[this] { this,
CurrentTaskQueueSetter set_current(this); queue_name,
ProcessTasks(); rtc::ThreadAttributes().SetPriority(priority)) {
}, thread_.Start();
queue_name,
rtc::ThreadAttributes().SetPriority(priority))) {
started_.Wait(rtc::Event::kForever); started_.Wait(rtc::Event::kForever);
} }
@ -142,7 +145,7 @@ void TaskQueueStdlib::Delete() {
NotifyWake(); NotifyWake();
stopped_.Wait(rtc::Event::kForever); stopped_.Wait(rtc::Event::kForever);
thread_.Finalize(); thread_.Stop();
delete this; delete this;
} }
@ -219,6 +222,13 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
return result; return result;
} }
// static
void TaskQueueStdlib::ThreadMain(void* context) {
TaskQueueStdlib* me = static_cast<TaskQueueStdlib*>(context);
CurrentTaskQueueSetter set_current(me);
me->ProcessTasks();
}
void TaskQueueStdlib::ProcessTasks() { void TaskQueueStdlib::ProcessTasks() {
started_.Set(); started_.Set();

View file

@ -29,7 +29,6 @@
#include <utility> #include <utility>
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/task_queue/queued_task.h" #include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "rtc_base/arraysize.h" #include "rtc_base/arraysize.h"
@ -57,12 +56,16 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
TaskQueueFactory::Priority priority) { TaskQueueFactory::Priority priority) {
switch (priority) { switch (priority) {
case TaskQueueFactory::Priority::HIGH: case TaskQueueFactory::Priority::HIGH:
return rtc::ThreadPriority::kRealtime; return rtc::kRealtimePriority;
case TaskQueueFactory::Priority::LOW: case TaskQueueFactory::Priority::LOW:
return rtc::ThreadPriority::kLow; return rtc::kLowPriority;
case TaskQueueFactory::Priority::NORMAL: case TaskQueueFactory::Priority::NORMAL:
return rtc::ThreadPriority::kNormal; return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
break;
} }
return rtc::kNormalPriority;
} }
int64_t GetTick() { int64_t GetTick() {
@ -164,6 +167,24 @@ class TaskQueueWin : public TaskQueueBase {
void RunPendingTasks(); void RunPendingTasks();
private: private:
static void ThreadMain(void* context);
class WorkerThread : public rtc::PlatformThread {
public:
WorkerThread(rtc::ThreadRunFunction func,
void* obj,
absl::string_view thread_name,
rtc::ThreadPriority priority)
: PlatformThread(func,
obj,
thread_name,
rtc::ThreadAttributes().SetPriority(priority)) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return rtc::PlatformThread::QueueAPC(apc_function, data);
}
};
void RunThreadMain(); void RunThreadMain();
bool ProcessQueuedMessages(); bool ProcessQueuedMessages();
void RunDueTasks(); void RunDueTasks();
@ -186,7 +207,7 @@ class TaskQueueWin : public TaskQueueBase {
greater<DelayedTaskInfo>> greater<DelayedTaskInfo>>
timer_tasks_; timer_tasks_;
UINT_PTR timer_id_ = 0; UINT_PTR timer_id_ = 0;
rtc::PlatformThread thread_; WorkerThread thread_;
Mutex pending_lock_; Mutex pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_ std::queue<std::unique_ptr<QueuedTask>> pending_
RTC_GUARDED_BY(pending_lock_); RTC_GUARDED_BY(pending_lock_);
@ -195,12 +216,10 @@ class TaskQueueWin : public TaskQueueBase {
TaskQueueWin::TaskQueueWin(absl::string_view queue_name, TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
rtc::ThreadPriority priority) rtc::ThreadPriority priority)
: in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { : thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority),
in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
RTC_DCHECK(in_queue_); RTC_DCHECK(in_queue_);
thread_ = rtc::PlatformThread::SpawnJoinable( thread_.Start();
[this] { RunThreadMain(); }, queue_name,
rtc::ThreadAttributes().SetPriority(priority));
rtc::Event event(false, false); rtc::Event event(false, false);
RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
reinterpret_cast<ULONG_PTR>(&event))); reinterpret_cast<ULONG_PTR>(&event)));
@ -209,13 +228,11 @@ TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
void TaskQueueWin::Delete() { void TaskQueueWin::Delete() {
RTC_DCHECK(!IsCurrent()); RTC_DCHECK(!IsCurrent());
RTC_CHECK(thread_.GetHandle() != absl::nullopt); while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
while (
!::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) {
RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
Sleep(1); Sleep(1);
} }
thread_.Finalize(); thread_.Stop();
::CloseHandle(in_queue_); ::CloseHandle(in_queue_);
delete this; delete this;
} }
@ -238,9 +255,7 @@ void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
// and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
// task pointer and timestamp as LPARAM and WPARAM. // task pointer and timestamp as LPARAM and WPARAM.
auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task)); auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
RTC_CHECK(thread_.GetHandle() != absl::nullopt); if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0,
if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
WM_QUEUE_DELAYED_TASK, 0,
reinterpret_cast<LPARAM>(task_info))) { reinterpret_cast<LPARAM>(task_info))) {
delete task_info; delete task_info;
} }
@ -262,6 +277,11 @@ void TaskQueueWin::RunPendingTasks() {
} }
} }
// static
void TaskQueueWin::ThreadMain(void* context) {
static_cast<TaskQueueWin*>(context)->RunThreadMain();
}
void TaskQueueWin::RunThreadMain() { void TaskQueueWin::RunThreadMain() {
CurrentTaskQueueSetter set_current(this); CurrentTaskQueueSetter set_current(this);
HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_}; HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};

View file

@ -153,24 +153,28 @@ class SleepDeadlock : public DeadlockInterface {
} }
}; };
// This is the function that is exectued by the thread that will deadlock and
// have its stacktrace captured.
void ThreadFunction(void* void_params) {
ThreadParams* params = static_cast<ThreadParams*>(void_params);
params->tid = gettid();
params->deadlock_region_start_address = GetCurrentRelativeExecutionAddress();
params->deadlock_start_event.Set();
params->deadlock_impl->Deadlock();
params->deadlock_region_end_address = GetCurrentRelativeExecutionAddress();
params->deadlock_done_event.Set();
}
void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) { void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) {
// Set params that will be sent to other thread. // Set params that will be sent to other thread.
ThreadParams params; ThreadParams params;
params.deadlock_impl = deadlock_impl.get(); params.deadlock_impl = deadlock_impl.get();
// Spawn thread. // Spawn thread.
auto thread = rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread thread(&ThreadFunction, &params, "StacktraceTest");
[&params] { thread.Start();
params.tid = gettid();
params.deadlock_region_start_address =
GetCurrentRelativeExecutionAddress();
params.deadlock_start_event.Set();
params.deadlock_impl->Deadlock();
params.deadlock_region_end_address =
GetCurrentRelativeExecutionAddress();
params.deadlock_done_event.Set();
},
"StacktraceTest");
// Wait until the thread has entered the deadlock region, and take a very // Wait until the thread has entered the deadlock region, and take a very
// brief nap to give it time to reach the actual deadlock. // brief nap to give it time to reach the actual deadlock.
@ -194,6 +198,8 @@ void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) {
<< rtc::ToHex(params.deadlock_region_start_address) << ", " << rtc::ToHex(params.deadlock_region_start_address) << ", "
<< rtc::ToHex(params.deadlock_region_end_address) << rtc::ToHex(params.deadlock_region_end_address)
<< "] not contained in: " << StackTraceToString(stack_trace); << "] not contained in: " << StackTraceToString(stack_trace);
thread.Stop();
} }
class LookoutLogSink final : public rtc::LogSink { class LookoutLogSink final : public rtc::LogSink {
@ -253,9 +259,13 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) {
// Start a thread that waits for an event. // Start a thread that waits for an event.
rtc::Event ev; rtc::Event ev;
auto thread = rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread thread(
[&ev] { ev.Wait(rtc::Event::kForever); }, [](void* arg) {
"TestRtcEventDeadlockDetection"); auto* ev = static_cast<rtc::Event*>(arg);
ev->Wait(rtc::Event::kForever);
},
&ev, "TestRtcEventDeadlockDetection");
thread.Start();
// The message should appear after 3 sec. We'll wait up to 10 sec in an // The message should appear after 3 sec. We'll wait up to 10 sec in an
// attempt to not be flaky. // attempt to not be flaky.
@ -263,7 +273,7 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) {
// Unblock the thread and shut it down. // Unblock the thread and shut it down.
ev.Set(); ev.Set();
thread.Finalize(); thread.Stop();
rtc::LogMessage::RemoveLogToStream(&sink); rtc::LogMessage::RemoveLogToStream(&sink);
} }

View file

@ -142,9 +142,12 @@ void DefaultVideoQualityAnalyzer::Start(
int max_threads_count) { int max_threads_count) {
test_label_ = std::move(test_case_name); test_label_ = std::move(test_case_name);
for (int i = 0; i < max_threads_count; i++) { for (int i = 0; i < max_threads_count; i++) {
thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable( auto thread = std::make_unique<rtc::PlatformThread>(
[this] { ProcessComparisons(); }, &DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this,
"DefaultVideoQualityAnalyzerWorker-" + std::to_string(i))); ("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(),
rtc::ThreadAttributes().SetPriority(rtc::kNormalPriority));
thread->Start();
thread_pool_.push_back(std::move(thread));
} }
{ {
MutexLock lock(&lock_); MutexLock lock(&lock_);
@ -544,6 +547,10 @@ void DefaultVideoQualityAnalyzer::Stop() {
} }
StopMeasuringCpuProcessTime(); StopMeasuringCpuProcessTime();
comparison_available_event_.Set(); comparison_available_event_.Set();
for (auto& thread : thread_pool_) {
thread->Stop();
}
// PlatformThread have to be deleted on the same thread, where it was created
thread_pool_.clear(); thread_pool_.clear();
// Perform final Metrics update. On this place analyzer is stopped and no one // Perform final Metrics update. On this place analyzer is stopped and no one
@ -670,6 +677,10 @@ void DefaultVideoQualityAnalyzer::AddComparison(
StopExcludingCpuThreadTime(); StopExcludingCpuThreadTime();
} }
void DefaultVideoQualityAnalyzer::ProcessComparisonsThread(void* obj) {
static_cast<DefaultVideoQualityAnalyzer*>(obj)->ProcessComparisons();
}
void DefaultVideoQualityAnalyzer::ProcessComparisons() { void DefaultVideoQualityAnalyzer::ProcessComparisons() {
while (true) { while (true) {
// Try to pick next comparison to perform from the queue. // Try to pick next comparison to perform from the queue.

View file

@ -560,7 +560,7 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_); std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_);
AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_); AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_);
std::vector<rtc::PlatformThread> thread_pool_; std::vector<std::unique_ptr<rtc::PlatformThread>> thread_pool_;
rtc::Event comparison_available_event_; rtc::Event comparison_available_event_;
Mutex cpu_measurement_lock_; Mutex cpu_measurement_lock_;

View file

@ -137,12 +137,10 @@ VideoAnalyzer::VideoAnalyzer(test::LayerFilteringTransport* transport,
} }
for (uint32_t i = 0; i < num_cores; ++i) { for (uint32_t i = 0; i < num_cores; ++i) {
comparison_thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable( rtc::PlatformThread* thread =
[this] { new rtc::PlatformThread(&FrameComparisonThread, this, "Analyzer");
while (CompareFrames()) { thread->Start();
} comparison_thread_pool_.push_back(thread);
},
"Analyzer"));
} }
if (!rtp_dump_name.empty()) { if (!rtp_dump_name.empty()) {
@ -157,8 +155,10 @@ VideoAnalyzer::~VideoAnalyzer() {
MutexLock lock(&comparison_lock_); MutexLock lock(&comparison_lock_);
quit_ = true; quit_ = true;
} }
// Joins all threads. for (rtc::PlatformThread* thread : comparison_thread_pool_) {
comparison_thread_pool_.clear(); thread->Stop();
delete thread;
}
} }
void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) { void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) {
@ -533,6 +533,12 @@ void VideoAnalyzer::PollStats() {
memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes()); memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes());
} }
void VideoAnalyzer::FrameComparisonThread(void* obj) {
VideoAnalyzer* analyzer = static_cast<VideoAnalyzer*>(obj);
while (analyzer->CompareFrames()) {
}
}
bool VideoAnalyzer::CompareFrames() { bool VideoAnalyzer::CompareFrames() {
if (AllFramesRecorded()) if (AllFramesRecorded())
return false; return false;

View file

@ -302,7 +302,7 @@ class VideoAnalyzer : public PacketReceiver,
const double avg_ssim_threshold_; const double avg_ssim_threshold_;
bool is_quick_test_enabled_; bool is_quick_test_enabled_;
std::vector<rtc::PlatformThread> comparison_thread_pool_; std::vector<rtc::PlatformThread*> comparison_thread_pool_;
rtc::Event comparison_available_event_; rtc::Event comparison_available_event_;
std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_); std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_);
bool quit_ RTC_GUARDED_BY(comparison_lock_); bool quit_ RTC_GUARDED_BY(comparison_lock_);