Remove locks from BufferQueue (not needed).

Also remove test code that can cause leaks into production.
Add sequence checkers.

Bug: webrtc:11988
Change-Id: I67b4cec6ee77d73ccffbbc88c9081ebb3c3cc423
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185503
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32228}
This commit is contained in:
Tomas Gunnarsson 2020-09-29 13:04:01 +02:00 committed by Commit Bot
parent f360506c7a
commit b6bc09b099
7 changed files with 63 additions and 49 deletions

View file

@ -99,6 +99,7 @@ rtc_library("rtc_p2p") {
"../rtc_base:checks",
"../rtc_base:rtc_numerics",
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/synchronization:sequence_checker",
# Needed by pseudo_tcp, which should move to a separate target.
"../rtc_base:safe_minmax",

View file

@ -73,6 +73,8 @@ rtc::StreamResult StreamInterfaceChannel::Read(void* buffer,
size_t buffer_len,
size_t* read,
int* error) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (state_ == rtc::SS_CLOSED)
return rtc::SR_EOS;
if (state_ == rtc::SS_OPENING)
@ -89,6 +91,7 @@ rtc::StreamResult StreamInterfaceChannel::Write(const void* data,
size_t data_len,
size_t* written,
int* error) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// Always succeeds, since this is an unreliable transport anyway.
// TODO(zhihuang): Should this block if ice_transport_'s temporarily
// unwritable?
@ -102,6 +105,7 @@ rtc::StreamResult StreamInterfaceChannel::Write(const void* data,
}
bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (packets_.size() > 0) {
RTC_LOG(LS_WARNING) << "Packet already in queue.";
}
@ -118,10 +122,12 @@ bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
}
rtc::StreamState StreamInterfaceChannel::GetState() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return state_;
}
void StreamInterfaceChannel::Close() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
packets_.Clear();
state_ = rtc::SS_CLOSED;
}

View file

@ -24,6 +24,7 @@
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
#include "rtc_base/strings/string_builder.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/thread_checker.h"
namespace rtc {
@ -54,9 +55,10 @@ class StreamInterfaceChannel : public rtc::StreamInterface {
int* error) override;
private:
IceTransportInternal* ice_transport_; // owned by DtlsTransport
rtc::StreamState state_;
rtc::BufferQueue packets_;
webrtc::SequenceChecker sequence_checker_;
IceTransportInternal* const ice_transport_; // owned by DtlsTransport
rtc::StreamState state_ RTC_GUARDED_BY(sequence_checker_);
rtc::BufferQueue packets_ RTC_GUARDED_BY(sequence_checker_);
RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterfaceChannel);
};

View file

@ -151,6 +151,7 @@ rtc_library("rtc_base_approved") {
":stringutils",
":thread_checker",
":timeutils",
"synchronization:sequence_checker",
]
}

View file

@ -21,23 +21,20 @@ BufferQueue::BufferQueue(size_t capacity, size_t default_size)
: capacity_(capacity), default_size_(default_size) {}
BufferQueue::~BufferQueue() {
webrtc::MutexLock lock(&mutex_);
for (Buffer* buffer : queue_) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
for (Buffer* buffer : queue_)
delete buffer;
}
for (Buffer* buffer : free_list_) {
for (Buffer* buffer : free_list_)
delete buffer;
}
}
size_t BufferQueue::size() const {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&sequence_checker_);
return queue_.size();
}
void BufferQueue::Clear() {
webrtc::MutexLock lock(&mutex_);
RTC_DCHECK_RUN_ON(&sequence_checker_);
while (!queue_.empty()) {
free_list_.push_back(queue_.front());
queue_.pop_front();
@ -45,36 +42,30 @@ void BufferQueue::Clear() {
}
bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) {
webrtc::MutexLock lock(&mutex_);
if (queue_.empty()) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (queue_.empty())
return false;
}
bool was_writable = queue_.size() < capacity_;
Buffer* packet = queue_.front();
queue_.pop_front();
bytes = std::min(bytes, packet->size());
memcpy(buffer, packet->data(), bytes);
if (bytes_read) {
if (bytes_read)
*bytes_read = bytes;
}
free_list_.push_back(packet);
if (!was_writable) {
NotifyWritableForTest();
}
return true;
}
bool BufferQueue::WriteBack(const void* buffer,
size_t bytes,
size_t* bytes_written) {
webrtc::MutexLock lock(&mutex_);
if (queue_.size() == capacity_) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (queue_.size() == capacity_)
return false;
}
bool was_readable = !queue_.empty();
Buffer* packet;
if (!free_list_.empty()) {
packet = free_list_.back();
@ -84,13 +75,10 @@ bool BufferQueue::WriteBack(const void* buffer,
}
packet->SetData(static_cast<const uint8_t*>(buffer), bytes);
if (bytes_written) {
if (bytes_written)
*bytes_written = bytes;
}
queue_.push_back(packet);
if (!was_readable) {
NotifyReadableForTest();
}
return true;
}

