Move some methods from StreamInterface to FifoBuffer

Moved methods: GetReadData, ConsumeReadData, GetWriteBuffer,
ConsumeWriteBuffer, GetWriteRemaining.

These methods represented an optional interface for reading and
writing streams, intended to optimize certain use cases. However,
it was implemented only in the FifoBuffer subclass, and the few
users of that class all have a concrete FifoBuffer, and hence
don't need the methods on the abstract StreamInterface.

Bug: webrtc:6424
Change-Id: I6de74d1a9205fcb7037ad84e24679d4a27c1d219
Reviewed-on: https://webrtc-review.googlesource.com/c/108621
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25446}
This commit is contained in:
Niels Möller 2018-10-31 10:19:50 +01:00 committed by Commit Bot
parent 21cddffd99
commit a8fa2d061f
3 changed files with 95 additions and 183 deletions

View file

@ -78,14 +78,6 @@ void StreamInterface::PostEvent(int events, int err) {
PostEvent(Thread::Current(), events, err);
}
const void* StreamInterface::GetReadData(size_t* data_len) {
return nullptr;
}
void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
return nullptr;
}
bool StreamInterface::SetPosition(size_t position) {
return false;
}
@ -98,10 +90,6 @@ bool StreamInterface::GetSize(size_t* size) const {
return false;
}
bool StreamInterface::GetWriteRemaining(size_t* size) const {
return false;
}
bool StreamInterface::Flush() {
return false;
}
@ -162,10 +150,6 @@ bool StreamAdapterInterface::GetSize(size_t* size) const {
return stream_->GetSize(size);
}
bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
return stream_->GetWriteRemaining(size);
}
bool StreamAdapterInterface::ReserveSize(size_t size) {
return stream_->ReserveSize(size);
}

View file

