dcsctp: Use stream scheduler in send queue

Changing the currently embedded scheduler that was implemented using a
revolving pointer, to the parameterized stream scheduler that is
implemented using a "virtual finish time" approach.

Also renamed StreamCallback to StreamProducer, per review comments.

Bug: webrtc:5696
Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37170}
This commit is contained in:
Victor Boivie 2022-05-11 12:46:16 +02:00 committed by WebRTC LUCI CQ
parent 5f308fdd89
commit d729d12454
6 changed files with 131 additions and 180 deletions

View file

@ -14,6 +14,7 @@ rtc_source_set("send_queue") {
"../common:internal_types",
"../packet:chunk",
"../packet:data",
"../public:socket",
"../public:types",
]
sources = [ "send_queue.h" ]
@ -23,9 +24,12 @@ rtc_source_set("send_queue") {
rtc_library("rr_send_queue") {
deps = [
":send_queue",
":stream_scheduler",
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
"../../../rtc_base/containers:flat_map",
"../common:str_join",
"../packet:data",
"../public:socket",
"../public:types",
@ -180,6 +184,7 @@ if (rtc_include_tests) {
"../common:sequence_numbers",
"../packet:chunk",
"../packet:data",
"../packet:sctp_packet",
"../public:socket",
"../public:types",
"../testing:data_generator",

View file

@ -13,12 +13,14 @@
#include <deque>
#include <limits>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
@ -42,18 +44,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix,
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
}
bool RRSendQueue::OutgoingStream::HasDataToSend() const {
size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
if (pause_state_ == PauseState::kPaused ||
pause_state_ == PauseState::kResetting) {
// The stream has paused (and there is no partially sent message).
return false;
return 0;
}
if (items_.empty()) {
return false;
return 0;
}
return true;
return items_.front().remaining_size;
}
void RRSendQueue::OutgoingStream::AddHandoverState(
@ -61,30 +63,31 @@ void RRSendQueue::OutgoingStream::AddHandoverState(
state.next_ssn = next_ssn_.value();
state.next_ordered_mid = next_ordered_mid_.value();
state.next_unordered_mid = next_unordered_mid_.value();
state.priority = *priority_;
state.priority = *scheduler_stream_->priority();
}
bool RRSendQueue::IsConsistent() const {
size_t total_buffered_amount = 0;
for (const auto& [unused, stream] : streams_) {
total_buffered_amount += stream.buffered_amount().value();
}
std::set<StreamID> expected_active_streams;
std::set<StreamID> actual_active_streams;
if (previous_message_has_ended_) {
auto it = streams_.find(current_stream_id_);
if (it != streams_.end() && it->second.has_partially_sent_message()) {
RTC_DLOG(LS_ERROR)
<< "Previous message has ended, but still partial message in stream";
return false;
}
} else {
auto it = streams_.find(current_stream_id_);
if (it == streams_.end() || !it->second.has_partially_sent_message()) {
RTC_DLOG(LS_ERROR)
<< "Previous message has NOT ended, but there is no partial message";
return false;
size_t total_buffered_amount = 0;
for (const auto& [stream_id, stream] : streams_) {
total_buffered_amount += stream.buffered_amount().value();
if (stream.bytes_to_send_in_next_message() > 0) {
expected_active_streams.emplace(stream_id);
}
}
for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
actual_active_streams.emplace(stream->stream_id());
}
if (expected_active_streams != actual_active_streams) {
auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
<< StrJoin(actual_active_streams, ",", fn)
<< "], expected=["
<< StrJoin(expected_active_streams, ",", fn) << "]";
return false;
}
return total_buffered_amount == total_buffered_amount_.value();
}
@ -118,10 +121,15 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
TimeMs expires_at,
const SendOptions& send_options) {
bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size());
total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), expires_at, send_options);
if (!was_active) {
scheduler_stream_->MaybeMakeActive();
}
RTC_DCHECK(IsConsistent());
}
@ -227,8 +235,15 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
// Only partially sent messages are discarded, so if a message was
// discarded, then it was the currently sent message.
scheduler_stream_->ForceReschedule();
if (pause_state_ == PauseState::kPending) {
pause_state_ = PauseState::kPaused;
scheduler_stream_->MakeInactive();
} else if (bytes_to_send_in_next_message() == 0) {
scheduler_stream_->MakeInactive();
}
// As the item still existed, it had unsent data.
@ -277,6 +292,7 @@ void RRSendQueue::OutgoingStream::Pause() {
if (had_pending_items && pause_state_ == PauseState::kPaused) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously active, but is now paused.";
scheduler_stream_->MakeInactive();
}
RTC_DCHECK(IsConsistent());
@ -284,11 +300,8 @@ void RRSendQueue::OutgoingStream::Pause() {
void RRSendQueue::OutgoingStream::Resume() {
RTC_DCHECK(pause_state_ == PauseState::kResetting);
if (!items_.empty()) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously paused, but is now active.";
}
pause_state_ = PauseState::kNotPaused;
scheduler_stream_->MaybeMakeActive();
RTC_DCHECK(IsConsistent());
}
@ -296,6 +309,11 @@ void RRSendQueue::OutgoingStream::Reset() {
// This can be called both when an outgoing stream reset has been responded
// to, or when the entire SendQueue is reset due to detecting the peer having
// restarted. The stream may be in any state at this time.
PauseState old_pause_state = pause_state_;
pause_state_ = PauseState::kNotPaused;
next_ordered_mid_ = MID(0);
next_unordered_mid_ = MID(0);
next_ssn_ = SSN(0);
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
@ -309,11 +327,11 @@ void RRSendQueue::OutgoingStream::Reset() {
item.message_id = absl::nullopt;
item.ssn = absl::nullopt;
item.current_fsn = FSN(0);
if (old_pause_state == PauseState::kPaused ||
old_pause_state == PauseState::kResetting) {
scheduler_stream_->MaybeMakeActive();
}
}
pause_state_ = PauseState::kNotPaused;
next_ordered_mid_ = MID(0);
next_unordered_mid_ = MID(0);
next_ssn_ = SSN(0);
RTC_DCHECK(IsConsistent());
}
@ -350,67 +368,9 @@ bool RRSendQueue::IsEmpty() const {
return total_buffered_amount() == 0;
}
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
RRSendQueue::GetNextStream() {
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
for (auto it = start_it; it != streams_.end(); ++it) {
if (it->second.HasDataToSend()) {
current_stream_id_ = it->first;
return it;
}
}
for (auto it = streams_.begin(); it != start_it; ++it) {
if (it->second.HasDataToSend()) {
current_stream_id_ = it->first;
return it;
}
}
return streams_.end();
}
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
size_t max_size) {
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
for (;;) {
if (previous_message_has_ended_) {
// Previous message has ended. Round-robin to a different stream, if there
// even is one with data to send.
stream_it = GetNextStream();
if (stream_it == streams_.end()) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_
<< "There is no stream with data; Can't produce any data.";
return absl::nullopt;
}
} else {
// The previous message has not ended; Continue from the current stream.
stream_it = streams_.find(current_stream_id_);
RTC_DCHECK(stream_it != streams_.end());
}
absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
if (!data.has_value()) {
continue;
}
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
<< (data->data.is_unordered ? "unordered" : "ordered")
<< "::"
<< (*data->data.is_beginning && *data->data.is_end
? "complete"
: *data->data.is_beginning ? "first"
: *data->data.is_end ? "last"
: "middle")
<< ", stream_id=" << *stream_it->first
<< ", ppid=" << *data->data.ppid
<< ", length=" << data->data.payload.size();
previous_message_has_ended_ = *data->data.is_end;
RTC_DCHECK(IsConsistent());
return data;
}
return scheduler_.Produce(now, max_size);
}
bool RRSendQueue::Discard(IsUnordered unordered,
@ -418,12 +378,8 @@ bool RRSendQueue::Discard(IsUnordered unordered,
MID message_id) {
bool has_discarded =
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
if (has_discarded) {
// Only partially sent messages are discarded, so if a message was
// discarded, then it was the currently sent message.
previous_message_has_ended_ = true;
}
RTC_DCHECK(IsConsistent());
return has_discarded;
}
@ -484,7 +440,7 @@ void RRSendQueue::Reset() {
for (auto& [unused, stream] : streams_) {
stream.Reset();
}
previous_message_has_ended_ = true;
scheduler_.ForceReschedule();
}
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
@ -516,9 +472,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
}
return streams_
.emplace(stream_id,
OutgoingStream(
stream_id, default_priority_,
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(
&scheduler_, stream_id, default_priority_,
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_))
.first->second;
@ -562,9 +518,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
state.tx.streams) {
StreamID stream_id(state_stream.id);
streams_.emplace(
stream_id,
OutgoingStream(
stream_id, StreamPriority(state_stream.priority),
std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(
&scheduler_, stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_, &state_stream));
}

View file

@ -13,6 +13,7 @@
#include <cstdint>
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
@ -25,6 +26,7 @@
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "net/dcsctp/tx/stream_scheduler.h"
namespace dcsctp {
@ -111,32 +113,33 @@ class RRSendQueue : public SendQueue {
};
// Per-stream information.
class OutgoingStream {
class OutgoingStream : public StreamScheduler::StreamProducer {
public:
OutgoingStream(
StreamScheduler* scheduler,
StreamID stream_id,
StreamPriority priority,
std::function<void()> on_buffered_amount_low,
ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
: stream_id_(stream_id),
priority_(priority),
: scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)),
buffered_amount_(std::move(on_buffered_amount_low)),
total_buffered_amount_(total_buffered_amount) {}
StreamID stream_id() const { return stream_id_; }
StreamID stream_id() const { return scheduler_stream_->stream_id(); }
// Enqueues a message to this stream.
void Add(DcSctpMessage message,
TimeMs expires_at,
const SendOptions& send_options);
// Produces a data chunk to send, or `absl::nullopt` if nothing could be
// produced, e.g. if all messages have expired.
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
// Implementing `StreamScheduler::StreamProducer`.
absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
size_t max_size) override;
size_t bytes_to_send_in_next_message() const override;
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
@ -167,12 +170,10 @@ class RRSendQueue : public SendQueue {
// Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const;
// Indicates if the stream possibly has data to send. Note that it may
// return `true` for streams that have enqueued, but expired, messages.
bool HasDataToSend() const;
void set_priority(StreamPriority priority) { priority_ = priority; }
StreamPriority priority() const { return priority_; }
StreamPriority priority() const { return scheduler_stream_->priority(); }
void set_priority(StreamPriority priority) {
scheduler_stream_->set_priority(priority);
}
void AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const;
@ -225,8 +226,8 @@ class RRSendQueue : public SendQueue {
bool IsConsistent() const;
const StreamID stream_id_;
StreamPriority priority_;
const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
PauseState pause_state_ = PauseState::kNotPaused;
// MIDs are different for unordered and ordered messages sent on a stream.
MID next_unordered_mid_;
@ -251,12 +252,10 @@ class RRSendQueue : public SendQueue {
TimeMs now,
size_t max_size);
// Return the next stream, in round-robin fashion.
std::map<StreamID, OutgoingStream>::iterator GetNextStream();
const std::string log_prefix_;
const size_t buffer_size_;
const StreamPriority default_priority_;
StreamScheduler scheduler_;
// Called when the buffered amount is below what has been set using
// `SetBufferedAmountLowThreshold`.
@ -269,15 +268,6 @@ class RRSendQueue : public SendQueue {
// The total amount of buffer data, for all streams.
ThresholdWatcher total_buffered_amount_;
// Indicates if the previous fragment sent was the end of a message. For
// non-interleaved sending, this means that the next message may come from a
// different stream. If not true, the next fragment must be produced from the
// same stream as last time.
bool previous_message_has_ended_ = true;
// The current stream to send chunks from. Modified by `GetNextStream`.
StreamID current_stream_id_ = StreamID(0);
// All streams, and messages added to those.
std::map<StreamID, OutgoingStream> streams_;
};

View file

@ -114,7 +114,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime()
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
TimeMs now,
size_t max_size) {
absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
if (data.has_value()) {
VirtualTime new_current = GetNextFinishTime();

View file

@ -56,9 +56,9 @@ class StreamScheduler {
};
public:
class StreamCallback {
class StreamProducer {
public:
virtual ~StreamCallback() = default;
virtual ~StreamProducer() = default;
// Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime.
@ -99,11 +99,11 @@ class StreamScheduler {
friend class StreamScheduler;
Stream(StreamScheduler* parent,
StreamCallback* callback,
StreamProducer* producer,
StreamID stream_id,
StreamPriority priority)
: parent_(*parent),
callback_(*callback),
producer_(*producer),
stream_id_(stream_id),
priority_(priority) {}
@ -117,14 +117,14 @@ class StreamScheduler {
VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const {
return callback_.bytes_to_send_in_next_message();
return producer_.bytes_to_send_in_next_message();
}
// Returns the next virtual finish time for this stream.
VirtualTime GetNextFinishTime() const;
StreamScheduler& parent_;
StreamCallback& callback_;
StreamProducer& producer_;
const StreamID stream_id_;
StreamPriority priority_;
// This outgoing stream's "current" virtual_time.
@ -132,10 +132,10 @@ class StreamScheduler {
VirtualTime next_finish_time_ = VirtualTime::Zero();
};
std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
StreamID stream_id,
StreamPriority priority) {
return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
}
// Makes the scheduler stop producing message from the current stream and

View file

@ -59,7 +59,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
return packet_counts;
}
class MockStreamCallback : public StreamScheduler::StreamCallback {
class MockStreamProducer : public StreamScheduler::StreamProducer {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
@ -74,18 +74,18 @@ class TestStream {
StreamID stream_id,
StreamPriority priority,
size_t packet_size = kPayloadSize) {
EXPECT_CALL(callback_, Produce)
EXPECT_CALL(producer_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
EXPECT_CALL(callback_, bytes_to_send_in_next_message)
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
stream_->MaybeMakeActive();
}
StreamScheduler::Stream& stream() { return *stream_; }
private:
StrictMock<MockStreamCallback> callback_;
StrictMock<MockStreamProducer> producer_;
std::unique_ptr<StreamScheduler::Stream> stream_;
};
@ -100,9 +100,9 @@ TEST(StreamSchedulerTest, HasNoActiveStreams) {
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback;
StrictMock<MockStreamProducer> producer;
auto stream =
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2));
@ -115,13 +115,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback;
EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
EXPECT_CALL(callback, bytes_to_send_in_next_message)
StrictMock<MockStreamProducer> producer;
EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
EXPECT_CALL(producer, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
auto stream =
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
@ -132,32 +132,32 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2;
EXPECT_CALL(callback2, Produce)
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -174,8 +174,8 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
@ -196,7 +196,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
@ -204,21 +204,21 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2;
EXPECT_CALL(callback2, Produce)
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -236,16 +236,16 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -260,20 +260,20 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
StrictMock<MockStreamProducer> producer1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
EXPECT_CALL(callback1, Produce)
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -290,33 +290,33 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2;
EXPECT_CALL(callback2, Produce)
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));