diff --git a/api/sequence_checker_unittest.cc b/api/sequence_checker_unittest.cc index 21a0894a8e..4029b8c9a0 100644 --- a/api/sequence_checker_unittest.cc +++ b/api/sequence_checker_unittest.cc @@ -40,14 +40,21 @@ class CompileTimeTestForGuardedBy { }; void RunOnDifferentThread(rtc::FunctionView run) { - rtc::Event thread_has_run_event; - rtc::PlatformThread::SpawnJoinable( - [&] { - run(); - thread_has_run_event.Set(); - }, - "thread"); - EXPECT_TRUE(thread_has_run_event.Wait(1000)); + struct Object { + static void Run(void* obj) { + auto* me = static_cast(obj); + me->run(); + me->thread_has_run_event.Set(); + } + + rtc::FunctionView run; + rtc::Event thread_has_run_event; + } object{run}; + + rtc::PlatformThread thread(&Object::Run, &object, "thread"); + thread.Start(); + EXPECT_TRUE(object.thread_has_run_event.Wait(1000)); + thread.Stop(); } } // namespace diff --git a/modules/audio_coding/acm2/audio_coding_module_unittest.cc b/modules/audio_coding/acm2/audio_coding_module_unittest.cc index 5b0577745c..7a962e5ce3 100644 --- a/modules/audio_coding/acm2/audio_coding_module_unittest.cc +++ b/modules/audio_coding/acm2/audio_coding_module_unittest.cc @@ -429,6 +429,21 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { AudioCodingModuleMtTestOldApi() : AudioCodingModuleTestOldApi(), + send_thread_( + CbSendThread, + this, + "send", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), + insert_packet_thread_( + CbInsertPacketThread, + this, + "insert_packet", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), + pull_audio_thread_( + CbPullAudioThread, + this, + "pull_audio", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), send_count_(0), insert_packet_count_(0), pull_audio_count_(0), @@ -445,38 +460,17 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { void StartThreads() { quit_.store(false); - - const auto attributes = - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); - send_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!quit_.load()) { - CbSendImpl(); - } - }, - "send", attributes); - insert_packet_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!quit_.load()) { - CbInsertPacketImpl(); - } - }, - "insert_packet", attributes); - pull_audio_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!quit_.load()) { - CbPullAudioImpl(); - } - }, - "pull_audio", attributes); + send_thread_.Start(); + insert_packet_thread_.Start(); + pull_audio_thread_.Start(); } void TearDown() { AudioCodingModuleTestOldApi::TearDown(); quit_.store(true); - pull_audio_thread_.Finalize(); - send_thread_.Finalize(); - insert_packet_thread_.Finalize(); + pull_audio_thread_.Stop(); + send_thread_.Stop(); + insert_packet_thread_.Stop(); } bool RunTest() { @@ -494,6 +488,14 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { return false; } + static void CbSendThread(void* context) { + AudioCodingModuleMtTestOldApi* fixture = + reinterpret_cast(context); + while (!fixture->quit_.load()) { + fixture->CbSendImpl(); + } + } + // The send thread doesn't have to care about the current simulated time, // since only the AcmReceiver is using the clock. void CbSendImpl() { @@ -509,6 +511,14 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { } } + static void CbInsertPacketThread(void* context) { + AudioCodingModuleMtTestOldApi* fixture = + reinterpret_cast(context); + while (!fixture->quit_.load()) { + fixture->CbInsertPacketImpl(); + } + } + void CbInsertPacketImpl() { SleepMs(1); { @@ -523,6 +533,14 @@ class AudioCodingModuleMtTestOldApi : public AudioCodingModuleTestOldApi { InsertPacket(); } + static void CbPullAudioThread(void* context) { + AudioCodingModuleMtTestOldApi* fixture = + reinterpret_cast(context); + while (!fixture->quit_.load()) { + fixture->CbPullAudioImpl(); + } + } + void CbPullAudioImpl() { SleepMs(1); { @@ -681,6 +699,16 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi { AcmReRegisterIsacMtTestOldApi() : AudioCodingModuleTestOldApi(), + receive_thread_( + CbReceiveThread, + this, + "receive", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), + codec_registration_thread_( + CbCodecRegistrationThread, + this, + "codec_registration", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), codec_registered_(false), receive_packet_count_(0), next_insert_packet_time_ms_(0), @@ -712,34 +740,28 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi { void StartThreads() { quit_.store(false); - const auto attributes = - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); - receive_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!quit_.load() && CbReceiveImpl()) { - } - }, - "receive", attributes); - codec_registration_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!quit_.load()) { - CbCodecRegistrationImpl(); - } - }, - "codec_registration", attributes); + receive_thread_.Start(); + codec_registration_thread_.Start(); } void TearDown() override { AudioCodingModuleTestOldApi::TearDown(); quit_.store(true); - receive_thread_.Finalize(); - codec_registration_thread_.Finalize(); + receive_thread_.Stop(); + codec_registration_thread_.Stop(); } bool RunTest() { return test_complete_.Wait(10 * 60 * 1000); // 10 minutes' timeout. } + static void CbReceiveThread(void* context) { + AcmReRegisterIsacMtTestOldApi* fixture = + reinterpret_cast(context); + while (!fixture->quit_.load() && fixture->CbReceiveImpl()) { + } + } + bool CbReceiveImpl() { SleepMs(1); rtc::Buffer encoded; @@ -785,6 +807,14 @@ class AcmReRegisterIsacMtTestOldApi : public AudioCodingModuleTestOldApi { return true; } + static void CbCodecRegistrationThread(void* context) { + AcmReRegisterIsacMtTestOldApi* fixture = + reinterpret_cast(context); + while (!fixture->quit_.load()) { + fixture->CbCodecRegistrationImpl(); + } + } + void CbCodecRegistrationImpl() { SleepMs(1); if (HasFatalFailure()) { diff --git a/modules/audio_device/dummy/file_audio_device.cc b/modules/audio_device/dummy/file_audio_device.cc index e345a16c44..90bba05296 100644 --- a/modules/audio_device/dummy/file_audio_device.cc +++ b/modules/audio_device/dummy/file_audio_device.cc @@ -216,13 +216,10 @@ int32_t FileAudioDevice::StartPlayout() { } } - _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( - [this] { - while (PlayThreadProcess()) { - } - }, - "webrtc_audio_module_play_thread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + _ptrThreadPlay.reset(new rtc::PlatformThread( + PlayThreadFunc, this, "webrtc_audio_module_play_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + _ptrThreadPlay->Start(); RTC_LOG(LS_INFO) << "Started playout capture to output file: " << _outputFilename; @@ -236,8 +233,10 @@ int32_t FileAudioDevice::StopPlayout() { } // stop playout thread first - if (!_ptrThreadPlay.empty()) - _ptrThreadPlay.Finalize(); + if (_ptrThreadPlay) { + _ptrThreadPlay->Stop(); + _ptrThreadPlay.reset(); + } MutexLock lock(&mutex_); @@ -277,13 +276,11 @@ int32_t FileAudioDevice::StartRecording() { } } - _ptrThreadRec = rtc::PlatformThread::SpawnJoinable( - [this] { - while (RecThreadProcess()) { - } - }, - "webrtc_audio_module_capture_thread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + _ptrThreadRec.reset(new rtc::PlatformThread( + RecThreadFunc, this, "webrtc_audio_module_capture_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + + _ptrThreadRec->Start(); RTC_LOG(LS_INFO) << "Started recording from input file: " << _inputFilename; @@ -296,8 +293,10 @@ int32_t FileAudioDevice::StopRecording() { _recording = false; } - if (!_ptrThreadRec.empty()) - _ptrThreadRec.Finalize(); + if (_ptrThreadRec) { + _ptrThreadRec->Stop(); + _ptrThreadRec.reset(); + } MutexLock lock(&mutex_); _recordingFramesLeft = 0; @@ -440,6 +439,18 @@ void FileAudioDevice::AttachAudioBuffer(AudioDeviceBuffer* audioBuffer) { _ptrAudioBuffer->SetPlayoutChannels(0); } +void FileAudioDevice::PlayThreadFunc(void* pThis) { + FileAudioDevice* device = static_cast(pThis); + while (device->PlayThreadProcess()) { + } +} + +void FileAudioDevice::RecThreadFunc(void* pThis) { + FileAudioDevice* device = static_cast(pThis); + while (device->RecThreadProcess()) { + } +} + bool FileAudioDevice::PlayThreadProcess() { if (!_playing) { return false; diff --git a/modules/audio_device/dummy/file_audio_device.h b/modules/audio_device/dummy/file_audio_device.h index f4a6b76586..ecb3f2f533 100644 --- a/modules/audio_device/dummy/file_audio_device.h +++ b/modules/audio_device/dummy/file_audio_device.h @@ -17,11 +17,14 @@ #include #include "modules/audio_device/audio_device_generic.h" -#include "rtc_base/platform_thread.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/file_wrapper.h" #include "rtc_base/time_utils.h" +namespace rtc { +class PlatformThread; +} // namespace rtc + namespace webrtc { // This is a fake audio device which plays audio from a file as its microphone @@ -142,8 +145,9 @@ class FileAudioDevice : public AudioDeviceGeneric { size_t _recordingFramesIn10MS; size_t _playoutFramesIn10MS; - rtc::PlatformThread _ptrThreadRec; - rtc::PlatformThread _ptrThreadPlay; + // TODO(pbos): Make plain members instead of pointers and stop resetting them. + std::unique_ptr _ptrThreadRec; + std::unique_ptr _ptrThreadPlay; bool _playing; bool _recording; diff --git a/modules/audio_device/linux/audio_device_alsa_linux.cc b/modules/audio_device/linux/audio_device_alsa_linux.cc index 9e6bd168fc..eb3466258e 100644 --- a/modules/audio_device/linux/audio_device_alsa_linux.cc +++ b/modules/audio_device/linux/audio_device_alsa_linux.cc @@ -178,13 +178,26 @@ int32_t AudioDeviceLinuxALSA::Terminate() { _mixerManager.Close(); // RECORDING - mutex_.Unlock(); - _ptrThreadRec.Finalize(); + if (_ptrThreadRec) { + rtc::PlatformThread* tmpThread = _ptrThreadRec.release(); + mutex_.Unlock(); + + tmpThread->Stop(); + delete tmpThread; + + mutex_.Lock(); + } // PLAYOUT - _ptrThreadPlay.Finalize(); - mutex_.Lock(); + if (_ptrThreadPlay) { + rtc::PlatformThread* tmpThread = _ptrThreadPlay.release(); + mutex_.Unlock(); + tmpThread->Stop(); + delete tmpThread; + + mutex_.Lock(); + } #if defined(WEBRTC_USE_X11) if (_XDisplay) { XCloseDisplay(_XDisplay); @@ -1027,13 +1040,11 @@ int32_t AudioDeviceLinuxALSA::StartRecording() { return -1; } // RECORDING - _ptrThreadRec = rtc::PlatformThread::SpawnJoinable( - [this] { - while (RecThreadProcess()) { - } - }, - "webrtc_audio_module_capture_thread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + _ptrThreadRec.reset(new rtc::PlatformThread( + RecThreadFunc, this, "webrtc_audio_module_capture_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + + _ptrThreadRec->Start(); errVal = LATE(snd_pcm_prepare)(_handleRecord); if (errVal < 0) { @@ -1077,7 +1088,10 @@ int32_t AudioDeviceLinuxALSA::StopRecordingLocked() { _recIsInitialized = false; _recording = false; - _ptrThreadRec.Finalize(); + if (_ptrThreadRec) { + _ptrThreadRec->Stop(); + _ptrThreadRec.reset(); + } _recordingFramesLeft = 0; if (_recordingBuffer) { @@ -1144,13 +1158,10 @@ int32_t AudioDeviceLinuxALSA::StartPlayout() { } // PLAYOUT - _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( - [this] { - while (PlayThreadProcess()) { - } - }, - "webrtc_audio_module_play_thread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + _ptrThreadPlay.reset(new rtc::PlatformThread( + PlayThreadFunc, this, "webrtc_audio_module_play_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + _ptrThreadPlay->Start(); int errVal = LATE(snd_pcm_prepare)(_handlePlayout); if (errVal < 0) { @@ -1180,7 +1191,10 @@ int32_t AudioDeviceLinuxALSA::StopPlayoutLocked() { _playing = false; // stop playout thread first - _ptrThreadPlay.Finalize(); + if (_ptrThreadPlay) { + _ptrThreadPlay->Stop(); + _ptrThreadPlay.reset(); + } _playoutFramesLeft = 0; delete[] _playoutBuffer; @@ -1455,6 +1469,18 @@ int32_t AudioDeviceLinuxALSA::ErrorRecovery(int32_t error, // Thread Methods // ============================================================================ +void AudioDeviceLinuxALSA::PlayThreadFunc(void* pThis) { + AudioDeviceLinuxALSA* device = static_cast(pThis); + while (device->PlayThreadProcess()) { + } +} + +void AudioDeviceLinuxALSA::RecThreadFunc(void* pThis) { + AudioDeviceLinuxALSA* device = static_cast(pThis); + while (device->RecThreadProcess()) { + } +} + bool AudioDeviceLinuxALSA::PlayThreadProcess() { if (!_playing) return false; diff --git a/modules/audio_device/linux/audio_device_alsa_linux.h b/modules/audio_device/linux/audio_device_alsa_linux.h index 1f4a231640..410afcf42c 100644 --- a/modules/audio_device/linux/audio_device_alsa_linux.h +++ b/modules/audio_device/linux/audio_device_alsa_linux.h @@ -155,8 +155,10 @@ class AudioDeviceLinuxALSA : public AudioDeviceGeneric { Mutex mutex_; - rtc::PlatformThread _ptrThreadRec; - rtc::PlatformThread _ptrThreadPlay; + // TODO(pbos): Make plain members and start/stop instead of resetting these + // pointers. A thread can be reused. + std::unique_ptr _ptrThreadRec; + std::unique_ptr _ptrThreadPlay; AudioMixerManagerLinuxALSA _mixerManager; diff --git a/modules/audio_device/linux/audio_device_pulse_linux.cc b/modules/audio_device/linux/audio_device_pulse_linux.cc index 7742420fc2..942e60da53 100644 --- a/modules/audio_device/linux/audio_device_pulse_linux.cc +++ b/modules/audio_device/linux/audio_device_pulse_linux.cc @@ -15,7 +15,6 @@ #include "modules/audio_device/linux/latebindingsymboltable_linux.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" -#include "rtc_base/platform_thread.h" WebRTCPulseSymbolTable* GetPulseSymbolTable() { static WebRTCPulseSymbolTable* pulse_symbol_table = @@ -159,22 +158,18 @@ AudioDeviceGeneric::InitStatus AudioDeviceLinuxPulse::Init() { #endif // RECORDING - const auto attributes = - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); - _ptrThreadRec = rtc::PlatformThread::SpawnJoinable( - [this] { - while (RecThreadProcess()) { - } - }, - "webrtc_audio_module_rec_thread", attributes); + _ptrThreadRec.reset(new rtc::PlatformThread( + RecThreadFunc, this, "webrtc_audio_module_rec_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + + _ptrThreadRec->Start(); // PLAYOUT - _ptrThreadPlay = rtc::PlatformThread::SpawnJoinable( - [this] { - while (PlayThreadProcess()) { - } - }, - "webrtc_audio_module_play_thread", attributes); + _ptrThreadPlay.reset(new rtc::PlatformThread( + PlayThreadFunc, this, "webrtc_audio_module_play_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + _ptrThreadPlay->Start(); + _initialized = true; return InitStatus::OK; @@ -192,12 +187,22 @@ int32_t AudioDeviceLinuxPulse::Terminate() { _mixerManager.Close(); // RECORDING - _timeEventRec.Set(); - _ptrThreadRec.Finalize(); + if (_ptrThreadRec) { + rtc::PlatformThread* tmpThread = _ptrThreadRec.release(); + + _timeEventRec.Set(); + tmpThread->Stop(); + delete tmpThread; + } // PLAYOUT - _timeEventPlay.Set(); - _ptrThreadPlay.Finalize(); + if (_ptrThreadPlay) { + rtc::PlatformThread* tmpThread = _ptrThreadPlay.release(); + + _timeEventPlay.Set(); + tmpThread->Stop(); + delete tmpThread; + } // Terminate PulseAudio if (TerminatePulseAudio() < 0) { @@ -1976,6 +1981,18 @@ int32_t AudioDeviceLinuxPulse::ProcessRecordedData(int8_t* bufferData, return 0; } +void AudioDeviceLinuxPulse::PlayThreadFunc(void* pThis) { + AudioDeviceLinuxPulse* device = static_cast(pThis); + while (device->PlayThreadProcess()) { + } +} + +void AudioDeviceLinuxPulse::RecThreadFunc(void* pThis) { + AudioDeviceLinuxPulse* device = static_cast(pThis); + while (device->RecThreadProcess()) { + } +} + bool AudioDeviceLinuxPulse::PlayThreadProcess() { if (!_timeEventPlay.Wait(1000)) { return true; diff --git a/modules/audio_device/linux/audio_device_pulse_linux.h b/modules/audio_device/linux/audio_device_pulse_linux.h index 0cf89ef011..cfad6b1c15 100644 --- a/modules/audio_device/linux/audio_device_pulse_linux.h +++ b/modules/audio_device/linux/audio_device_pulse_linux.h @@ -268,8 +268,9 @@ class AudioDeviceLinuxPulse : public AudioDeviceGeneric { rtc::Event _recStartEvent; rtc::Event _playStartEvent; - rtc::PlatformThread _ptrThreadPlay; - rtc::PlatformThread _ptrThreadRec; + // TODO(pbos): Remove unique_ptr and use directly without resetting. + std::unique_ptr _ptrThreadPlay; + std::unique_ptr _ptrThreadRec; AudioMixerManagerLinuxPulse _mixerManager; diff --git a/modules/audio_device/mac/audio_device_mac.cc b/modules/audio_device/mac/audio_device_mac.cc index 2088b017a0..f143a43f00 100644 --- a/modules/audio_device/mac/audio_device_mac.cc +++ b/modules/audio_device/mac/audio_device_mac.cc @@ -166,8 +166,8 @@ AudioDeviceMac::~AudioDeviceMac() { Terminate(); } - RTC_DCHECK(capture_worker_thread_.empty()); - RTC_DCHECK(render_worker_thread_.empty()); + RTC_DCHECK(!capture_worker_thread_.get()); + RTC_DCHECK(!render_worker_thread_.get()); if (_paRenderBuffer) { delete _paRenderBuffer; @@ -1308,14 +1308,12 @@ int32_t AudioDeviceMac::StartRecording() { return -1; } - RTC_DCHECK(capture_worker_thread_.empty()); - capture_worker_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (CaptureWorkerThread()) { - } - }, - "CaptureWorkerThread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + RTC_DCHECK(!capture_worker_thread_.get()); + capture_worker_thread_.reset(new rtc::PlatformThread( + RunCapture, this, "CaptureWorkerThread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + RTC_DCHECK(capture_worker_thread_.get()); + capture_worker_thread_->Start(); OSStatus err = noErr; if (_twoDevices) { @@ -1397,9 +1395,10 @@ int32_t AudioDeviceMac::StopRecording() { // Setting this signal will allow the worker thread to be stopped. AtomicSet32(&_captureDeviceIsAlive, 0); - if (!capture_worker_thread_.empty()) { + if (capture_worker_thread_.get()) { mutex_.Unlock(); - capture_worker_thread_.Finalize(); + capture_worker_thread_->Stop(); + capture_worker_thread_.reset(); mutex_.Lock(); } @@ -1445,14 +1444,11 @@ int32_t AudioDeviceMac::StartPlayout() { return 0; } - RTC_DCHECK(render_worker_thread_.empty()); - render_worker_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (RenderWorkerThread()) { - } - }, - "RenderWorkerThread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); + RTC_DCHECK(!render_worker_thread_.get()); + render_worker_thread_.reset(new rtc::PlatformThread( + RunRender, this, "RenderWorkerThread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))); + render_worker_thread_->Start(); if (_twoDevices || !_recording) { OSStatus err = noErr; @@ -1510,9 +1506,10 @@ int32_t AudioDeviceMac::StopPlayout() { // Setting this signal will allow the worker thread to be stopped. AtomicSet32(&_renderDeviceIsAlive, 0); - if (!render_worker_thread_.empty()) { + if (render_worker_thread_.get()) { mutex_.Unlock(); - render_worker_thread_.Finalize(); + render_worker_thread_->Stop(); + render_worker_thread_.reset(); mutex_.Lock(); } @@ -2374,6 +2371,12 @@ OSStatus AudioDeviceMac::implInConverterProc(UInt32* numberDataPackets, return 0; } +void AudioDeviceMac::RunRender(void* ptrThis) { + AudioDeviceMac* device = static_cast(ptrThis); + while (device->RenderWorkerThread()) { + } +} + bool AudioDeviceMac::RenderWorkerThread() { PaRingBufferSize numSamples = ENGINE_PLAY_BUF_SIZE_IN_SAMPLES * _outDesiredFormat.mChannelsPerFrame; @@ -2439,6 +2442,12 @@ bool AudioDeviceMac::RenderWorkerThread() { return true; } +void AudioDeviceMac::RunCapture(void* ptrThis) { + AudioDeviceMac* device = static_cast(ptrThis); + while (device->CaptureWorkerThread()) { + } +} + bool AudioDeviceMac::CaptureWorkerThread() { OSStatus err = noErr; UInt32 noRecSamples = diff --git a/modules/audio_device/mac/audio_device_mac.h b/modules/audio_device/mac/audio_device_mac.h index f9504b64b5..985db9da52 100644 --- a/modules/audio_device/mac/audio_device_mac.h +++ b/modules/audio_device/mac/audio_device_mac.h @@ -21,12 +21,15 @@ #include "modules/audio_device/mac/audio_mixer_manager_mac.h" #include "rtc_base/event.h" #include "rtc_base/logging.h" -#include "rtc_base/platform_thread.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" struct PaUtilRingBuffer; +namespace rtc { +class PlatformThread; +} // namespace rtc + namespace webrtc { const uint32_t N_REC_SAMPLES_PER_SEC = 48000; @@ -268,11 +271,13 @@ class AudioDeviceMac : public AudioDeviceGeneric { rtc::Event _stopEventRec; rtc::Event _stopEvent; + // TODO(pbos): Replace with direct members, just start/stop, no need to + // recreate the thread. // Only valid/running between calls to StartRecording and StopRecording. - rtc::PlatformThread capture_worker_thread_; + std::unique_ptr capture_worker_thread_; // Only valid/running between calls to StartPlayout and StopPlayout. - rtc::PlatformThread render_worker_thread_; + std::unique_ptr render_worker_thread_; AudioMixerManagerMac _mixerManager; diff --git a/modules/audio_device/win/core_audio_base_win.cc b/modules/audio_device/win/core_audio_base_win.cc index 7d93fcb14a..59debc07a9 100644 --- a/modules/audio_device/win/core_audio_base_win.cc +++ b/modules/audio_device/win/core_audio_base_win.cc @@ -119,6 +119,11 @@ const char* SessionDisconnectReasonToString( } } +void Run(void* obj) { + RTC_DCHECK(obj); + reinterpret_cast(obj)->ThreadRun(); +} + // Returns true if the selected audio device supports low latency, i.e, if it // is possible to initialize the engine using periods less than the default // period (10ms). @@ -548,19 +553,24 @@ bool CoreAudioBase::Start() { // Audio thread should be alive during internal restart since the restart // callback is triggered on that thread and it also makes the restart // sequence less complex. - RTC_DCHECK(!audio_thread_.empty()); + RTC_DCHECK(audio_thread_); } // Start an audio thread but only if one does not already exist (which is the // case during restart). - if (audio_thread_.empty()) { - const absl::string_view name = - IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread"; - audio_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { ThreadRun(); }, name, - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime)); - RTC_DLOG(INFO) << "Started thread with name: " << name - << " and handle: " << *audio_thread_.GetHandle(); + if (!audio_thread_) { + audio_thread_ = std::make_unique( + Run, this, IsInput() ? "wasapi_capture_thread" : "wasapi_render_thread", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)); + RTC_DCHECK(audio_thread_); + audio_thread_->Start(); + if (!audio_thread_->IsRunning()) { + StopThread(); + RTC_LOG(LS_ERROR) << "Failed to start audio thread"; + return false; + } + RTC_DLOG(INFO) << "Started thread with name: " << audio_thread_->name() + << " and id: " << audio_thread_->GetThreadRef(); } // Start streaming data between the endpoint buffer and the audio engine. @@ -687,11 +697,14 @@ bool CoreAudioBase::Restart() { void CoreAudioBase::StopThread() { RTC_DLOG(INFO) << __FUNCTION__; RTC_DCHECK(!IsRestarting()); - if (!audio_thread_.empty()) { - RTC_DLOG(INFO) << "Sets stop_event..."; - SetEvent(stop_event_.Get()); - RTC_DLOG(INFO) << "PlatformThread::Finalize..."; - audio_thread_.Finalize(); + if (audio_thread_) { + if (audio_thread_->IsRunning()) { + RTC_DLOG(INFO) << "Sets stop_event..."; + SetEvent(stop_event_.Get()); + RTC_DLOG(INFO) << "PlatformThread::Stop..."; + audio_thread_->Stop(); + } + audio_thread_.reset(); // Ensure that we don't quit the main thread loop immediately next // time Start() is called. @@ -704,7 +717,7 @@ bool CoreAudioBase::HandleRestartEvent() { RTC_DLOG(INFO) << __FUNCTION__ << "[" << DirectionToString(direction()) << "]"; RTC_DCHECK_RUN_ON(&thread_checker_audio_); - RTC_DCHECK(!audio_thread_.empty()); + RTC_DCHECK(audio_thread_); RTC_DCHECK(IsRestarting()); // Let each client (input and/or output) take care of its own restart // sequence since each side might need unique actions. diff --git a/modules/audio_device/win/core_audio_base_win.h b/modules/audio_device/win/core_audio_base_win.h index afcc6a684d..2a57636640 100644 --- a/modules/audio_device/win/core_audio_base_win.h +++ b/modules/audio_device/win/core_audio_base_win.h @@ -158,7 +158,7 @@ class CoreAudioBase : public IAudioSessionEvents { // Set when restart process starts and cleared when restart stops // successfully. Accessed atomically. std::atomic is_restarting_; - rtc::PlatformThread audio_thread_; + std::unique_ptr audio_thread_; Microsoft::WRL::ComPtr audio_session_control_; void StopThread(); diff --git a/modules/audio_processing/audio_processing_impl_locking_unittest.cc b/modules/audio_processing/audio_processing_impl_locking_unittest.cc index 66c1251d4c..1f065ffe52 100644 --- a/modules/audio_processing/audio_processing_impl_locking_unittest.cc +++ b/modules/audio_processing/audio_processing_impl_locking_unittest.cc @@ -387,6 +387,33 @@ class AudioProcessingImplLockTest void SetUp() override; void TearDown() override; + // Thread callback for the render thread + static void RenderProcessorThreadFunc(void* context) { + AudioProcessingImplLockTest* impl = + reinterpret_cast(context); + while (!impl->MaybeEndTest()) { + impl->render_thread_state_.Process(); + } + } + + // Thread callback for the capture thread + static void CaptureProcessorThreadFunc(void* context) { + AudioProcessingImplLockTest* impl = + reinterpret_cast(context); + while (!impl->MaybeEndTest()) { + impl->capture_thread_state_.Process(); + } + } + + // Thread callback for the stats thread + static void StatsProcessorThreadFunc(void* context) { + AudioProcessingImplLockTest* impl = + reinterpret_cast(context); + while (!impl->MaybeEndTest()) { + impl->stats_thread_state_.Process(); + } + } + // Tests whether all the required render and capture side calls have been // done. bool TestDone() { @@ -396,28 +423,9 @@ class AudioProcessingImplLockTest // Start the threads used in the test. void StartThreads() { - const auto attributes = - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); - render_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!MaybeEndTest()) - render_thread_state_.Process(); - }, - "render", attributes); - capture_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!MaybeEndTest()) { - capture_thread_state_.Process(); - } - }, - "capture", attributes); - - stats_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (!MaybeEndTest()) - stats_thread_state_.Process(); - }, - "stats", attributes); + render_thread_.Start(); + capture_thread_.Start(); + stats_thread_.Start(); } // Event handlers for the test. @@ -426,6 +434,9 @@ class AudioProcessingImplLockTest rtc::Event capture_call_event_; // Thread related variables. + rtc::PlatformThread render_thread_; + rtc::PlatformThread capture_thread_; + rtc::PlatformThread stats_thread_; mutable RandomGenerator rand_gen_; std::unique_ptr apm_; @@ -434,9 +445,6 @@ class AudioProcessingImplLockTest RenderProcessor render_thread_state_; CaptureProcessor capture_thread_state_; StatsProcessor stats_thread_state_; - rtc::PlatformThread render_thread_; - rtc::PlatformThread capture_thread_; - rtc::PlatformThread stats_thread_; }; // Sleeps a random time between 0 and max_sleep milliseconds. @@ -477,7 +485,22 @@ void PopulateAudioFrame(float amplitude, } AudioProcessingImplLockTest::AudioProcessingImplLockTest() - : apm_(AudioProcessingBuilderForTesting().Create()), + : render_thread_( + RenderProcessorThreadFunc, + this, + "render", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), + capture_thread_( + CaptureProcessorThreadFunc, + this, + "capture", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), + stats_thread_( + StatsProcessorThreadFunc, + this, + "stats", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority)), + apm_(AudioProcessingBuilderForTesting().Create()), render_thread_state_(kMaxFrameSize, &rand_gen_, &render_call_event_, @@ -529,6 +552,9 @@ void AudioProcessingImplLockTest::SetUp() { void AudioProcessingImplLockTest::TearDown() { render_call_event_.Set(); capture_call_event_.Set(); + render_thread_.Stop(); + capture_thread_.Stop(); + stats_thread_.Stop(); } StatsProcessor::StatsProcessor(RandomGenerator* rand_gen, diff --git a/modules/audio_processing/audio_processing_performance_unittest.cc b/modules/audio_processing/audio_processing_performance_unittest.cc index 9585850296..9063cf4a93 100644 --- a/modules/audio_processing/audio_processing_performance_unittest.cc +++ b/modules/audio_processing/audio_processing_performance_unittest.cc @@ -391,7 +391,17 @@ class TimedThreadApiProcessor { class CallSimulator : public ::testing::TestWithParam { public: CallSimulator() - : rand_gen_(42U), + : render_thread_(new rtc::PlatformThread( + RenderProcessorThreadFunc, + this, + "render", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))), + capture_thread_(new rtc::PlatformThread( + CaptureProcessorThreadFunc, + this, + "capture", + rtc::ThreadAttributes().SetPriority(rtc::kRealtimePriority))), + rand_gen_(42U), simulation_config_(static_cast(GetParam())) {} // Run the call simulation with a timeout. @@ -426,10 +436,13 @@ class CallSimulator : public ::testing::TestWithParam { static const int kMinNumFramesToProcess = 150; static const int32_t kTestTimeout = 3 * 10 * kMinNumFramesToProcess; + // ::testing::TestWithParam<> implementation. + void TearDown() override { StopThreads(); } + // Stop all running threads. void StopThreads() { - render_thread_.Finalize(); - capture_thread_.Finalize(); + render_thread_->Stop(); + capture_thread_->Stop(); } // Simulator and APM setup. @@ -520,28 +533,32 @@ class CallSimulator : public ::testing::TestWithParam { kMinNumFramesToProcess, kCaptureInputFloatLevel, num_capture_channels)); } + // Thread callback for the render thread. + static void RenderProcessorThreadFunc(void* context) { + CallSimulator* call_simulator = reinterpret_cast(context); + while (call_simulator->render_thread_state_->Process()) { + } + } + + // Thread callback for the capture thread. + static void CaptureProcessorThreadFunc(void* context) { + CallSimulator* call_simulator = reinterpret_cast(context); + while (call_simulator->capture_thread_state_->Process()) { + } + } + // Start the threads used in the test. void StartThreads() { - const auto attributes = - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kRealtime); - render_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (render_thread_state_->Process()) { - } - }, - "render", attributes); - capture_thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - while (capture_thread_state_->Process()) { - } - }, - "capture", attributes); + ASSERT_NO_FATAL_FAILURE(render_thread_->Start()); + ASSERT_NO_FATAL_FAILURE(capture_thread_->Start()); } // Event handler for the test. rtc::Event test_complete_; // Thread related variables. + std::unique_ptr render_thread_; + std::unique_ptr capture_thread_; Random rand_gen_; std::unique_ptr apm_; @@ -550,8 +567,6 @@ class CallSimulator : public ::testing::TestWithParam { LockedFlag capture_call_checker_; std::unique_ptr render_thread_state_; std::unique_ptr capture_thread_state_; - rtc::PlatformThread render_thread_; - rtc::PlatformThread capture_thread_; }; // Implements the callback functionality for the threads. diff --git a/modules/desktop_capture/screen_drawer_unittest.cc b/modules/desktop_capture/screen_drawer_unittest.cc index 2394260105..c38eee6991 100644 --- a/modules/desktop_capture/screen_drawer_unittest.cc +++ b/modules/desktop_capture/screen_drawer_unittest.cc @@ -48,12 +48,13 @@ void TestScreenDrawerLock( ~Task() = default; - void RunTask() { - std::unique_ptr lock = ctor_(); + static void RunTask(void* me) { + Task* task = static_cast(me); + std::unique_ptr lock = task->ctor_(); ASSERT_TRUE(!!lock); - created_->store(true); + task->created_->store(true); // Wait for the main thread to get the signal of created_. - while (!ready_.load()) { + while (!task->ready_.load()) { SleepMs(1); } // At this point, main thread should begin to create a second lock. Though @@ -76,8 +77,8 @@ void TestScreenDrawerLock( const rtc::FunctionView()> ctor_; } task(&created, ready, ctor); - auto lock_thread = rtc::PlatformThread::SpawnJoinable( - [&task] { task.RunTask(); }, "lock_thread"); + rtc::PlatformThread lock_thread(&Task::RunTask, &task, "lock_thread"); + lock_thread.Start(); // Wait for the first lock in Task::RunTask() to be created. // TODO(zijiehe): Find a better solution to wait for the creation of the first @@ -94,6 +95,7 @@ void TestScreenDrawerLock( ASSERT_GT(kLockDurationMs, rtc::TimeMillis() - start_ms); ctor(); ASSERT_LE(kLockDurationMs, rtc::TimeMillis() - start_ms); + lock_thread.Stop(); } } // namespace diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc index cdc2fa1005..dc2a0066e9 100644 --- a/modules/utility/source/process_thread_impl.cc +++ b/modules/utility/source/process_thread_impl.cc @@ -48,6 +48,7 @@ ProcessThreadImpl::ProcessThreadImpl(const char* thread_name) ProcessThreadImpl::~ProcessThreadImpl() { RTC_DCHECK(thread_checker_.IsCurrent()); + RTC_DCHECK(!thread_.get()); RTC_DCHECK(!stop_); while (!delayed_tasks_.empty()) { @@ -71,8 +72,8 @@ void ProcessThreadImpl::Delete() { // Doesn't need locking, because the contending thread isn't running. void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { RTC_DCHECK(thread_checker_.IsCurrent()); - RTC_DCHECK(thread_.empty()); - if (!thread_.empty()) + RTC_DCHECK(!thread_.get()); + if (thread_.get()) return; RTC_DCHECK(!stop_); @@ -80,18 +81,14 @@ void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS { for (ModuleCallback& m : modules_) m.module->ProcessThreadAttached(this); - thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - CurrentTaskQueueSetter set_current(this); - while (Process()) { - } - }, - thread_name_); + thread_.reset( + new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_)); + thread_->Start(); } void ProcessThreadImpl::Stop() { RTC_DCHECK(thread_checker_.IsCurrent()); - if (thread_.empty()) + if (!thread_.get()) return; { @@ -101,7 +98,9 @@ void ProcessThreadImpl::Stop() { } wake_up_.Set(); - thread_.Finalize(); + + thread_->Stop(); + thread_.reset(); StopNoLocks(); } @@ -109,7 +108,7 @@ void ProcessThreadImpl::Stop() { // No locking needed, since this is called after the contending thread is // stopped. void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS { - RTC_DCHECK(thread_.empty()); + RTC_DCHECK(!thread_); stop_ = false; for (ModuleCallback& m : modules_) @@ -200,7 +199,7 @@ void ProcessThreadImpl::RegisterModule(Module* module, // Now that we know the module isn't in the list, we'll call out to notify // the module that it's attached to the worker thread. We don't hold // the lock while we make this call. - if (!thread_.empty()) + if (thread_.get()) module->ProcessThreadAttached(this); { @@ -228,6 +227,14 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) { module->ProcessThreadAttached(nullptr); } +// static +void ProcessThreadImpl::Run(void* obj) { + ProcessThreadImpl* impl = static_cast(obj); + CurrentTaskQueueSetter set_current(impl); + while (impl->Process()) { + } +} + bool ProcessThreadImpl::Process() { TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_); int64_t now = rtc::TimeMillis(); diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h index b667bfc68a..b83994cef8 100644 --- a/modules/utility/source/process_thread_impl.h +++ b/modules/utility/source/process_thread_impl.h @@ -45,6 +45,7 @@ class ProcessThreadImpl : public ProcessThread { void DeRegisterModule(Module* module) override; protected: + static void Run(void* obj); bool Process(); private: @@ -96,7 +97,8 @@ class ProcessThreadImpl : public ProcessThread { SequenceChecker thread_checker_; rtc::Event wake_up_; - rtc::PlatformThread thread_; + // TODO(pbos): Remove unique_ptr and stop recreating the thread. + std::unique_ptr thread_; ModuleList modules_ RTC_GUARDED_BY(mutex_); // Set to true when calling Process, to allow reentrant calls to WakeUp. diff --git a/modules/video_capture/linux/video_capture_linux.cc b/modules/video_capture/linux/video_capture_linux.cc index 10f9713ec3..49237cdf19 100644 --- a/modules/video_capture/linux/video_capture_linux.cc +++ b/modules/video_capture/linux/video_capture_linux.cc @@ -240,15 +240,12 @@ int32_t VideoCaptureModuleV4L2::StartCapture( } // start capture thread; - if (_captureThread.empty()) { + if (!_captureThread) { quit_ = false; - _captureThread = rtc::PlatformThread::SpawnJoinable( - [this] { - while (CaptureProcess()) { - } - }, - "CaptureThread", - rtc::ThreadAttributes().SetPriority(rtc::ThreadPriority::kHigh)); + _captureThread.reset(new rtc::PlatformThread( + VideoCaptureModuleV4L2::CaptureThread, this, "CaptureThread", + rtc::ThreadAttributes().SetPriority(rtc::kHighPriority))); + _captureThread->Start(); } // Needed to start UVC camera - from the uvcview application @@ -264,13 +261,14 @@ int32_t VideoCaptureModuleV4L2::StartCapture( } int32_t VideoCaptureModuleV4L2::StopCapture() { - if (!_captureThread.empty()) { + if (_captureThread) { { MutexLock lock(&capture_lock_); quit_ = true; } - // Make sure the capture thread stops using the mutex. - _captureThread.Finalize(); + // Make sure the capture thread stop stop using the critsect. + _captureThread->Stop(); + _captureThread.reset(); } MutexLock lock(&capture_lock_); @@ -358,6 +356,11 @@ bool VideoCaptureModuleV4L2::CaptureStarted() { return _captureStarted; } +void VideoCaptureModuleV4L2::CaptureThread(void* obj) { + VideoCaptureModuleV4L2* capture = static_cast(obj); + while (capture->CaptureProcess()) { + } +} bool VideoCaptureModuleV4L2::CaptureProcess() { int retVal = 0; fd_set rSet; diff --git a/modules/video_capture/linux/video_capture_linux.h b/modules/video_capture/linux/video_capture_linux.h index fa06d72b8d..ddb5d5ba87 100644 --- a/modules/video_capture/linux/video_capture_linux.h +++ b/modules/video_capture/linux/video_capture_linux.h @@ -41,7 +41,8 @@ class VideoCaptureModuleV4L2 : public VideoCaptureImpl { bool AllocateVideoBuffers(); bool DeAllocateVideoBuffers(); - rtc::PlatformThread _captureThread; + // TODO(pbos): Stop using unique_ptr and resetting the thread. + std::unique_ptr _captureThread; Mutex capture_lock_; bool quit_ RTC_GUARDED_BY(capture_lock_); int32_t _deviceId; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 501ca01541..4d186c7040 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -245,7 +245,6 @@ rtc_library("platform_thread") { absl_deps = [ "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", - "//third_party/abseil-cpp/absl/types:optional", ] } @@ -562,10 +561,7 @@ if (is_win) { "../api/task_queue", "synchronization:mutex", ] - absl_deps = [ - "//third_party/abseil-cpp/absl/strings", - "//third_party/abseil-cpp/absl/types:optional", - ] + absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } } @@ -1417,7 +1413,6 @@ if (rtc_include_tests) { absl_deps = [ "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/memory", - "//third_party/abseil-cpp/absl/types:optional", ] } diff --git a/rtc_base/async_resolver.cc b/rtc_base/async_resolver.cc index d482b4e681..9e6a2bae1c 100644 --- a/rtc_base/async_resolver.cc +++ b/rtc_base/async_resolver.cc @@ -123,7 +123,7 @@ void AsyncResolver::Start(const SocketAddress& addr) { RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK(!destroy_called_); addr_ = addr; - PlatformThread::SpawnDetached( + auto thread_function = [this, addr, caller_task_queue = webrtc::TaskQueueBase::Current(), state = state_] { std::vector addresses; @@ -146,8 +146,14 @@ void AsyncResolver::Start(const SocketAddress& addr) { } })); } - }, - "AsyncResolver"); + }; + PlatformThread thread(RunResolution, + new std::function(std::move(thread_function)), + "NameResolution", ThreadAttributes().SetDetached()); + thread.Start(); + // Although |thread| is detached, the PlatformThread contract mandates to call + // Stop() before destruction. The call doesn't actually stop anything. + thread.Stop(); } bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { diff --git a/rtc_base/cpu_time_unittest.cc b/rtc_base/cpu_time_unittest.cc index 94f82f4306..675e86307c 100644 --- a/rtc_base/cpu_time_unittest.cc +++ b/rtc_base/cpu_time_unittest.cc @@ -30,7 +30,8 @@ const int kProcessingTimeMillisecs = 500; const int kWorkingThreads = 2; // Consumes approximately kProcessingTimeMillisecs of CPU time in single thread. -void WorkingFunction(int64_t* counter) { +void WorkingFunction(void* counter_pointer) { + int64_t* counter = reinterpret_cast(counter_pointer); *counter = 0; int64_t stop_cpu_time = rtc::GetThreadCpuTimeNanos() + @@ -61,12 +62,14 @@ TEST(CpuTimeTest, MAYBE_TEST(TwoThreads)) { int64_t thread_start_time_nanos = GetThreadCpuTimeNanos(); int64_t counter1; int64_t counter2; - auto thread1 = PlatformThread::SpawnJoinable( - [&counter1] { WorkingFunction(&counter1); }, "Thread1"); - auto thread2 = PlatformThread::SpawnJoinable( - [&counter2] { WorkingFunction(&counter2); }, "Thread2"); - thread1.Finalize(); - thread2.Finalize(); + PlatformThread thread1(WorkingFunction, reinterpret_cast(&counter1), + "Thread1"); + PlatformThread thread2(WorkingFunction, reinterpret_cast(&counter2), + "Thread2"); + thread1.Start(); + thread2.Start(); + thread1.Stop(); + thread2.Stop(); EXPECT_GE(counter1, 0); EXPECT_GE(counter2, 0); diff --git a/rtc_base/deprecated/recursive_critical_section_unittest.cc b/rtc_base/deprecated/recursive_critical_section_unittest.cc index 9256a76f58..3fb7c519c1 100644 --- a/rtc_base/deprecated/recursive_critical_section_unittest.cc +++ b/rtc_base/deprecated/recursive_critical_section_unittest.cc @@ -329,28 +329,33 @@ class PerfTestData { class PerfTestThread { public: + PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {} + void Start(PerfTestData* data, int repeats, int id) { + RTC_DCHECK(!thread_.IsRunning()); RTC_DCHECK(!data_); data_ = data; repeats_ = repeats; my_id_ = id; - thread_ = PlatformThread::SpawnJoinable( - [this] { - for (int i = 0; i < repeats_; ++i) - data_->AddToCounter(my_id_); - }, - "CsPerf"); + thread_.Start(); } void Stop() { + RTC_DCHECK(thread_.IsRunning()); RTC_DCHECK(data_); - thread_.Finalize(); + thread_.Stop(); repeats_ = 0; data_ = nullptr; my_id_ = 0; } private: + static void ThreadFunc(void* param) { + PerfTestThread* me = static_cast(param); + for (int i = 0; i < me->repeats_; ++i) + me->data_->AddToCounter(me->my_id_); + } + PlatformThread thread_; PerfTestData* data_ = nullptr; int repeats_ = 0; diff --git a/rtc_base/event_tracer.cc b/rtc_base/event_tracer.cc index 1a2b41ec5c..0eae375708 100644 --- a/rtc_base/event_tracer.cc +++ b/rtc_base/event_tracer.cc @@ -79,12 +79,19 @@ namespace rtc { namespace tracing { namespace { +static void EventTracingThreadFunc(void* params); + // Atomic-int fast path for avoiding logging when disabled. static volatile int g_event_logging_active = 0; // TODO(pbos): Log metadata for all threads, etc. class EventLogger final { public: + EventLogger() + : logging_thread_(EventTracingThreadFunc, + this, + "EventTracingThread", + ThreadAttributes().SetPriority(kLowPriority)) {} ~EventLogger() { RTC_DCHECK(thread_checker_.IsCurrent()); } void AddTraceEvent(const char* name, @@ -202,8 +209,7 @@ class EventLogger final { rtc::AtomicOps::CompareAndSwap(&g_event_logging_active, 0, 1)); // Finally start, everything should be set up now. - logging_thread_ = - PlatformThread::SpawnJoinable([this] { Log(); }, "EventTracingThread"); + logging_thread_.Start(); TRACE_EVENT_INSTANT0("webrtc", "EventLogger::Start"); } @@ -217,7 +223,7 @@ class EventLogger final { // Wake up logging thread to finish writing. shutdown_event_.Set(); // Join the logging thread. - logging_thread_.Finalize(); + logging_thread_.Stop(); } private: @@ -320,6 +326,10 @@ class EventLogger final { bool output_file_owned_ = false; }; +static void EventTracingThreadFunc(void* params) { + static_cast(params)->Log(); +} + static EventLogger* volatile g_event_logger = nullptr; static const char* const kDisabledTracePrefix = TRACE_DISABLED_BY_DEFAULT(""); const unsigned char* InternalGetCategoryEnabled(const char* name) { diff --git a/rtc_base/event_unittest.cc b/rtc_base/event_unittest.cc index a634d6e426..31118877cf 100644 --- a/rtc_base/event_unittest.cc +++ b/rtc_base/event_unittest.cc @@ -43,21 +43,22 @@ TEST(EventTest, AutoReset) { class SignalerThread { public: + SignalerThread() : thread_(&ThreadFn, this, "EventPerf") {} void Start(Event* writer, Event* reader) { writer_ = writer; reader_ = reader; - thread_ = PlatformThread::SpawnJoinable( - [this] { - while (!stop_event_.Wait(0)) { - writer_->Set(); - reader_->Wait(Event::kForever); - } - }, - "EventPerf"); + thread_.Start(); } void Stop() { stop_event_.Set(); - thread_.Finalize(); + thread_.Stop(); + } + static void ThreadFn(void* param) { + auto* me = static_cast(param); + while (!me->stop_event_.Wait(0)) { + me->writer_->Set(); + me->reader_->Wait(Event::kForever); + } } Event stop_event_; Event* writer_; diff --git a/rtc_base/logging_unittest.cc b/rtc_base/logging_unittest.cc index dc1208f3f6..225d66d13d 100644 --- a/rtc_base/logging_unittest.cc +++ b/rtc_base/logging_unittest.cc @@ -160,13 +160,18 @@ TEST(LogTest, MultipleStreams) { class LogThread { public: - void Start() { - thread_ = PlatformThread::SpawnJoinable( - [] { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; }, "LogThread"); - } + LogThread() : thread_(&ThreadEntry, this, "LogThread") {} + ~LogThread() { thread_.Stop(); } + + void Start() { thread_.Start(); } private: + void Run() { RTC_LOG(LS_VERBOSE) << "RTC_LOG"; } + + static void ThreadEntry(void* p) { static_cast(p)->Run(); } + PlatformThread thread_; + Event event_; }; // Ensure we don't crash when adding/removing streams while threads are going. diff --git a/rtc_base/platform_thread.cc b/rtc_base/platform_thread.cc index 6d369d747e..c5f3bc3951 100644 --- a/rtc_base/platform_thread.cc +++ b/rtc_base/platform_thread.cc @@ -10,37 +10,32 @@ #include "rtc_base/platform_thread.h" -#include #include #if !defined(WEBRTC_WIN) #include #endif +#include +#include +#include + +#include "absl/memory/memory.h" #include "rtc_base/checks.h" namespace rtc { -namespace { -#if defined(WEBRTC_WIN) -int Win32PriorityFromThreadPriority(ThreadPriority priority) { - switch (priority) { - case ThreadPriority::kLow: - return THREAD_PRIORITY_BELOW_NORMAL; - case ThreadPriority::kNormal: - return THREAD_PRIORITY_NORMAL; - case ThreadPriority::kHigh: - return THREAD_PRIORITY_ABOVE_NORMAL; - case ThreadPriority::kRealtime: - return THREAD_PRIORITY_TIME_CRITICAL; - } -} -#endif +namespace { +struct ThreadStartData { + ThreadRunFunction run_function; + void* obj; + std::string thread_name; + ThreadPriority priority; +}; bool SetPriority(ThreadPriority priority) { #if defined(WEBRTC_WIN) - return SetThreadPriority(GetCurrentThread(), - Win32PriorityFromThreadPriority(priority)) != FALSE; + return SetThreadPriority(GetCurrentThread(), priority) != FALSE; #elif defined(__native_client__) || defined(WEBRTC_FUCHSIA) // Setting thread priorities is not supported in NaCl or Fuchsia. return true; @@ -64,18 +59,21 @@ bool SetPriority(ThreadPriority priority) { const int top_prio = max_prio - 1; const int low_prio = min_prio + 1; switch (priority) { - case ThreadPriority::kLow: + case kLowPriority: param.sched_priority = low_prio; break; - case ThreadPriority::kNormal: + case kNormalPriority: // The -1 ensures that the kHighPriority is always greater or equal to // kNormalPriority. param.sched_priority = (low_prio + top_prio - 1) / 2; break; - case ThreadPriority::kHigh: + case kHighPriority: param.sched_priority = std::max(top_prio - 2, low_prio); break; - case ThreadPriority::kRealtime: + case kHighestPriority: + param.sched_priority = std::max(top_prio - 1, low_prio); + break; + case kRealtimePriority: param.sched_priority = top_prio; break; } @@ -83,129 +81,124 @@ bool SetPriority(ThreadPriority priority) { #endif // defined(WEBRTC_WIN) } +void RunPlatformThread(std::unique_ptr data) { + rtc::SetCurrentThreadName(data->thread_name.c_str()); + data->thread_name.clear(); + SetPriority(data->priority); + data->run_function(data->obj); +} + #if defined(WEBRTC_WIN) -DWORD WINAPI RunPlatformThread(void* param) { +DWORD WINAPI StartThread(void* param) { // The GetLastError() function only returns valid results when it is called // after a Win32 API function that returns a "failed" result. A crash dump // contains the result from GetLastError() and to make sure it does not // falsely report a Windows error we call SetLastError here. ::SetLastError(ERROR_SUCCESS); - auto function = static_cast*>(param); - (*function)(); - delete function; + RunPlatformThread(absl::WrapUnique(static_cast(param))); return 0; } #else -void* RunPlatformThread(void* param) { - auto function = static_cast*>(param); - (*function)(); - delete function; +void* StartThread(void* param) { + RunPlatformThread(absl::WrapUnique(static_cast(param))); return 0; } #endif // defined(WEBRTC_WIN) } // namespace -PlatformThread::PlatformThread(Handle handle, bool joinable) - : handle_(handle), joinable_(joinable) {} - -PlatformThread::PlatformThread(PlatformThread&& rhs) - : handle_(rhs.handle_), joinable_(rhs.joinable_) { - rhs.handle_ = absl::nullopt; -} - -PlatformThread& PlatformThread::operator=(PlatformThread&& rhs) { - Finalize(); - handle_ = rhs.handle_; - joinable_ = rhs.joinable_; - rhs.handle_ = absl::nullopt; - return *this; +PlatformThread::PlatformThread(ThreadRunFunction func, + void* obj, + absl::string_view thread_name, + ThreadAttributes attributes) + : run_function_(func), + attributes_(attributes), + obj_(obj), + name_(thread_name) { + RTC_DCHECK(func); + RTC_DCHECK(!name_.empty()); + // TODO(tommi): Consider lowering the limit to 15 (limit on Linux). + RTC_DCHECK(name_.length() < 64); } PlatformThread::~PlatformThread() { - Finalize(); -} - -PlatformThread PlatformThread::SpawnJoinable( - std::function thread_function, - absl::string_view name, - ThreadAttributes attributes) { - return SpawnThread(std::move(thread_function), name, attributes, - /*joinable=*/true); -} - -PlatformThread PlatformThread::SpawnDetached( - std::function thread_function, - absl::string_view name, - ThreadAttributes attributes) { - return SpawnThread(std::move(thread_function), name, attributes, - /*joinable=*/false); -} - -absl::optional PlatformThread::GetHandle() const { - return handle_; -} - + RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK(!thread_); #if defined(WEBRTC_WIN) -bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) { - RTC_DCHECK(handle_.has_value()); - return handle_.has_value() ? QueueUserAPC(function, *handle_, data) != FALSE - : false; -} -#endif - -void PlatformThread::Finalize() { - if (!handle_.has_value()) - return; -#if defined(WEBRTC_WIN) - if (joinable_) - WaitForSingleObject(*handle_, INFINITE); - CloseHandle(*handle_); -#else - if (joinable_) - RTC_CHECK_EQ(0, pthread_join(*handle_, nullptr)); -#endif - handle_ = absl::nullopt; + RTC_DCHECK(!thread_id_); +#endif // defined(WEBRTC_WIN) } -PlatformThread PlatformThread::SpawnThread( - std::function thread_function, - absl::string_view name, - ThreadAttributes attributes, - bool joinable) { - RTC_DCHECK(thread_function); - RTC_DCHECK(!name.empty()); - // TODO(tommi): Consider lowering the limit to 15 (limit on Linux). - RTC_DCHECK(name.length() < 64); - auto start_thread_function_ptr = - new std::function([thread_function = std::move(thread_function), - name = std::string(name), attributes] { - rtc::SetCurrentThreadName(name.c_str()); - SetPriority(attributes.priority); - thread_function(); - }); +void PlatformThread::Start() { + RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK(!thread_) << "Thread already started?"; + ThreadStartData* data = + new ThreadStartData{run_function_, obj_, name_, attributes_.priority}; #if defined(WEBRTC_WIN) // See bug 2902 for background on STACK_SIZE_PARAM_IS_A_RESERVATION. // Set the reserved stack stack size to 1M, which is the default on Windows // and Linux. - DWORD thread_id = 0; - PlatformThread::Handle handle = ::CreateThread( - nullptr, 1024 * 1024, &RunPlatformThread, start_thread_function_ptr, - STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id); - RTC_CHECK(handle) << "CreateThread failed"; + thread_ = ::CreateThread(nullptr, 1024 * 1024, &StartThread, data, + STACK_SIZE_PARAM_IS_A_RESERVATION, &thread_id_); + RTC_CHECK(thread_) << "CreateThread failed"; + RTC_DCHECK(thread_id_); #else pthread_attr_t attr; pthread_attr_init(&attr); // Set the stack stack size to 1M. pthread_attr_setstacksize(&attr, 1024 * 1024); - pthread_attr_setdetachstate( - &attr, joinable ? PTHREAD_CREATE_JOINABLE : PTHREAD_CREATE_DETACHED); - PlatformThread::Handle handle; - RTC_CHECK_EQ(0, pthread_create(&handle, &attr, &RunPlatformThread, - start_thread_function_ptr)); + pthread_attr_setdetachstate(&attr, attributes_.joinable + ? PTHREAD_CREATE_JOINABLE + : PTHREAD_CREATE_DETACHED); + RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, data)); pthread_attr_destroy(&attr); #endif // defined(WEBRTC_WIN) - return PlatformThread(handle, joinable); } +bool PlatformThread::IsRunning() const { + RTC_DCHECK_RUN_ON(&thread_checker_); +#if defined(WEBRTC_WIN) + return thread_ != nullptr; +#else + return thread_ != 0; +#endif // defined(WEBRTC_WIN) +} + +PlatformThreadRef PlatformThread::GetThreadRef() const { +#if defined(WEBRTC_WIN) + return thread_id_; +#else + return thread_; +#endif // defined(WEBRTC_WIN) +} + +void PlatformThread::Stop() { + RTC_DCHECK_RUN_ON(&thread_checker_); + if (!IsRunning()) + return; + +#if defined(WEBRTC_WIN) + if (attributes_.joinable) { + WaitForSingleObject(thread_, INFINITE); + } + CloseHandle(thread_); + thread_ = nullptr; + thread_id_ = 0; +#else + if (attributes_.joinable) { + RTC_CHECK_EQ(0, pthread_join(thread_, nullptr)); + } + thread_ = 0; +#endif // defined(WEBRTC_WIN) +} + +#if defined(WEBRTC_WIN) +bool PlatformThread::QueueAPC(PAPCFUNC function, ULONG_PTR data) { + RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK(IsRunning()); + + return QueueUserAPC(function, thread_, data) != FALSE; +} +#endif + } // namespace rtc diff --git a/rtc_base/platform_thread.h b/rtc_base/platform_thread.h index 11ccfae3d0..35c0e27432 100644 --- a/rtc_base/platform_thread.h +++ b/rtc_base/platform_thread.h @@ -11,101 +11,103 @@ #ifndef RTC_BASE_PLATFORM_THREAD_H_ #define RTC_BASE_PLATFORM_THREAD_H_ -#include +#ifndef WEBRTC_WIN +#include +#endif #include #include "absl/strings/string_view.h" -#include "absl/types/optional.h" +#include "api/sequence_checker.h" +#include "rtc_base/constructor_magic.h" #include "rtc_base/platform_thread_types.h" namespace rtc { -enum class ThreadPriority { - kLow = 1, - kNormal, - kHigh, - kRealtime, +// Callback function that the spawned thread will enter once spawned. +typedef void (*ThreadRunFunction)(void*); + +enum ThreadPriority { +#ifdef WEBRTC_WIN + kLowPriority = THREAD_PRIORITY_BELOW_NORMAL, + kNormalPriority = THREAD_PRIORITY_NORMAL, + kHighPriority = THREAD_PRIORITY_ABOVE_NORMAL, + kHighestPriority = THREAD_PRIORITY_HIGHEST, + kRealtimePriority = THREAD_PRIORITY_TIME_CRITICAL +#else + kLowPriority = 1, + kNormalPriority = 2, + kHighPriority = 3, + kHighestPriority = 4, + kRealtimePriority = 5 +#endif }; struct ThreadAttributes { - ThreadPriority priority = ThreadPriority::kNormal; + ThreadPriority priority = kNormalPriority; + bool joinable = true; + ThreadAttributes& SetPriority(ThreadPriority priority_param) { priority = priority_param; return *this; } + ThreadAttributes& SetDetached() { + joinable = false; + return *this; + } }; -// Represents a simple worker thread. -class PlatformThread final { +// Represents a simple worker thread. The implementation must be assumed +// to be single threaded, meaning that all methods of the class, must be +// called from the same thread, including instantiation. +class PlatformThread { public: - // Handle is the base platform thread handle. -#if defined(WEBRTC_WIN) - using Handle = HANDLE; -#else - using Handle = pthread_t; -#endif // defined(WEBRTC_WIN) - // This ctor creates the PlatformThread with an unset handle (returning true - // in empty()) and is provided for convenience. - // TODO(bugs.webrtc.org/12727) Look into if default and move support can be - // removed. - PlatformThread() = default; - - // Moves |rhs| into this, storing an empty state in |rhs|. - // TODO(bugs.webrtc.org/12727) Look into if default and move support can be - // removed. - PlatformThread(PlatformThread&& rhs); - - // Moves |rhs| into this, storing an empty state in |rhs|. - // TODO(bugs.webrtc.org/12727) Look into if default and move support can be - // removed. - PlatformThread& operator=(PlatformThread&& rhs); - - // For a PlatformThread that's been spawned joinable, the destructor suspends - // the calling thread until the created thread exits unless the thread has - // already exited. + PlatformThread(ThreadRunFunction func, + void* obj, + absl::string_view thread_name, + ThreadAttributes attributes = ThreadAttributes()); virtual ~PlatformThread(); - // Finalizes any allocated resources. - // For a PlatformThread that's been spawned joinable, Finalize() suspends - // the calling thread until the created thread exits unless the thread has - // already exited. - // empty() returns true after completion. - void Finalize(); + const std::string& name() const { return name_; } - // Returns true if default constructed, moved from, or Finalize()ed. - bool empty() const { return !handle_.has_value(); } + // Spawns a thread and tries to set thread priority according to the priority + // from when CreateThread was called. + // Start can only be called after the constructor or after a call to Stop(). + void Start(); - // Creates a started joinable thread which will be joined when the returned - // PlatformThread destructs or Finalize() is called. - static PlatformThread SpawnJoinable( - std::function thread_function, - absl::string_view name, - ThreadAttributes attributes = ThreadAttributes()); + bool IsRunning() const; - // Creates a started detached thread. The caller has to use external - // synchronization as nothing is provided by the PlatformThread construct. - static PlatformThread SpawnDetached( - std::function thread_function, - absl::string_view name, - ThreadAttributes attributes = ThreadAttributes()); + // Returns an identifier for the worker thread that can be used to do + // thread checks. + PlatformThreadRef GetThreadRef() const; - // Returns the base platform thread handle of this thread. - absl::optional GetHandle() const; + // Stop() prepares the PlatformThread for destruction or another call to + // Start(). For a PlatformThread that's been created with + // ThreadAttributes::joinable true (the default), Stop() suspends the calling + // thread until the created thread exits unless the thread has already exited. + // Stop() can only be called after calling Start(). + void Stop(); + protected: #if defined(WEBRTC_WIN) - // Queue a Windows APC function that runs when the thread is alertable. + // Exposed to derived classes to allow for special cases specific to Windows. bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data); #endif private: - PlatformThread(Handle handle, bool joinable); - static PlatformThread SpawnThread(std::function thread_function, - absl::string_view name, - ThreadAttributes attributes, - bool joinable); - - absl::optional handle_; - bool joinable_ = false; + ThreadRunFunction const run_function_ = nullptr; + const ThreadAttributes attributes_; + void* const obj_; + // TODO(pbos): Make sure call sites use string literals and update to a const + // char* instead of a std::string. + const std::string name_; + webrtc::SequenceChecker thread_checker_; +#if defined(WEBRTC_WIN) + HANDLE thread_ = nullptr; + DWORD thread_id_ = 0; +#else + pthread_t thread_ = 0; +#endif // defined(WEBRTC_WIN) + RTC_DISALLOW_COPY_AND_ASSIGN(PlatformThread); }; } // namespace rtc diff --git a/rtc_base/platform_thread_unittest.cc b/rtc_base/platform_thread_unittest.cc index 0da822cf85..d09772fddc 100644 --- a/rtc_base/platform_thread_unittest.cc +++ b/rtc_base/platform_thread_unittest.cc @@ -10,73 +10,69 @@ #include "rtc_base/platform_thread.h" -#include "absl/types/optional.h" #include "rtc_base/event.h" #include "system_wrappers/include/sleep.h" #include "test/gmock.h" namespace rtc { +namespace { -TEST(PlatformThreadTest, DefaultConstructedIsEmpty) { - PlatformThread thread; - EXPECT_EQ(thread.GetHandle(), absl::nullopt); - EXPECT_TRUE(thread.empty()); +void NullRunFunction(void* obj) {} + +// Function that sets a boolean. +void SetFlagRunFunction(void* obj) { + bool* obj_as_bool = static_cast(obj); + *obj_as_bool = true; } -TEST(PlatformThreadTest, StartFinalize) { - PlatformThread thread = PlatformThread::SpawnJoinable([] {}, "1"); - EXPECT_NE(thread.GetHandle(), absl::nullopt); - EXPECT_FALSE(thread.empty()); - thread.Finalize(); - EXPECT_TRUE(thread.empty()); - thread = PlatformThread::SpawnDetached([] {}, "2"); - EXPECT_FALSE(thread.empty()); - thread.Finalize(); - EXPECT_TRUE(thread.empty()); +void StdFunctionRunFunction(void* obj) { + std::function* fun = static_cast*>(obj); + (*fun)(); } -TEST(PlatformThreadTest, MovesEmpty) { - PlatformThread thread1; - PlatformThread thread2 = std::move(thread1); - EXPECT_TRUE(thread1.empty()); - EXPECT_TRUE(thread2.empty()); +} // namespace + +TEST(PlatformThreadTest, StartStop) { + PlatformThread thread(&NullRunFunction, nullptr, "PlatformThreadTest"); + EXPECT_TRUE(thread.name() == "PlatformThreadTest"); + EXPECT_TRUE(thread.GetThreadRef() == 0); + thread.Start(); + EXPECT_TRUE(thread.GetThreadRef() != 0); + thread.Stop(); + EXPECT_TRUE(thread.GetThreadRef() == 0); } -TEST(PlatformThreadTest, MovesHandles) { - PlatformThread thread1 = PlatformThread::SpawnJoinable([] {}, "1"); - PlatformThread thread2 = std::move(thread1); - EXPECT_TRUE(thread1.empty()); - EXPECT_FALSE(thread2.empty()); - thread1 = PlatformThread::SpawnDetached([] {}, "2"); - thread2 = std::move(thread1); - EXPECT_TRUE(thread1.empty()); - EXPECT_FALSE(thread2.empty()); -} - -TEST(PlatformThreadTest, - TwoThreadHandlesAreDifferentWhenStartedAndEqualWhenJoined) { - PlatformThread thread1 = PlatformThread(); - PlatformThread thread2 = PlatformThread(); - EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle()); - thread1 = PlatformThread::SpawnJoinable([] {}, "1"); - thread2 = PlatformThread::SpawnJoinable([] {}, "2"); - EXPECT_NE(thread1.GetHandle(), thread2.GetHandle()); - thread1.Finalize(); - EXPECT_NE(thread1.GetHandle(), thread2.GetHandle()); - thread2.Finalize(); - EXPECT_EQ(thread1.GetHandle(), thread2.GetHandle()); +TEST(PlatformThreadTest, StartStop2) { + PlatformThread thread1(&NullRunFunction, nullptr, "PlatformThreadTest1"); + PlatformThread thread2(&NullRunFunction, nullptr, "PlatformThreadTest2"); + EXPECT_TRUE(thread1.GetThreadRef() == thread2.GetThreadRef()); + thread1.Start(); + thread2.Start(); + EXPECT_TRUE(thread1.GetThreadRef() != thread2.GetThreadRef()); + thread2.Stop(); + thread1.Stop(); } TEST(PlatformThreadTest, RunFunctionIsCalled) { bool flag = false; - PlatformThread::SpawnJoinable([&] { flag = true; }, "T"); + PlatformThread thread(&SetFlagRunFunction, &flag, "RunFunctionIsCalled"); + thread.Start(); + + // At this point, the flag may be either true or false. + thread.Stop(); + + // We expect the thread to have run at least once. EXPECT_TRUE(flag); } TEST(PlatformThreadTest, JoinsThread) { // This test flakes if there are problems with the join implementation. + EXPECT_TRUE(ThreadAttributes().joinable); rtc::Event event; - PlatformThread::SpawnJoinable([&] { event.Set(); }, "T"); + std::function thread_function = [&] { event.Set(); }; + PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T"); + thread.Start(); + thread.Stop(); EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/0)); } @@ -87,14 +83,18 @@ TEST(PlatformThreadTest, StopsBeforeDetachedThreadExits) { rtc::Event thread_started; rtc::Event thread_continue; rtc::Event thread_exiting; - PlatformThread::SpawnDetached( - [&] { - thread_started.Set(); - thread_continue.Wait(Event::kForever); - flag = true; - thread_exiting.Set(); - }, - "T"); + std::function thread_function = [&] { + thread_started.Set(); + thread_continue.Wait(Event::kForever); + flag = true; + thread_exiting.Set(); + }; + { + PlatformThread thread(&StdFunctionRunFunction, &thread_function, "T", + ThreadAttributes().SetDetached()); + thread.Start(); + thread.Stop(); + } thread_started.Wait(Event::kForever); EXPECT_FALSE(flag); thread_continue.Set(); diff --git a/rtc_base/rate_limiter_unittest.cc b/rtc_base/rate_limiter_unittest.cc index eda644b4ca..8ebf8aa67b 100644 --- a/rtc_base/rate_limiter_unittest.cc +++ b/rtc_base/rate_limiter_unittest.cc @@ -127,6 +127,10 @@ class ThreadTask { rtc::Event end_signal_; }; +void RunTask(void* thread_task) { + reinterpret_cast(thread_task)->Run(); +} + TEST_F(RateLimitTest, MultiThreadedUsage) { // Simple sanity test, with different threads calling the various methods. // Runs a few simple tasks, each on its own thread, but coordinated with @@ -145,8 +149,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { EXPECT_TRUE(rate_limiter_->SetWindowSize(kWindowSizeMs / 2)); } } set_window_size_task(rate_limiter.get()); - auto thread1 = rtc::PlatformThread::SpawnJoinable( - [&set_window_size_task] { set_window_size_task.Run(); }, "Thread1"); + rtc::PlatformThread thread1(RunTask, &set_window_size_task, "Thread1"); + thread1.Start(); class SetMaxRateTask : public ThreadTask { public: @@ -156,8 +160,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { void DoRun() override { rate_limiter_->SetMaxRate(kMaxRateBps * 2); } } set_max_rate_task(rate_limiter.get()); - auto thread2 = rtc::PlatformThread::SpawnJoinable( - [&set_max_rate_task] { set_max_rate_task.Run(); }, "Thread2"); + rtc::PlatformThread thread2(RunTask, &set_max_rate_task, "Thread2"); + thread2.Start(); class UseRateTask : public ThreadTask { public: @@ -173,8 +177,8 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { SimulatedClock* const clock_; } use_rate_task(rate_limiter.get(), &clock_); - auto thread3 = rtc::PlatformThread::SpawnJoinable( - [&use_rate_task] { use_rate_task.Run(); }, "Thread3"); + rtc::PlatformThread thread3(RunTask, &use_rate_task, "Thread3"); + thread3.Start(); set_window_size_task.start_signal_.Set(); EXPECT_TRUE(set_window_size_task.end_signal_.Wait(kMaxTimeoutMs)); @@ -187,6 +191,10 @@ TEST_F(RateLimitTest, MultiThreadedUsage) { // All rate consumed. EXPECT_FALSE(rate_limiter->TryUseRate(1)); + + thread1.Stop(); + thread2.Stop(); + thread3.Stop(); } } // namespace webrtc diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 909698611e..71a9e8a3fe 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -93,12 +93,16 @@ void EventAssign(struct event* ev, rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { switch (priority) { case Priority::HIGH: - return rtc::ThreadPriority::kRealtime; + return rtc::kRealtimePriority; case Priority::LOW: - return rtc::ThreadPriority::kLow; + return rtc::kLowPriority; case Priority::NORMAL: - return rtc::ThreadPriority::kNormal; + return rtc::kNormalPriority; + default: + RTC_NOTREACHED(); + break; } + return rtc::kNormalPriority; } class TaskQueueLibevent final : public TaskQueueBase { @@ -116,6 +120,7 @@ class TaskQueueLibevent final : public TaskQueueBase { ~TaskQueueLibevent() override = default; + static void ThreadMain(void* context); static void OnWakeup(int socket, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT @@ -167,7 +172,11 @@ class TaskQueueLibevent::SetTimerTask : public QueuedTask { TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority) - : event_base_(event_base_new()) { + : event_base_(event_base_new()), + thread_(&TaskQueueLibevent::ThreadMain, + this, + queue_name, + rtc::ThreadAttributes().SetPriority(priority)) { int fds[2]; RTC_CHECK(pipe(fds) == 0); SetNonBlocking(fds[0]); @@ -178,18 +187,7 @@ TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_, EV_READ | EV_PERSIST, OnWakeup, this); event_add(&wakeup_event_, 0); - thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { - { - CurrentTaskQueueSetter set_current(this); - while (is_active_) - event_base_loop(event_base_, 0); - } - - for (TimerEvent* timer : pending_timers_) - delete timer; - }, - queue_name, rtc::ThreadAttributes().SetPriority(priority)); + thread_.Start(); } void TaskQueueLibevent::Delete() { @@ -204,7 +202,7 @@ void TaskQueueLibevent::Delete() { nanosleep(&ts, nullptr); } - thread_.Finalize(); + thread_.Stop(); event_del(&wakeup_event_); @@ -257,6 +255,20 @@ void TaskQueueLibevent::PostDelayedTask(std::unique_ptr task, } } +// static +void TaskQueueLibevent::ThreadMain(void* context) { + TaskQueueLibevent* me = static_cast(context); + + { + CurrentTaskQueueSetter set_current(me); + while (me->is_active_) + event_base_loop(me->event_base_, 0); + } + + for (TimerEvent* timer : me->pending_timers_) + delete timer; +} + // static void TaskQueueLibevent::OnWakeup(int socket, short flags, // NOLINT diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc index 548f7ef69a..bd5bb97988 100644 --- a/rtc_base/task_queue_stdlib.cc +++ b/rtc_base/task_queue_stdlib.cc @@ -36,11 +36,14 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority( TaskQueueFactory::Priority priority) { switch (priority) { case TaskQueueFactory::Priority::HIGH: - return rtc::ThreadPriority::kRealtime; + return rtc::kRealtimePriority; case TaskQueueFactory::Priority::LOW: - return rtc::ThreadPriority::kLow; + return rtc::kLowPriority; case TaskQueueFactory::Priority::NORMAL: - return rtc::ThreadPriority::kNormal; + return rtc::kNormalPriority; + default: + RTC_NOTREACHED(); + return rtc::kNormalPriority; } } @@ -75,6 +78,8 @@ class TaskQueueStdlib final : public TaskQueueBase { NextTask GetNextTask(); + static void ThreadMain(void* context); + void ProcessTasks(); void NotifyWake(); @@ -121,13 +126,11 @@ TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name, : started_(/*manual_reset=*/false, /*initially_signaled=*/false), stopped_(/*manual_reset=*/false, /*initially_signaled=*/false), flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false), - thread_(rtc::PlatformThread::SpawnJoinable( - [this] { - CurrentTaskQueueSetter set_current(this); - ProcessTasks(); - }, - queue_name, - rtc::ThreadAttributes().SetPriority(priority))) { + thread_(&TaskQueueStdlib::ThreadMain, + this, + queue_name, + rtc::ThreadAttributes().SetPriority(priority)) { + thread_.Start(); started_.Wait(rtc::Event::kForever); } @@ -142,7 +145,7 @@ void TaskQueueStdlib::Delete() { NotifyWake(); stopped_.Wait(rtc::Event::kForever); - thread_.Finalize(); + thread_.Stop(); delete this; } @@ -219,6 +222,13 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() { return result; } +// static +void TaskQueueStdlib::ThreadMain(void* context) { + TaskQueueStdlib* me = static_cast(context); + CurrentTaskQueueSetter set_current(me); + me->ProcessTasks(); +} + void TaskQueueStdlib::ProcessTasks() { started_.Set(); diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc index 39dc9b9373..8bfe5e5c44 100644 --- a/rtc_base/task_queue_win.cc +++ b/rtc_base/task_queue_win.cc @@ -29,7 +29,6 @@ #include #include "absl/strings/string_view.h" -#include "absl/types/optional.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/arraysize.h" @@ -57,12 +56,16 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority( TaskQueueFactory::Priority priority) { switch (priority) { case TaskQueueFactory::Priority::HIGH: - return rtc::ThreadPriority::kRealtime; + return rtc::kRealtimePriority; case TaskQueueFactory::Priority::LOW: - return rtc::ThreadPriority::kLow; + return rtc::kLowPriority; case TaskQueueFactory::Priority::NORMAL: - return rtc::ThreadPriority::kNormal; + return rtc::kNormalPriority; + default: + RTC_NOTREACHED(); + break; } + return rtc::kNormalPriority; } int64_t GetTick() { @@ -164,6 +167,24 @@ class TaskQueueWin : public TaskQueueBase { void RunPendingTasks(); private: + static void ThreadMain(void* context); + + class WorkerThread : public rtc::PlatformThread { + public: + WorkerThread(rtc::ThreadRunFunction func, + void* obj, + absl::string_view thread_name, + rtc::ThreadPriority priority) + : PlatformThread(func, + obj, + thread_name, + rtc::ThreadAttributes().SetPriority(priority)) {} + + bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { + return rtc::PlatformThread::QueueAPC(apc_function, data); + } + }; + void RunThreadMain(); bool ProcessQueuedMessages(); void RunDueTasks(); @@ -186,7 +207,7 @@ class TaskQueueWin : public TaskQueueBase { greater> timer_tasks_; UINT_PTR timer_id_ = 0; - rtc::PlatformThread thread_; + WorkerThread thread_; Mutex pending_lock_; std::queue> pending_ RTC_GUARDED_BY(pending_lock_); @@ -195,12 +216,10 @@ class TaskQueueWin : public TaskQueueBase { TaskQueueWin::TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority) - : in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { + : thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority), + in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { RTC_DCHECK(in_queue_); - thread_ = rtc::PlatformThread::SpawnJoinable( - [this] { RunThreadMain(); }, queue_name, - rtc::ThreadAttributes().SetPriority(priority)); - + thread_.Start(); rtc::Event event(false, false); RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, reinterpret_cast(&event))); @@ -209,13 +228,11 @@ TaskQueueWin::TaskQueueWin(absl::string_view queue_name, void TaskQueueWin::Delete() { RTC_DCHECK(!IsCurrent()); - RTC_CHECK(thread_.GetHandle() != absl::nullopt); - while ( - !::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) { + while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); Sleep(1); } - thread_.Finalize(); + thread_.Stop(); ::CloseHandle(in_queue_); delete this; } @@ -238,9 +255,7 @@ void TaskQueueWin::PostDelayedTask(std::unique_ptr task, // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the // task pointer and timestamp as LPARAM and WPARAM. auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task)); - RTC_CHECK(thread_.GetHandle() != absl::nullopt); - if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()), - WM_QUEUE_DELAYED_TASK, 0, + if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0, reinterpret_cast(task_info))) { delete task_info; } @@ -262,6 +277,11 @@ void TaskQueueWin::RunPendingTasks() { } } +// static +void TaskQueueWin::ThreadMain(void* context) { + static_cast(context)->RunThreadMain(); +} + void TaskQueueWin::RunThreadMain() { CurrentTaskQueueSetter set_current(this); HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_}; diff --git a/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc b/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc index b77d86719f..fcd9c9b8f1 100644 --- a/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc +++ b/sdk/android/native_unittests/stacktrace/stacktrace_unittest.cc @@ -153,24 +153,28 @@ class SleepDeadlock : public DeadlockInterface { } }; +// This is the function that is exectued by the thread that will deadlock and +// have its stacktrace captured. +void ThreadFunction(void* void_params) { + ThreadParams* params = static_cast(void_params); + params->tid = gettid(); + + params->deadlock_region_start_address = GetCurrentRelativeExecutionAddress(); + params->deadlock_start_event.Set(); + params->deadlock_impl->Deadlock(); + params->deadlock_region_end_address = GetCurrentRelativeExecutionAddress(); + + params->deadlock_done_event.Set(); +} + void TestStacktrace(std::unique_ptr deadlock_impl) { // Set params that will be sent to other thread. ThreadParams params; params.deadlock_impl = deadlock_impl.get(); // Spawn thread. - auto thread = rtc::PlatformThread::SpawnJoinable( - [¶ms] { - params.tid = gettid(); - params.deadlock_region_start_address = - GetCurrentRelativeExecutionAddress(); - params.deadlock_start_event.Set(); - params.deadlock_impl->Deadlock(); - params.deadlock_region_end_address = - GetCurrentRelativeExecutionAddress(); - params.deadlock_done_event.Set(); - }, - "StacktraceTest"); + rtc::PlatformThread thread(&ThreadFunction, ¶ms, "StacktraceTest"); + thread.Start(); // Wait until the thread has entered the deadlock region, and take a very // brief nap to give it time to reach the actual deadlock. @@ -194,6 +198,8 @@ void TestStacktrace(std::unique_ptr deadlock_impl) { << rtc::ToHex(params.deadlock_region_start_address) << ", " << rtc::ToHex(params.deadlock_region_end_address) << "] not contained in: " << StackTraceToString(stack_trace); + + thread.Stop(); } class LookoutLogSink final : public rtc::LogSink { @@ -253,9 +259,13 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) { // Start a thread that waits for an event. rtc::Event ev; - auto thread = rtc::PlatformThread::SpawnJoinable( - [&ev] { ev.Wait(rtc::Event::kForever); }, - "TestRtcEventDeadlockDetection"); + rtc::PlatformThread thread( + [](void* arg) { + auto* ev = static_cast(arg); + ev->Wait(rtc::Event::kForever); + }, + &ev, "TestRtcEventDeadlockDetection"); + thread.Start(); // The message should appear after 3 sec. We'll wait up to 10 sec in an // attempt to not be flaky. @@ -263,7 +273,7 @@ TEST(Stacktrace, TestRtcEventDeadlockDetection) { // Unblock the thread and shut it down. ev.Set(); - thread.Finalize(); + thread.Stop(); rtc::LogMessage::RemoveLogToStream(&sink); } diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc index 53fb14e606..436b3ba1d2 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc @@ -142,9 +142,12 @@ void DefaultVideoQualityAnalyzer::Start( int max_threads_count) { test_label_ = std::move(test_case_name); for (int i = 0; i < max_threads_count; i++) { - thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable( - [this] { ProcessComparisons(); }, - "DefaultVideoQualityAnalyzerWorker-" + std::to_string(i))); + auto thread = std::make_unique( + &DefaultVideoQualityAnalyzer::ProcessComparisonsThread, this, + ("DefaultVideoQualityAnalyzerWorker-" + std::to_string(i)).data(), + rtc::ThreadAttributes().SetPriority(rtc::kNormalPriority)); + thread->Start(); + thread_pool_.push_back(std::move(thread)); } { MutexLock lock(&lock_); @@ -544,6 +547,10 @@ void DefaultVideoQualityAnalyzer::Stop() { } StopMeasuringCpuProcessTime(); comparison_available_event_.Set(); + for (auto& thread : thread_pool_) { + thread->Stop(); + } + // PlatformThread have to be deleted on the same thread, where it was created thread_pool_.clear(); // Perform final Metrics update. On this place analyzer is stopped and no one @@ -670,6 +677,10 @@ void DefaultVideoQualityAnalyzer::AddComparison( StopExcludingCpuThreadTime(); } +void DefaultVideoQualityAnalyzer::ProcessComparisonsThread(void* obj) { + static_cast(obj)->ProcessComparisons(); +} + void DefaultVideoQualityAnalyzer::ProcessComparisons() { while (true) { // Try to pick next comparison to perform from the queue. diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h index 626fa246e5..de9419dda9 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h @@ -560,7 +560,7 @@ class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface { std::deque comparisons_ RTC_GUARDED_BY(comparison_lock_); AnalyzerStats analyzer_stats_ RTC_GUARDED_BY(comparison_lock_); - std::vector thread_pool_; + std::vector> thread_pool_; rtc::Event comparison_available_event_; Mutex cpu_measurement_lock_; diff --git a/video/video_analyzer.cc b/video/video_analyzer.cc index 81dcf055b8..6698dadf42 100644 --- a/video/video_analyzer.cc +++ b/video/video_analyzer.cc @@ -137,12 +137,10 @@ VideoAnalyzer::VideoAnalyzer(test::LayerFilteringTransport* transport, } for (uint32_t i = 0; i < num_cores; ++i) { - comparison_thread_pool_.push_back(rtc::PlatformThread::SpawnJoinable( - [this] { - while (CompareFrames()) { - } - }, - "Analyzer")); + rtc::PlatformThread* thread = + new rtc::PlatformThread(&FrameComparisonThread, this, "Analyzer"); + thread->Start(); + comparison_thread_pool_.push_back(thread); } if (!rtp_dump_name.empty()) { @@ -157,8 +155,10 @@ VideoAnalyzer::~VideoAnalyzer() { MutexLock lock(&comparison_lock_); quit_ = true; } - // Joins all threads. - comparison_thread_pool_.clear(); + for (rtc::PlatformThread* thread : comparison_thread_pool_) { + thread->Stop(); + delete thread; + } } void VideoAnalyzer::SetReceiver(PacketReceiver* receiver) { @@ -533,6 +533,12 @@ void VideoAnalyzer::PollStats() { memory_usage_.AddSample(rtc::GetProcessResidentSizeBytes()); } +void VideoAnalyzer::FrameComparisonThread(void* obj) { + VideoAnalyzer* analyzer = static_cast(obj); + while (analyzer->CompareFrames()) { + } +} + bool VideoAnalyzer::CompareFrames() { if (AllFramesRecorded()) return false; diff --git a/video/video_analyzer.h b/video/video_analyzer.h index 68861d1b5f..18bacc16fc 100644 --- a/video/video_analyzer.h +++ b/video/video_analyzer.h @@ -302,7 +302,7 @@ class VideoAnalyzer : public PacketReceiver, const double avg_ssim_threshold_; bool is_quick_test_enabled_; - std::vector comparison_thread_pool_; + std::vector comparison_thread_pool_; rtc::Event comparison_available_event_; std::deque comparisons_ RTC_GUARDED_BY(comparison_lock_); bool quit_ RTC_GUARDED_BY(comparison_lock_);