Refactor the PlatformThread API.

PlatformThread's API is using old style function pointers, causes
casting, is unintuitive and forces artificial call sequences, and
is additionally possible to misuse in release mode.

Fix this by an API face lift:
1. The class is turned into a handle, which can be empty.
2. The only way of getting a non-empty PlatformThread is by calling
SpawnJoinable or SpawnDetached, clearly conveying the semantics to the
code reader.
3. Handles can be Finalized, which works differently for joinable and
detached threads:
  a) Handles for detached threads are simply closed where applicable.
  b) Joinable threads are joined before handles are closed.
4. The destructor finalizes handles. No explicit call is needed.

Fixed: webrtc:12727
Change-Id: Id00a0464edf4fc9e552b6a1fbb5d2e1280e88811
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215075
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33923}
This commit is contained in:
Markus Handell 2021-05-05 10:42:04 +02:00 committed by WebRTC LUCI CQ
parent 1c5c2178fe
commit c89fdd716c
38 changed files with 586 additions and 864 deletions

View file

@ -40,21 +40,14 @@ class CompileTimeTestForGuardedBy {
};
void RunOnDifferentThread(rtc::FunctionView<void()> run) {
struct Object {
static void Run(void* obj) {
auto* me = static_cast<Object*>(obj);
me->run();
me->thread_has_run_event.Set();
}
rtc::FunctionView<void()> run;
rtc::Event thread_has_run_event;
} object{run};
rtc::PlatformThread thread(&Object::Run, &object, "thread");
thread.Start();
EXPECT_TRUE(object.thread_has_run_event.Wait(1000));
thread.Stop();
rtc::Event thread_has_run_event;
rtc::PlatformThread::SpawnJoinable(
[&] {
run();
thread_has_run_event.Set();
},
"thread");
EXPECT_TRUE(thread_has_run_event.Wait(1000));
}
} // namespace

View file