View file

@ -18,16 +18,16 @@
#include "rtc_base/buffer.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/thread_annotations.h"
namespace rtc {
class BufferQueue {
class BufferQueue final {
public:
// Creates a buffer queue with a given capacity and default buffer size.
BufferQueue(size_t capacity, size_t default_size);
virtual ~BufferQueue();
~BufferQueue();
// Return number of queued buffers.
size_t size() const;
@ -44,17 +44,22 @@ class BufferQueue {
// Returns true unless no data could be written.
bool WriteBack(const void* data, size_t bytes, size_t* bytes_written);
protected:
// These methods are called when the state of the queue changes.
virtual void NotifyReadableForTest() {}
virtual void NotifyWritableForTest() {}
bool is_writable() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return queue_.size() < capacity_;
}
bool is_readable() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return !queue_.empty();
}
private:
size_t capacity_;
size_t default_size_;
mutable webrtc::Mutex mutex_;
std::deque<Buffer*> queue_ RTC_GUARDED_BY(mutex_);
std::vector<Buffer*> free_list_ RTC_GUARDED_BY(mutex_);
webrtc::SequenceChecker sequence_checker_;
const size_t capacity_;
const size_t default_size_;
std::deque<Buffer*> queue_ RTC_GUARDED_BY(sequence_checker_);
std::vector<Buffer*> free_list_ RTC_GUARDED_BY(sequence_checker_);
RTC_DISALLOW_COPY_AND_ASSIGN(BufferQueue);
};

View file

@ -231,10 +231,10 @@ class SSLDummyStreamTLS : public SSLDummyStreamBase {
: SSLDummyStreamBase(test, side, in, out) {}
};
class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface {
class BufferQueueStream : public rtc::StreamInterface {
public:
BufferQueueStream(size_t capacity, size_t default_size)
: rtc::BufferQueue(capacity, default_size) {}
: buffer_(capacity, default_size) {}
// Implementation of abstract StreamInterface methods.
@ -246,9 +246,13 @@ class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface {
size_t buffer_len,
size_t* read,
int* error) override {
if (!ReadFront(buffer, buffer_len, read)) {
const bool was_writable = buffer_.is_writable();
if (!buffer_.ReadFront(buffer, buffer_len, read))
return rtc::SR_BLOCK;
}
if (!was_writable)
NotifyWritableForTest();
return rtc::SR_SUCCESS;
}
@ -257,9 +261,13 @@ class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface {
size_t data_len,
size_t* written,
int* error) override {
if (!WriteBack(data, data_len, written)) {
const bool was_readable = buffer_.is_readable();
if (!buffer_.WriteBack(data, data_len, written))
return rtc::SR_BLOCK;
}
if (!was_readable)
NotifyReadableForTest();
return rtc::SR_SUCCESS;
}
@ -267,9 +275,12 @@ class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface {
void Close() override {}
protected:
void NotifyReadableForTest() override { PostEvent(rtc::SE_READ, 0); }
void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); }
void NotifyWritableForTest() override { PostEvent(rtc::SE_WRITE, 0); }
void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); }
private:
rtc::BufferQueue buffer_;
};
class SSLDummyStreamDTLS : public SSLDummyStreamBase {