dcsctp: Use stream scheduler in send queue

Changing the currently embedded scheduler that was implemented using a
revolving pointer, to the parameterized stream scheduler that is
implemented using a "virtual finish time" approach.

Also renamed StreamCallback to StreamProducer, per review comments.

Bug: webrtc:5696
Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37170}
This commit is contained in:
Victor Boivie 2022-05-11 12:46:16 +02:00 committed by WebRTC LUCI CQ
parent 5f308fdd89
commit d729d12454
6 changed files with 131 additions and 180 deletions

View file

@ -14,6 +14,7 @@ rtc_source_set("send_queue") {
"../common:internal_types", "../common:internal_types",
"../packet:chunk", "../packet:chunk",
"../packet:data", "../packet:data",
"../public:socket",
"../public:types", "../public:types",
] ]
sources = [ "send_queue.h" ] sources = [ "send_queue.h" ]
@ -23,9 +24,12 @@ rtc_source_set("send_queue") {
rtc_library("rr_send_queue") { rtc_library("rr_send_queue") {
deps = [ deps = [
":send_queue", ":send_queue",
":stream_scheduler",
"../../../api:array_view", "../../../api:array_view",
"../../../rtc_base:checks", "../../../rtc_base:checks",
"../../../rtc_base:logging", "../../../rtc_base:logging",
"../../../rtc_base/containers:flat_map",
"../common:str_join",
"../packet:data", "../packet:data",
"../public:socket", "../public:socket",
"../public:types", "../public:types",
@ -180,6 +184,7 @@ if (rtc_include_tests) {
"../common:sequence_numbers", "../common:sequence_numbers",
"../packet:chunk", "../packet:chunk",
"../packet:data", "../packet:data",
"../packet:sctp_packet",
"../public:socket", "../public:socket",
"../public:types", "../public:types",
"../testing:data_generator", "../testing:data_generator",

View file

@ -13,12 +13,14 @@
#include <deque> #include <deque>
#include <limits> #include <limits>
#include <map> #include <map>
#include <set>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/array_view.h" #include "api/array_view.h"
#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h" #include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/dcsctp_socket.h"
@ -42,18 +44,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix,
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
} }
bool RRSendQueue::OutgoingStream::HasDataToSend() const { size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
if (pause_state_ == PauseState::kPaused || if (pause_state_ == PauseState::kPaused ||
pause_state_ == PauseState::kResetting) { pause_state_ == PauseState::kResetting) {
// The stream has paused (and there is no partially sent message). // The stream has paused (and there is no partially sent message).
return false; return 0;
} }
if (items_.empty()) { if (items_.empty()) {
return false; return 0;
} }
return true; return items_.front().remaining_size;
} }
void RRSendQueue::OutgoingStream::AddHandoverState( void RRSendQueue::OutgoingStream::AddHandoverState(
@ -61,29 +63,30 @@ void RRSendQueue::OutgoingStream::AddHandoverState(
state.next_ssn = next_ssn_.value(); state.next_ssn = next_ssn_.value();
state.next_ordered_mid = next_ordered_mid_.value(); state.next_ordered_mid = next_ordered_mid_.value();
state.next_unordered_mid = next_unordered_mid_.value(); state.next_unordered_mid = next_unordered_mid_.value();
state.priority = *priority_; state.priority = *scheduler_stream_->priority();
} }
bool RRSendQueue::IsConsistent() const { bool RRSendQueue::IsConsistent() const {
size_t total_buffered_amount = 0; std::set<StreamID> expected_active_streams;
for (const auto& [unused, stream] : streams_) { std::set<StreamID> actual_active_streams;
total_buffered_amount += stream.buffered_amount().value();
}
if (previous_message_has_ended_) { size_t total_buffered_amount = 0;
auto it = streams_.find(current_stream_id_); for (const auto& [stream_id, stream] : streams_) {
if (it != streams_.end() && it->second.has_partially_sent_message()) { total_buffered_amount += stream.buffered_amount().value();
RTC_DLOG(LS_ERROR) if (stream.bytes_to_send_in_next_message() > 0) {
<< "Previous message has ended, but still partial message in stream"; expected_active_streams.emplace(stream_id);
return false;
} }
} else {
auto it = streams_.find(current_stream_id_);
if (it == streams_.end() || !it->second.has_partially_sent_message()) {
RTC_DLOG(LS_ERROR)
<< "Previous message has NOT ended, but there is no partial message";
return false;
} }
for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
actual_active_streams.emplace(stream->stream_id());
}
if (expected_active_streams != actual_active_streams) {
auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
<< StrJoin(actual_active_streams, ",", fn)
<< "], expected=["
<< StrJoin(expected_active_streams, ",", fn) << "]";
return false;
} }
return total_buffered_amount == total_buffered_amount_.value(); return total_buffered_amount == total_buffered_amount_.value();
@ -118,10 +121,15 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
TimeMs expires_at, TimeMs expires_at,
const SendOptions& send_options) { const SendOptions& send_options) {
bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size()); buffered_amount_.Increase(message.payload().size());
total_buffered_amount_.Increase(message.payload().size()); total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), expires_at, send_options); items_.emplace_back(std::move(message), expires_at, send_options);
if (!was_active) {
scheduler_stream_->MaybeMakeActive();
}
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
@ -227,8 +235,15 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
total_buffered_amount_.Decrease(item.remaining_size); total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front(); items_.pop_front();
// Only partially sent messages are discarded, so if a message was
// discarded, then it was the currently sent message.
scheduler_stream_->ForceReschedule();
if (pause_state_ == PauseState::kPending) { if (pause_state_ == PauseState::kPending) {
pause_state_ = PauseState::kPaused; pause_state_ = PauseState::kPaused;
scheduler_stream_->MakeInactive();
} else if (bytes_to_send_in_next_message() == 0) {
scheduler_stream_->MakeInactive();
} }
// As the item still existed, it had unsent data. // As the item still existed, it had unsent data.
@ -277,6 +292,7 @@ void RRSendQueue::OutgoingStream::Pause() {
if (had_pending_items && pause_state_ == PauseState::kPaused) { if (had_pending_items && pause_state_ == PauseState::kPaused) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously active, but is now paused."; << " was previously active, but is now paused.";
scheduler_stream_->MakeInactive();
} }
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
@ -284,11 +300,8 @@ void RRSendQueue::OutgoingStream::Pause() {
void RRSendQueue::OutgoingStream::Resume() { void RRSendQueue::OutgoingStream::Resume() {
RTC_DCHECK(pause_state_ == PauseState::kResetting); RTC_DCHECK(pause_state_ == PauseState::kResetting);
if (!items_.empty()) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously paused, but is now active.";
}
pause_state_ = PauseState::kNotPaused; pause_state_ = PauseState::kNotPaused;
scheduler_stream_->MaybeMakeActive();
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
@ -296,6 +309,11 @@ void RRSendQueue::OutgoingStream::Reset() {
// This can be called both when an outgoing stream reset has been responded // This can be called both when an outgoing stream reset has been responded
// to, or when the entire SendQueue is reset due to detecting the peer having // to, or when the entire SendQueue is reset due to detecting the peer having
// restarted. The stream may be in any state at this time. // restarted. The stream may be in any state at this time.
PauseState old_pause_state = pause_state_;
pause_state_ = PauseState::kNotPaused;
next_ordered_mid_ = MID(0);
next_unordered_mid_ = MID(0);
next_ssn_ = SSN(0);
if (!items_.empty()) { if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be // If this message has been partially sent, reset it so that it will be
// re-sent. // re-sent.
@ -309,11 +327,11 @@ void RRSendQueue::OutgoingStream::Reset() {
item.message_id = absl::nullopt; item.message_id = absl::nullopt;
item.ssn = absl::nullopt; item.ssn = absl::nullopt;
item.current_fsn = FSN(0); item.current_fsn = FSN(0);
if (old_pause_state == PauseState::kPaused ||
old_pause_state == PauseState::kResetting) {
scheduler_stream_->MaybeMakeActive();
}
} }
pause_state_ = PauseState::kNotPaused;
next_ordered_mid_ = MID(0);
next_unordered_mid_ = MID(0);
next_ssn_ = SSN(0);
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
@ -350,67 +368,9 @@ bool RRSendQueue::IsEmpty() const {
return total_buffered_amount() == 0; return total_buffered_amount() == 0;
} }
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
RRSendQueue::GetNextStream() {
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
for (auto it = start_it; it != streams_.end(); ++it) {
if (it->second.HasDataToSend()) {
current_stream_id_ = it->first;
return it;
}
}
for (auto it = streams_.begin(); it != start_it; ++it) {
if (it->second.HasDataToSend()) {
current_stream_id_ = it->first;
return it;
}
}
return streams_.end();
}
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now, absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
size_t max_size) { size_t max_size) {
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it; return scheduler_.Produce(now, max_size);
for (;;) {
if (previous_message_has_ended_) {
// Previous message has ended. Round-robin to a different stream, if there
// even is one with data to send.
stream_it = GetNextStream();
if (stream_it == streams_.end()) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_
<< "There is no stream with data; Can't produce any data.";
return absl::nullopt;
}
} else {
// The previous message has not ended; Continue from the current stream.
stream_it = streams_.find(current_stream_id_);
RTC_DCHECK(stream_it != streams_.end());
}
absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
if (!data.has_value()) {
continue;
}
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
<< (data->data.is_unordered ? "unordered" : "ordered")
<< "::"
<< (*data->data.is_beginning && *data->data.is_end
? "complete"
: *data->data.is_beginning ? "first"
: *data->data.is_end ? "last"
: "middle")
<< ", stream_id=" << *stream_it->first
<< ", ppid=" << *data->data.ppid
<< ", length=" << data->data.payload.size();
previous_message_has_ended_ = *data->data.is_end;
RTC_DCHECK(IsConsistent());
return data;
}
} }
bool RRSendQueue::Discard(IsUnordered unordered, bool RRSendQueue::Discard(IsUnordered unordered,
@ -418,12 +378,8 @@ bool RRSendQueue::Discard(IsUnordered unordered,
MID message_id) { MID message_id) {
bool has_discarded = bool has_discarded =
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
if (has_discarded) {
// Only partially sent messages are discarded, so if a message was
// discarded, then it was the currently sent message.
previous_message_has_ended_ = true;
}
RTC_DCHECK(IsConsistent());
return has_discarded; return has_discarded;
} }
@ -484,7 +440,7 @@ void RRSendQueue::Reset() {
for (auto& [unused, stream] : streams_) { for (auto& [unused, stream] : streams_) {
stream.Reset(); stream.Reset();
} }
previous_message_has_ended_ = true; scheduler_.ForceReschedule();
} }
size_t RRSendQueue::buffered_amount(StreamID stream_id) const { size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
@ -516,9 +472,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
} }
return streams_ return streams_
.emplace(stream_id, .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
OutgoingStream( std::forward_as_tuple(
stream_id, default_priority_, &scheduler_, stream_id, default_priority_,
[this, stream_id]() { on_buffered_amount_low_(stream_id); }, [this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_)) total_buffered_amount_))
.first->second; .first->second;
@ -562,9 +518,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
state.tx.streams) { state.tx.streams) {
StreamID stream_id(state_stream.id); StreamID stream_id(state_stream.id);
streams_.emplace( streams_.emplace(
stream_id, std::piecewise_construct, std::forward_as_tuple(stream_id),
OutgoingStream( std::forward_as_tuple(
stream_id, StreamPriority(state_stream.priority), &scheduler_, stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { on_buffered_amount_low_(stream_id); }, [this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_, &state_stream)); total_buffered_amount_, &state_stream));
} }

