mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-14 06:10:40 +01:00
dcsctp: Implement Round Robin scheduler
Bug: webrtc:12793 Change-Id: I19adb292443def42ee54df67c4869b980db7b7c0 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219682 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34093}
This commit is contained in:
parent
70cd086644
commit
93faab1b51
4 changed files with 404 additions and 174 deletions
|
@ -127,6 +127,7 @@ if (rtc_include_tests) {
|
||||||
"../public:socket",
|
"../public:socket",
|
||||||
"../public:types",
|
"../public:types",
|
||||||
"../testing:data_generator",
|
"../testing:data_generator",
|
||||||
|
"../testing:testing_macros",
|
||||||
"../timer",
|
"../timer",
|
||||||
]
|
]
|
||||||
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
||||||
|
|
|
@ -11,8 +11,7 @@
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <unordered_map>
|
#include <map>
|
||||||
#include <unordered_set>
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
@ -22,52 +21,16 @@
|
||||||
#include "net/dcsctp/packet/data.h"
|
#include "net/dcsctp/packet/data.h"
|
||||||
#include "net/dcsctp/public/dcsctp_message.h"
|
#include "net/dcsctp/public/dcsctp_message.h"
|
||||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||||
|
#include "net/dcsctp/public/types.h"
|
||||||
#include "net/dcsctp/tx/send_queue.h"
|
#include "net/dcsctp/tx/send_queue.h"
|
||||||
#include "rtc_base/logging.h"
|
#include "rtc_base/logging.h"
|
||||||
|
|
||||||
namespace dcsctp {
|
namespace dcsctp {
|
||||||
void RRSendQueue::Add(TimeMs now,
|
|
||||||
DcSctpMessage message,
|
|
||||||
const SendOptions& send_options) {
|
|
||||||
RTC_DCHECK(!message.payload().empty());
|
|
||||||
std::deque<Item>& queue =
|
|
||||||
IsPaused(message.stream_id()) ? paused_items_ : items_;
|
|
||||||
// Any limited lifetime should start counting from now - when the message
|
|
||||||
// has been added to the queue.
|
|
||||||
absl::optional<TimeMs> expires_at = absl::nullopt;
|
|
||||||
if (send_options.lifetime.has_value()) {
|
|
||||||
// `expires_at` is the time when it expires. Which is slightly larger than
|
|
||||||
// the message's lifetime, as the message is alive during its entire
|
|
||||||
// lifetime (which may be zero).
|
|
||||||
expires_at = now + *send_options.lifetime + DurationMs(1);
|
|
||||||
}
|
|
||||||
queue.emplace_back(std::move(message), expires_at, send_options);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t RRSendQueue::total_bytes() const {
|
RRSendQueue::OutgoingStream::Item*
|
||||||
// TODO(boivie): Have the current size as a member variable, so that's it not
|
RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) {
|
||||||
// calculated for every operation.
|
|
||||||
return absl::c_accumulate(items_, 0,
|
|
||||||
[](size_t size, const Item& item) {
|
|
||||||
return size + item.remaining_size;
|
|
||||||
}) +
|
|
||||||
absl::c_accumulate(paused_items_, 0,
|
|
||||||
[](size_t size, const Item& item) {
|
|
||||||
return size + item.remaining_size;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RRSendQueue::IsFull() const {
|
|
||||||
return total_bytes() >= buffer_size_;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RRSendQueue::IsEmpty() const {
|
|
||||||
return items_.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
|
|
||||||
while (!items_.empty()) {
|
while (!items_.empty()) {
|
||||||
RRSendQueue::Item& item = items_.front();
|
RRSendQueue::OutgoingStream::Item& item = items_.front();
|
||||||
// An entire item can be discarded iff:
|
// An entire item can be discarded iff:
|
||||||
// 1) It hasn't been partially sent (has been allocated a message_id).
|
// 1) It hasn't been partially sent (has been allocated a message_id).
|
||||||
// 2) It has a non-negative expiry time.
|
// 2) It has a non-negative expiry time.
|
||||||
|
@ -75,9 +38,6 @@ RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
|
||||||
if (!item.message_id.has_value() && item.expires_at.has_value() &&
|
if (!item.message_id.has_value() && item.expires_at.has_value() &&
|
||||||
*item.expires_at <= now) {
|
*item.expires_at <= now) {
|
||||||
// TODO(boivie): This should be reported to the client.
|
// TODO(boivie): This should be reported to the client.
|
||||||
RTC_DLOG(LS_VERBOSE)
|
|
||||||
<< log_prefix_
|
|
||||||
<< "Message is expired before even partially sent - discarding";
|
|
||||||
items_.pop_front();
|
items_.pop_front();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -87,35 +47,42 @@ RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
||||||
|
absl::optional<TimeMs> expires_at,
|
||||||
|
const SendOptions& send_options) {
|
||||||
|
items_.emplace_back(std::move(message), expires_at, send_options);
|
||||||
|
}
|
||||||
|
|
||||||
|
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
||||||
|
TimeMs now,
|
||||||
size_t max_size) {
|
size_t max_size) {
|
||||||
Item* item = GetFirstNonExpiredMessage(now);
|
Item* item = GetFirstNonExpiredMessage(now);
|
||||||
if (item == nullptr) {
|
if (item == nullptr) {
|
||||||
return absl::nullopt;
|
return absl::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a stream is paused, it will allow sending all partially sent messages
|
||||||
|
// but will not start sending new fragments of completely unsent messages.
|
||||||
|
if (is_paused_ && !item->message_id.has_value()) {
|
||||||
|
return absl::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
DcSctpMessage& message = item->message;
|
DcSctpMessage& message = item->message;
|
||||||
|
|
||||||
// Don't make too small fragments as that can result in increased risk of
|
|
||||||
// failure to assemble a message if a small fragment is missing.
|
|
||||||
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
|
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
|
||||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Will not fragment "
|
|
||||||
<< item->remaining_size << " bytes into buffer of "
|
|
||||||
<< max_size << " bytes";
|
|
||||||
return absl::nullopt;
|
return absl::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allocate Message ID and SSN when the first fragment is sent.
|
// Allocate Message ID and SSN when the first fragment is sent.
|
||||||
if (!item->message_id.has_value()) {
|
if (!item->message_id.has_value()) {
|
||||||
MID& mid =
|
MID& mid =
|
||||||
mid_by_stream_id_[{item->send_options.unordered, message.stream_id()}];
|
item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
|
||||||
item->message_id = mid;
|
item->message_id = mid;
|
||||||
mid = MID(*mid + 1);
|
mid = MID(*mid + 1);
|
||||||
}
|
}
|
||||||
if (!item->send_options.unordered && !item->ssn.has_value()) {
|
if (!item->send_options.unordered && !item->ssn.has_value()) {
|
||||||
SSN& ssn = ssn_by_stream_id_[message.stream_id()];
|
item->ssn = next_ssn_;
|
||||||
item->ssn = ssn;
|
next_ssn_ = SSN(*next_ssn_ + 1);
|
||||||
ssn = SSN(*ssn + 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grab the next `max_size` fragment from this message and calculate flags.
|
// Grab the next `max_size` fragment from this message and calculate flags.
|
||||||
|
@ -157,38 +124,39 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
||||||
item->message.payload().size());
|
item->message.payload().size());
|
||||||
RTC_DCHECK(item->remaining_size > 0);
|
RTC_DCHECK(item->remaining_size > 0);
|
||||||
}
|
}
|
||||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
|
|
||||||
<< chunk.data.size() << " bytes (max: " << max_size
|
|
||||||
<< ")";
|
|
||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RRSendQueue::Discard(IsUnordered unordered,
|
size_t RRSendQueue::OutgoingStream::buffered_amount() const {
|
||||||
StreamID stream_id,
|
size_t bytes = 0;
|
||||||
|
for (const auto& item : items_) {
|
||||||
|
bytes += item.remaining_size;
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
||||||
MID message_id) {
|
MID message_id) {
|
||||||
// As this method will only discard partially sent messages, and as the queue
|
|
||||||
// is a FIFO queue, the only partially sent message would be the topmost
|
|
||||||
// message.
|
|
||||||
if (!items_.empty()) {
|
if (!items_.empty()) {
|
||||||
Item& item = items_.front();
|
Item& item = items_.front();
|
||||||
if (item.send_options.unordered == unordered &&
|
if (item.send_options.unordered == unordered &&
|
||||||
item.message.stream_id() == stream_id && item.message_id.has_value() &&
|
item.message_id.has_value() && *item.message_id == message_id) {
|
||||||
*item.message_id == message_id) {
|
|
||||||
items_.pop_front();
|
items_.pop_front();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
|
void RRSendQueue::OutgoingStream::Pause() {
|
||||||
for (StreamID stream_id : streams) {
|
is_paused_ = true;
|
||||||
paused_streams_.insert(stream_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Will not discard partially sent messages - only whole messages. Partially
|
// A stream is pause when it's about to be reset. In this implementation,
|
||||||
// delivered messages (at the time of receiving a Stream Reset command) will
|
// it will throw away all non-partially send messages. This is subject to
|
||||||
// always deliver all the fragments before actually resetting the stream.
|
// change. It will however not discard any partially sent messages - only
|
||||||
|
// whole messages. Partially delivered messages (at the time of receiving a
|
||||||
|
// Stream Reset command) will always deliver all the fragments before actually
|
||||||
|
// resetting the stream.
|
||||||
for (auto it = items_.begin(); it != items_.end();) {
|
for (auto it = items_.begin(); it != items_.end();) {
|
||||||
if (IsPaused(it->message.stream_id()) && it->remaining_offset == 0) {
|
if (it->remaining_offset == 0) {
|
||||||
it = items_.erase(it);
|
it = items_.erase(it);
|
||||||
} else {
|
} else {
|
||||||
++it;
|
++it;
|
||||||
|
@ -196,37 +164,7 @@ void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RRSendQueue::CanResetStreams() const {
|
void RRSendQueue::OutgoingStream::Reset() {
|
||||||
for (auto& item : items_) {
|
|
||||||
if (IsPaused(item.message.stream_id())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void RRSendQueue::CommitResetStreams() {
|
|
||||||
for (StreamID stream_id : paused_streams_) {
|
|
||||||
ssn_by_stream_id_[stream_id] = SSN(0);
|
|
||||||
// https://tools.ietf.org/html/rfc8260#section-2.3.2
|
|
||||||
// "When an association resets the SSN using the SCTP extension defined
|
|
||||||
// in [RFC6525], the two counters (one for the ordered messages, one for
|
|
||||||
// the unordered messages) used for the MIDs MUST be reset to 0."
|
|
||||||
mid_by_stream_id_[{IsUnordered(false), stream_id}] = MID(0);
|
|
||||||
mid_by_stream_id_[{IsUnordered(true), stream_id}] = MID(0);
|
|
||||||
}
|
|
||||||
RollbackResetStreams();
|
|
||||||
}
|
|
||||||
|
|
||||||
void RRSendQueue::RollbackResetStreams() {
|
|
||||||
while (!paused_items_.empty()) {
|
|
||||||
items_.push_back(std::move(paused_items_.front()));
|
|
||||||
paused_items_.pop_front();
|
|
||||||
}
|
|
||||||
paused_streams_.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
void RRSendQueue::Reset() {
|
|
||||||
if (!items_.empty()) {
|
if (!items_.empty()) {
|
||||||
// If this message has been partially sent, reset it so that it will be
|
// If this message has been partially sent, reset it so that it will be
|
||||||
// re-sent.
|
// re-sent.
|
||||||
|
@ -237,13 +175,141 @@ void RRSendQueue::Reset() {
|
||||||
item.ssn = absl::nullopt;
|
item.ssn = absl::nullopt;
|
||||||
item.current_fsn = FSN(0);
|
item.current_fsn = FSN(0);
|
||||||
}
|
}
|
||||||
RollbackResetStreams();
|
is_paused_ = false;
|
||||||
mid_by_stream_id_.clear();
|
next_ordered_mid_ = MID(0);
|
||||||
ssn_by_stream_id_.clear();
|
next_unordered_mid_ = MID(0);
|
||||||
|
next_ssn_ = SSN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RRSendQueue::IsPaused(StreamID stream_id) const {
|
bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
|
||||||
return paused_streams_.find(stream_id) != paused_streams_.end();
|
if (items_.empty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return items_.front().message_id.has_value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::Add(TimeMs now,
|
||||||
|
DcSctpMessage message,
|
||||||
|
const SendOptions& send_options) {
|
||||||
|
RTC_DCHECK(!message.payload().empty());
|
||||||
|
// Any limited lifetime should start counting from now - when the message
|
||||||
|
// has been added to the queue.
|
||||||
|
absl::optional<TimeMs> expires_at = absl::nullopt;
|
||||||
|
if (send_options.lifetime.has_value()) {
|
||||||
|
// `expires_at` is the time when it expires. Which is slightly larger than
|
||||||
|
// the message's lifetime, as the message is alive during its entire
|
||||||
|
// lifetime (which may be zero).
|
||||||
|
expires_at = now + *send_options.lifetime + DurationMs(1);
|
||||||
|
}
|
||||||
|
GetOrCreateStreamInfo(message.stream_id())
|
||||||
|
.Add(std::move(message), expires_at, send_options);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t RRSendQueue::total_bytes() const {
|
||||||
|
// TODO(boivie): Have the current size as a member variable, so that's it not
|
||||||
|
// calculated for every operation.
|
||||||
|
size_t bytes = 0;
|
||||||
|
for (const auto& stream : streams_) {
|
||||||
|
bytes += stream.second.buffered_amount();
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RRSendQueue::IsFull() const {
|
||||||
|
return total_bytes() >= buffer_size_;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RRSendQueue::IsEmpty() const {
|
||||||
|
return total_bytes() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(
|
||||||
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator it,
|
||||||
|
TimeMs now,
|
||||||
|
size_t max_size) {
|
||||||
|
absl::optional<DataToSend> data = it->second.Produce(now, max_size);
|
||||||
|
if (data.has_value()) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
|
||||||
|
<< data->data.size() << " bytes (max: " << max_size
|
||||||
|
<< ")";
|
||||||
|
|
||||||
|
if (data->data.is_end) {
|
||||||
|
// No more fragments. Continue with the next stream next time.
|
||||||
|
next_stream_id_ = StreamID(*it->first + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
||||||
|
size_t max_size) {
|
||||||
|
auto start_it = streams_.lower_bound(next_stream_id_);
|
||||||
|
for (auto it = start_it; it != streams_.end(); ++it) {
|
||||||
|
absl::optional<DataToSend> ret = Produce(it, now, max_size);
|
||||||
|
if (ret.has_value()) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto it = streams_.begin(); it != start_it; ++it) {
|
||||||
|
absl::optional<DataToSend> ret = Produce(it, now, max_size);
|
||||||
|
if (ret.has_value()) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return absl::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::Discard(IsUnordered unordered,
|
||||||
|
StreamID stream_id,
|
||||||
|
MID message_id) {
|
||||||
|
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
|
||||||
|
for (StreamID stream_id : streams) {
|
||||||
|
GetOrCreateStreamInfo(stream_id).Pause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RRSendQueue::CanResetStreams() const {
|
||||||
|
// Streams can be reset if those streams that are paused don't have any
|
||||||
|
// messages that are partially sent.
|
||||||
|
for (auto& stream : streams_) {
|
||||||
|
if (stream.second.is_paused() &&
|
||||||
|
stream.second.has_partially_sent_message()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::CommitResetStreams() {
|
||||||
|
Reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::RollbackResetStreams() {
|
||||||
|
for (auto& stream_entry : streams_) {
|
||||||
|
stream_entry.second.Resume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RRSendQueue::Reset() {
|
||||||
|
for (auto& stream_entry : streams_) {
|
||||||
|
OutgoingStream& stream = stream_entry.second;
|
||||||
|
stream.Reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
||||||
|
StreamID stream_id) {
|
||||||
|
auto it = streams_.find(stream_id);
|
||||||
|
if (it != streams_.end()) {
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
return streams_.emplace(stream_id, OutgoingStream()).first->second;
|
||||||
|
}
|
||||||
} // namespace dcsctp
|
} // namespace dcsctp
|
||||||
|
|
|
@ -12,9 +12,8 @@
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <unordered_map>
|
|
||||||
#include <unordered_set>
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
#include "absl/algorithm/container.h"
|
#include "absl/algorithm/container.h"
|
||||||
|
@ -77,6 +76,38 @@ class RRSendQueue : public SendQueue {
|
||||||
// The size of the buffer, in "payload bytes".
|
// The size of the buffer, in "payload bytes".
|
||||||
size_t total_bytes() const;
|
size_t total_bytes() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Per-stream information.
|
||||||
|
class OutgoingStream {
|
||||||
|
public:
|
||||||
|
// Enqueues a message to this stream.
|
||||||
|
void Add(DcSctpMessage message,
|
||||||
|
absl::optional<TimeMs> expires_at,
|
||||||
|
const SendOptions& send_options);
|
||||||
|
|
||||||
|
// Possibly produces a data chunk to send.
|
||||||
|
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
|
||||||
|
|
||||||
|
// The amount of data enqueued on this stream.
|
||||||
|
size_t buffered_amount() const;
|
||||||
|
|
||||||
|
// Discards a partially sent message, see `SendQueue::Discard`.
|
||||||
|
void Discard(IsUnordered unordered, MID message_id);
|
||||||
|
|
||||||
|
// Pauses this stream, which is used before resetting it.
|
||||||
|
void Pause();
|
||||||
|
|
||||||
|
// Resumes a paused stream.
|
||||||
|
void Resume() { is_paused_ = false; }
|
||||||
|
|
||||||
|
bool is_paused() const { return is_paused_; }
|
||||||
|
|
||||||
|
// Resets this stream, meaning MIDs and SSNs are set to zero.
|
||||||
|
void Reset();
|
||||||
|
|
||||||
|
// Indicates if this stream has a partially sent message in it.
|
||||||
|
bool has_partially_sent_message() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// An enqueued message and metadata.
|
// An enqueued message and metadata.
|
||||||
struct Item {
|
struct Item {
|
||||||
|
@ -95,27 +126,42 @@ class RRSendQueue : public SendQueue {
|
||||||
// fragmented.
|
// fragmented.
|
||||||
size_t remaining_offset;
|
size_t remaining_offset;
|
||||||
size_t remaining_size;
|
size_t remaining_size;
|
||||||
// If set, an allocated Message ID and SSN. Will be allocated when the first
|
// If set, an allocated Message ID and SSN. Will be allocated when the
|
||||||
// fragment is sent.
|
// first fragment is sent.
|
||||||
absl::optional<MID> message_id = absl::nullopt;
|
absl::optional<MID> message_id = absl::nullopt;
|
||||||
absl::optional<SSN> ssn = absl::nullopt;
|
absl::optional<SSN> ssn = absl::nullopt;
|
||||||
// The current Fragment Sequence Number, incremented for each fragment.
|
// The current Fragment Sequence Number, incremented for each fragment.
|
||||||
FSN current_fsn = FSN(0);
|
FSN current_fsn = FSN(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Returns the first non-expired message, or nullptr if there isn't one.
|
||||||
Item* GetFirstNonExpiredMessage(TimeMs now);
|
Item* GetFirstNonExpiredMessage(TimeMs now);
|
||||||
bool IsPaused(StreamID stream_id) const;
|
|
||||||
|
// Streams are pause when they are about to be reset.
|
||||||
|
bool is_paused_ = false;
|
||||||
|
// MIDs are different for unordered and ordered messages sent on a stream.
|
||||||
|
MID next_unordered_mid_ = MID(0);
|
||||||
|
MID next_ordered_mid_ = MID(0);
|
||||||
|
|
||||||
|
SSN next_ssn_ = SSN(0);
|
||||||
|
// Enqueued messages, and metadata.
|
||||||
|
std::deque<Item> items_;
|
||||||
|
};
|
||||||
|
|
||||||
|
OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
|
||||||
|
absl::optional<DataToSend> Produce(
|
||||||
|
std::map<StreamID, OutgoingStream>::iterator it,
|
||||||
|
TimeMs now,
|
||||||
|
size_t max_size);
|
||||||
|
|
||||||
const std::string log_prefix_;
|
const std::string log_prefix_;
|
||||||
const size_t buffer_size_;
|
const size_t buffer_size_;
|
||||||
std::deque<Item> items_;
|
|
||||||
|
|
||||||
std::unordered_set<StreamID, StreamID::Hasher> paused_streams_;
|
// The next stream to send chunks from.
|
||||||
std::deque<Item> paused_items_;
|
StreamID next_stream_id_ = StreamID(0);
|
||||||
|
|
||||||
std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash>
|
// All streams, and messages added to those.
|
||||||
mid_by_stream_id_;
|
std::map<StreamID, OutgoingStream> streams_;
|
||||||
std::unordered_map<StreamID, SSN, StreamID::Hasher> ssn_by_stream_id_;
|
|
||||||
};
|
};
|
||||||
} // namespace dcsctp
|
} // namespace dcsctp
|
||||||
|
|
||||||
|
|
|
@ -18,20 +18,25 @@
|
||||||
#include "net/dcsctp/public/dcsctp_options.h"
|
#include "net/dcsctp/public/dcsctp_options.h"
|
||||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||||
#include "net/dcsctp/public/types.h"
|
#include "net/dcsctp/public/types.h"
|
||||||
|
#include "net/dcsctp/testing/testing_macros.h"
|
||||||
#include "net/dcsctp/tx/send_queue.h"
|
#include "net/dcsctp/tx/send_queue.h"
|
||||||
#include "rtc_base/gunit.h"
|
#include "rtc_base/gunit.h"
|
||||||
#include "test/gmock.h"
|
#include "test/gmock.h"
|
||||||
|
|
||||||
namespace dcsctp {
|
namespace dcsctp {
|
||||||
namespace {
|
namespace {
|
||||||
|
using ::testing::SizeIs;
|
||||||
|
|
||||||
constexpr TimeMs kNow = TimeMs(0);
|
constexpr TimeMs kNow = TimeMs(0);
|
||||||
constexpr StreamID kStreamID(1);
|
constexpr StreamID kStreamID(1);
|
||||||
constexpr PPID kPPID(53);
|
constexpr PPID kPPID(53);
|
||||||
|
constexpr size_t kMaxQueueSize = 1000;
|
||||||
|
constexpr size_t kOneFragmentPacketSize = 100;
|
||||||
|
constexpr size_t kTwoFragmentPacketSize = 101;
|
||||||
|
|
||||||
class RRSendQueueTest : public testing::Test {
|
class RRSendQueueTest : public testing::Test {
|
||||||
protected:
|
protected:
|
||||||
RRSendQueueTest() : buf_("log: ", 100) {}
|
RRSendQueueTest() : buf_("log: ", kMaxQueueSize) {}
|
||||||
|
|
||||||
const DcSctpOptions options_;
|
const DcSctpOptions options_;
|
||||||
RRSendQueue buf_;
|
RRSendQueue buf_;
|
||||||
|
@ -39,7 +44,7 @@ class RRSendQueueTest : public testing::Test {
|
||||||
|
|
||||||
TEST_F(RRSendQueueTest, EmptyBuffer) {
|
TEST_F(RRSendQueueTest, EmptyBuffer) {
|
||||||
EXPECT_TRUE(buf_.IsEmpty());
|
EXPECT_TRUE(buf_.IsEmpty());
|
||||||
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
|
||||||
EXPECT_FALSE(buf_.IsFull());
|
EXPECT_FALSE(buf_.IsFull());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +53,8 @@ TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
|
||||||
|
|
||||||
EXPECT_FALSE(buf_.IsEmpty());
|
EXPECT_FALSE(buf_.IsEmpty());
|
||||||
EXPECT_FALSE(buf_.IsFull());
|
EXPECT_FALSE(buf_.IsFull());
|
||||||
absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_opt =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_opt.has_value());
|
ASSERT_TRUE(chunk_opt.has_value());
|
||||||
EXPECT_TRUE(chunk_opt->data.is_beginning);
|
EXPECT_TRUE(chunk_opt->data.is_beginning);
|
||||||
EXPECT_TRUE(chunk_opt->data.is_end);
|
EXPECT_TRUE(chunk_opt->data.is_end);
|
||||||
|
@ -76,7 +82,7 @@ TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
|
||||||
EXPECT_FALSE(chunk_end->data.is_beginning);
|
EXPECT_FALSE(chunk_end->data.is_beginning);
|
||||||
EXPECT_TRUE(chunk_end->data.is_end);
|
EXPECT_TRUE(chunk_end->data.is_end);
|
||||||
|
|
||||||
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
|
TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
|
||||||
|
@ -84,14 +90,16 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_one.has_value());
|
ASSERT_TRUE(chunk_one.has_value());
|
||||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||||
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
||||||
EXPECT_TRUE(chunk_one->data.is_beginning);
|
EXPECT_TRUE(chunk_one->data.is_beginning);
|
||||||
EXPECT_TRUE(chunk_one->data.is_end);
|
EXPECT_TRUE(chunk_one->data.is_end);
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_two.has_value());
|
ASSERT_TRUE(chunk_two.has_value());
|
||||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
||||||
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
||||||
|
@ -100,7 +108,7 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
|
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
|
||||||
std::vector<uint8_t> payload(60);
|
std::vector<uint8_t> payload(600);
|
||||||
EXPECT_FALSE(buf_.IsFull());
|
EXPECT_FALSE(buf_.IsFull());
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
EXPECT_FALSE(buf_.IsFull());
|
EXPECT_FALSE(buf_.IsFull());
|
||||||
|
@ -112,14 +120,14 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
|
||||||
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
|
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
|
||||||
EXPECT_TRUE(buf_.IsFull());
|
EXPECT_TRUE(buf_.IsFull());
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
|
||||||
ASSERT_TRUE(chunk_one.has_value());
|
ASSERT_TRUE(chunk_one.has_value());
|
||||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||||
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
||||||
|
|
||||||
EXPECT_TRUE(buf_.IsFull());
|
EXPECT_TRUE(buf_.IsFull());
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
|
||||||
ASSERT_TRUE(chunk_two.has_value());
|
ASSERT_TRUE(chunk_two.has_value());
|
||||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
||||||
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
||||||
|
@ -127,7 +135,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
|
||||||
EXPECT_FALSE(buf_.IsFull());
|
EXPECT_FALSE(buf_.IsFull());
|
||||||
EXPECT_FALSE(buf_.IsEmpty());
|
EXPECT_FALSE(buf_.IsEmpty());
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
|
||||||
ASSERT_TRUE(chunk_three.has_value());
|
ASSERT_TRUE(chunk_three.has_value());
|
||||||
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
|
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
|
||||||
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
|
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
|
||||||
|
@ -171,7 +179,7 @@ TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
|
||||||
// Default is ordered
|
// Default is ordered
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
absl::optional<SendQueue::DataToSend> chunk_one =
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||||
buf_.Produce(kNow, /*max_size=*/100);
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_one.has_value());
|
ASSERT_TRUE(chunk_one.has_value());
|
||||||
EXPECT_FALSE(chunk_one->data.is_unordered);
|
EXPECT_FALSE(chunk_one->data.is_unordered);
|
||||||
|
|
||||||
|
@ -180,7 +188,7 @@ TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
|
||||||
opts.unordered = IsUnordered(true);
|
opts.unordered = IsUnordered(true);
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
|
||||||
absl::optional<SendQueue::DataToSend> chunk_two =
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||||
buf_.Produce(kNow, /*max_size=*/100);
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_two.has_value());
|
ASSERT_TRUE(chunk_two.has_value());
|
||||||
EXPECT_TRUE(chunk_two->data.is_unordered);
|
EXPECT_TRUE(chunk_two->data.is_unordered);
|
||||||
}
|
}
|
||||||
|
@ -192,7 +200,7 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
|
||||||
TimeMs now = kNow;
|
TimeMs now = kNow;
|
||||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
now += DurationMs(1000000);
|
now += DurationMs(1000000);
|
||||||
ASSERT_TRUE(buf_.Produce(now, 100));
|
ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
|
||||||
|
|
||||||
SendOptions expires_2_seconds;
|
SendOptions expires_2_seconds;
|
||||||
expires_2_seconds.lifetime = DurationMs(2000);
|
expires_2_seconds.lifetime = DurationMs(2000);
|
||||||
|
@ -200,17 +208,17 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
|
||||||
// Add and consume within lifetime
|
// Add and consume within lifetime
|
||||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||||
now += DurationMs(2000);
|
now += DurationMs(2000);
|
||||||
ASSERT_TRUE(buf_.Produce(now, 100));
|
ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
|
||||||
|
|
||||||
// Add and consume just outside lifetime
|
// Add and consume just outside lifetime
|
||||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||||
now += DurationMs(2001);
|
now += DurationMs(2001);
|
||||||
ASSERT_FALSE(buf_.Produce(now, 100));
|
ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
|
||||||
|
|
||||||
// A long time after expiry
|
// A long time after expiry
|
||||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||||
now += DurationMs(1000000);
|
now += DurationMs(1000000);
|
||||||
ASSERT_FALSE(buf_.Produce(now, 100));
|
ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
|
||||||
|
|
||||||
// Expire one message, but produce the second that is not expired.
|
// Expire one message, but produce the second that is not expired.
|
||||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||||
|
@ -221,8 +229,8 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
|
||||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
|
||||||
now += DurationMs(2001);
|
now += DurationMs(2001);
|
||||||
|
|
||||||
ASSERT_TRUE(buf_.Produce(now, 100));
|
ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
|
||||||
ASSERT_FALSE(buf_.Produce(now, 100));
|
ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RRSendQueueTest, DiscardPartialPackets) {
|
TEST_F(RRSendQueueTest, DiscardPartialPackets) {
|
||||||
|
@ -231,28 +239,31 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) {
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_one.has_value());
|
ASSERT_TRUE(chunk_one.has_value());
|
||||||
EXPECT_FALSE(chunk_one->data.is_end);
|
EXPECT_FALSE(chunk_one->data.is_end);
|
||||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||||
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
||||||
chunk_one->data.message_id);
|
chunk_one->data.message_id);
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_two.has_value());
|
ASSERT_TRUE(chunk_two.has_value());
|
||||||
EXPECT_FALSE(chunk_two->data.is_end);
|
EXPECT_FALSE(chunk_two->data.is_end);
|
||||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
|
EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_three =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_three.has_value());
|
ASSERT_TRUE(chunk_three.has_value());
|
||||||
EXPECT_TRUE(chunk_three->data.is_end);
|
EXPECT_TRUE(chunk_three->data.is_end);
|
||||||
EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
|
EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
|
||||||
ASSERT_FALSE(buf_.Produce(kNow, 100));
|
ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
|
||||||
// Calling it again shouldn't cause issues.
|
// Calling it again shouldn't cause issues.
|
||||||
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
||||||
chunk_one->data.message_id);
|
chunk_one->data.message_id);
|
||||||
ASSERT_FALSE(buf_.Produce(kNow, 100));
|
ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
|
TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
|
||||||
|
@ -292,7 +303,7 @@ TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
||||||
|
|
||||||
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
|
||||||
buf_.CommitResetStreams();
|
buf_.CommitResetStreams();
|
||||||
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
||||||
|
|
||||||
|
@ -308,11 +319,13 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) {
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_one.has_value());
|
ASSERT_TRUE(chunk_one.has_value());
|
||||||
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_two.has_value());
|
ASSERT_TRUE(chunk_two.has_value());
|
||||||
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
||||||
|
|
||||||
|
@ -325,7 +338,8 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) {
|
||||||
EXPECT_TRUE(buf_.CanResetStreams());
|
EXPECT_TRUE(buf_.CanResetStreams());
|
||||||
buf_.CommitResetStreams();
|
buf_.CommitResetStreams();
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_three =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_three.has_value());
|
ASSERT_TRUE(chunk_three.has_value());
|
||||||
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
|
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
|
||||||
}
|
}
|
||||||
|
@ -336,11 +350,13 @@ TEST_F(RRSendQueueTest, RollBackResumesSSN) {
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_one.has_value());
|
ASSERT_TRUE(chunk_one.has_value());
|
||||||
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_two.has_value());
|
ASSERT_TRUE(chunk_two.has_value());
|
||||||
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
||||||
|
|
||||||
|
@ -352,10 +368,111 @@ TEST_F(RRSendQueueTest, RollBackResumesSSN) {
|
||||||
EXPECT_TRUE(buf_.CanResetStreams());
|
EXPECT_TRUE(buf_.CanResetStreams());
|
||||||
buf_.RollbackResetStreams();
|
buf_.RollbackResetStreams();
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
absl::optional<SendQueue::DataToSend> chunk_three =
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||||
ASSERT_TRUE(chunk_three.has_value());
|
ASSERT_TRUE(chunk_three.has_value());
|
||||||
EXPECT_EQ(chunk_three->data.ssn, SSN(2));
|
EXPECT_EQ(chunk_three->data.ssn, SSN(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
|
||||||
|
std::vector<uint8_t> payload(200);
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
|
||||||
|
std::vector<uint8_t> payload(kTwoFragmentPacketSize);
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||||
|
EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
|
||||||
|
EXPECT_THAT(chunk2.data.payload,
|
||||||
|
SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
|
||||||
|
EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
|
||||||
|
EXPECT_THAT(chunk4.data.payload,
|
||||||
|
SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
|
||||||
|
buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||||
|
EXPECT_THAT(chunk1.data.payload, SizeIs(1));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
|
||||||
|
EXPECT_THAT(chunk2.data.payload, SizeIs(3));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
|
||||||
|
EXPECT_THAT(chunk3.data.payload, SizeIs(5));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
|
||||||
|
EXPECT_THAT(chunk4.data.payload, SizeIs(7));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
|
||||||
|
EXPECT_THAT(chunk5.data.payload, SizeIs(2));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
|
||||||
|
EXPECT_THAT(chunk6.data.payload, SizeIs(4));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
|
||||||
|
EXPECT_THAT(chunk7.data.payload, SizeIs(6));
|
||||||
|
|
||||||
|
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
|
||||||
|
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||||
|
EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
|
||||||
|
EXPECT_THAT(chunk8.data.payload, SizeIs(8));
|
||||||
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
} // namespace dcsctp
|
} // namespace dcsctp
|
||||||
|
|
Loading…
Reference in a new issue