/* * Copyright 2004 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include #include #include #include #include "rtc_base/checks.h" #include "rtc_base/location.h" #include "rtc_base/message_queue.h" #include "rtc_base/stream.h" #include "rtc_base/thread.h" #if defined(WEBRTC_WIN) #include #define fileno _fileno #include "rtc_base/string_utils.h" #endif namespace rtc { /////////////////////////////////////////////////////////////////////////////// // StreamInterface /////////////////////////////////////////////////////////////////////////////// StreamInterface::~StreamInterface() {} StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, size_t* written, int* error) { StreamResult result = SR_SUCCESS; size_t total_written = 0, current_written; while (total_written < data_len) { result = Write(static_cast(data) + total_written, data_len - total_written, ¤t_written, error); if (result != SR_SUCCESS) break; total_written += current_written; } if (written) *written = total_written; return result; } StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, size_t* read, int* error) { StreamResult result = SR_SUCCESS; size_t total_read = 0, current_read; while (total_read < buffer_len) { result = Read(static_cast(buffer) + total_read, buffer_len - total_read, ¤t_read, error); if (result != SR_SUCCESS) break; total_read += current_read; } if (read) *read = total_read; return result; } void StreamInterface::PostEvent(Thread* t, int events, int err) { t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT, new StreamEventData(events, err)); } void StreamInterface::PostEvent(int events, int err) { PostEvent(Thread::Current(), events, err); } bool StreamInterface::Flush() { return false; } StreamInterface::StreamInterface() {} void StreamInterface::OnMessage(Message* msg) { if (MSG_POST_EVENT == msg->message_id) { StreamEventData* pe = static_cast(msg->pdata); SignalEvent(this, pe->events, pe->error); delete msg->pdata; } } /////////////////////////////////////////////////////////////////////////////// // StreamAdapterInterface /////////////////////////////////////////////////////////////////////////////// StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, bool owned) : stream_(stream), owned_(owned) { if (nullptr != stream_) stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); } StreamState StreamAdapterInterface::GetState() const { return stream_->GetState(); } StreamResult StreamAdapterInterface::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { return stream_->Read(buffer, buffer_len, read, error); } StreamResult StreamAdapterInterface::Write(const void* data, size_t data_len, size_t* written, int* error) { return stream_->Write(data, data_len, written, error); } void StreamAdapterInterface::Close() { stream_->Close(); } bool StreamAdapterInterface::Flush() { return stream_->Flush(); } void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { if (nullptr != stream_) stream_->SignalEvent.disconnect(this); if (owned_) delete stream_; stream_ = stream; owned_ = owned; if (nullptr != stream_) stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); } StreamInterface* StreamAdapterInterface::Detach() { if (nullptr != stream_) stream_->SignalEvent.disconnect(this); StreamInterface* stream = stream_; stream_ = nullptr; return stream; } StreamAdapterInterface::~StreamAdapterInterface() { if (owned_) delete stream_; } void StreamAdapterInterface::OnEvent(StreamInterface* stream, int events, int err) { SignalEvent(this, events, err); } /////////////////////////////////////////////////////////////////////////////// // FileStream /////////////////////////////////////////////////////////////////////////////// FileStream::FileStream() : file_(nullptr) {} FileStream::~FileStream() { FileStream::Close(); } bool FileStream::Open(const std::string& filename, const char* mode, int* error) { Close(); #if defined(WEBRTC_WIN) std::wstring wfilename; if (Utf8ToWindowsFilename(filename, &wfilename)) { file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); } else { if (error) { *error = -1; return false; } } #else file_ = fopen(filename.c_str(), mode); #endif if (!file_ && error) { *error = errno; } return (file_ != nullptr); } bool FileStream::OpenShare(const std::string& filename, const char* mode, int shflag, int* error) { Close(); #if defined(WEBRTC_WIN) std::wstring wfilename; if (Utf8ToWindowsFilename(filename, &wfilename)) { file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); if (!file_ && error) { *error = errno; return false; } return file_ != nullptr; } else { if (error) { *error = -1; } return false; } #else return Open(filename, mode, error); #endif } bool FileStream::DisableBuffering() { if (!file_) return false; return (setvbuf(file_, nullptr, _IONBF, 0) == 0); } StreamState FileStream::GetState() const { return (file_ == nullptr) ? SS_CLOSED : SS_OPEN; } StreamResult FileStream::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { if (!file_) return SR_EOS; size_t result = fread(buffer, 1, buffer_len, file_); if ((result == 0) && (buffer_len > 0)) { if (feof(file_)) return SR_EOS; if (error) *error = errno; return SR_ERROR; } if (read) *read = result; return SR_SUCCESS; } StreamResult FileStream::Write(const void* data, size_t data_len, size_t* written, int* error) { if (!file_) return SR_EOS; size_t result = fwrite(data, 1, data_len, file_); if ((result == 0) && (data_len > 0)) { if (error) *error = errno; return SR_ERROR; } if (written) *written = result; return SR_SUCCESS; } void FileStream::Close() { if (file_) { DoClose(); file_ = nullptr; } } bool FileStream::SetPosition(size_t position) { if (!file_) return false; return (fseek(file_, static_cast(position), SEEK_SET) == 0); } bool FileStream::Flush() { if (file_) { return (0 == fflush(file_)); } // try to flush empty file? RTC_NOTREACHED(); return false; } void FileStream::DoClose() { fclose(file_); } /////////////////////////////////////////////////////////////////////////////// // FifoBuffer /////////////////////////////////////////////////////////////////////////////// FifoBuffer::FifoBuffer(size_t size) : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), data_length_(0), read_position_(0), owner_(Thread::Current()) { // all events are done on the owner_ thread } FifoBuffer::FifoBuffer(size_t size, Thread* owner) : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), data_length_(0), read_position_(0), owner_(owner) { // all events are done on the owner_ thread } FifoBuffer::~FifoBuffer() {} bool FifoBuffer::GetBuffered(size_t* size) const { CritScope cs(&crit_); *size = data_length_; return true; } bool FifoBuffer::SetCapacity(size_t size) { CritScope cs(&crit_); if (data_length_ > size) { return false; } if (size != buffer_length_) { char* buffer = new char[size]; const size_t copy = data_length_; const size_t tail_copy = std::min(copy, buffer_length_ - read_position_); memcpy(buffer, &buffer_[read_position_], tail_copy); memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); buffer_.reset(buffer); read_position_ = 0; buffer_length_ = size; } return true; } StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, size_t offset, size_t* bytes_read) { CritScope cs(&crit_); return ReadOffsetLocked(buffer, bytes, offset, bytes_read); } StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, size_t offset, size_t* bytes_written) { CritScope cs(&crit_); return WriteOffsetLocked(buffer, bytes, offset, bytes_written); } StreamState FifoBuffer::GetState() const { CritScope cs(&crit_); return state_; } StreamResult FifoBuffer::Read(void* buffer, size_t bytes, size_t* bytes_read, int* error) { CritScope cs(&crit_); const bool was_writable = data_length_ < buffer_length_; size_t copy = 0; StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); if (result == SR_SUCCESS) { // If read was successful then adjust the read position and number of // bytes buffered. read_position_ = (read_position_ + copy) % buffer_length_; data_length_ -= copy; if (bytes_read) { *bytes_read = copy; } // if we were full before, and now we're not, post an event if (!was_writable && copy > 0) { PostEvent(owner_, SE_WRITE, 0); } } return result; } StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, size_t* bytes_written, int* error) { CritScope cs(&crit_); const bool was_readable = (data_length_ > 0); size_t copy = 0; StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); if (result == SR_SUCCESS) { // If write was successful then adjust the number of readable bytes. data_length_ += copy; if (bytes_written) { *bytes_written = copy; } // if we didn't have any data to read before, and now we do, post an event if (!was_readable && copy > 0) { PostEvent(owner_, SE_READ, 0); } } return result; } void FifoBuffer::Close() { CritScope cs(&crit_); state_ = SS_CLOSED; } const void* FifoBuffer::GetReadData(size_t* size) { CritScope cs(&crit_); *size = (read_position_ + data_length_ <= buffer_length_) ? data_length_ : buffer_length_ - read_position_; return &buffer_[read_position_]; } void FifoBuffer::ConsumeReadData(size_t size) { CritScope cs(&crit_); RTC_DCHECK(size <= data_length_); const bool was_writable = data_length_ < buffer_length_; read_position_ = (read_position_ + size) % buffer_length_; data_length_ -= size; if (!was_writable && size > 0) { PostEvent(owner_, SE_WRITE, 0); } } void* FifoBuffer::GetWriteBuffer(size_t* size) { CritScope cs(&crit_); if (state_ == SS_CLOSED) { return nullptr; } // if empty, reset the write position to the beginning, so we can get // the biggest possible block if (data_length_ == 0) { read_position_ = 0; } const size_t write_position = (read_position_ + data_length_) % buffer_length_; *size = (write_position > read_position_ || data_length_ == 0) ? buffer_length_ - write_position : read_position_ - write_position; return &buffer_[write_position]; } void FifoBuffer::ConsumeWriteBuffer(size_t size) { CritScope cs(&crit_); RTC_DCHECK(size <= buffer_length_ - data_length_); const bool was_readable = (data_length_ > 0); data_length_ += size; if (!was_readable && size > 0) { PostEvent(owner_, SE_READ, 0); } } bool FifoBuffer::GetWriteRemaining(size_t* size) const { CritScope cs(&crit_); *size = buffer_length_ - data_length_; return true; } StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, size_t bytes, size_t offset, size_t* bytes_read) { if (offset >= data_length_) { return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; } const size_t available = data_length_ - offset; const size_t read_position = (read_position_ + offset) % buffer_length_; const size_t copy = std::min(bytes, available); const size_t tail_copy = std::min(copy, buffer_length_ - read_position); char* const p = static_cast(buffer); memcpy(p, &buffer_[read_position], tail_copy); memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); if (bytes_read) { *bytes_read = copy; } return SR_SUCCESS; } StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, size_t bytes, size_t offset, size_t* bytes_written) { if (state_ == SS_CLOSED) { return SR_EOS; } if (data_length_ + offset >= buffer_length_) { return SR_BLOCK; } const size_t available = buffer_length_ - data_length_ - offset; const size_t write_position = (read_position_ + data_length_ + offset) % buffer_length_; const size_t copy = std::min(bytes, available); const size_t tail_copy = std::min(copy, buffer_length_ - write_position); const char* const p = static_cast(buffer); memcpy(&buffer_[write_position], p, tail_copy); memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); if (bytes_written) { *bytes_written = copy; } return SR_SUCCESS; } } // namespace rtc