@ -429,21 +429,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
AudioCodingModuleMtTestOldApi()
: AudioCodingModuleTestOldApi(),
send_thread_(
CbSendThread,
this,
"send",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
insert_packet_thread_(
CbInsertPacketThread,
this,
"insert_packet",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
pull_audio_thread_(
CbPullAudioThread,
this,
"pull_audio",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
send_count_(0),
insert_packet_count_(0),
pull_audio_count_(0),
@ -460,17 +445,38 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
void StartThreads() {
quit_.store(false);
send_thread_.Start();
insert_packet_thread_.Start();
pull_audio_thread_.Start();
const auto attributes =
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
send_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbSendImpl();
}
},
"send", attributes);
insert_packet_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbInsertPacketImpl();
}
},
"insert_packet", attributes);
pull_audio_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbPullAudioImpl();
}
},
"pull_audio", attributes);
}
void TearDown() {
AudioCodingModuleTestOldApi::TearDown();
quit_.store(true);
pull_audio_thread_.Stop();
send_thread_.Stop();
insert_packet_thread_.Stop();
pull_audio_thread_.Finalize();
send_thread_.Finalize();
insert_packet_thread_.Finalize();
}
bool RunTest() {
@ -488,14 +494,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
return false;
}
static void CbSendThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbSendImpl();
}
}
// The send thread doesn't have to care about the current simulated time,
// since only the AcmReceiver is using the clock.
void CbSendImpl() {
@ -511,14 +509,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
}
}
static void CbInsertPacketThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbInsertPacketImpl();
}
}
void CbInsertPacketImpl() {
SleepMs(1);
{
@ -533,14 +523,6 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi {
InsertPacket();
}
static void CbPullAudioThread(void* context) {
AudioCodingModuleMtTestOldApi* fixture =
reinterpret_cast<AudioCodingModuleMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbPullAudioImpl();
}
}
void CbPullAudioImpl() {
SleepMs(1);
{
@ -699,16 +681,6 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
AcmReRegisterIsacMtTestOldApi()
: AudioCodingModuleTestOldApi(),
receive_thread_(
CbReceiveThread,
this,
"receive",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registration_thread_(
CbCodecRegistrationThread,
this,
"codec_registration",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
codec_registered_(false),
receive_packet_count_(0),
next_insert_packet_time_ms_(0),
@ -740,28 +712,34 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
void StartThreads() {
quit_.store(false);
receive_thread_.Start();
codec_registration_thread_.Start();
const auto attributes =
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
receive_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load() && CbReceiveImpl()) {
}
},
"receive", attributes);
codec_registration_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!quit_.load()) {
CbCodecRegistrationImpl();
}
},
"codec_registration", attributes);
}
void TearDown() override {
AudioCodingModuleTestOldApi::TearDown();
quit_.store(true);
receive_thread_.Stop();
codec_registration_thread_.Stop();
receive_thread_.Finalize();
codec_registration_thread_.Finalize();
}
bool RunTest() {
return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout.
}
static void CbReceiveThread(void* context) {
AcmReRegisterIsacMtTestOldApi* fixture =
reinterpret_cast<AcmReRegisterIsacMtTestOldApi*>(context);
while (!fixture->quit_.load() && fixture->CbReceiveImpl()) {
}
}
bool CbReceiveImpl() {
SleepMs(1);
rtc::Buffer encoded;
@ -807,14 +785,6 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi {
return true;
}
static void CbCodecRegistrationThread(void* context) {
AcmReRegisterIsacMtTestOldApi* fixture =
reinterpret_cast<AcmReRegisterIsacMtTestOldApi*>(context);
while (!fixture->quit_.load()) {
fixture->CbCodecRegistrationImpl();
}
}
void CbCodecRegistrationImpl() {
SleepMs(1);
if (HasFatalFailure()) {

View file

@ -216,10 +216,13 @@ int32_t FileAudioDevice::StartPlayout() {
}
}
_ptrThreadPlay.reset(new rtc::PlatformThread(
PlayThreadFunc, this, "webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadPlay->Start();
_ptrThreadPlay = rtc::PlatformThread::SpawnJoinable(
[this] {
while (PlayThreadProcess()) {
}
},
"webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_LOG(LS_INFO) << "Started playout capture to output file: "
<< _outputFilename;
@ -233,10 +236,8 @@ int32_t FileAudioDevice::StopPlayout() {
}
// stop playout thread first
if (_ptrThreadPlay) {
_ptrThreadPlay->Stop();
_ptrThreadPlay.reset();
}
if (!_ptrThreadPlay.empty())
_ptrThreadPlay.Finalize();
MutexLock lock(&mutex_);
@ -276,11 +277,13 @@ int32_t FileAudioDevice::StartRecording() {
}
}
_ptrThreadRec.reset(new rtc::PlatformThread(
RecThreadFunc, this, "webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadRec->Start();
_ptrThreadRec = rtc::PlatformThread::SpawnJoinable(
[this] {
while (RecThreadProcess()) {
}
},
"webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename;
@ -293,10 +296,8 @@ int32_t FileAudioDevice::StopRecording() {
_recording = false;
}
if (_ptrThreadRec) {
_ptrThreadRec->Stop();
_ptrThreadRec.reset();
}
if (!_ptrThreadRec.empty())
_ptrThreadRec.Finalize();
MutexLock lock(&mutex_);
_recordingFramesLeft = 0;
@ -439,18 +440,6 @@ void FileAudioDevice::AttachAudioBuffer(AudioDeviceBuffer* audioBuffer) {
_ptrAudioBuffer->SetPlayoutChannels(0);
}
void FileAudioDevice::PlayThreadFunc(void* pThis) {
FileAudioDevice* device = static_cast<FileAudioDevice*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void FileAudioDevice::RecThreadFunc(void* pThis) {
FileAudioDevice* device = static_cast<FileAudioDevice*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool FileAudioDevice::PlayThreadProcess() {
if (!_playing) {
return false;

View file

@ -17,14 +17,11 @@
#include <string>
#include "modules/audio_device/audio_device_generic.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/file_wrapper.h"
#include "rtc_base/time_utils.h"
namespace rtc {
class PlatformThread;
} // namespace rtc
namespace webrtc {
// This is a fake audio device which plays audio from a file as its microphone
@ -145,9 +142,8 @@ class FileAudioDevice : public AudioDeviceGeneric {
size_t _recordingFramesIn10MS;
size_t _playoutFramesIn10MS;
// TODO(pbos): Make plain members instead of pointers and stop resetting them.
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
rtc::PlatformThread _ptrThreadRec;
rtc::PlatformThread _ptrThreadPlay;
bool _playing;
bool _recording;

View file

@ -178,26 +178,13 @@ int32_t AudioDeviceLinuxALSA::Terminate() {
_mixerManager.Close();
// RECORDING
if (_ptrThreadRec) {
rtc::PlatformThread* tmpThread = _ptrThreadRec.release();
mutex_.Unlock();
tmpThread->Stop();
delete tmpThread;
mutex_.Lock();
}
mutex_.Unlock();
_ptrThreadRec.Finalize();
// PLAYOUT
if (_ptrThreadPlay) {
rtc::PlatformThread* tmpThread = _ptrThreadPlay.release();
mutex_.Unlock();
_ptrThreadPlay.Finalize();
mutex_.Lock();
tmpThread->Stop();
delete tmpThread;
mutex_.Lock();
}
#if defined(WEBRTC_USE_X11)
if (_XDisplay) {
XCloseDisplay(_XDisplay);
@ -1040,11 +1027,13 @@ int32_t AudioDeviceLinuxALSA::StartRecording() {
return -1;
}
// RECORDING
_ptrThreadRec.reset(new rtc::PlatformThread(
RecThreadFunc, this, "webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadRec->Start();
_ptrThreadRec = rtc::PlatformThread::SpawnJoinable(
[this] {
while (RecThreadProcess()) {
}
},
"webrtc_audio_module_capture_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
errVal = LATE(snd_pcm_prepare)(_handleRecord);
if (errVal < 0) {
@ -1088,10 +1077,7 @@ int32_t AudioDeviceLinuxALSA::StopRecordingLocked() {
_recIsInitialized = false;
_recording = false;
if (_ptrThreadRec) {
_ptrThreadRec->Stop();
_ptrThreadRec.reset();
}
_ptrThreadRec.Finalize();
_recordingFramesLeft = 0;
if (_recordingBuffer) {
@ -1158,10 +1144,13 @@ int32_t AudioDeviceLinuxALSA::StartPlayout() {
}
// PLAYOUT
_ptrThreadPlay.reset(new rtc::PlatformThread(
PlayThreadFunc, this, "webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadPlay->Start();
_ptrThreadPlay = rtc::PlatformThread::SpawnJoinable(
[this] {
while (PlayThreadProcess()) {
}
},
"webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
int errVal = LATE(snd_pcm_prepare)(_handlePlayout);
if (errVal < 0) {
@ -1191,10 +1180,7 @@ int32_t AudioDeviceLinuxALSA::StopPlayoutLocked() {
_playing = false;
// stop playout thread first
if (_ptrThreadPlay) {
_ptrThreadPlay->Stop();
_ptrThreadPlay.reset();
}
_ptrThreadPlay.Finalize();
_playoutFramesLeft = 0;
delete[] _playoutBuffer;
@ -1469,18 +1455,6 @@ int32_t AudioDeviceLinuxALSA::ErrorRecovery(int32_t error,
// Thread Methods
// ============================================================================
void AudioDeviceLinuxALSA::PlayThreadFunc(void* pThis) {
AudioDeviceLinuxALSA* device = static_cast<AudioDeviceLinuxALSA*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void AudioDeviceLinuxALSA::RecThreadFunc(void* pThis) {
AudioDeviceLinuxALSA* device = static_cast<AudioDeviceLinuxALSA*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool AudioDeviceLinuxALSA::PlayThreadProcess() {
if (!_playing)
return false;

View file

@ -155,10 +155,8 @@ class AudioDeviceLinuxALSA : public AudioDeviceGeneric {
Mutex mutex_;
// TODO(pbos): Make plain members and start/stop instead of resetting these
// pointers. A thread can be reused.
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
rtc::PlatformThread _ptrThreadRec;
rtc::PlatformThread _ptrThreadPlay;
AudioMixerManagerLinuxALSA _mixerManager;

View file

@ -15,6 +15,7 @@
#include "modules/audio_device/linux/latebindingsymboltable_linux.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
WebRTCPulseSymbolTable* GetPulseSymbolTable() {
static WebRTCPulseSymbolTable* pulse_symbol_table =
@ -158,18 +159,22 @@ AudioDeviceGeneric::InitStatus AudioDeviceLinuxPulse::Init() {
#endif
// RECORDING
_ptrThreadRec.reset(new rtc::PlatformThread(
RecThreadFunc, this, "webrtc_audio_module_rec_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadRec->Start();
const auto attributes =
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
_ptrThreadRec = rtc::PlatformThread::SpawnJoinable(
[this] {
while (RecThreadProcess()) {
}
},
"webrtc_audio_module_rec_thread", attributes);
// PLAYOUT
_ptrThreadPlay.reset(new rtc::PlatformThread(
PlayThreadFunc, this, "webrtc_audio_module_play_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
_ptrThreadPlay->Start();
_ptrThreadPlay = rtc::PlatformThread::SpawnJoinable(
[this] {
while (PlayThreadProcess()) {
}
},
"webrtc_audio_module_play_thread", attributes);
_initialized = true;
return InitStatus::OK;
@ -187,22 +192,12 @@ int32_t AudioDeviceLinuxPulse::Terminate() {
_mixerManager.Close();
// RECORDING
if (_ptrThreadRec) {
rtc::PlatformThread* tmpThread = _ptrThreadRec.release();
_timeEventRec.Set();
tmpThread->Stop();
delete tmpThread;
}
_timeEventRec.Set();
_ptrThreadRec.Finalize();
// PLAYOUT
if (_ptrThreadPlay) {
rtc::PlatformThread* tmpThread = _ptrThreadPlay.release();
_timeEventPlay.Set();
tmpThread->Stop();
delete tmpThread;
}
_timeEventPlay.Set();
_ptrThreadPlay.Finalize();
// Terminate PulseAudio
if (TerminatePulseAudio() < 0) {
@ -1981,18 +1976,6 @@ int32_t AudioDeviceLinuxPulse::ProcessRecordedData(int8_t* bufferData,
return 0;
}
void AudioDeviceLinuxPulse::PlayThreadFunc(void* pThis) {
AudioDeviceLinuxPulse* device = static_cast<AudioDeviceLinuxPulse*>(pThis);
while (device->PlayThreadProcess()) {
}
}
void AudioDeviceLinuxPulse::RecThreadFunc(void* pThis) {
AudioDeviceLinuxPulse* device = static_cast<AudioDeviceLinuxPulse*>(pThis);
while (device->RecThreadProcess()) {
}
}
bool AudioDeviceLinuxPulse::PlayThreadProcess() {
if (!_timeEventPlay.Wait(1000)) {
return true;

View file

@ -268,9 +268,8 @@ class AudioDeviceLinuxPulse : public AudioDeviceGeneric {
rtc::Event _recStartEvent;
rtc::Event _playStartEvent;
// TODO(pbos): Remove unique_ptr and use directly without resetting.
std::unique_ptr<rtc::PlatformThread> _ptrThreadPlay;
std::unique_ptr<rtc::PlatformThread> _ptrThreadRec;
rtc::PlatformThread _ptrThreadPlay;
rtc::PlatformThread _ptrThreadRec;
AudioMixerManagerLinuxPulse _mixerManager;

View file

@ -166,8 +166,8 @@ AudioDeviceMac::~AudioDeviceMac() {
Terminate();
}
RTC_DCHECK(!capture_worker_thread_.get());
RTC_DCHECK(!render_worker_thread_.get());
RTC_DCHECK(capture_worker_thread_.empty());
RTC_DCHECK(render_worker_thread_.empty());
if (_paRenderBuffer) {
delete _paRenderBuffer;
@ -1308,12 +1308,14 @@ int32_t AudioDeviceMac::StartRecording() {
return -1;
}
RTC_DCHECK(!capture_worker_thread_.get());
capture_worker_thread_.reset(new rtc::PlatformThread(
RunCapture, this, "CaptureWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
RTC_DCHECK(capture_worker_thread_.get());
capture_worker_thread_->Start();
RTC_DCHECK(capture_worker_thread_.empty());
capture_worker_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (CaptureWorkerThread()) {
}
},
"CaptureWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
OSStatus err = noErr;
if (_twoDevices) {
@ -1395,10 +1397,9 @@ int32_t AudioDeviceMac::StopRecording() {
// Setting this signal will allow the worker thread to be stopped.
AtomicSet32(&_captureDeviceIsAlive, 0);
if (capture_worker_thread_.get()) {
if (!capture_worker_thread_.empty()) {
mutex_.Unlock();
capture_worker_thread_->Stop();
capture_worker_thread_.reset();
capture_worker_thread_.Finalize();
mutex_.Lock();
}
@ -1444,11 +1445,14 @@ int32_t AudioDeviceMac::StartPlayout() {
return 0;
}
RTC_DCHECK(!render_worker_thread_.get());
render_worker_thread_.reset(new rtc::PlatformThread(
RunRender, this, "RenderWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)));
render_worker_thread_->Start();
RTC_DCHECK(render_worker_thread_.empty());
render_worker_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (RenderWorkerThread()) {
}
},
"RenderWorkerThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
if (_twoDevices || !_recording) {
OSStatus err = noErr;
@ -1506,10 +1510,9 @@ int32_t AudioDeviceMac::StopPlayout() {
// Setting this signal will allow the worker thread to be stopped.
AtomicSet32(&_renderDeviceIsAlive, 0);
if (render_worker_thread_.get()) {
if (!render_worker_thread_.empty()) {
mutex_.Unlock();
render_worker_thread_->Stop();
render_worker_thread_.reset();
render_worker_thread_.Finalize();
mutex_.Lock();
}
@ -2371,12 +2374,6 @@ OSStatus AudioDeviceMac::implInConverterProc(UInt32* numberDataPackets,
return 0;
}
void AudioDeviceMac::RunRender(void* ptrThis) {
AudioDeviceMac* device = static_cast<AudioDeviceMac*>(ptrThis);
while (device->RenderWorkerThread()) {
}
}
bool AudioDeviceMac::RenderWorkerThread() {
PaRingBufferSize numSamples =
ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame;
@ -2442,12 +2439,6 @@ bool AudioDeviceMac::RenderWorkerThread() {
return true;
}
void AudioDeviceMac::RunCapture(void* ptrThis) {
AudioDeviceMac* device = static_cast<AudioDeviceMac*>(ptrThis);
while (device->CaptureWorkerThread()) {
}
}
bool AudioDeviceMac::CaptureWorkerThread() {
OSStatus err = noErr;
UInt32 noRecSamples =

View file

@ -21,15 +21,12 @@
#include "modules/audio_device/mac/audio_mixer_manager_mac.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
struct PaUtilRingBuffer;
namespace rtc {
class PlatformThread;
} // namespace rtc
namespace webrtc {
const uint32_t N_REC_SAMPLES_PER_SEC = 48000;
@ -271,13 +268,11 @@ class AudioDeviceMac : public AudioDeviceGeneric {
rtc::Event _stopEventRec;
rtc::Event _stopEvent;
// TODO(pbos): Replace with direct members, just start/stop, no need to
// recreate the thread.
// Only valid/running between calls to StartRecording and StopRecording.
std::unique_ptr<rtc::PlatformThread> capture_worker_thread_;
rtc::PlatformThread capture_worker_thread_;
// Only valid/running between calls to StartPlayout and StopPlayout.
std::unique_ptr<rtc::PlatformThread> render_worker_thread_;
rtc::PlatformThread render_worker_thread_;
AudioMixerManagerMac _mixerManager;

View file

@ -119,11 +119,6 @@ const char* SessionDisconnectReasonToString(
}
}
void Run(void* obj) {
RTC_DCHECK(obj);
reinterpret_cast<CoreAudioBase*>(obj)->ThreadRun();
}
// Returns true if the selected audio device supports low latency, i.e, if it
// is possible to initialize the engine using periods less than the default
// period (10ms).
@ -553,24 +548,19 @@ bool CoreAudioBase::Start() {
// Audio thread should be alive during internal restart since the restart
// callback is triggered on that thread and it also makes the restart
// sequence less complex.
RTC_DCHECK(audio_thread_);
RTC_DCHECK(!audio_thread_.empty());
}
// Start an audio thread but only if one does not already exist (which is the
// case during restart).
if (!audio_thread_) {
audio_thread_ = std::make_unique<rtc::PlatformThread>(
Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority));
RTC_DCHECK(audio_thread_);
audio_thread_->Start();
if (!audio_thread_->IsRunning()) {
StopThread();
RTC_LOG(LS_ERROR) << "Failed to start audio thread";
return false;
}
RTC_DLOG(INFO) << "Started thread with name: " << audio_thread_->name()
<< " and id: " << audio_thread_->GetThreadRef();
if (audio_thread_.empty()) {
const absl::string_view name =
IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread";
audio_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] { ThreadRun(); }, name,
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime));
RTC_DLOG(INFO) << "Started thread with name: " << name
<< " and handle: " << *audio_thread_.GetHandle();
}
// Start streaming data between the endpoint buffer and the audio engine.
@ -697,14 +687,11 @@ bool CoreAudioBase::Restart() {
void CoreAudioBase::StopThread() {
RTC_DLOG(INFO) << __FUNCTION__;
RTC_DCHECK(!IsRestarting());
if (audio_thread_) {
if (audio_thread_->IsRunning()) {
RTC_DLOG(INFO) << "Sets stop_event...";
SetEvent(stop_event_.Get());
RTC_DLOG(INFO) << "PlatformThread::Stop...";
audio_thread_->Stop();
}
audio_thread_.reset();
if (!audio_thread_.empty()) {
RTC_DLOG(INFO) << "Sets stop_event...";
SetEvent(stop_event_.Get());
RTC_DLOG(INFO) << "PlatformThread::Finalize...";
audio_thread_.Finalize();
// Ensure that we don't quit the main thread loop immediately next
// time Start() is called.
@ -717,7 +704,7 @@ bool CoreAudioBase::HandleRestartEvent() {
RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction())
<< "]";
RTC_DCHECK_RUN_ON(&thread_checker_audio_);
RTC_DCHECK(audio_thread_);
RTC_DCHECK(!audio_thread_.empty());
RTC_DCHECK(IsRestarting());
// Let each client (input and/or output) take care of its own restart
// sequence since each side might need unique actions.

View file

@ -158,7 +158,7 @@ class CoreAudioBase : public IAudioSessionEvents {
// Set when restart process starts and cleared when restart stops
// successfully. Accessed atomically.
std::atomic<bool> is_restarting_;
std::unique_ptr<rtc::PlatformThread> audio_thread_;
rtc::PlatformThread audio_thread_;
Microsoft::WRL::ComPtr<IAudioSessionControl> audio_session_control_;
void StopThread();

View file

@ -387,33 +387,6 @@ class AudioProcessingImplLockTest
void SetUp() override;
void TearDown() override;
// Thread callback for the render thread
static void RenderProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->render_thread_state_.Process();
}
}
// Thread callback for the capture thread
static void CaptureProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->capture_thread_state_.Process();
}
}
// Thread callback for the stats thread
static void StatsProcessorThreadFunc(void* context) {
AudioProcessingImplLockTest* impl =
reinterpret_cast<AudioProcessingImplLockTest*>(context);
while (!impl->MaybeEndTest()) {
impl->stats_thread_state_.Process();
}
}
// Tests whether all the required render and capture side calls have been
// done.
bool TestDone() {
@ -423,9 +396,28 @@ class AudioProcessingImplLockTest
// Start the threads used in the test.
void StartThreads() {
render_thread_.Start();
capture_thread_.Start();
stats_thread_.Start();
const auto attributes =
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
render_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest())
render_thread_state_.Process();
},
"render", attributes);
capture_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest()) {
capture_thread_state_.Process();
}
},
"capture", attributes);
stats_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (!MaybeEndTest())
stats_thread_state_.Process();
},
"stats", attributes);
}
// Event handlers for the test.
@ -434,9 +426,6 @@ class AudioProcessingImplLockTest
rtc::Event capture_call_event_;
// Thread related variables.
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
rtc::PlatformThread stats_thread_;
mutable RandomGenerator rand_gen_;
std::unique_ptr<AudioProcessing> apm_;
@ -445,6 +434,9 @@ class AudioProcessingImplLockTest
RenderProcessor render_thread_state_;
CaptureProcessor capture_thread_state_;
StatsProcessor stats_thread_state_;
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
rtc::PlatformThread stats_thread_;
};
// Sleeps a random time between 0 and max_sleep milliseconds.
@ -485,22 +477,7 @@ void PopulateAudioFrame(float amplitude,
}
AudioProcessingImplLockTest::AudioProcessingImplLockTest()
: render_thread_(
RenderProcessorThreadFunc,
this,
"render",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
capture_thread_(
CaptureProcessorThreadFunc,
this,
"capture",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
stats_thread_(
StatsProcessorThreadFunc,
this,
"stats",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)),
apm_(AudioProcessingBuilderForTesting().Create()),
: apm_(AudioProcessingBuilderForTesting().Create()),
render_thread_state_(kMaxFrameSize,
&rand_gen_,
&render_call_event_,
@ -552,9 +529,6 @@ void AudioProcessingImplLockTest::SetUp() {
void AudioProcessingImplLockTest::TearDown() {
render_call_event_.Set();
capture_call_event_.Set();
render_thread_.Stop();
capture_thread_.Stop();
stats_thread_.Stop();
}
StatsProcessor::StatsProcessor(RandomGenerator* rand_gen,

View file

@ -391,17 +391,7 @@ class TimedThreadApiProcessor {
class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
public:
CallSimulator()
: render_thread_(new rtc::PlatformThread(
RenderProcessorThreadFunc,
this,
"render",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
capture_thread_(new rtc::PlatformThread(
CaptureProcessorThreadFunc,
this,
"capture",
rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))),
rand_gen_(42U),
: rand_gen_(42U),
simulation_config_(static_cast<SimulationConfig>(GetParam())) {}
// Run the call simulation with a timeout.
@ -436,13 +426,10 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
static const int kMinNumFramesToProcess = 150;
static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess;
// ::testing::TestWithParam<> implementation.
void TearDown() override { StopThreads(); }
// Stop all running threads.
void StopThreads() {
render_thread_->Stop();
capture_thread_->Stop();
render_thread_.Finalize();
capture_thread_.Finalize();
}
// Simulator and APM setup.
@ -533,32 +520,28 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels));
}
// Thread callback for the render thread.
static void RenderProcessorThreadFunc(void* context) {
CallSimulator* call_simulator = reinterpret_cast<CallSimulator*>(context);
while (call_simulator->render_thread_state_->Process()) {
}
}
// Thread callback for the capture thread.
static void CaptureProcessorThreadFunc(void* context) {
CallSimulator* call_simulator = reinterpret_cast<CallSimulator*>(context);
while (call_simulator->capture_thread_state_->Process()) {
}
}
// Start the threads used in the test.
void StartThreads() {
ASSERT_NO_FATAL_FAILURE(render_thread_->Start());
ASSERT_NO_FATAL_FAILURE(capture_thread_->Start());
const auto attributes =
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime);
render_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (render_thread_state_->Process()) {
}
},
"render", attributes);
capture_thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
while (capture_thread_state_->Process()) {
}
},
"capture", attributes);
}
// Event handler for the test.
rtc::Event test_complete_;
// Thread related variables.
std::unique_ptr<rtc::PlatformThread> render_thread_;
std::unique_ptr<rtc::PlatformThread> capture_thread_;
Random rand_gen_;
std::unique_ptr<AudioProcessing> apm_;
@ -567,6 +550,8 @@ class CallSimulator : public ::testing::TestWithParam<SimulationConfig> {
LockedFlag capture_call_checker_;
std::unique_ptr<TimedThreadApiProcessor> render_thread_state_;
std::unique_ptr<TimedThreadApiProcessor> capture_thread_state_;
rtc::PlatformThread render_thread_;
rtc::PlatformThread capture_thread_;
};
// Implements the callback functionality for the threads.

View file

@ -48,13 +48,12 @@ void TestScreenDrawerLock(
~Task() = default;
static void RunTask(void* me) {
Task* task = static_cast<Task*>(me);
std::unique_ptr<ScreenDrawerLock> lock = task->ctor_();
void RunTask() {
std::unique_ptr<ScreenDrawerLock> lock = ctor_();
ASSERT_TRUE(!!lock);
task->created_->store(true);
created_->store(true);
// Wait for the main thread to get the signal of created_.
while (!task->ready_.load()) {
while (!ready_.load()) {
SleepMs(1);
}
// At this point, main thread should begin to create a second lock. Though
@ -77,8 +76,8 @@ void TestScreenDrawerLock(
const rtc::FunctionView<std::unique_ptr<ScreenDrawerLock>()> ctor_;
} task(&created, ready, ctor);
rtc::PlatformThread lock_thread(&Task::RunTask, &task, "lock_thread");
lock_thread.Start();
auto lock_thread = rtc::PlatformThread::SpawnJoinable(
[&task] { task.RunTask(); }, "lock_thread");
// Wait for the first lock in Task::RunTask() to be created.
// TODO(zijiehe): Find a better solution to wait for the creation of the first
@ -95,7 +94,6 @@ void TestScreenDrawerLock(
ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms);
ctor();
ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms);
lock_thread.Stop();
}
} // namespace

View file

@ -48,7 +48,6 @@ ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
ProcessThreadImpl::~ProcessThreadImpl() {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get());
RTC_DCHECK(!stop_);
while (!delayed_tasks_.empty()) {
@ -72,8 +71,8 @@ void ProcessThreadImpl::Delete() {
// Doesn't need locking, because the contending thread isn't running.
void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get());
if (thread_.get())
RTC_DCHECK(thread_.empty());
if (!thread_.empty())
return;
RTC_DCHECK(!stop_);
@ -81,14 +80,18 @@ void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(this);
thread_.reset(
new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
thread_->Start();
thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
CurrentTaskQueueSetter set_current(this);
while (Process()) {
}
},
thread_name_);
}
void ProcessThreadImpl::Stop() {
RTC_DCHECK(thread_checker_.IsCurrent());
if (!thread_.get())
if (thread_.empty())
return;
{
@ -98,9 +101,7 @@ void ProcessThreadImpl::Stop() {
}
wake_up_.Set();
thread_->Stop();
thread_.reset();
thread_.Finalize();
StopNoLocks();
}
@ -108,7 +109,7 @@ void ProcessThreadImpl::Stop() {
// No locking needed, since this is called after the contending thread is
// stopped.
void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK(!thread_);
RTC_DCHECK(thread_.empty());
stop_ = false;
for (ModuleCallback& m : modules_)
@ -199,7 +200,7 @@ void ProcessThreadImpl::RegisterModule(Module* module,
// Now that we know the module isn't in the list, we'll call out to notify
// the module that it's attached to the worker thread. We don't hold
// the lock while we make this call.
if (thread_.get())
if (!thread_.empty())
module->ProcessThreadAttached(this);
{
@ -227,14 +228,6 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) {
module->ProcessThreadAttached(nullptr);
}
// static
void ProcessThreadImpl::Run(void* obj) {
ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj);
CurrentTaskQueueSetter set_current(impl);
while (impl->Process()) {
}
}
bool ProcessThreadImpl::Process() {
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
int64_t now = rtc::TimeMillis();

View file

@ -45,7 +45,6 @@ class ProcessThreadImpl : public ProcessThread {
void DeRegisterModule(Module* module) override;
protected:
static void Run(void* obj);
bool Process();
private:
@ -97,8 +96,7 @@ class ProcessThreadImpl : public ProcessThread {
SequenceChecker thread_checker_;
rtc::Event wake_up_;
// TODO(pbos): Remove unique_ptr and stop recreating the thread.
std::unique_ptr<rtc::PlatformThread> thread_;
rtc::PlatformThread thread_;
ModuleList modules_ RTC_GUARDED_BY(mutex_);
// Set to true when calling Process, to allow reentrant calls to WakeUp.

View file

@ -240,12 +240,15 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
}
// start capture thread;
if (!_captureThread) {
if (_captureThread.empty()) {
quit_ = false;
_captureThread.reset(new rtc::PlatformThread(
VideoCaptureModuleV4L2::CaptureThread, this, "CaptureThread",
rtc::ThreadAttributes().SetPriority(rtc::kHighPriority)));
_captureThread->Start();
_captureThread = rtc::PlatformThread::SpawnJoinable(
[this] {
while (CaptureProcess()) {
}
},
"CaptureThread",
rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kHigh));
}
// Needed to start UVC camera - from the uvcview application
@ -261,14 +264,13 @@ int32_t VideoCaptureModuleV4L2::StartCapture(
}
int32_t VideoCaptureModuleV4L2::StopCapture() {
if (_captureThread) {
if (!_captureThread.empty()) {
{
MutexLock lock(&capture_lock_);
quit_ = true;
}
// Make sure the capture thread stop stop using the critsect.
_captureThread->Stop();
_captureThread.reset();
// Make sure the capture thread stops using the mutex.
_captureThread.Finalize();
}
MutexLock lock(&capture_lock_);
@ -356,11 +358,6 @@ bool VideoCaptureModuleV4L2::CaptureStarted() {
return _captureStarted;
}
void VideoCaptureModuleV4L2::CaptureThread(void* obj) {
VideoCaptureModuleV4L2* capture = static_cast<VideoCaptureModuleV4L2*>(obj);
while (capture->CaptureProcess()) {
}
}
bool VideoCaptureModuleV4L2::CaptureProcess() {
int retVal = 0;
fd_set rSet;

View file

@ -41,8 +41,7 @@ class VideoCaptureModuleV4L2 : public VideoCaptureImpl {
bool AllocateVideoBuffers();
bool DeAllocateVideoBuffers();
// TODO(pbos): Stop using unique_ptr and resetting the thread.
std::unique_ptr<rtc::PlatformThread> _captureThread;
rtc::PlatformThread _captureThread;
Mutex capture_lock_;
bool quit_ RTC_GUARDED_BY(capture_lock_);
int32_t _deviceId;

View file

@ -245,6 +245,7 @@ rtc_library("platform_thread") {
absl_deps = [
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
}
@ -561,7 +562,10 @@ if (is_win) {
"../api/task_queue",
"synchronization:mutex",
]
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
absl_deps = [
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
}
}
@ -1413,6 +1417,7 @@ if (rtc_include_tests) {
absl_deps = [
"//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
]
}

View file

@ -123,7 +123,7 @@ void AsyncResolver::Start(const SocketAddress& addr) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK(!destroy_called_);
addr_ = addr;
auto thread_function =
PlatformThread::SpawnDetached(
[this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(),
state = state_] {
std::vector<IPAddress> addresses;
@ -146,14 +146,8 @@ void AsyncResolver::Start(const SocketAddress& addr) {
}
}));
}
};
PlatformThread thread(RunResolution,
new std::function<void()>(std::move(thread_function)),
"NameResolution", ThreadAttributes().SetDetached());
thread.Start();
// Although |thread| is detached, the PlatformThread contract mandates to call
// Stop() before destruction. The call doesn't actually stop anything.
thread.Stop();
},
"AsyncResolver");
}
bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const {

View file

@ -30,8 +30,7 @@ const int kProcessingTimeMillisecs = 500;
const int kWorkingThreads = 2;
// Consumes approximately kProcessingTimeMillisecs of CPU time in single thread.
void WorkingFunction(void* counter_pointer) {
int64_t* counter = reinterpret_cast<int64_t*>(counter_pointer);
void WorkingFunction(int64_t* counter) {
*counter = 0;
int64_t stop_cpu_time =
rtc::GetThreadCpuTimeNanos() +
@ -62,14 +61,12 @@ TEST(CpuTimeTest, MAYBE_TEST(TwoThreads)) {
int64_t thread_start_time_nanos = GetThreadCpuTimeNanos();
int64_t counter1;
int64_t counter2;
PlatformThread thread1(WorkingFunction, reinterpret_cast<void*>(&counter1),
"Thread1");
PlatformThread thread2(WorkingFunction, reinterpret_cast<void*>(&counter2),
"Thread2");
thread1.Start();
thread2.Start();
thread1.Stop();
thread2.Stop();
auto thread1 = PlatformThread::SpawnJoinable(
[&counter1] { WorkingFunction(&counter1); }, "Thread1");
auto thread2 = PlatformThread::SpawnJoinable(
[&counter2] { WorkingFunction(&counter2); }, "Thread2");
thread1.Finalize();
thread2.Finalize();
EXPECT_GE(counter1, 0);
EXPECT_GE(counter2, 0);

View file

@ -329,33 +329,28 @@ class PerfTestData {
class PerfTestThread {
public:
PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {}
void Start(PerfTestData* data, int repeats, int id) {
RTC_DCHECK(!thread_.IsRunning());
RTC_DCHECK(!data_);
data_ = data;
repeats_ = repeats;
my_id_ = id;
thread_.Start();
thread_ = PlatformThread::SpawnJoinable(
[this] {
for (int i = 0; i < repeats_; ++i)
data_->AddToCounter(my_id_);
},
"CsPerf");
}
void Stop() {
RTC_DCHECK(thread_.IsRunning());
RTC_DCHECK(data_);
thread_.Stop();
thread_.Finalize();
repeats_ = 0;
data_ = nullptr;
my_id_ = 0;
}
private:
static void ThreadFunc(void* param) {
PerfTestThread* me = static_cast<PerfTestThread*>(param);
for (int i = 0; i < me->repeats_; ++i)
me->data_->AddToCounter(me->my_id_);
}
PlatformThread thread_;
PerfTestData* data_ = nullptr;
int repeats_ = 0;

View file

@ -79,19 +79,12 @@ namespace rtc {
namespace tracing {
namespace {
static void EventTracingThreadFunc(void* params);
// Atomic-int fast path for avoiding logging when disabled.
static volatile int g_event_logging_active = 0;
// TODO(pbos): Log metadata for all threads, etc.
class EventLogger final {
public:
EventLogger()
: logging_thread_(EventTracingThreadFunc,
this,
"EventTracingThread",
ThreadAttributes().SetPriority(kLowPriority)) {}
~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); }
void AddTraceEvent(const char* name,
@ -209,7 +202,8 @@ class EventLogger final {
rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1));
// Finally start, everything should be set up now.
logging_thread_.Start();
logging_thread_ =
PlatformThread::SpawnJoinable([this] { Log(); }, "EventTracingThread");
TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start");
}
@ -223,7 +217,7 @@ class EventLogger final {
// Wake up logging thread to finish writing.
shutdown_event_.Set();
// Join the logging thread.
logging_thread_.Stop();
logging_thread_.Finalize();
}
private:
@ -326,10 +320,6 @@ class EventLogger final {
bool output_file_owned_ = false;
};
static void EventTracingThreadFunc(void* params) {
static_cast<EventLogger*>(params)->Log();
}
static EventLogger* volatile g_event_logger = nullptr;
static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT("");
const unsigned char* InternalGetCategoryEnabled(const char* name) {

View file

@ -43,22 +43,21 @@ TEST(EventTest, AutoReset) {
class SignalerThread {
public:
SignalerThread() : thread_(&ThreadFn, this, "EventPerf") {}
void Start(Event* writer, Event* reader) {
writer_ = writer;
reader_ = reader;
thread_.Start();
thread_ = PlatformThread::SpawnJoinable(
[this] {
while (!stop_event_.Wait(0)) {
writer_->Set();
reader_->Wait(Event::kForever);
}
},
"EventPerf");
}
void Stop() {
stop_event_.Set();
thread_.Stop();
}
static void ThreadFn(void* param) {
auto* me = static_cast<SignalerThread*>(param);
while (!me->stop_event_.Wait(0)) {
me->writer_->Set();
me->reader_->Wait(Event::kForever);
}
thread_.Finalize();
}
Event stop_event_;
Event* writer_;

View file

@ -160,18 +160,13 @@ TEST(LogTest, MultipleStreams) {
class LogThread {
public:
LogThread() : thread_(&ThreadEntry, this, "LogThread") {}
~LogThread() { thread_.Stop(); }
void Start() { thread_.Start(); }
void Start() {
thread_ = PlatformThread::SpawnJoinable(
[] { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }, "LogThread");
}
private:
void Run() { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }
static void ThreadEntry(void* p) { static_cast<LogThread*>(p)->Run(); }
PlatformThread thread_;
Event event_;
};
// Ensure we don't crash when adding/removing streams while threads are going.

View file

@ -10,32 +10,37 @@
#include "rtc_base/platform_thread.h"
#include <algorithm>
#include <memory>
#if !defined(WEBRTC_WIN)
#include <sched.h>
#endif
#include <stdint.h>
#include <time.h>
#include <algorithm>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
namespace rtc {
namespace {
struct ThreadStartData {
ThreadRunFunction run_function;
void* obj;
std::string thread_name;
ThreadPriority priority;
};
#if defined(WEBRTC_WIN)
int Win32PriorityFromThreadPriority(ThreadPriority priority) {
switch (priority) {
case ThreadPriority::kLow:
return THREAD_PRIORITY_BELOW_NORMAL;
case ThreadPriority::kNormal:
return THREAD_PRIORITY_NORMAL;
case ThreadPriority::kHigh:
return THREAD_PRIORITY_ABOVE_NORMAL;
case ThreadPriority::kRealtime:
return THREAD_PRIORITY_TIME_CRITICAL;
}
}
#endif
bool SetPriority(ThreadPriority priority) {
#if defined(WEBRTC_WIN)
return SetThreadPriority(GetCurrentThread(), priority) != FALSE;
return SetThreadPriority(GetCurrentThread(),
Win32PriorityFromThreadPriority(priority)) != FALSE;
#elif defined(__native_client__) || defined(WEBRTC_FUCHSIA)
// Setting thread priorities is not supported in NaCl or Fuchsia.
return true;
@ -59,21 +64,18 @@ bool SetPriority(ThreadPriority priority) {
const int top_prio = max_prio - 1;
const int low_prio = min_prio + 1;
switch (priority) {
case kLowPriority:
case ThreadPriority::kLow:
param.sched_priority = low_prio;
break;
case kNormalPriority:
case ThreadPriority::kNormal:
// The -1 ensures that the kHighPriority is always greater or equal to
// kNormalPriority.
param.sched_priority = (low_prio + top_prio - 1) / 2;
break;
case kHighPriority:
case ThreadPriority::kHigh:
param.sched_priority = std::max(top_prio - 2, low_prio);
break;
case kHighestPriority:
param.sched_priority = std::max(top_prio - 1, low_prio);
break;
case kRealtimePriority:
case ThreadPriority::kRealtime:
param.sched_priority = top_prio;
break;
}
@ -81,124 +83,129 @@ bool SetPriority(ThreadPriority priority) {
#endif // defined(WEBRTC_WIN)
}
void RunPlatformThread(std::unique_ptr<ThreadStartData> data) {
rtc::SetCurrentThreadName(data->thread_name.c_str());
data->thread_name.clear();
SetPriority(data->priority);
data->run_function(data->obj);
}
#if defined(WEBRTC_WIN)
DWORD WINAPI StartThread(void* param) {
DWORD WINAPI RunPlatformThread(void* param) {
// The GetLastError() function only returns valid results when it is called
// after a Win32 API function that returns a "failed" result. A crash dump
// contains the result from GetLastError() and to make sure it does not
// falsely report a Windows error we call SetLastError here.
::SetLastError(ERROR_SUCCESS);
RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param)));
auto function = static_cast<std::function<void()>*>(param);
(*function)();
delete function;
return 0;
}
#else
void* StartThread(void* param) {
RunPlatformThread(absl::WrapUnique(static_cast<ThreadStartData*>(param)));
void* RunPlatformThread(void* param) {
auto function = static_cast<std::function<void()>*>(param);
(*function)();
delete function;
return 0;
}
#endif // defined(WEBRTC_WIN)
} // namespace
PlatformThread::PlatformThread(ThreadRunFunction func,
void* obj,
absl::string_view thread_name,
ThreadAttributes attributes)
: run_function_(func),
attributes_(attributes),
obj_(obj),
name_(thread_name) {
RTC_DCHECK(func);
RTC_DCHECK(!name_.empty());
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
RTC_DCHECK(name_.length() < 64);
PlatformThread::PlatformThread(Handle handle, bool joinable)
: handle_(handle), joinable_(joinable) {}
PlatformThread::PlatformThread(PlatformThread&& rhs)
: handle_(rhs.handle_), joinable_(rhs.joinable_) {
rhs.handle_ = absl::nullopt;
}
PlatformThread& PlatformThread::operator=(PlatformThread&& rhs) {
Finalize();
handle_ = rhs.handle_;
joinable_ = rhs.joinable_;
rhs.handle_ = absl::nullopt;
return *this;
}
PlatformThread::~PlatformThread() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(!thread_);
#if defined(WEBRTC_WIN)
RTC_DCHECK(!thread_id_);
#endif // defined(WEBRTC_WIN)
Finalize();
}
void PlatformThread::Start() {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(!thread_) << "Thread already started?";
ThreadStartData* data =
new ThreadStartData{run_function_, obj_, name_, attributes_.priority};
PlatformThread PlatformThread::SpawnJoinable(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes) {
return SpawnThread(std::move(thread_function), name, attributes,
/*joinable=*/true);
}
PlatformThread PlatformThread::SpawnDetached(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes) {
return SpawnThread(std::move(thread_function), name, attributes,
/*joinable=*/false);
}
absl::optional<PlatformThread::Handle> PlatformThread::GetHandle() const {
return handle_;
}
#if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) {
RTC_DCHECK(handle_.has_value());
return handle_.has_value() ? QueueUserAPC(function, *handle_, data) != FALSE
: false;
}
#endif
void PlatformThread::Finalize() {
if (!handle_.has_value())
return;
#if defined(WEBRTC_WIN)
if (joinable_)
WaitForSingleObject(*handle_, INFINITE);
CloseHandle(*handle_);
#else
if (joinable_)
RTC_CHECK_EQ(0, pthread_join(*handle_, nullptr));
#endif
handle_ = absl::nullopt;
}
PlatformThread PlatformThread::SpawnThread(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes,
bool joinable) {
RTC_DCHECK(thread_function);
RTC_DCHECK(!name.empty());
// TODO(tommi): Consider lowering the limit to 15 (limit on Linux).
RTC_DCHECK(name.length() < 64);
auto start_thread_function_ptr =
new std::function<void()>([thread_function = std::move(thread_function),
name = std::string(name), attributes] {
rtc::SetCurrentThreadName(name.c_str());
SetPriority(attributes.priority);
thread_function();
});
#if defined(WEBRTC_WIN)
// See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION.
// Set the reserved stack stack size to 1M, which is the default on Windows
// and Linux.
thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, data,
STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_);
RTC_CHECK(thread_) << "CreateThread failed";
RTC_DCHECK(thread_id_);
DWORD thread_id = 0;
PlatformThread::Handle handle = ::CreateThread(
nullptr, 1024 * 1024, &RunPlatformThread, start_thread_function_ptr,
STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id);
RTC_CHECK(handle) << "CreateThread failed";
#else
pthread_attr_t attr;
pthread_attr_init(&attr);
// Set the stack stack size to 1M.
pthread_attr_setstacksize(&attr, 1024 * 1024);
pthread_attr_setdetachstate(&attr, attributes_.joinable
? PTHREAD_CREATE_JOINABLE
: PTHREAD_CREATE_DETACHED);
RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, data));
pthread_attr_setdetachstate(
&attr, joinable ? PTHREAD_CREATE_JOINABLE : PTHREAD_CREATE_DETACHED);
PlatformThread::Handle handle;
RTC_CHECK_EQ(0, pthread_create(&handle, &attr, &RunPlatformThread,
start_thread_function_ptr));
pthread_attr_destroy(&attr);
#endif // defined(WEBRTC_WIN)
return PlatformThread(handle, joinable);
}
bool PlatformThread::IsRunning() const {
RTC_DCHECK_RUN_ON(&thread_checker_);
#if defined(WEBRTC_WIN)
return thread_ != nullptr;
#else
return thread_ != 0;
#endif // defined(WEBRTC_WIN)
}
PlatformThreadRef PlatformThread::GetThreadRef() const {
#if defined(WEBRTC_WIN)
return thread_id_;
#else
return thread_;
#endif // defined(WEBRTC_WIN)
}
void PlatformThread::Stop() {
RTC_DCHECK_RUN_ON(&thread_checker_);
if (!IsRunning())
return;
#if defined(WEBRTC_WIN)
if (attributes_.joinable) {
WaitForSingleObject(thread_, INFINITE);
}
CloseHandle(thread_);
thread_ = nullptr;
thread_id_ = 0;
#else
if (attributes_.joinable) {
RTC_CHECK_EQ(0, pthread_join(thread_, nullptr));
}
thread_ = 0;
#endif // defined(WEBRTC_WIN)
}
#if defined(WEBRTC_WIN)
bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) {
RTC_DCHECK_RUN_ON(&thread_checker_);
RTC_DCHECK(IsRunning());
return QueueUserAPC(function, thread_, data) != FALSE;
}
#endif
} // namespace rtc

View file

@ -11,103 +11,101 @@
#ifndef RTC_BASE_PLATFORM_THREAD_H_
#define RTC_BASE_PLATFORM_THREAD_H_
#ifndef WEBRTC_WIN
#include <pthread.h>
#endif
#include <functional>
#include <string>
#include "absl/strings/string_view.h"
#include "api/sequence_checker.h"
#include "rtc_base/constructor_magic.h"
#include "absl/types/optional.h"
#include "rtc_base/platform_thread_types.h"
namespace rtc {
// Callback function that the spawned thread will enter once spawned.
typedef void (*ThreadRunFunction)(void*);
enum ThreadPriority {
#ifdef WEBRTC_WIN
kLowPriority = THREAD_PRIORITY_BELOW_NORMAL,
kNormalPriority = THREAD_PRIORITY_NORMAL,
kHighPriority = THREAD_PRIORITY_ABOVE_NORMAL,
kHighestPriority = THREAD_PRIORITY_HIGHEST,
kRealtimePriority = THREAD_PRIORITY_TIME_CRITICAL
#else
kLowPriority = 1,
kNormalPriority = 2,
kHighPriority = 3,
kHighestPriority = 4,
kRealtimePriority = 5
#endif
enum class ThreadPriority {
kLow = 1,
kNormal,
kHigh,
kRealtime,
};
struct ThreadAttributes {
ThreadPriority priority = kNormalPriority;
bool joinable = true;
ThreadPriority priority = ThreadPriority::kNormal;
ThreadAttributes& SetPriority(ThreadPriority priority_param) {
priority = priority_param;
return *this;
}
ThreadAttributes& SetDetached() {
joinable = false;
return *this;
}
};
// Represents a simple worker thread. The implementation must be assumed
// to be single threaded, meaning that all methods of the class, must be
// called from the same thread, including instantiation.
class PlatformThread {
// Represents a simple worker thread.
class PlatformThread final {
public:
PlatformThread(ThreadRunFunction func,
void* obj,
absl::string_view thread_name,
ThreadAttributes attributes = ThreadAttributes());
// Handle is the base platform thread handle.
#if defined(WEBRTC_WIN)
using Handle = HANDLE;
#else
using Handle = pthread_t;
#endif // defined(WEBRTC_WIN)
// This ctor creates the PlatformThread with an unset handle (returning true
// in empty()) and is provided for convenience.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread() = default;
// Moves |rhs| into this, storing an empty state in |rhs|.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread(PlatformThread&& rhs);
// Moves |rhs| into this, storing an empty state in |rhs|.
// TODO(bugs.webrtc.org/12727) Look into if default and move support can be
// removed.
PlatformThread& operator=(PlatformThread&& rhs);
// For a PlatformThread that's been spawned joinable, the destructor suspends
// the calling thread until the created thread exits unless the thread has
// already exited.
virtual ~PlatformThread();
const std::string& name() const { return name_; }
// Finalizes any allocated resources.
// For a PlatformThread that's been spawned joinable, Finalize() suspends
// the calling thread until the created thread exits unless the thread has
// already exited.
// empty() returns true after completion.
void Finalize();
// Spawns a thread and tries to set thread priority according to the priority
// from when CreateThread was called.
// Start can only be called after the constructor or after a call to Stop().
void Start();
// Returns true if default constructed, moved from, or Finalize()ed.
bool empty() const { return !handle_.has_value(); }
bool IsRunning() const;
// Creates a started joinable thread which will be joined when the returned
// PlatformThread destructs or Finalize() is called.
static PlatformThread SpawnJoinable(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes = ThreadAttributes());
// Returns an identifier for the worker thread that can be used to do
// thread checks.
PlatformThreadRef GetThreadRef() const;
// Creates a started detached thread. The caller has to use external
// synchronization as nothing is provided by the PlatformThread construct.
static PlatformThread SpawnDetached(
std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes = ThreadAttributes());
// Stop() prepares the PlatformThread for destruction or another call to
// Start(). For a PlatformThread that's been created with
// ThreadAttributes::joinable true (the default), Stop() suspends the calling
// thread until the created thread exits unless the thread has already exited.
// Stop() can only be called after calling Start().
void Stop();
// Returns the base platform thread handle of this thread.
absl::optional<Handle> GetHandle() const;
protected:
#if defined(WEBRTC_WIN)
// Exposed to derived classes to allow for special cases specific to Windows.
// Queue a Windows APC function that runs when the thread is alertable.
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data);
#endif
private:
ThreadRunFunction const run_function_ = nullptr;
const ThreadAttributes attributes_;
void* const obj_;
// TODO(pbos): Make sure call sites use string literals and update to a const
// char* instead of a std::string.
const std::string name_;
webrtc::SequenceChecker thread_checker_;
#if defined(WEBRTC_WIN)
HANDLE thread_ = nullptr;
DWORD thread_id_ = 0;
#else
pthread_t thread_ = 0;
#endif // defined(WEBRTC_WIN)
RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread);
PlatformThread(Handle handle, bool joinable);
static PlatformThread SpawnThread(std::function<void()> thread_function,
absl::string_view name,
ThreadAttributes attributes,
bool joinable);
absl::optional<Handle> handle_;
bool joinable_ = false;
};
} // namespace rtc

View file

@ -10,69 +10,73 @@
#include "rtc_base/platform_thread.h"
#include "absl/types/optional.h"
#include "rtc_base/event.h"
#include "system_wrappers/include/sleep.h"
#include "test/gmock.h"
namespace rtc {
namespace {
void NullRunFunction(void* obj) {}
// Function that sets a boolean.
void SetFlagRunFunction(void* obj) {
bool* obj_as_bool = static_cast<bool*>(obj);
*obj_as_bool = true;
TEST(PlatformThreadTest, DefaultConstructedIsEmpty) {
PlatformThread thread;
EXPECT_EQ(thread.GetHandle(), absl::nullopt);
EXPECT_TRUE(thread.empty());
}
void StdFunctionRunFunction(void* obj) {
std::function<void()>* fun = static_cast<std::function<void()>*>(obj);
(*fun)();
TEST(PlatformThreadTest, StartFinalize) {
PlatformThread thread = PlatformThread::SpawnJoinable([] {}, "1");
EXPECT_NE(thread.GetHandle(), absl::nullopt);
EXPECT_FALSE(thread.empty());
thread.Finalize();
EXPECT_TRUE(thread.empty());
thread = PlatformThread::SpawnDetached([] {}, "2");
EXPECT_FALSE(thread.empty());
thread.Finalize();
EXPECT_TRUE(thread.empty());
}
} // namespace
TEST(PlatformThreadTest, StartStop) {
PlatformThread thread(&NullRunFunction, nullptr, "PlatformThreadTest");
EXPECT_TRUE(thread.name() == "PlatformThreadTest");
EXPECT_TRUE(thread.GetThreadRef() == 0);
thread.Start();
EXPECT_TRUE(thread.GetThreadRef() != 0);
thread.Stop();
EXPECT_TRUE(thread.GetThreadRef() == 0);
TEST(PlatformThreadTest, MovesEmpty) {
PlatformThread thread1;
PlatformThread thread2 = std::move(thread1);
EXPECT_TRUE(thread1.empty());
EXPECT_TRUE(thread2.empty());
}
TEST(PlatformThreadTest, StartStop2) {
PlatformThread thread1(&NullRunFunction, nullptr, "PlatformThreadTest1");
PlatformThread thread2(&NullRunFunction, nullptr, "PlatformThreadTest2");
EXPECT_TRUE(thread1.GetThreadRef() == thread2.GetThreadRef());
thread1.Start();
thread2.Start();
EXPECT_TRUE(thread1.GetThreadRef() != thread2.GetThreadRef());
thread2.Stop();
thread1.Stop();
TEST(PlatformThreadTest, MovesHandles) {
PlatformThread thread1 = PlatformThread::SpawnJoinable([] {}, "1");
PlatformThread thread2 = std::move(thread1);
EXPECT_TRUE(thread1.empty());
EXPECT_FALSE(thread2.empty());
thread1 = PlatformThread::SpawnDetached([] {}, "2");
thread2 = std::move(thread1);
EXPECT_TRUE(thread1.empty());
EXPECT_FALSE(thread2.empty());
}
TEST(PlatformThreadTest,
TwoThreadHandlesAreDifferentWhenStartedAndEqualWhenJoined) {
PlatformThread thread1 = PlatformThread();
PlatformThread thread2 = PlatformThread();
EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle());
thread1 = PlatformThread::SpawnJoinable([] {}, "1");
thread2 = PlatformThread::SpawnJoinable([] {}, "2");
EXPECT_NE(thread1.GetHandle(), thread2.GetHandle());
thread1.Finalize();
EXPECT_NE(thread1.GetHandle(), thread2.GetHandle());
thread2.Finalize();
EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle());
}
TEST(PlatformThreadTest, RunFunctionIsCalled) {
bool flag = false;
PlatformThread thread(&SetFlagRunFunction, &flag, "RunFunctionIsCalled");
thread.Start();
// At this point, the flag may be either true or false.
thread.Stop();
// We expect the thread to have run at least once.
PlatformThread::SpawnJoinable([&] { flag = true; }, "T");
EXPECT_TRUE(flag);
}
TEST(PlatformThreadTest, JoinsThread) {
// This test flakes if there are problems with the join implementation.
EXPECT_TRUE(ThreadAttributes().joinable);
rtc::Event event;
std::function<void()> thread_function = [&] { event.Set(); };
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T");
thread.Start();
thread.Stop();
PlatformThread::SpawnJoinable([&] { event.Set(); }, "T");
EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0));
}
@ -83,18 +87,14 @@ TEST(PlatformThreadTest, StopsBeforeDetachedThreadExits) {
rtc::Event thread_started;
rtc::Event thread_continue;
rtc::Event thread_exiting;
std::function<void()> thread_function = [&] {
thread_started.Set();
thread_continue.Wait(Event::kForever);
flag = true;
thread_exiting.Set();
};
{
PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T",
ThreadAttributes().SetDetached());
thread.Start();
thread.Stop();
}
PlatformThread::SpawnDetached(
[&] {
thread_started.Set();
thread_continue.Wait(Event::kForever);
flag = true;
thread_exiting.Set();
},
"T");
thread_started.Wait(Event::kForever);
EXPECT_FALSE(flag);
thread_continue.Set();

View file

@ -127,10 +127,6 @@ class ThreadTask {
rtc::Event end_signal_;
};
void RunTask(void* thread_task) {
reinterpret_cast<ThreadTask*>(thread_task)->Run();
}
TEST_F(RateLimitTest, MultiThreadedUsage) {
// Simple sanity test, with different threads calling the various methods.
// Runs a few simple tasks, each on its own thread, but coordinated with
@ -149,8 +145,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2));
}
} set_window_size_task(rate_limiter.get());
rtc::PlatformThread thread1(RunTask, &set_window_size_task, "Thread1");
thread1.Start();
auto thread1 = rtc::PlatformThread::SpawnJoinable(
[&set_window_size_task] { set_window_size_task.Run(); }, "Thread1");
class SetMaxRateTask : public ThreadTask {
public:
@ -160,8 +156,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); }
} set_max_rate_task(rate_limiter.get());
rtc::PlatformThread thread2(RunTask, &set_max_rate_task, "Thread2");
thread2.Start();
auto thread2 = rtc::PlatformThread::SpawnJoinable(
[&set_max_rate_task] { set_max_rate_task.Run(); }, "Thread2");
class UseRateTask : public ThreadTask {
public:
@ -177,8 +173,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
SimulatedClock* const clock_;
} use_rate_task(rate_limiter.get(), &clock_);
rtc::PlatformThread thread3(RunTask, &use_rate_task, "Thread3");
thread3.Start();
auto thread3 = rtc::PlatformThread::SpawnJoinable(
[&use_rate_task] { use_rate_task.Run(); }, "Thread3");
set_window_size_task.start_signal_.Set();
EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs));
@ -191,10 +187,6 @@ TEST_F(RateLimitTest, MultiThreadedUsage) {
// All rate consumed.
EXPECT_FALSE(rate_limiter->TryUseRate(1));
thread1.Stop();
thread2.Stop();
thread3.Stop();
}
} // namespace webrtc

View file

@ -93,16 +93,12 @@ void EventAssign(struct event* ev,
rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
switch (priority) {
case Priority::HIGH:
return rtc::kRealtimePriority;
return rtc::ThreadPriority::kRealtime;
case Priority::LOW:
return rtc::kLowPriority;
return rtc::ThreadPriority::kLow;
case Priority::NORMAL:
return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
break;
return rtc::ThreadPriority::kNormal;
}
return rtc::kNormalPriority;
}
class TaskQueueLibevent final : public TaskQueueBase {
@ -120,7 +116,6 @@ class TaskQueueLibevent final : public TaskQueueBase {
~TaskQueueLibevent() override = default;
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
@ -172,11 +167,7 @@ class TaskQueueLibevent::SetTimerTask : public QueuedTask {
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
rtc::ThreadPriority priority)
: event_base_(event_base_new()),
thread_(&TaskQueueLibevent::ThreadMain,
this,
queue_name,
rtc::ThreadAttributes().SetPriority(priority)) {
: event_base_(event_base_new()) {
int fds[2];
RTC_CHECK(pipe(fds) == 0);
SetNonBlocking(fds[0]);
@ -187,7 +178,18 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this);
event_add(&wakeup_event_, 0);
thread_.Start();
thread_ = rtc::PlatformThread::SpawnJoinable(
[this] {
{
CurrentTaskQueueSetter set_current(this);
while (is_active_)
event_base_loop(event_base_, 0);
}
for (TimerEvent* timer : pending_timers_)
delete timer;
},
queue_name, rtc::ThreadAttributes().SetPriority(priority));
}
void TaskQueueLibevent::Delete() {
@ -202,7 +204,7 @@ void TaskQueueLibevent::Delete() {
nanosleep(&ts, nullptr);
}
thread_.Stop();
thread_.Finalize();
event_del(&wakeup_event_);
@ -255,20 +257,6 @@ void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
}
}
// static
void TaskQueueLibevent::ThreadMain(void* context) {
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
{
CurrentTaskQueueSetter set_current(me);
while (me->is_active_)
event_base_loop(me->event_base_, 0);
}
for (TimerEvent* timer : me->pending_timers_)
delete timer;
}
// static
void TaskQueueLibevent::OnWakeup(int socket,
short flags, // NOLINT

View file

@ -36,14 +36,11 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
TaskQueueFactory::Priority priority) {
switch (priority) {
case TaskQueueFactory::Priority::HIGH:
return rtc::kRealtimePriority;
return rtc::ThreadPriority::kRealtime;
case TaskQueueFactory::Priority::LOW:
return rtc::kLowPriority;
return rtc::ThreadPriority::kLow;
case TaskQueueFactory::Priority::NORMAL:
return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
return rtc::kNormalPriority;
return rtc::ThreadPriority::kNormal;
}
}
@ -78,8 +75,6 @@ class TaskQueueStdlib final : public TaskQueueBase {
NextTask GetNextTask();
static void ThreadMain(void* context);
void ProcessTasks();
void NotifyWake();
@ -126,11 +121,13 @@ TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,
: started_(/*manual_reset=*/false, /*initially_signaled=*/false),
stopped_(/*manual_reset=*/false, /*initially_signaled=*/false),
flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
thread_(&TaskQueueStdlib::ThreadMain,
this,
queue_name,
rtc::ThreadAttributes().SetPriority(priority)) {
thread_.Start();
thread_(rtc::PlatformThread::SpawnJoinable(
[this] {
CurrentTaskQueueSetter set_current(this);
ProcessTasks();
},
queue_name,
rtc::ThreadAttributes().SetPriority(priority))) {
started_.Wait(rtc::Event::kForever);
}
@ -145,7 +142,7 @@ void TaskQueueStdlib::Delete() {
NotifyWake();
stopped_.Wait(rtc::Event::kForever);
thread_.Stop();
thread_.Finalize();
delete this;
}
@ -222,13 +219,6 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
return result;
}
// static
void TaskQueueStdlib::ThreadMain(void* context) {
TaskQueueStdlib* me = static_cast<TaskQueueStdlib*>(context);
CurrentTaskQueueSetter set_current(me);
me->ProcessTasks();
}
void TaskQueueStdlib::ProcessTasks() {
started_.Set();

View file

@ -29,6 +29,7 @@
#include <utility>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/arraysize.h"
@ -56,16 +57,12 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
TaskQueueFactory::Priority priority) {
switch (priority) {
case TaskQueueFactory::Priority::HIGH:
return rtc::kRealtimePriority;
return rtc::ThreadPriority::kRealtime;
case TaskQueueFactory::Priority::LOW:
return rtc::kLowPriority;
return rtc::ThreadPriority::kLow;
case TaskQueueFactory::Priority::NORMAL:
return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
break;
return rtc::ThreadPriority::kNormal;
}
return rtc::kNormalPriority;
}
int64_t GetTick() {
@ -167,24 +164,6 @@ class TaskQueueWin : public TaskQueueBase {
void RunPendingTasks();
private:
static void ThreadMain(void* context);
class WorkerThread : public rtc::PlatformThread {
public:
WorkerThread(rtc::ThreadRunFunction func,
void* obj,
absl::string_view thread_name,
rtc::ThreadPriority priority)
: PlatformThread(func,
obj,
thread_name,
rtc::ThreadAttributes().SetPriority(priority)) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return rtc::PlatformThread::QueueAPC(apc_function, data);
}
};
void RunThreadMain();
bool ProcessQueuedMessages();
void RunDueTasks();
@ -207,7 +186,7 @@ class TaskQueueWin : public TaskQueueBase {
greater<DelayedTaskInfo>>
timer_tasks_;
UINT_PTR timer_id_ = 0;
WorkerThread thread_;
rtc::PlatformThread thread_;
Mutex pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_
RTC_GUARDED_BY(pending_lock_);
@ -216,10 +195,12 @@ class TaskQueueWin : public TaskQueueBase {
TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
rtc::ThreadPriority priority)
: thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority),
in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
: in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
RTC_DCHECK(in_queue_);
thread_.Start();
thread_ = rtc::PlatformThread::SpawnJoinable(
[this] { RunThreadMain(); }, queue_name,
rtc::ThreadAttributes().SetPriority(priority));
rtc::Event event(false, false);
RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
reinterpret_cast<ULONG_PTR>(&event)));
@ -228,11 +209,13 @@ TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
void TaskQueueWin::Delete() {
RTC_DCHECK(!IsCurrent());
while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
RTC_CHECK(thread_.GetHandle() != absl::nullopt);
while (
!::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) {
RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
Sleep(1);
}
thread_.Stop();
thread_.Finalize();
::CloseHandle(in_queue_);
delete this;
}
@ -255,7 +238,9 @@ void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
// and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
// task pointer and timestamp as LPARAM and WPARAM.
auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0,
RTC_CHECK(thread_.GetHandle() != absl::nullopt);
if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
WM_QUEUE_DELAYED_TASK, 0,
reinterpret_cast<LPARAM>(task_info))) {
delete task_info;
}
@ -277,11 +262,6 @@ void TaskQueueWin::RunPendingTasks() {
}
}
// static
void TaskQueueWin::ThreadMain(void* context) {
static_cast<TaskQueueWin*>(context)->RunThreadMain();
}
void TaskQueueWin::RunThreadMain() {
CurrentTaskQueueSetter set_current(this);
HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};

View file

@ -153,28 +153,24 @@ class SleepDeadlock : public DeadlockInterface {
}
};
// This is the function that is exectued by the thread that will deadlock and
// have its stacktrace captured.
void ThreadFunction(void* void_params) {
ThreadParams* params = static_cast<ThreadParams*>(void_params);
params->tid = gettid();
params->deadlock_region_start_address = GetCurrentRelativeExecutionAddress();
params->deadlock_start_event.Set();
params->deadlock_impl->Deadlock();
params->deadlock_region_end_address = GetCurrentRelativeExecutionAddress();
params->deadlock_done_event.Set();
}
void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) {
// Set params that will be sent to other thread.
ThreadParams params;
params.deadlock_impl = deadlock_impl.get();
// Spawn thread.
rtc::PlatformThread thread(&ThreadFunction, &params, "StacktraceTest");
thread.Start();
auto thread = rtc::PlatformThread::SpawnJoinable(
[&params] {
params.tid = gettid();
params.deadlock_region_start_address =
GetCurrentRelativeExecutionAddress();
params.deadlock_start_event.Set();
params.deadlock_impl->Deadlock();
params.deadlock_region_end_address =
GetCurrentRelativeExecutionAddress();
params.deadlock_done_event.Set();
},
"StacktraceTest");
// Wait until the thread has entered the deadlock region, and take a very
// brief nap to give it time to reach the actual deadlock.
@ -198,8 +194,6 @@ void TestStacktrace(std::unique_ptr<DeadlockInterface> deadlock_impl) {
<< rtc::ToHex(params.deadlock_region_start_address) << ", "
<< rtc::ToHex(params.deadlock_region_end_address)
<< "] not contained in: " << StackTraceToString(stack_trace);
thread.Stop();
}
class LookoutLogSink final : public rtc::LogSink {
@ -259,13 +253,9 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) {
// Start a thread that waits for an event.
rtc::Event ev;
rtc::PlatformThread thread(
[](void* arg) {
auto* ev = static_cast<rtc::Event*>(arg);
ev->Wait(rtc::Event::kForever);
},
&ev, "TestRtcEventDeadlockDetection");
thread.Start();
auto thread = rtc::PlatformThread::SpawnJoinable(
[&ev] { ev.Wait(rtc::Event::kForever); },
"TestRtcEventDeadlockDetection");
// The message should appear after 3 sec. We'll wait up to 10 sec in an
// attempt to not be flaky.
@ -273,7 +263,7 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) {
// Unblock the thread and shut it down.
ev.Set();
thread.Stop();
thread.Finalize();
rtc::LogMessage::RemoveLogToStream(&sink);
}

View file

@ -142,12 +142,9 @@ void DefaultVideoQualityAnalyzer::Start(
int max_threads_count) {
test_label_ = std::move(test_case_name);
for (int i = 0; i < max_threads_count; i++) {
auto thread = std::make_unique<rtc::PlatformThread>(
&DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this,
("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(),
rtc::ThreadAttributes().SetPriority(rtc::kNormalPriority));
thread->Start();
thread_pool_.push_back(std::move(thread));
thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable(
[this] { ProcessComparisons(); },
"DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)));
}
{
MutexLock lock(&lock_);
@ -547,10 +544,6 @@ void DefaultVideoQualityAnalyzer::Stop() {
}
StopMeasuringCpuProcessTime();
comparison_available_event_.Set();
for (auto& thread : thread_pool_) {
thread->Stop();
}
// PlatformThread have to be deleted on the same thread, where it was created
thread_pool_.clear();
// Perform final Metrics update. On this place analyzer is stopped and no one
@ -677,10 +670,6 @@ void DefaultVideoQualityAnalyzer::AddComparison(
StopExcludingCpuThreadTime();
}
void DefaultVideoQualityAnalyzer::ProcessComparisonsThread(void* obj) {
static_cast<DefaultVideoQualityAnalyzer*>(obj)->ProcessComparisons();
}
void DefaultVideoQualityAnalyzer::ProcessComparisons() {
while (true) {
// Try to pick next comparison to perform from the queue.

View file

@ -560,7 +560,7 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_);
AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_);
std::vector<std::unique_ptr<rtc::PlatformThread>> thread_pool_;
std::vector<rtc::PlatformThread> thread_pool_;
rtc::Event comparison_available_event_;
Mutex cpu_measurement_lock_;

View file

@ -137,10 +137,12 @@ VideoAnalyzer::VideoAnalyzer(test::LayerFilteringTransport* transport,
}
for (uint32_t i = 0; i < num_cores; ++i) {
rtc::PlatformThread* thread =
new rtc::PlatformThread(&FrameComparisonThread, this, "Analyzer");
thread->Start();
comparison_thread_pool_.push_back(thread);
comparison_thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable(
[this] {
while (CompareFrames()) {
}
},
"Analyzer"));
}
if (!rtp_dump_name.empty()) {
@ -155,10 +157,8 @@ VideoAnalyzer::~VideoAnalyzer() {
MutexLock lock(&comparison_lock_);
quit_ = true;
}
for (rtc::PlatformThread* thread : comparison_thread_pool_) {
thread->Stop();
delete thread;
}
// Joins all threads.
comparison_thread_pool_.clear();
}
void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) {
@ -533,12 +533,6 @@ void VideoAnalyzer::PollStats() {
memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes());
}
void VideoAnalyzer::FrameComparisonThread(void* obj) {
VideoAnalyzer* analyzer = static_cast<VideoAnalyzer*>(obj);
while (analyzer->CompareFrames()) {
}
}
bool VideoAnalyzer::CompareFrames() {
if (AllFramesRecorded())
return false;

View file

@ -302,7 +302,7 @@ class VideoAnalyzer : public PacketReceiver,
const double avg_ssim_threshold_;
bool is_quick_test_enabled_;
std::vector<rtc::PlatformThread*> comparison_thread_pool_;
std::vector<rtc::PlatformThread> comparison_thread_pool_;
rtc::Event comparison_available_event_;
std::deque<FrameComparison> comparisons_ RTC_GUARDED_BY(comparison_lock_);
bool quit_ RTC_GUARDED_BY(comparison_lock_);