View file

@ -13,6 +13,7 @@
#include <cstdint> #include <cstdint>
#include <deque> #include <deque>
#include <map> #include <map>
#include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -25,6 +26,7 @@
#include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h" #include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h" #include "net/dcsctp/tx/send_queue.h"
#include "net/dcsctp/tx/stream_scheduler.h"
namespace dcsctp { namespace dcsctp {
@ -111,32 +113,33 @@ class RRSendQueue : public SendQueue {
}; };
// Per-stream information. // Per-stream information.
class OutgoingStream { class OutgoingStream : public StreamScheduler::StreamProducer {
public: public:
OutgoingStream( OutgoingStream(
StreamScheduler* scheduler,
StreamID stream_id, StreamID stream_id,
StreamPriority priority, StreamPriority priority,
std::function<void()> on_buffered_amount_low, std::function<void()> on_buffered_amount_low,
ThresholdWatcher& total_buffered_amount, ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
: stream_id_(stream_id), : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
priority_(priority),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)),
buffered_amount_(std::move(on_buffered_amount_low)), buffered_amount_(std::move(on_buffered_amount_low)),
total_buffered_amount_(total_buffered_amount) {} total_buffered_amount_(total_buffered_amount) {}
StreamID stream_id() const { return stream_id_; } StreamID stream_id() const { return scheduler_stream_->stream_id(); }
// Enqueues a message to this stream. // Enqueues a message to this stream.
void Add(DcSctpMessage message, void Add(DcSctpMessage message,
TimeMs expires_at, TimeMs expires_at,
const SendOptions& send_options); const SendOptions& send_options);
// Produces a data chunk to send, or `absl::nullopt` if nothing could be // Implementing `StreamScheduler::StreamProducer`.
// produced, e.g. if all messages have expired. absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size); size_t max_size) override;
size_t bytes_to_send_in_next_message() const override;
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
ThresholdWatcher& buffered_amount() { return buffered_amount_; } ThresholdWatcher& buffered_amount() { return buffered_amount_; }
@ -167,12 +170,10 @@ class RRSendQueue : public SendQueue {
// Indicates if this stream has a partially sent message in it. // Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const; bool has_partially_sent_message() const;
// Indicates if the stream possibly has data to send. Note that it may StreamPriority priority() const { return scheduler_stream_->priority(); }
// return `true` for streams that have enqueued, but expired, messages. void set_priority(StreamPriority priority) {
bool HasDataToSend() const; scheduler_stream_->set_priority(priority);
}
void set_priority(StreamPriority priority) { priority_ = priority; }
StreamPriority priority() const { return priority_; }
void AddHandoverState( void AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const; DcSctpSocketHandoverState::OutgoingStream& state) const;
@ -225,8 +226,8 @@ class RRSendQueue : public SendQueue {
bool IsConsistent() const; bool IsConsistent() const;
const StreamID stream_id_; const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
StreamPriority priority_;
PauseState pause_state_ = PauseState::kNotPaused; PauseState pause_state_ = PauseState::kNotPaused;
// MIDs are different for unordered and ordered messages sent on a stream. // MIDs are different for unordered and ordered messages sent on a stream.
MID next_unordered_mid_; MID next_unordered_mid_;
@ -251,12 +252,10 @@ class RRSendQueue : public SendQueue {
TimeMs now, TimeMs now,
size_t max_size); size_t max_size);
// Return the next stream, in round-robin fashion.
std::map<StreamID, OutgoingStream>::iterator GetNextStream();
const std::string log_prefix_; const std::string log_prefix_;
const size_t buffer_size_; const size_t buffer_size_;
const StreamPriority default_priority_; const StreamPriority default_priority_;
StreamScheduler scheduler_;
// Called when the buffered amount is below what has been set using // Called when the buffered amount is below what has been set using
// `SetBufferedAmountLowThreshold`. // `SetBufferedAmountLowThreshold`.
@ -269,15 +268,6 @@ class RRSendQueue : public SendQueue {
// The total amount of buffer data, for all streams. // The total amount of buffer data, for all streams.
ThresholdWatcher total_buffered_amount_; ThresholdWatcher total_buffered_amount_;
// Indicates if the previous fragment sent was the end of a message. For
// non-interleaved sending, this means that the next message may come from a
// different stream. If not true, the next fragment must be produced from the
// same stream as last time.
bool previous_message_has_ended_ = true;
// The current stream to send chunks from. Modified by `GetNextStream`.
StreamID current_stream_id_ = StreamID(0);
// All streams, and messages added to those. // All streams, and messages added to those.
std::map<StreamID, OutgoingStream> streams_; std::map<StreamID, OutgoingStream> streams_;
}; };

View file

@ -114,7 +114,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime()
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce( absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
TimeMs now, TimeMs now,
size_t max_size) { size_t max_size) {
absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size); absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
if (data.has_value()) { if (data.has_value()) {
VirtualTime new_current = GetNextFinishTime(); VirtualTime new_current = GetNextFinishTime();

View file

@ -56,9 +56,9 @@ class StreamScheduler {
}; };
public: public:
class StreamCallback { class StreamProducer {
public: public:
virtual ~StreamCallback() = default; virtual ~StreamProducer() = default;
// Produces a fragment of data to send. The current wall time is specified // Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime. // as `now` and should be used to skip chunks with expired limited lifetime.
@ -99,11 +99,11 @@ class StreamScheduler {
friend class StreamScheduler; friend class StreamScheduler;
Stream(StreamScheduler* parent, Stream(StreamScheduler* parent,
StreamCallback* callback, StreamProducer* producer,
StreamID stream_id, StreamID stream_id,
StreamPriority priority) StreamPriority priority)
: parent_(*parent), : parent_(*parent),
callback_(*callback), producer_(*producer),
stream_id_(stream_id), stream_id_(stream_id),
priority_(priority) {} priority_(priority) {}
@ -117,14 +117,14 @@ class StreamScheduler {
VirtualTime current_time() const { return current_virtual_time_; } VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; } VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const { size_t bytes_to_send_in_next_message() const {
return callback_.bytes_to_send_in_next_message(); return producer_.bytes_to_send_in_next_message();
} }
// Returns the next virtual finish time for this stream. // Returns the next virtual finish time for this stream.
VirtualTime GetNextFinishTime() const; VirtualTime GetNextFinishTime() const;
StreamScheduler& parent_; StreamScheduler& parent_;
StreamCallback& callback_; StreamProducer& producer_;
const StreamID stream_id_; const StreamID stream_id_;
StreamPriority priority_; StreamPriority priority_;
// This outgoing stream's "current" virtual_time. // This outgoing stream's "current" virtual_time.
@ -132,10 +132,10 @@ class StreamScheduler {
VirtualTime next_finish_time_ = VirtualTime::Zero(); VirtualTime next_finish_time_ = VirtualTime::Zero();
}; };
std::unique_ptr<Stream> CreateStream(StreamCallback* callback, std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
StreamID stream_id, StreamID stream_id,
StreamPriority priority) { StreamPriority priority) {
return absl::WrapUnique(new Stream(this, callback, stream_id, priority)); return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
} }
// Makes the scheduler stop producing message from the current stream and // Makes the scheduler stop producing message from the current stream and

View file

@ -59,7 +59,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
return packet_counts; return packet_counts;
} }
class MockStreamCallback : public StreamScheduler::StreamCallback { class MockStreamProducer : public StreamScheduler::StreamProducer {
public: public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>, MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce, Produce,
@ -74,18 +74,18 @@ class TestStream {
StreamID stream_id, StreamID stream_id,
StreamPriority priority, StreamPriority priority,
size_t packet_size = kPayloadSize) { size_t packet_size = kPayloadSize) {
EXPECT_CALL(callback_, Produce) EXPECT_CALL(producer_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
EXPECT_CALL(callback_, bytes_to_send_in_next_message) EXPECT_CALL(producer_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size)); .WillRepeatedly(Return(packet_size));
stream_ = scheduler.CreateStream(&callback_, stream_id, priority); stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
stream_->MaybeMakeActive(); stream_->MaybeMakeActive();
} }
StreamScheduler::Stream& stream() { return *stream_; } StreamScheduler::Stream& stream() { return *stream_; }
private: private:
StrictMock<MockStreamCallback> callback_; StrictMock<MockStreamProducer> producer_;
std::unique_ptr<StreamScheduler::Stream> stream_; std::unique_ptr<StreamScheduler::Stream> stream_;
}; };
@ -100,9 +100,9 @@ TEST(StreamSchedulerTest, HasNoActiveStreams) {
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback; StrictMock<MockStreamProducer> producer;
auto stream = auto stream =
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1)); EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2)); EXPECT_EQ(stream->priority(), StreamPriority(2));
@ -115,13 +115,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
TEST(StreamSchedulerTest, CanProduceFromSingleStream) { TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback; StrictMock<MockStreamProducer> producer;
EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
EXPECT_CALL(callback, bytes_to_send_in_next_message) EXPECT_CALL(producer, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream = auto stream =
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive(); stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
@ -132,32 +132,32 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1; StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(callback1, Produce) EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102))); .WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message) EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream1 = auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive(); stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2; StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(callback2, Produce) EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202))); .WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message) EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream2 = auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive(); stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -174,8 +174,8 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1; StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(callback1, Produce) EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce([](...) { .WillOnce([](...) {
return SendQueue::DataToSend( return SendQueue::DataToSend(
@ -196,7 +196,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
Data::IsEnd(true), IsUnordered(true))); Data::IsEnd(true), IsUnordered(true)));
}) })
.WillOnce(CreateChunk(StreamID(1), MID(102))); .WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message) EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
@ -204,21 +204,21 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream1 = auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive(); stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2; StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(callback2, Produce) EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202))); .WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message) EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream2 = auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive(); stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -236,16 +236,16 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1; StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(callback1, Produce) EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101))); .WillOnce(CreateChunk(StreamID(1), MID(101)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message) EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming. .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 = auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive(); stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -260,20 +260,20 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1; StrictMock<MockStreamProducer> producer1;
// Callbacks are setup so that they hint that there is a MID(2) coming... // Callbacks are setup so that they hint that there is a MID(2) coming...
EXPECT_CALL(callback1, Produce) EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102))); .WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message) EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again .WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream1 = auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive(); stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@ -290,33 +290,33 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler; StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1; StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(callback1, Produce) EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100))) .WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101))) .WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102))); .WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message) EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream1 = auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive(); stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2; StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(callback2, Produce) EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200))) .WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201))) .WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202))); .WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message) EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) .WillOnce(Return(kPayloadSize))
.WillOnce(Return(0)); .WillOnce(Return(0));
auto stream2 = auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive(); stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));