mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-18 08:07:56 +01:00
dcsctp: Use stream scheduler in send queue
Changing the currently embedded scheduler that was implemented using a revolving pointer, to the parameterized stream scheduler that is implemented using a "virtual finish time" approach. Also renamed StreamCallback to StreamProducer, per review comments. Bug: webrtc:5696 Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37170}
This commit is contained in:
parent
5f308fdd89
commit
d729d12454
6 changed files with 131 additions and 180 deletions
|
@ -14,6 +14,7 @@ rtc_source_set("send_queue") {
|
|||
"../common:internal_types",
|
||||
"../packet:chunk",
|
||||
"../packet:data",
|
||||
"../public:socket",
|
||||
"../public:types",
|
||||
]
|
||||
sources = [ "send_queue.h" ]
|
||||
|
@ -23,9 +24,12 @@ rtc_source_set("send_queue") {
|
|||
rtc_library("rr_send_queue") {
|
||||
deps = [
|
||||
":send_queue",
|
||||
":stream_scheduler",
|
||||
"../../../api:array_view",
|
||||
"../../../rtc_base:checks",
|
||||
"../../../rtc_base:logging",
|
||||
"../../../rtc_base/containers:flat_map",
|
||||
"../common:str_join",
|
||||
"../packet:data",
|
||||
"../public:socket",
|
||||
"../public:types",
|
||||
|
@ -180,6 +184,7 @@ if (rtc_include_tests) {
|
|||
"../common:sequence_numbers",
|
||||
"../packet:chunk",
|
||||
"../packet:data",
|
||||
"../packet:sctp_packet",
|
||||
"../public:socket",
|
||||
"../public:types",
|
||||
"../testing:data_generator",
|
||||
|
|
|
@ -13,12 +13,14 @@
|
|||
#include <deque>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/algorithm/container.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/common/str_join.h"
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/dcsctp_message.h"
|
||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||
|
@ -42,18 +44,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix,
|
|||
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
|
||||
}
|
||||
|
||||
bool RRSendQueue::OutgoingStream::HasDataToSend() const {
|
||||
size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
|
||||
if (pause_state_ == PauseState::kPaused ||
|
||||
pause_state_ == PauseState::kResetting) {
|
||||
// The stream has paused (and there is no partially sent message).
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (items_.empty()) {
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return true;
|
||||
return items_.front().remaining_size;
|
||||
}
|
||||
|
||||
void RRSendQueue::OutgoingStream::AddHandoverState(
|
||||
|
@ -61,30 +63,31 @@ void RRSendQueue::OutgoingStream::AddHandoverState(
|
|||
state.next_ssn = next_ssn_.value();
|
||||
state.next_ordered_mid = next_ordered_mid_.value();
|
||||
state.next_unordered_mid = next_unordered_mid_.value();
|
||||
state.priority = *priority_;
|
||||
state.priority = *scheduler_stream_->priority();
|
||||
}
|
||||
|
||||
bool RRSendQueue::IsConsistent() const {
|
||||
size_t total_buffered_amount = 0;
|
||||
for (const auto& [unused, stream] : streams_) {
|
||||
total_buffered_amount += stream.buffered_amount().value();
|
||||
}
|
||||
std::set<StreamID> expected_active_streams;
|
||||
std::set<StreamID> actual_active_streams;
|
||||
|
||||
if (previous_message_has_ended_) {
|
||||
auto it = streams_.find(current_stream_id_);
|
||||
if (it != streams_.end() && it->second.has_partially_sent_message()) {
|
||||
RTC_DLOG(LS_ERROR)
|
||||
<< "Previous message has ended, but still partial message in stream";
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
auto it = streams_.find(current_stream_id_);
|
||||
if (it == streams_.end() || !it->second.has_partially_sent_message()) {
|
||||
RTC_DLOG(LS_ERROR)
|
||||
<< "Previous message has NOT ended, but there is no partial message";
|
||||
return false;
|
||||
size_t total_buffered_amount = 0;
|
||||
for (const auto& [stream_id, stream] : streams_) {
|
||||
total_buffered_amount += stream.buffered_amount().value();
|
||||
if (stream.bytes_to_send_in_next_message() > 0) {
|
||||
expected_active_streams.emplace(stream_id);
|
||||
}
|
||||
}
|
||||
for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
|
||||
actual_active_streams.emplace(stream->stream_id());
|
||||
}
|
||||
if (expected_active_streams != actual_active_streams) {
|
||||
auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
|
||||
RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
|
||||
<< StrJoin(actual_active_streams, ",", fn)
|
||||
<< "], expected=["
|
||||
<< StrJoin(expected_active_streams, ",", fn) << "]";
|
||||
return false;
|
||||
}
|
||||
|
||||
return total_buffered_amount == total_buffered_amount_.value();
|
||||
}
|
||||
|
@ -118,10 +121,15 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
|
|||
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
||||
TimeMs expires_at,
|
||||
const SendOptions& send_options) {
|
||||
bool was_active = bytes_to_send_in_next_message() > 0;
|
||||
buffered_amount_.Increase(message.payload().size());
|
||||
total_buffered_amount_.Increase(message.payload().size());
|
||||
items_.emplace_back(std::move(message), expires_at, send_options);
|
||||
|
||||
if (!was_active) {
|
||||
scheduler_stream_->MaybeMakeActive();
|
||||
}
|
||||
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
|
@ -227,8 +235,15 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
|||
total_buffered_amount_.Decrease(item.remaining_size);
|
||||
items_.pop_front();
|
||||
|
||||
// Only partially sent messages are discarded, so if a message was
|
||||
// discarded, then it was the currently sent message.
|
||||
scheduler_stream_->ForceReschedule();
|
||||
|
||||
if (pause_state_ == PauseState::kPending) {
|
||||
pause_state_ = PauseState::kPaused;
|
||||
scheduler_stream_->MakeInactive();
|
||||
} else if (bytes_to_send_in_next_message() == 0) {
|
||||
scheduler_stream_->MakeInactive();
|
||||
}
|
||||
|
||||
// As the item still existed, it had unsent data.
|
||||
|
@ -277,6 +292,7 @@ void RRSendQueue::OutgoingStream::Pause() {
|
|||
if (had_pending_items && pause_state_ == PauseState::kPaused) {
|
||||
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
|
||||
<< " was previously active, but is now paused.";
|
||||
scheduler_stream_->MakeInactive();
|
||||
}
|
||||
|
||||
RTC_DCHECK(IsConsistent());
|
||||
|
@ -284,11 +300,8 @@ void RRSendQueue::OutgoingStream::Pause() {
|
|||
|
||||
void RRSendQueue::OutgoingStream::Resume() {
|
||||
RTC_DCHECK(pause_state_ == PauseState::kResetting);
|
||||
if (!items_.empty()) {
|
||||
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
|
||||
<< " was previously paused, but is now active.";
|
||||
}
|
||||
pause_state_ = PauseState::kNotPaused;
|
||||
scheduler_stream_->MaybeMakeActive();
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
|
@ -296,6 +309,11 @@ void RRSendQueue::OutgoingStream::Reset() {
|
|||
// This can be called both when an outgoing stream reset has been responded
|
||||
// to, or when the entire SendQueue is reset due to detecting the peer having
|
||||
// restarted. The stream may be in any state at this time.
|
||||
PauseState old_pause_state = pause_state_;
|
||||
pause_state_ = PauseState::kNotPaused;
|
||||
next_ordered_mid_ = MID(0);
|
||||
next_unordered_mid_ = MID(0);
|
||||
next_ssn_ = SSN(0);
|
||||
if (!items_.empty()) {
|
||||
// If this message has been partially sent, reset it so that it will be
|
||||
// re-sent.
|
||||
|
@ -309,11 +327,11 @@ void RRSendQueue::OutgoingStream::Reset() {
|
|||
item.message_id = absl::nullopt;
|
||||
item.ssn = absl::nullopt;
|
||||
item.current_fsn = FSN(0);
|
||||
if (old_pause_state == PauseState::kPaused ||
|
||||
old_pause_state == PauseState::kResetting) {
|
||||
scheduler_stream_->MaybeMakeActive();
|
||||
}
|
||||
}
|
||||
pause_state_ = PauseState::kNotPaused;
|
||||
next_ordered_mid_ = MID(0);
|
||||
next_unordered_mid_ = MID(0);
|
||||
next_ssn_ = SSN(0);
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
|
@ -350,67 +368,9 @@ bool RRSendQueue::IsEmpty() const {
|
|||
return total_buffered_amount() == 0;
|
||||
}
|
||||
|
||||
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
|
||||
RRSendQueue::GetNextStream() {
|
||||
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
|
||||
|
||||
for (auto it = start_it; it != streams_.end(); ++it) {
|
||||
if (it->second.HasDataToSend()) {
|
||||
current_stream_id_ = it->first;
|
||||
return it;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto it = streams_.begin(); it != start_it; ++it) {
|
||||
if (it->second.HasDataToSend()) {
|
||||
current_stream_id_ = it->first;
|
||||
return it;
|
||||
}
|
||||
}
|
||||
return streams_.end();
|
||||
}
|
||||
|
||||
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
||||
size_t max_size) {
|
||||
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
|
||||
|
||||
for (;;) {
|
||||
if (previous_message_has_ended_) {
|
||||
// Previous message has ended. Round-robin to a different stream, if there
|
||||
// even is one with data to send.
|
||||
stream_it = GetNextStream();
|
||||
if (stream_it == streams_.end()) {
|
||||
RTC_DLOG(LS_VERBOSE)
|
||||
<< log_prefix_
|
||||
<< "There is no stream with data; Can't produce any data.";
|
||||
return absl::nullopt;
|
||||
}
|
||||
} else {
|
||||
// The previous message has not ended; Continue from the current stream.
|
||||
stream_it = streams_.find(current_stream_id_);
|
||||
RTC_DCHECK(stream_it != streams_.end());
|
||||
}
|
||||
|
||||
absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
|
||||
if (!data.has_value()) {
|
||||
continue;
|
||||
}
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
|
||||
<< (data->data.is_unordered ? "unordered" : "ordered")
|
||||
<< "::"
|
||||
<< (*data->data.is_beginning && *data->data.is_end
|
||||
? "complete"
|
||||
: *data->data.is_beginning ? "first"
|
||||
: *data->data.is_end ? "last"
|
||||
: "middle")
|
||||
<< ", stream_id=" << *stream_it->first
|
||||
<< ", ppid=" << *data->data.ppid
|
||||
<< ", length=" << data->data.payload.size();
|
||||
|
||||
previous_message_has_ended_ = *data->data.is_end;
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return data;
|
||||
}
|
||||
return scheduler_.Produce(now, max_size);
|
||||
}
|
||||
|
||||
bool RRSendQueue::Discard(IsUnordered unordered,
|
||||
|
@ -418,12 +378,8 @@ bool RRSendQueue::Discard(IsUnordered unordered,
|
|||
MID message_id) {
|
||||
bool has_discarded =
|
||||
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
|
||||
if (has_discarded) {
|
||||
// Only partially sent messages are discarded, so if a message was
|
||||
// discarded, then it was the currently sent message.
|
||||
previous_message_has_ended_ = true;
|
||||
}
|
||||
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return has_discarded;
|
||||
}
|
||||
|
||||
|
@ -484,7 +440,7 @@ void RRSendQueue::Reset() {
|
|||
for (auto& [unused, stream] : streams_) {
|
||||
stream.Reset();
|
||||
}
|
||||
previous_message_has_ended_ = true;
|
||||
scheduler_.ForceReschedule();
|
||||
}
|
||||
|
||||
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
|
||||
|
@ -516,9 +472,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
|||
}
|
||||
|
||||
return streams_
|
||||
.emplace(stream_id,
|
||||
OutgoingStream(
|
||||
stream_id, default_priority_,
|
||||
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
|
||||
std::forward_as_tuple(
|
||||
&scheduler_, stream_id, default_priority_,
|
||||
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
||||
total_buffered_amount_))
|
||||
.first->second;
|
||||
|
@ -562,9 +518,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
|
|||
state.tx.streams) {
|
||||
StreamID stream_id(state_stream.id);
|
||||
streams_.emplace(
|
||||
stream_id,
|
||||
OutgoingStream(
|
||||
stream_id, StreamPriority(state_stream.priority),
|
||||
std::piecewise_construct, std::forward_as_tuple(stream_id),
|
||||
std::forward_as_tuple(
|
||||
&scheduler_, stream_id, StreamPriority(state_stream.priority),
|
||||
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
||||
total_buffered_amount_, &state_stream));
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
@ -25,6 +26,7 @@
|
|||
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
#include "net/dcsctp/tx/stream_scheduler.h"
|
||||
|
||||
namespace dcsctp {
|
||||
|
||||
|
@ -111,32 +113,33 @@ class RRSendQueue : public SendQueue {
|
|||
};
|
||||
|
||||
// Per-stream information.
|
||||
class OutgoingStream {
|
||||
class OutgoingStream : public StreamScheduler::StreamProducer {
|
||||
public:
|
||||
OutgoingStream(
|
||||
StreamScheduler* scheduler,
|
||||
StreamID stream_id,
|
||||
StreamPriority priority,
|
||||
std::function<void()> on_buffered_amount_low,
|
||||
ThresholdWatcher& total_buffered_amount,
|
||||
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
|
||||
: stream_id_(stream_id),
|
||||
priority_(priority),
|
||||
: scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
|
||||
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
|
||||
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
|
||||
next_ssn_(SSN(state ? state->next_ssn : 0)),
|
||||
buffered_amount_(std::move(on_buffered_amount_low)),
|
||||
total_buffered_amount_(total_buffered_amount) {}
|
||||
|
||||
StreamID stream_id() const { return stream_id_; }
|
||||
StreamID stream_id() const { return scheduler_stream_->stream_id(); }
|
||||
|
||||
// Enqueues a message to this stream.
|
||||
void Add(DcSctpMessage message,
|
||||
TimeMs expires_at,
|
||||
const SendOptions& send_options);
|
||||
|
||||
// Produces a data chunk to send, or `absl::nullopt` if nothing could be
|
||||
// produced, e.g. if all messages have expired.
|
||||
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
|
||||
// Implementing `StreamScheduler::StreamProducer`.
|
||||
absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
|
||||
size_t max_size) override;
|
||||
size_t bytes_to_send_in_next_message() const override;
|
||||
|
||||
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
|
||||
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
|
||||
|
@ -167,12 +170,10 @@ class RRSendQueue : public SendQueue {
|
|||
// Indicates if this stream has a partially sent message in it.
|
||||
bool has_partially_sent_message() const;
|
||||
|
||||
// Indicates if the stream possibly has data to send. Note that it may
|
||||
// return `true` for streams that have enqueued, but expired, messages.
|
||||
bool HasDataToSend() const;
|
||||
|
||||
void set_priority(StreamPriority priority) { priority_ = priority; }
|
||||
StreamPriority priority() const { return priority_; }
|
||||
StreamPriority priority() const { return scheduler_stream_->priority(); }
|
||||
void set_priority(StreamPriority priority) {
|
||||
scheduler_stream_->set_priority(priority);
|
||||
}
|
||||
|
||||
void AddHandoverState(
|
||||
DcSctpSocketHandoverState::OutgoingStream& state) const;
|
||||
|
@ -225,8 +226,8 @@ class RRSendQueue : public SendQueue {
|
|||
|
||||
bool IsConsistent() const;
|
||||
|
||||
const StreamID stream_id_;
|
||||
StreamPriority priority_;
|
||||
const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
|
||||
|
||||
PauseState pause_state_ = PauseState::kNotPaused;
|
||||
// MIDs are different for unordered and ordered messages sent on a stream.
|
||||
MID next_unordered_mid_;
|
||||
|
@ -251,12 +252,10 @@ class RRSendQueue : public SendQueue {
|
|||
TimeMs now,
|
||||
size_t max_size);
|
||||
|
||||
// Return the next stream, in round-robin fashion.
|
||||
std::map<StreamID, OutgoingStream>::iterator GetNextStream();
|
||||
|
||||
const std::string log_prefix_;
|
||||
const size_t buffer_size_;
|
||||
const StreamPriority default_priority_;
|
||||
StreamScheduler scheduler_;
|
||||
|
||||
// Called when the buffered amount is below what has been set using
|
||||
// `SetBufferedAmountLowThreshold`.
|
||||
|
@ -269,15 +268,6 @@ class RRSendQueue : public SendQueue {
|
|||
// The total amount of buffer data, for all streams.
|
||||
ThresholdWatcher total_buffered_amount_;
|
||||
|
||||
// Indicates if the previous fragment sent was the end of a message. For
|
||||
// non-interleaved sending, this means that the next message may come from a
|
||||
// different stream. If not true, the next fragment must be produced from the
|
||||
// same stream as last time.
|
||||
bool previous_message_has_ended_ = true;
|
||||
|
||||
// The current stream to send chunks from. Modified by `GetNextStream`.
|
||||
StreamID current_stream_id_ = StreamID(0);
|
||||
|
||||
// All streams, and messages added to those.
|
||||
std::map<StreamID, OutgoingStream> streams_;
|
||||
};
|
||||
|
|
|
@ -114,7 +114,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime()
|
|||
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
|
||||
TimeMs now,
|
||||
size_t max_size) {
|
||||
absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
|
||||
absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
|
||||
|
||||
if (data.has_value()) {
|
||||
VirtualTime new_current = GetNextFinishTime();
|
||||
|
|
|
@ -56,9 +56,9 @@ class StreamScheduler {
|
|||
};
|
||||
|
||||
public:
|
||||
class StreamCallback {
|
||||
class StreamProducer {
|
||||
public:
|
||||
virtual ~StreamCallback() = default;
|
||||
virtual ~StreamProducer() = default;
|
||||
|
||||
// Produces a fragment of data to send. The current wall time is specified
|
||||
// as `now` and should be used to skip chunks with expired limited lifetime.
|
||||
|
@ -99,11 +99,11 @@ class StreamScheduler {
|
|||
friend class StreamScheduler;
|
||||
|
||||
Stream(StreamScheduler* parent,
|
||||
StreamCallback* callback,
|
||||
StreamProducer* producer,
|
||||
StreamID stream_id,
|
||||
StreamPriority priority)
|
||||
: parent_(*parent),
|
||||
callback_(*callback),
|
||||
producer_(*producer),
|
||||
stream_id_(stream_id),
|
||||
priority_(priority) {}
|
||||
|
||||
|
@ -117,14 +117,14 @@ class StreamScheduler {
|
|||
VirtualTime current_time() const { return current_virtual_time_; }
|
||||
VirtualTime next_finish_time() const { return next_finish_time_; }
|
||||
size_t bytes_to_send_in_next_message() const {
|
||||
return callback_.bytes_to_send_in_next_message();
|
||||
return producer_.bytes_to_send_in_next_message();
|
||||
}
|
||||
|
||||
// Returns the next virtual finish time for this stream.
|
||||
VirtualTime GetNextFinishTime() const;
|
||||
|
||||
StreamScheduler& parent_;
|
||||
StreamCallback& callback_;
|
||||
StreamProducer& producer_;
|
||||
const StreamID stream_id_;
|
||||
StreamPriority priority_;
|
||||
// This outgoing stream's "current" virtual_time.
|
||||
|
@ -132,10 +132,10 @@ class StreamScheduler {
|
|||
VirtualTime next_finish_time_ = VirtualTime::Zero();
|
||||
};
|
||||
|
||||
std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
|
||||
std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
|
||||
StreamID stream_id,
|
||||
StreamPriority priority) {
|
||||
return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
|
||||
return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
|
||||
}
|
||||
|
||||
// Makes the scheduler stop producing message from the current stream and
|
||||
|
|
|
@ -59,7 +59,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
|
|||
return packet_counts;
|
||||
}
|
||||
|
||||
class MockStreamCallback : public StreamScheduler::StreamCallback {
|
||||
class MockStreamProducer : public StreamScheduler::StreamProducer {
|
||||
public:
|
||||
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
|
||||
Produce,
|
||||
|
@ -74,18 +74,18 @@ class TestStream {
|
|||
StreamID stream_id,
|
||||
StreamPriority priority,
|
||||
size_t packet_size = kPayloadSize) {
|
||||
EXPECT_CALL(callback_, Produce)
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
|
||||
EXPECT_CALL(callback_, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
|
||||
.WillRepeatedly(Return(packet_size));
|
||||
stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
|
||||
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
|
||||
stream_->MaybeMakeActive();
|
||||
}
|
||||
|
||||
StreamScheduler::Stream& stream() { return *stream_; }
|
||||
|
||||
private:
|
||||
StrictMock<MockStreamCallback> callback_;
|
||||
StrictMock<MockStreamProducer> producer_;
|
||||
std::unique_ptr<StreamScheduler::Stream> stream_;
|
||||
};
|
||||
|
||||
|
@ -100,9 +100,9 @@ TEST(StreamSchedulerTest, HasNoActiveStreams) {
|
|||
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback;
|
||||
StrictMock<MockStreamProducer> producer;
|
||||
auto stream =
|
||||
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
||||
|
||||
EXPECT_EQ(stream->stream_id(), StreamID(1));
|
||||
EXPECT_EQ(stream->priority(), StreamPriority(2));
|
||||
|
@ -115,13 +115,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
|||
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback;
|
||||
EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
|
||||
EXPECT_CALL(callback, bytes_to_send_in_next_message)
|
||||
StrictMock<MockStreamProducer> producer;
|
||||
EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
|
||||
EXPECT_CALL(producer, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(0));
|
||||
auto stream =
|
||||
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
||||
stream->MaybeMakeActive();
|
||||
|
||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
||||
|
@ -132,32 +132,32 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
|||
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback1;
|
||||
EXPECT_CALL(callback1, Produce)
|
||||
StrictMock<MockStreamProducer> producer1;
|
||||
EXPECT_CALL(producer1, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(0));
|
||||
auto stream1 =
|
||||
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
||||
stream1->MaybeMakeActive();
|
||||
|
||||
StrictMock<MockStreamCallback> callback2;
|
||||
EXPECT_CALL(callback2, Produce)
|
||||
StrictMock<MockStreamProducer> producer2;
|
||||
EXPECT_CALL(producer2, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
||||
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(0));
|
||||
auto stream2 =
|
||||
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
||||
stream2->MaybeMakeActive();
|
||||
|
||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||
|
@ -174,8 +174,8 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
|||
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback1;
|
||||
EXPECT_CALL(callback1, Produce)
|
||||
StrictMock<MockStreamProducer> producer1;
|
||||
EXPECT_CALL(producer1, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||
.WillOnce([](...) {
|
||||
return SendQueue::DataToSend(
|
||||
|
@ -196,7 +196,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|||
Data::IsEnd(true), IsUnordered(true)));
|
||||
})
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
|
@ -204,21 +204,21 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(0));
|
||||
auto stream1 =
|
||||
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
||||
stream1->MaybeMakeActive();
|
||||
|
||||
StrictMock<MockStreamCallback> callback2;
|
||||
EXPECT_CALL(callback2, Produce)
|
||||
StrictMock<MockStreamProducer> producer2;
|
||||
EXPECT_CALL(producer2, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
||||
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(0));
|
||||
auto stream2 =
|
||||
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
||||
stream2->MaybeMakeActive();
|
||||
|
||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||
|
@ -236,16 +236,16 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|||
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback1;
|
||||
EXPECT_CALL(callback1, Produce)
|
||||
StrictMock<MockStreamProducer> producer1;
|
||||
EXPECT_CALL(producer1, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(101)));
|
||||
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
|
||||
auto stream1 =
|
||||
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
||||
stream1->MaybeMakeActive();
|
||||
|
||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||
|
@ -260,20 +260,20 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
|||
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback1;
|
||||
StrictMock<MockStreamProducer> producer1;
|
||||
// Callbacks are setup so that they hint that there is a MID(2) coming...
|
||||
EXPECT_CALL(callback1, Produce)
|
||||
EXPECT_CALL(producer1, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize)) // When making active again
|
||||
.WillOnce(Return(0));
|
||||
auto stream1 =
|
||||
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
||||
stream1->MaybeMakeActive();
|
||||
|
||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||
|
@ -290,33 +290,33 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
|||
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
|
||||
StreamScheduler scheduler;
|
||||
|
||||
StrictMock<MockStreamCallback> callback1;
|
||||
EXPECT_CALL(callback1, Produce)
|
||||
StrictMock<MockStreamProducer> producer1;
|
||||
EXPECT_CALL(producer1, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(0));
|
||||
auto stream1 =
|
||||
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
||||
stream1->MaybeMakeActive();
|
||||
|
||||
StrictMock<MockStreamCallback> callback2;
|
||||
EXPECT_CALL(callback2, Produce)
|
||||
StrictMock<MockStreamProducer> producer2;
|
||||
EXPECT_CALL(producer2, Produce)
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
||||
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
||||
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
||||
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
||||
.WillOnce(Return(kPayloadSize)) // When making active
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(kPayloadSize))
|
||||
.WillOnce(Return(0));
|
||||
auto stream2 =
|
||||
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
||||
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
||||
stream2->MaybeMakeActive();
|
||||
|
||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||
|
|
Loading…
Reference in a new issue