Add thread checks to FifoBuffer (test-only class)

These checks replace the need for a mutex as the usage of the
StreamInterface methods is consistently on the same thread as
the callbacks.

Bug: none
Change-Id: I0c5aaddcbdaa4a6a84c3bc73306563a9f8a8821d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/347902
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42148}
This commit is contained in:
Tommi 2024-04-23 05:15:13 +02:00 committed by WebRTC LUCI CQ
parent 5bfcc873d9
commit 3e7d35c55d
3 changed files with 19 additions and 21 deletions

View file

@ -36,7 +36,6 @@ rtc_library("fifo_buffer") {
"..:stream",
"..:threading",
"../../api/task_queue:pending_task_safety_flag",
"../synchronization:mutex",
]
}

View file

@ -39,20 +39,20 @@ FifoBuffer::FifoBuffer(size_t size, Thread* owner)
FifoBuffer::~FifoBuffer() {}
bool FifoBuffer::GetBuffered(size_t* size) const {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
*size = data_length_;
return true;
}
StreamState FifoBuffer::GetState() const {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
return state_;
}
StreamResult FifoBuffer::Read(rtc::ArrayView<uint8_t> buffer,
size_t& bytes_read,
int& error) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
const bool was_writable = data_length_ < buffer_length_;
size_t copy = 0;
StreamResult result = ReadLocked(buffer.data(), buffer.size(), &copy);
@ -75,7 +75,7 @@ StreamResult FifoBuffer::Read(rtc::ArrayView<uint8_t> buffer,
StreamResult FifoBuffer::Write(rtc::ArrayView<const uint8_t> buffer,
size_t& bytes_written,
int& error) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
const bool was_readable = (data_length_ > 0);
size_t copy = 0;
@ -94,12 +94,12 @@ StreamResult FifoBuffer::Write(rtc::ArrayView<const uint8_t> buffer,
}
void FifoBuffer::Close() {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
state_ = SS_CLOSED;
}
const void* FifoBuffer::GetReadData(size_t* size) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
*size = (read_position_ + data_length_ <= buffer_length_)
? data_length_
: buffer_length_ - read_position_;
@ -107,8 +107,8 @@ const void* FifoBuffer::GetReadData(size_t* size) {
}
void FifoBuffer::ConsumeReadData(size_t size) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(size <= data_length_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_LE(size, data_length_);
const bool was_writable = data_length_ < buffer_length_;
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
@ -118,7 +118,8 @@ void FifoBuffer::ConsumeReadData(size_t size) {
}
void* FifoBuffer::GetWriteBuffer(size_t* size) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
if (state_ == SS_CLOSED) {
return nullptr;
}
@ -138,8 +139,8 @@ void* FifoBuffer::GetWriteBuffer(size_t* size) {
}
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(size <= buffer_length_ - data_length_);
RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_LE(size, buffer_length_ - data_length_);
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {

View file

@ -15,7 +15,6 @@
#include "api/task_queue/pending_task_safety_flag.h"
#include "rtc_base/stream.h"
#include "rtc_base/synchronization/mutex.h"
namespace rtc {
@ -78,6 +77,7 @@ class FifoBuffer final : public StreamInterface {
private:
void PostEvent(int events, int err) {
RTC_DCHECK_RUN_ON(owner_);
owner_->PostTask(
webrtc::SafeTask(task_safety_.flag(), [this, events, err]() {
RTC_DCHECK_RUN_ON(&callback_sequence_);
@ -88,31 +88,29 @@ class FifoBuffer final : public StreamInterface {
// Helper method that implements Read. Caller must acquire a lock
// when calling this method.
StreamResult ReadLocked(void* buffer, size_t bytes, size_t* bytes_read)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
RTC_EXCLUSIVE_LOCKS_REQUIRED(callback_sequence_);
// Helper method that implements Write. Caller must acquire a lock
// when calling this method.
StreamResult WriteLocked(const void* buffer,
size_t bytes,
size_t* bytes_written)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
RTC_EXCLUSIVE_LOCKS_REQUIRED(callback_sequence_);
webrtc::ScopedTaskSafety task_safety_;
// keeps the opened/closed state of the stream
StreamState state_ RTC_GUARDED_BY(mutex_);
StreamState state_ RTC_GUARDED_BY(callback_sequence_);
// the allocated buffer
std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(mutex_);
std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(callback_sequence_);
// size of the allocated buffer
const size_t buffer_length_;
// amount of readable data in the buffer
size_t data_length_ RTC_GUARDED_BY(mutex_);
size_t data_length_ RTC_GUARDED_BY(callback_sequence_);
// offset to the readable data
size_t read_position_ RTC_GUARDED_BY(mutex_);
size_t read_position_ RTC_GUARDED_BY(callback_sequence_);
// stream callbacks are dispatched on this thread
Thread* const owner_;
// object lock
mutable webrtc::Mutex mutex_;
};
} // namespace rtc