From 93faab1b51a11cf997260c84a57565001cf9c9c6 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 20 May 2021 13:08:21 +0200 Subject: [PATCH] dcsctp: Implement Round Robin scheduler Bug: webrtc:12793 Change-Id: I19adb292443def42ee54df67c4869b980db7b7c0 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219682 Commit-Queue: Victor Boivie Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/master@{#34093} --- net/dcsctp/tx/BUILD.gn | 1 + net/dcsctp/tx/rr_send_queue.cc | 288 +++++++++++++++++----------- net/dcsctp/tx/rr_send_queue.h | 112 +++++++---- net/dcsctp/tx/rr_send_queue_test.cc | 177 ++++++++++++++--- 4 files changed, 404 insertions(+), 174 deletions(-) diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index 641c8a6519..2f0b27afc6 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -127,6 +127,7 @@ if (rtc_include_tests) { "../public:socket", "../public:types", "../testing:data_generator", + "../testing:testing_macros", "../timer", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index f2d22c8576..7f913393c8 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -11,8 +11,7 @@ #include #include -#include -#include +#include #include #include @@ -22,52 +21,16 @@ #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/logging.h" namespace dcsctp { -void RRSendQueue::Add(TimeMs now, - DcSctpMessage message, - const SendOptions& send_options) { - RTC_DCHECK(!message.payload().empty()); - std::deque& queue = - IsPaused(message.stream_id()) ? paused_items_ : items_; - // Any limited lifetime should start counting from now - when the message - // has been added to the queue. - absl::optional expires_at = absl::nullopt; - if (send_options.lifetime.has_value()) { - // `expires_at` is the time when it expires. Which is slightly larger than - // the message's lifetime, as the message is alive during its entire - // lifetime (which may be zero). - expires_at = now + *send_options.lifetime + DurationMs(1); - } - queue.emplace_back(std::move(message), expires_at, send_options); -} -size_t RRSendQueue::total_bytes() const { - // TODO(boivie): Have the current size as a member variable, so that's it not - // calculated for every operation. - return absl::c_accumulate(items_, 0, - [](size_t size, const Item& item) { - return size + item.remaining_size; - }) + - absl::c_accumulate(paused_items_, 0, - [](size_t size, const Item& item) { - return size + item.remaining_size; - }); -} - -bool RRSendQueue::IsFull() const { - return total_bytes() >= buffer_size_; -} - -bool RRSendQueue::IsEmpty() const { - return items_.empty(); -} - -RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) { +RRSendQueue::OutgoingStream::Item* +RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) { while (!items_.empty()) { - RRSendQueue::Item& item = items_.front(); + RRSendQueue::OutgoingStream::Item& item = items_.front(); // An entire item can be discarded iff: // 1) It hasn't been partially sent (has been allocated a message_id). // 2) It has a non-negative expiry time. @@ -75,9 +38,6 @@ RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) { if (!item.message_id.has_value() && item.expires_at.has_value() && *item.expires_at <= now) { // TODO(boivie): This should be reported to the client. - RTC_DLOG(LS_VERBOSE) - << log_prefix_ - << "Message is expired before even partially sent - discarding"; items_.pop_front(); continue; } @@ -87,35 +47,42 @@ RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) { return nullptr; } -absl::optional RRSendQueue::Produce(TimeMs now, - size_t max_size) { +void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, + absl::optional expires_at, + const SendOptions& send_options) { + items_.emplace_back(std::move(message), expires_at, send_options); +} + +absl::optional RRSendQueue::OutgoingStream::Produce( + TimeMs now, + size_t max_size) { Item* item = GetFirstNonExpiredMessage(now); if (item == nullptr) { return absl::nullopt; } + // If a stream is paused, it will allow sending all partially sent messages + // but will not start sending new fragments of completely unsent messages. + if (is_paused_ && !item->message_id.has_value()) { + return absl::nullopt; + } + DcSctpMessage& message = item->message; - // Don't make too small fragments as that can result in increased risk of - // failure to assemble a message if a small fragment is missing. if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Will not fragment " - << item->remaining_size << " bytes into buffer of " - << max_size << " bytes"; return absl::nullopt; } // Allocate Message ID and SSN when the first fragment is sent. if (!item->message_id.has_value()) { MID& mid = - mid_by_stream_id_[{item->send_options.unordered, message.stream_id()}]; + item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_; item->message_id = mid; mid = MID(*mid + 1); } if (!item->send_options.unordered && !item->ssn.has_value()) { - SSN& ssn = ssn_by_stream_id_[message.stream_id()]; - item->ssn = ssn; - ssn = SSN(*ssn + 1); + item->ssn = next_ssn_; + next_ssn_ = SSN(*next_ssn_ + 1); } // Grab the next `max_size` fragment from this message and calculate flags. @@ -157,38 +124,39 @@ absl::optional RRSendQueue::Produce(TimeMs now, item->message.payload().size()); RTC_DCHECK(item->remaining_size > 0); } - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of " - << chunk.data.size() << " bytes (max: " << max_size - << ")"; return chunk; } -void RRSendQueue::Discard(IsUnordered unordered, - StreamID stream_id, - MID message_id) { - // As this method will only discard partially sent messages, and as the queue - // is a FIFO queue, the only partially sent message would be the topmost - // message. +size_t RRSendQueue::OutgoingStream::buffered_amount() const { + size_t bytes = 0; + for (const auto& item : items_) { + bytes += item.remaining_size; + } + return bytes; +} + +void RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, + MID message_id) { if (!items_.empty()) { Item& item = items_.front(); if (item.send_options.unordered == unordered && - item.message.stream_id() == stream_id && item.message_id.has_value() && - *item.message_id == message_id) { + item.message_id.has_value() && *item.message_id == message_id) { items_.pop_front(); } } } -void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { - for (StreamID stream_id : streams) { - paused_streams_.insert(stream_id); - } +void RRSendQueue::OutgoingStream::Pause() { + is_paused_ = true; - // Will not discard partially sent messages - only whole messages. Partially - // delivered messages (at the time of receiving a Stream Reset command) will - // always deliver all the fragments before actually resetting the stream. + // A stream is pause when it's about to be reset. In this implementation, + // it will throw away all non-partially send messages. This is subject to + // change. It will however not discard any partially sent messages - only + // whole messages. Partially delivered messages (at the time of receiving a + // Stream Reset command) will always deliver all the fragments before actually + // resetting the stream. for (auto it = items_.begin(); it != items_.end();) { - if (IsPaused(it->message.stream_id()) && it->remaining_offset == 0) { + if (it->remaining_offset == 0) { it = items_.erase(it); } else { ++it; @@ -196,37 +164,7 @@ void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { } } -bool RRSendQueue::CanResetStreams() const { - for (auto& item : items_) { - if (IsPaused(item.message.stream_id())) { - return false; - } - } - return true; -} - -void RRSendQueue::CommitResetStreams() { - for (StreamID stream_id : paused_streams_) { - ssn_by_stream_id_[stream_id] = SSN(0); - // https://tools.ietf.org/html/rfc8260#section-2.3.2 - // "When an association resets the SSN using the SCTP extension defined - // in [RFC6525], the two counters (one for the ordered messages, one for - // the unordered messages) used for the MIDs MUST be reset to 0." - mid_by_stream_id_[{IsUnordered(false), stream_id}] = MID(0); - mid_by_stream_id_[{IsUnordered(true), stream_id}] = MID(0); - } - RollbackResetStreams(); -} - -void RRSendQueue::RollbackResetStreams() { - while (!paused_items_.empty()) { - items_.push_back(std::move(paused_items_.front())); - paused_items_.pop_front(); - } - paused_streams_.clear(); -} - -void RRSendQueue::Reset() { +void RRSendQueue::OutgoingStream::Reset() { if (!items_.empty()) { // If this message has been partially sent, reset it so that it will be // re-sent. @@ -237,13 +175,141 @@ void RRSendQueue::Reset() { item.ssn = absl::nullopt; item.current_fsn = FSN(0); } - RollbackResetStreams(); - mid_by_stream_id_.clear(); - ssn_by_stream_id_.clear(); + is_paused_ = false; + next_ordered_mid_ = MID(0); + next_unordered_mid_ = MID(0); + next_ssn_ = SSN(0); } -bool RRSendQueue::IsPaused(StreamID stream_id) const { - return paused_streams_.find(stream_id) != paused_streams_.end(); +bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { + if (items_.empty()) { + return false; + } + return items_.front().message_id.has_value(); } +void RRSendQueue::Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options) { + RTC_DCHECK(!message.payload().empty()); + // Any limited lifetime should start counting from now - when the message + // has been added to the queue. + absl::optional expires_at = absl::nullopt; + if (send_options.lifetime.has_value()) { + // `expires_at` is the time when it expires. Which is slightly larger than + // the message's lifetime, as the message is alive during its entire + // lifetime (which may be zero). + expires_at = now + *send_options.lifetime + DurationMs(1); + } + GetOrCreateStreamInfo(message.stream_id()) + .Add(std::move(message), expires_at, send_options); +} + +size_t RRSendQueue::total_bytes() const { + // TODO(boivie): Have the current size as a member variable, so that's it not + // calculated for every operation. + size_t bytes = 0; + for (const auto& stream : streams_) { + bytes += stream.second.buffered_amount(); + } + + return bytes; +} + +bool RRSendQueue::IsFull() const { + return total_bytes() >= buffer_size_; +} + +bool RRSendQueue::IsEmpty() const { + return total_bytes() == 0; +} + +absl::optional RRSendQueue::Produce( + std::map::iterator it, + TimeMs now, + size_t max_size) { + absl::optional data = it->second.Produce(now, max_size); + if (data.has_value()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of " + << data->data.size() << " bytes (max: " << max_size + << ")"; + + if (data->data.is_end) { + // No more fragments. Continue with the next stream next time. + next_stream_id_ = StreamID(*it->first + 1); + } + } + + return data; +} + +absl::optional RRSendQueue::Produce(TimeMs now, + size_t max_size) { + auto start_it = streams_.lower_bound(next_stream_id_); + for (auto it = start_it; it != streams_.end(); ++it) { + absl::optional ret = Produce(it, now, max_size); + if (ret.has_value()) { + return ret; + } + } + + for (auto it = streams_.begin(); it != start_it; ++it) { + absl::optional ret = Produce(it, now, max_size); + if (ret.has_value()) { + return ret; + } + } + return absl::nullopt; +} + +void RRSendQueue::Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) { + GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); +} + +void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { + for (StreamID stream_id : streams) { + GetOrCreateStreamInfo(stream_id).Pause(); + } +} + +bool RRSendQueue::CanResetStreams() const { + // Streams can be reset if those streams that are paused don't have any + // messages that are partially sent. + for (auto& stream : streams_) { + if (stream.second.is_paused() && + stream.second.has_partially_sent_message()) { + return false; + } + } + return true; +} + +void RRSendQueue::CommitResetStreams() { + Reset(); +} + +void RRSendQueue::RollbackResetStreams() { + for (auto& stream_entry : streams_) { + stream_entry.second.Resume(); + } +} + +void RRSendQueue::Reset() { + for (auto& stream_entry : streams_) { + OutgoingStream& stream = stream_entry.second; + stream.Reset(); + } +} + +RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( + StreamID stream_id) { + auto it = streams_.find(stream_id); + if (it != streams_.end()) { + return it->second; + } + + return streams_.emplace(stream_id, OutgoingStream()).first->second; +} } // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index c43dc91881..abbe70205d 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -12,9 +12,8 @@ #include #include +#include #include -#include -#include #include #include "absl/algorithm/container.h" @@ -78,44 +77,91 @@ class RRSendQueue : public SendQueue { size_t total_bytes() const; private: - // An enqueued message and metadata. - struct Item { - explicit Item(DcSctpMessage msg, - absl::optional expires_at, - const SendOptions& send_options) - : message(std::move(msg)), - expires_at(expires_at), - send_options(send_options), - remaining_offset(0), - remaining_size(message.payload().size()) {} - DcSctpMessage message; - absl::optional expires_at; - SendOptions send_options; - // The remaining payload (offset and size) to be sent, when it has been - // fragmented. - size_t remaining_offset; - size_t remaining_size; - // If set, an allocated Message ID and SSN. Will be allocated when the first - // fragment is sent. - absl::optional message_id = absl::nullopt; - absl::optional ssn = absl::nullopt; - // The current Fragment Sequence Number, incremented for each fragment. - FSN current_fsn = FSN(0); + // Per-stream information. + class OutgoingStream { + public: + // Enqueues a message to this stream. + void Add(DcSctpMessage message, + absl::optional expires_at, + const SendOptions& send_options); + + // Possibly produces a data chunk to send. + absl::optional Produce(TimeMs now, size_t max_size); + + // The amount of data enqueued on this stream. + size_t buffered_amount() const; + + // Discards a partially sent message, see `SendQueue::Discard`. + void Discard(IsUnordered unordered, MID message_id); + + // Pauses this stream, which is used before resetting it. + void Pause(); + + // Resumes a paused stream. + void Resume() { is_paused_ = false; } + + bool is_paused() const { return is_paused_; } + + // Resets this stream, meaning MIDs and SSNs are set to zero. + void Reset(); + + // Indicates if this stream has a partially sent message in it. + bool has_partially_sent_message() const; + + private: + // An enqueued message and metadata. + struct Item { + explicit Item(DcSctpMessage msg, + absl::optional expires_at, + const SendOptions& send_options) + : message(std::move(msg)), + expires_at(expires_at), + send_options(send_options), + remaining_offset(0), + remaining_size(message.payload().size()) {} + DcSctpMessage message; + absl::optional expires_at; + SendOptions send_options; + // The remaining payload (offset and size) to be sent, when it has been + // fragmented. + size_t remaining_offset; + size_t remaining_size; + // If set, an allocated Message ID and SSN. Will be allocated when the + // first fragment is sent. + absl::optional message_id = absl::nullopt; + absl::optional ssn = absl::nullopt; + // The current Fragment Sequence Number, incremented for each fragment. + FSN current_fsn = FSN(0); + }; + + // Returns the first non-expired message, or nullptr if there isn't one. + Item* GetFirstNonExpiredMessage(TimeMs now); + + // Streams are pause when they are about to be reset. + bool is_paused_ = false; + // MIDs are different for unordered and ordered messages sent on a stream. + MID next_unordered_mid_ = MID(0); + MID next_ordered_mid_ = MID(0); + + SSN next_ssn_ = SSN(0); + // Enqueued messages, and metadata. + std::deque items_; }; - Item* GetFirstNonExpiredMessage(TimeMs now); - bool IsPaused(StreamID stream_id) const; + OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); + absl::optional Produce( + std::map::iterator it, + TimeMs now, + size_t max_size); const std::string log_prefix_; const size_t buffer_size_; - std::deque items_; - std::unordered_set paused_streams_; - std::deque paused_items_; + // The next stream to send chunks from. + StreamID next_stream_id_ = StreamID(0); - std::unordered_map, MID, UnorderedStreamHash> - mid_by_stream_id_; - std::unordered_map ssn_by_stream_id_; + // All streams, and messages added to those. + std::map streams_; }; } // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 0f6fd2bd05..5e99bb4baf 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -18,20 +18,25 @@ #include "net/dcsctp/public/dcsctp_options.h" #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" +#include "net/dcsctp/testing/testing_macros.h" #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/gunit.h" #include "test/gmock.h" namespace dcsctp { namespace { +using ::testing::SizeIs; constexpr TimeMs kNow = TimeMs(0); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); +constexpr size_t kMaxQueueSize = 1000; +constexpr size_t kOneFragmentPacketSize = 100; +constexpr size_t kTwoFragmentPacketSize = 101; class RRSendQueueTest : public testing::Test { protected: - RRSendQueueTest() : buf_("log: ", 100) {} + RRSendQueueTest() : buf_("log: ", kMaxQueueSize) {} const DcSctpOptions options_; RRSendQueue buf_; @@ -39,7 +44,7 @@ class RRSendQueueTest : public testing::Test { TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); - EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); EXPECT_FALSE(buf_.IsFull()); } @@ -48,7 +53,8 @@ TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { EXPECT_FALSE(buf_.IsEmpty()); EXPECT_FALSE(buf_.IsFull()); - absl::optional chunk_opt = buf_.Produce(kNow, 100); + absl::optional chunk_opt = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); EXPECT_TRUE(chunk_opt->data.is_beginning); EXPECT_TRUE(chunk_opt->data.is_end); @@ -76,7 +82,7 @@ TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) { EXPECT_FALSE(chunk_end->data.is_beginning); EXPECT_TRUE(chunk_end->data.is_end); - EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); } TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { @@ -84,14 +90,16 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - absl::optional chunk_one = buf_.Produce(kNow, 100); + absl::optional chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); EXPECT_TRUE(chunk_one->data.is_beginning); EXPECT_TRUE(chunk_one->data.is_end); - absl::optional chunk_two = buf_.Produce(kNow, 100); + absl::optional chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); @@ -100,7 +108,7 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { } TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { - std::vector payload(60); + std::vector payload(600); EXPECT_FALSE(buf_.IsFull()); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); EXPECT_FALSE(buf_.IsFull()); @@ -112,14 +120,14 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); EXPECT_TRUE(buf_.IsFull()); - absl::optional chunk_one = buf_.Produce(kNow, 100); + absl::optional chunk_one = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); EXPECT_TRUE(buf_.IsFull()); - absl::optional chunk_two = buf_.Produce(kNow, 100); + absl::optional chunk_two = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); @@ -127,7 +135,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { EXPECT_FALSE(buf_.IsFull()); EXPECT_FALSE(buf_.IsEmpty()); - absl::optional chunk_three = buf_.Produce(kNow, 100); + absl::optional chunk_three = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); @@ -171,7 +179,7 @@ TEST_F(RRSendQueueTest, DefaultsToOrderedSend) { // Default is ordered buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); absl::optional chunk_one = - buf_.Produce(kNow, /*max_size=*/100); + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_FALSE(chunk_one->data.is_unordered); @@ -180,7 +188,7 @@ TEST_F(RRSendQueueTest, DefaultsToOrderedSend) { opts.unordered = IsUnordered(true); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts); absl::optional chunk_two = - buf_.Produce(kNow, /*max_size=*/100); + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_TRUE(chunk_two->data.is_unordered); } @@ -192,7 +200,7 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { TimeMs now = kNow; buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); now += DurationMs(1000000); - ASSERT_TRUE(buf_.Produce(now, 100)); + ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); SendOptions expires_2_seconds; expires_2_seconds.lifetime = DurationMs(2000); @@ -200,17 +208,17 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { // Add and consume within lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); now += DurationMs(2000); - ASSERT_TRUE(buf_.Produce(now, 100)); + ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); // Add and consume just outside lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); now += DurationMs(2001); - ASSERT_FALSE(buf_.Produce(now, 100)); + ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // A long time after expiry buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); now += DurationMs(1000000); - ASSERT_FALSE(buf_.Produce(now, 100)); + ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // Expire one message, but produce the second that is not expired. buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); @@ -221,8 +229,8 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); now += DurationMs(2001); - ASSERT_TRUE(buf_.Produce(now, 100)); - ASSERT_FALSE(buf_.Produce(now, 100)); + ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); } TEST_F(RRSendQueueTest, DiscardPartialPackets) { @@ -231,28 +239,31 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload)); - absl::optional chunk_one = buf_.Produce(kNow, 100); + absl::optional chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_FALSE(chunk_one->data.is_end); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, chunk_one->data.message_id); - absl::optional chunk_two = buf_.Produce(kNow, 100); + absl::optional chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_FALSE(chunk_two->data.is_end); EXPECT_EQ(chunk_two->data.stream_id, StreamID(2)); - absl::optional chunk_three = buf_.Produce(kNow, 100); + absl::optional chunk_three = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_TRUE(chunk_three->data.is_end); EXPECT_EQ(chunk_three->data.stream_id, StreamID(2)); - ASSERT_FALSE(buf_.Produce(kNow, 100)); + ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); // Calling it again shouldn't cause issues. buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, chunk_one->data.message_id); - ASSERT_FALSE(buf_.Produce(kNow, 100)); + ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); } TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { @@ -292,7 +303,7 @@ TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); EXPECT_EQ(buf_.total_bytes(), payload.size()); - EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); buf_.CommitResetStreams(); EXPECT_EQ(buf_.total_bytes(), payload.size()); @@ -308,11 +319,13 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - absl::optional chunk_one = buf_.Produce(kNow, 100); + absl::optional chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.ssn, SSN(0)); - absl::optional chunk_two = buf_.Produce(kNow, 100); + absl::optional chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); @@ -325,7 +338,8 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) { EXPECT_TRUE(buf_.CanResetStreams()); buf_.CommitResetStreams(); - absl::optional chunk_three = buf_.Produce(kNow, 100); + absl::optional chunk_three = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.ssn, SSN(0)); } @@ -336,11 +350,13 @@ TEST_F(RRSendQueueTest, RollBackResumesSSN) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - absl::optional chunk_one = buf_.Produce(kNow, 100); + absl::optional chunk_one = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.ssn, SSN(0)); - absl::optional chunk_two = buf_.Produce(kNow, 100); + absl::optional chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); @@ -352,10 +368,111 @@ TEST_F(RRSendQueueTest, RollBackResumesSSN) { EXPECT_TRUE(buf_.CanResetStreams()); buf_.RollbackResetStreams(); - absl::optional chunk_three = buf_.Produce(kNow, 100); + absl::optional chunk_three = + buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.ssn, SSN(2)); } +TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) { + std::vector payload(200); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(2)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(2)); +} + +TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) { + std::vector payload(kTwoFragmentPacketSize); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, + SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk4.data.payload, + SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize)); +} + +TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) { + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(2))); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(3))); + buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(4))); + buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(5))); + buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(6))); + buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector(7))); + buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector(8))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk2.data.payload, SizeIs(3)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(3)); + EXPECT_THAT(chunk3.data.payload, SizeIs(5)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(4)); + EXPECT_THAT(chunk4.data.payload, SizeIs(7)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk5.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk5.data.payload, SizeIs(2)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk6.data.stream_id, StreamID(2)); + EXPECT_THAT(chunk6.data.payload, SizeIs(4)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk7.data.stream_id, StreamID(3)); + EXPECT_THAT(chunk7.data.payload, SizeIs(6)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk8.data.stream_id, StreamID(4)); + EXPECT_THAT(chunk8.data.payload, SizeIs(8)); +} } // namespace } // namespace dcsctp