@ -107,59 +107,6 @@ class StreamInterface : public MessageHandler {
// Like the aforementioned method, but posts to the current thread.
void PostEvent(int events, int err);
//
// OPTIONAL OPERATIONS
//
// Not all implementations will support the following operations. In general,
// a stream will only support an operation if it reasonably efficient to do
// so. For example, while a socket could buffer incoming data to support
// seeking, it will not do so. Instead, a buffering stream adapter should
// be used.
//
// Even though several of these operations are related, you should
// always use whichever operation is most relevant.
//
// The following four methods are used to avoid copying data multiple times.
// GetReadData returns a pointer to a buffer which is owned by the stream.
// The buffer contains data_len bytes. null is returned if no data is
// available, or if the method fails. If the caller processes the data, it
// must call ConsumeReadData with the number of processed bytes. GetReadData
// does not require a matching call to ConsumeReadData if the data is not
// processed. Read and ConsumeReadData invalidate the buffer returned by
// GetReadData.
virtual const void* GetReadData(size_t* data_len);
virtual void ConsumeReadData(size_t used) {}
// GetWriteBuffer returns a pointer to a buffer which is owned by the stream.
// The buffer has a capacity of buf_len bytes. null is returned if there is
// no buffer available, or if the method fails. The call may write data to
// the buffer, and then call ConsumeWriteBuffer with the number of bytes
// written. GetWriteBuffer does not require a matching call to
// ConsumeWriteData if no data is written. Write, ForceWrite, and
// ConsumeWriteData invalidate the buffer returned by GetWriteBuffer.
// TODO: Allow the caller to specify a minimum buffer size. If the specified
// amount of buffer is not yet available, return null and Signal SE_WRITE
// when it is available. If the requested amount is too large, return an
// error.
virtual void* GetWriteBuffer(size_t* buf_len);
virtual void ConsumeWriteBuffer(size_t used) {}
// Write data_len bytes found in data, circumventing any throttling which
// would could cause SR_BLOCK to be returned. Returns true if all the data
// was written. Otherwise, the method is unsupported, or an unrecoverable
// error occurred, and the error value is set. This method should be used
// sparingly to write critical data which should not be throttled. A stream
// which cannot circumvent its blocking constraints should not implement this
// method.
// NOTE: This interface is being considered experimentally at the moment. It
// would be used by JUDP and BandwidthStream as a way to circumvent certain
// soft limits in writing.
// virtual bool ForceWrite(const void* data, size_t data_len, int* error) {
// if (error) *error = -1;
// return false;
//}
// Seek to a byte offset from the beginning of the stream. Returns false if
// the stream does not support seeking, or cannot seek to the specified
// position.
@ -173,10 +120,6 @@ class StreamInterface : public MessageHandler {
// is not known.
virtual bool GetSize(size_t* size) const;
// Return the number of Write()-able bytes remaining before end-of-stream.
// Returns false if not known.
virtual bool GetWriteRemaining(size_t* size) const;
// Return true if flush is successful.
virtual bool Flush();
@ -248,37 +191,9 @@ class StreamAdapterInterface : public StreamInterface,
int* error) override;
void Close() override;
// Optional Stream Interface
/* Note: Many stream adapters were implemented prior to this Read/Write
interface. Therefore, a simple pass through of data in those cases may
be broken. At a later time, we should do a once-over pass of all
adapters, and make them compliant with these interfaces, after which this
code can be uncommented.
virtual const void* GetReadData(size_t* data_len) {
return stream_->GetReadData(data_len);
}
virtual void ConsumeReadData(size_t used) {
stream_->ConsumeReadData(used);
}
virtual void* GetWriteBuffer(size_t* buf_len) {
return stream_->GetWriteBuffer(buf_len);
}
virtual void ConsumeWriteBuffer(size_t used) {
stream_->ConsumeWriteBuffer(used);
}
*/
/* Note: This interface is currently undergoing evaluation.
virtual bool ForceWrite(const void* data, size_t data_len, int* error) {
return stream_->ForceWrite(data, data_len, error);
}
*/
bool SetPosition(size_t position) override;
bool GetPosition(size_t* position) const override;
bool GetSize(size_t* size) const override;
bool GetWriteRemaining(size_t* size) const override;
bool ReserveSize(size_t size) override;
bool Flush() override;
@ -407,7 +322,7 @@ class MemoryStream : public MemoryStreamBase {
// writer and reader. As the data can wrap around the end of the buffer,
// MemoryStreamBase can't help us here.
class FifoBuffer : public StreamInterface {
class FifoBuffer final : public StreamInterface {
public:
// Creates a FIFO buffer with the specified capacity.
explicit FifoBuffer(size_t length);
@ -448,11 +363,28 @@ class FifoBuffer : public StreamInterface {
size_t* bytes_written,
int* error) override;
void Close() override;
const void* GetReadData(size_t* data_len) override;
void ConsumeReadData(size_t used) override;
void* GetWriteBuffer(size_t* buf_len) override;
void ConsumeWriteBuffer(size_t used) override;
bool GetWriteRemaining(size_t* size) const override;
// GetReadData returns a pointer to a buffer which is owned by the stream.
// The buffer contains data_len bytes. null is returned if no data is
// available, or if the method fails. If the caller processes the data, it
// must call ConsumeReadData with the number of processed bytes. GetReadData
// does not require a matching call to ConsumeReadData if the data is not
// processed. Read and ConsumeReadData invalidate the buffer returned by
// GetReadData.
const void* GetReadData(size_t* data_len);
void ConsumeReadData(size_t used);
// GetWriteBuffer returns a pointer to a buffer which is owned by the stream.
// The buffer has a capacity of buf_len bytes. null is returned if there is
// no buffer available, or if the method fails. The call may write data to
// the buffer, and then call ConsumeWriteBuffer with the number of bytes
// written. GetWriteBuffer does not require a matching call to
// ConsumeWriteData if no data is written. Write and
// ConsumeWriteData invalidate the buffer returned by GetWriteBuffer.
void* GetWriteBuffer(size_t* buf_len);
void ConsumeWriteBuffer(size_t used);
// Return the number of Write()-able bytes remaining before end-of-stream.
// Returns false if not known.
bool GetWriteRemaining(size_t* size) const;
private:
// Helper method that implements ReadOffset. Caller must acquire a lock

View file

@ -105,56 +105,52 @@ TEST(FifoBufferTest, TestAll) {
const void* q;
size_t bytes;
FifoBuffer buf(kSize);
StreamInterface* stream = &buf;
// Test assumptions about base state
EXPECT_EQ(SS_OPEN, stream->GetState());
EXPECT_EQ(SR_BLOCK, stream->Read(out, kSize, &bytes, nullptr));
EXPECT_TRUE(nullptr != stream->GetReadData(&bytes));
EXPECT_EQ((size_t)0, bytes);
stream->ConsumeReadData(0);
EXPECT_TRUE(nullptr != stream->GetWriteBuffer(&bytes));
EXPECT_EQ(SS_OPEN, buf.GetState());
EXPECT_EQ(SR_BLOCK, buf.Read(out, kSize, &bytes, nullptr));
EXPECT_TRUE(nullptr != buf.GetWriteBuffer(&bytes));
EXPECT_EQ(kSize, bytes);
stream->ConsumeWriteBuffer(0);
buf.ConsumeWriteBuffer(0);
// Try a full write
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
// Try a write that should block
EXPECT_EQ(SR_BLOCK, stream->Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_BLOCK, buf.Write(in, kSize, &bytes, nullptr));
// Try a full read
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));
// Try a read that should block
EXPECT_EQ(SR_BLOCK, stream->Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(SR_BLOCK, buf.Read(out, kSize, &bytes, nullptr));
// Try a too-big write
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize * 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize * 2, &bytes, nullptr));
EXPECT_EQ(bytes, kSize);
// Try a too-big read
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize * 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize * 2, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));
// Try some small writes and reads
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
@ -166,51 +162,51 @@ TEST(FifoBufferTest, TestAll) {
// XXXXWWWWWWWWXXXX 4567012345670123
// RRRRXXXXXXXXRRRR ....01234567....
// ....RRRRRRRR.... ................
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(kSize * 3 / 4, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 4, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 4, &bytes, nullptr));
EXPECT_EQ(kSize / 4, bytes);
EXPECT_EQ(0, memcmp(in + kSize / 2, out, kSize / 4));
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
// Use GetWriteBuffer to reset the read_position for the next tests
stream->GetWriteBuffer(&bytes);
stream->ConsumeWriteBuffer(0);
buf.GetWriteBuffer(&bytes);
buf.ConsumeWriteBuffer(0);
// Try using GetReadData to do a full read
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize, &bytes, nullptr));
q = stream->GetReadData(&bytes);
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize, &bytes, nullptr));
q = buf.GetReadData(&bytes);
EXPECT_TRUE(nullptr != q);
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(q, in, kSize));
stream->ConsumeReadData(kSize);
EXPECT_EQ(SR_BLOCK, stream->Read(out, kSize, &bytes, nullptr));
buf.ConsumeReadData(kSize);
EXPECT_EQ(SR_BLOCK, buf.Read(out, kSize, &bytes, nullptr));
// Try using GetReadData to do some small reads
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize, &bytes, nullptr));
q = stream->GetReadData(&bytes);
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize, &bytes, nullptr));
q = buf.GetReadData(&bytes);
EXPECT_TRUE(nullptr != q);
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(q, in, kSize / 2));
stream->ConsumeReadData(kSize / 2);
q = stream->GetReadData(&bytes);
buf.ConsumeReadData(kSize / 2);
q = buf.GetReadData(&bytes);
EXPECT_TRUE(nullptr != q);
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(q, in + kSize / 2, kSize / 2));
stream->ConsumeReadData(kSize / 2);
EXPECT_EQ(SR_BLOCK, stream->Read(out, kSize, &bytes, nullptr));
buf.ConsumeReadData(kSize / 2);
EXPECT_EQ(SR_BLOCK, buf.Read(out, kSize, &bytes, nullptr));
// Try using GetReadData in a wraparound case
// WWWWWWWWWWWWWWWW 0123456789ABCDEF
@ -218,46 +214,46 @@ TEST(FifoBufferTest, TestAll) {
// WWWWWWWW....XXXX 01234567....CDEF
// ............RRRR 01234567........
// RRRRRRRR........ ................
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
q = stream->GetReadData(&bytes);
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
q = buf.GetReadData(&bytes);
EXPECT_TRUE(nullptr != q);
EXPECT_EQ(kSize / 4, bytes);
EXPECT_EQ(0, memcmp(q, in + kSize * 3 / 4, kSize / 4));
stream->ConsumeReadData(kSize / 4);
q = stream->GetReadData(&bytes);
buf.ConsumeReadData(kSize / 4);
q = buf.GetReadData(&bytes);
EXPECT_TRUE(nullptr != q);
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(q, in, kSize / 2));
stream->ConsumeReadData(kSize / 2);
buf.ConsumeReadData(kSize / 2);
// Use GetWriteBuffer to reset the read_position for the next tests
stream->GetWriteBuffer(&bytes);
stream->ConsumeWriteBuffer(0);
buf.GetWriteBuffer(&bytes);
buf.ConsumeWriteBuffer(0);
// Try using GetWriteBuffer to do a full write
p = stream->GetWriteBuffer(&bytes);
p = buf.GetWriteBuffer(&bytes);
EXPECT_TRUE(nullptr != p);
EXPECT_EQ(kSize, bytes);
memcpy(p, in, kSize);
stream->ConsumeWriteBuffer(kSize);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize, &bytes, nullptr));
buf.ConsumeWriteBuffer(kSize);
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));
// Try using GetWriteBuffer to do some small writes
p = stream->GetWriteBuffer(&bytes);
p = buf.GetWriteBuffer(&bytes);
EXPECT_TRUE(nullptr != p);
EXPECT_EQ(kSize, bytes);
memcpy(p, in, kSize / 2);
stream->ConsumeWriteBuffer(kSize / 2);
p = stream->GetWriteBuffer(&bytes);
buf.ConsumeWriteBuffer(kSize / 2);
p = buf.GetWriteBuffer(&bytes);
EXPECT_TRUE(nullptr != p);
EXPECT_EQ(kSize / 2, bytes);
memcpy(p, in + kSize / 2, kSize / 2);
stream->ConsumeWriteBuffer(kSize / 2);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize, &bytes, nullptr));
buf.ConsumeWriteBuffer(kSize / 2);
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));
@ -267,53 +263,53 @@ TEST(FifoBufferTest, TestAll) {
// ........XXXXWWWW ........89AB0123
// WWWW....XXXXXXXX 4567....89AB0123
// RRRR....RRRRRRRR ................
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
p = stream->GetWriteBuffer(&bytes);
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
p = buf.GetWriteBuffer(&bytes);
EXPECT_TRUE(nullptr != p);
EXPECT_EQ(kSize / 4, bytes);
memcpy(p, in, kSize / 4);
stream->ConsumeWriteBuffer(kSize / 4);
p = stream->GetWriteBuffer(&bytes);
buf.ConsumeWriteBuffer(kSize / 4);
p = buf.GetWriteBuffer(&bytes);
EXPECT_TRUE(nullptr != p);
EXPECT_EQ(kSize / 2, bytes);
memcpy(p, in + kSize / 4, kSize / 4);
stream->ConsumeWriteBuffer(kSize / 4);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize * 3 / 4, &bytes, nullptr));
buf.ConsumeWriteBuffer(kSize / 4);
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize * 3 / 4, &bytes, nullptr));
EXPECT_EQ(kSize * 3 / 4, bytes);
EXPECT_EQ(0, memcmp(in + kSize / 2, out, kSize / 4));
EXPECT_EQ(0, memcmp(in, out + kSize / 4, kSize / 4));
// Check that the stream is now empty
EXPECT_EQ(SR_BLOCK, stream->Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(SR_BLOCK, buf.Read(out, kSize, &bytes, nullptr));
// Try growing the buffer
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_TRUE(buf.SetCapacity(kSize * 2));
EXPECT_EQ(SR_SUCCESS, stream->Write(in + kSize, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in + kSize, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize * 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize * 2, &bytes, nullptr));
EXPECT_EQ(kSize * 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize * 2));
// Try shrinking the buffer
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_TRUE(buf.SetCapacity(kSize));
EXPECT_EQ(SR_BLOCK, stream->Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(SR_BLOCK, buf.Write(in, kSize, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize, &bytes, nullptr));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));
// Write to the stream, close it, read the remaining bytes
EXPECT_EQ(SR_SUCCESS, stream->Write(in, kSize / 2, &bytes, nullptr));
stream->Close();
EXPECT_EQ(SS_CLOSED, stream->GetState());
EXPECT_EQ(SR_EOS, stream->Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Write(in, kSize / 2, &bytes, nullptr));
buf.Close();
EXPECT_EQ(SS_CLOSED, buf.GetState());
EXPECT_EQ(SR_EOS, buf.Write(in, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_SUCCESS, buf.Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
EXPECT_EQ(SR_EOS, stream->Read(out, kSize / 2, &bytes, nullptr));
EXPECT_EQ(SR_EOS, buf.Read(out, kSize / 2, &bytes, nullptr));
}
TEST(FifoBufferTest, FullBufferCheck) {