dcsctp: Add public API for BufferedAmountLow

This adds native support for the RTCDataChannel properties:
https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/bufferedAmount
https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/bufferedAmountLowThreshold

And the RTCDataChannel event:
https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/onbufferedamountlow

The old callback, NotifyOutgoingMessageBufferEmpty, is deprecated as it
didn't work very well. It will not be triggered and will be removed
as soon as all users of it are gone. There is a new callback,
OnTotalBufferedAmountLow, that serves the same purpose but also allows
setting an arbitrary limit when it should be triggered (See
DcSctpOptions::total_buffered_amount_low_threshold).

Bug: webrtc:12794
Change-Id: Ic1c92f174eff8a1acda0b5fd3dcc45bd1cfa2704
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219691
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34144}
This commit is contained in:
Victor Boivie 2021-05-20 19:34:18 +02:00 committed by WebRTC LUCI CQ
parent bd9031bf22
commit 236ac50628
15 changed files with 247 additions and 37 deletions

View file

@ -366,7 +366,7 @@ uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) {
return random_.Rand(low, high);
}
void DcSctpTransport::NotifyOutgoingMessageBufferEmpty() {
void DcSctpTransport::OnTotalBufferedAmountLow() {
if (!ready_to_send_data_) {
ready_to_send_data_ = true;
SignalReadyToSendData();

View file

@ -63,7 +63,7 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
std::unique_ptr<dcsctp::Timeout> CreateTimeout() override;
dcsctp::TimeMs TimeMillis() override;
uint32_t GetRandomInt(uint32_t low, uint32_t high) override;
void NotifyOutgoingMessageBufferEmpty() override;
void OnTotalBufferedAmountLow() override;
void OnMessageReceived(dcsctp::DcSctpMessage message) override;
void OnError(dcsctp::ErrorKind error, absl::string_view message) override;
void OnAborted(dcsctp::ErrorKind error, absl::string_view message) override;

View file

@ -77,7 +77,6 @@ class FuzzerCallbacks : public DcSctpSocketCallbacks {
rtc::ArrayView<const StreamID> outgoing_streams) override {}
void OnIncomingStreamsReset(
rtc::ArrayView<const StreamID> incoming_streams) override {}
void NotifyOutgoingMessageBufferEmpty() override {}
std::vector<uint8_t> ConsumeSentPacket() {
if (sent_packets_.empty()) {

View file

@ -81,7 +81,11 @@ struct DcSctpOptions {
// Maximum send buffer size. It will not be possible to queue more data than
// this before sending it.
size_t max_send_buffer_size = 2 * 1024 * 1024;
size_t max_send_buffer_size = 2'000'000;
// A threshold that, when the amount of data in the send buffer goes below
// this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`.
size_t total_buffered_amount_low_threshold = 1'800'000;
// Max allowed RTT value. When the RTT is measured and it's found to be larger
// than this value, it will be discarded and not used for e.g. any RTO

View file

@ -197,12 +197,11 @@ class DcSctpSocketCallbacks {
// Triggered when the outgoing message buffer is empty, meaning that there are
// no more queued messages, but there can still be packets in-flight or to be
// retransmitted. (in contrast to SCTP_SENDER_DRY_EVENT).
// TODO(boivie): This is currently only used in benchmarks to have a steady
// flow of packets to send
//
// Note that it's NOT ALLOWED to call into this library from within this
// callback.
virtual void NotifyOutgoingMessageBufferEmpty() = 0;
ABSL_DEPRECATED("Use OnTotalBufferedAmountLow instead")
virtual void NotifyOutgoingMessageBufferEmpty() {}
// Called when the library has received an SCTP message in full and delivers
// it to the upper layer.
@ -263,6 +262,17 @@ class DcSctpSocketCallbacks {
// It is allowed to call into this library from within this callback.
virtual void OnIncomingStreamsReset(
rtc::ArrayView<const StreamID> incoming_streams) = 0;
// Will be called when the amount of data buffered to be sent falls to or
// below the threshold set when calling `SetBufferedAmountLowThreshold`.
//
// It is allowed to call into this library from within this callback.
virtual void OnBufferedAmountLow(StreamID stream_id) {}
// Will be called when the total amount of data buffered (in the entire send
// buffer, for all streams) falls to or below the threshold specified in
// `DcSctpOptions::total_buffered_amount_low_threshold`.
virtual void OnTotalBufferedAmountLow() {}
};
// The DcSctpSocket implementation implements the following interface.
@ -326,6 +336,20 @@ class DcSctpSocketInterface {
// or streams that don't support resetting will not perform any operation.
virtual ResetStreamsStatus ResetStreams(
rtc::ArrayView<const StreamID> outgoing_streams) = 0;
// Returns the number of bytes of data currently queued to be sent on a given
// stream.
virtual size_t buffered_amount(StreamID stream_id) const = 0;
// Returns the number of buffered outgoing bytes that is considered "low" for
// a given stream. See `SetBufferedAmountLowThreshold`.
virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0;
// Used to specify the number of bytes of buffered outgoing data that is
// considered "low" for a given stream, which will trigger an
// OnBufferedAmountLow event. The default value is zero (0).
virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
size_t bytes) = 0;
};
} // namespace dcsctp

View file

@ -79,11 +79,6 @@ class CallbackDeferrer : public DcSctpSocketCallbacks {
return underlying_.GetRandomInt(low, high);
}
void NotifyOutgoingMessageBufferEmpty() override {
// Will not be deferred - call directly.
underlying_.NotifyOutgoingMessageBufferEmpty();
}
void OnMessageReceived(DcSctpMessage message) override {
deferred_.emplace_back(
[deliverer = MessageDeliverer(std::move(message))](
@ -145,6 +140,17 @@ class CallbackDeferrer : public DcSctpSocketCallbacks {
DcSctpSocketCallbacks& cb) { cb.OnIncomingStreamsReset(streams); });
}
void OnBufferedAmountLow(StreamID stream_id) override {
deferred_.emplace_back([stream_id](DcSctpSocketCallbacks& cb) {
cb.OnBufferedAmountLow(stream_id);
});
}
void OnTotalBufferedAmountLow() override {
deferred_.emplace_back(
[](DcSctpSocketCallbacks& cb) { cb.OnTotalBufferedAmountLow(); });
}
private:
// A wrapper around the move-only DcSctpMessage, to let it be captured in a
// lambda.

View file

@ -170,9 +170,11 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
send_queue_(
log_prefix_,
options_.max_send_buffer_size,
[](StreamID stream_id) {},
/*total_buffered_amount_low_threshold=*/0,
[]() {}) {}
[this](StreamID stream_id) {
callbacks_.OnBufferedAmountLow(stream_id);
},
options_.total_buffered_amount_low_threshold,
[this]() { callbacks_.OnTotalBufferedAmountLow(); }) {}
std::string DcSctpSocket::log_prefix() const {
return log_prefix_ + "[" + std::string(ToString(state_)) + "] ";
@ -442,6 +444,19 @@ void DcSctpSocket::SetMaxMessageSize(size_t max_message_size) {
options_.max_message_size = max_message_size;
}
size_t DcSctpSocket::buffered_amount(StreamID stream_id) const {
return send_queue_.buffered_amount(stream_id);
}
size_t DcSctpSocket::buffered_amount_low_threshold(StreamID stream_id) const {
return send_queue_.buffered_amount_low_threshold(stream_id);
}
void DcSctpSocket::SetBufferedAmountLowThreshold(StreamID stream_id,
size_t bytes) {
send_queue_.SetBufferedAmountLowThreshold(stream_id, bytes);
}
void DcSctpSocket::MaybeSendShutdownOnPacketReceived(const SctpPacket& packet) {
if (state_ == State::kShutdownSent) {
bool has_data_chunk =

View file

@ -93,7 +93,9 @@ class DcSctpSocket : public DcSctpSocketInterface {
SocketState state() const override;
const DcSctpOptions& options() const override { return options_; }
void SetMaxMessageSize(size_t max_message_size) override;
size_t buffered_amount(StreamID stream_id) const override;
size_t buffered_amount_low_threshold(StreamID stream_id) const override;
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
// Returns this socket's verification tag, or zero if not yet connected.
VerificationTag verification_tag() const {
return tcb_ != nullptr ? tcb_->my_verification_tag() : VerificationTag(0);

View file

@ -58,6 +58,7 @@ using ::testing::SizeIs;
constexpr SendOptions kSendOptions;
constexpr size_t kLargeMessageSize = DcSctpOptions::kMaxSafeMTUSize * 20;
static constexpr size_t kSmallMessageSize = 10;
MATCHER_P(HasDataChunkWithSsn, ssn, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
@ -1040,9 +1041,10 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks()));
// This DATA should be accepted, and it fills the reassembly queue.
sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 2), StreamID(1), SSN(0),
PPID(53), std::vector<uint8_t>(10),
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 2), StreamID(1), SSN(0), PPID(53),
std::vector<uint8_t>(kSmallMessageSize),
/*options=*/{}))
.Build());
@ -1058,9 +1060,10 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
EXPECT_CALL(cb_z2, OnClosed).Times(0);
// This DATA will make the connection close. It's too full now.
sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0),
PPID(53), std::vector<uint8_t>(10),
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
std::vector<uint8_t>(kSmallMessageSize),
/*options=*/{}))
.Build());
}
@ -1162,5 +1165,171 @@ TEST_F(DcSctpSocketTest, DiscardsMessagesWithLowLifetimeIfMustBuffer) {
EXPECT_FALSE(cb_z_.ConsumeReceivedMessage().has_value());
}
TEST_F(DcSctpSocketTest, HasReasonableBufferedAmountValues) {
ConnectSockets();
EXPECT_EQ(sock_a_.buffered_amount(StreamID(1)), 0u);
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kSmallMessageSize)),
kSendOptions);
// Sending a small message will directly send it as a single packet, so
// nothing is left in the queue.
EXPECT_EQ(sock_a_.buffered_amount(StreamID(1)), 0u);
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kLargeMessageSize)),
kSendOptions);
// Sending a message will directly start sending a few packets, so the
// buffered amount is not the full message size.
EXPECT_GT(sock_a_.buffered_amount(StreamID(1)), 0u);
EXPECT_LT(sock_a_.buffered_amount(StreamID(1)), kLargeMessageSize);
}
TEST_F(DcSctpSocketTest, HasDefaultOnBufferedAmountLowValueZero) {
EXPECT_EQ(sock_a_.buffered_amount_low_threshold(StreamID(1)), 0u);
}
TEST_F(DcSctpSocketTest, TriggersOnBufferedAmountLowWithDefaultValueZero) {
EXPECT_CALL(cb_a_, OnBufferedAmountLow).Times(0);
ConnectSockets();
EXPECT_CALL(cb_a_, OnBufferedAmountLow(StreamID(1)));
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kSmallMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
TEST_F(DcSctpSocketTest, DoesntTriggerOnBufferedAmountLowIfBelowThreshold) {
static constexpr size_t kMessageSize = 1000;
static constexpr size_t kBufferedAmountLowThreshold = kMessageSize * 10;
sock_a_.SetBufferedAmountLowThreshold(StreamID(1),
kBufferedAmountLowThreshold);
EXPECT_CALL(cb_a_, OnBufferedAmountLow).Times(0);
ConnectSockets();
EXPECT_CALL(cb_a_, OnBufferedAmountLow(StreamID(1))).Times(0);
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
TEST_F(DcSctpSocketTest, TriggersOnBufferedAmountMultipleTimes) {
static constexpr size_t kMessageSize = 1000;
static constexpr size_t kBufferedAmountLowThreshold = kMessageSize / 2;
sock_a_.SetBufferedAmountLowThreshold(StreamID(1),
kBufferedAmountLowThreshold);
EXPECT_CALL(cb_a_, OnBufferedAmountLow).Times(0);
ConnectSockets();
EXPECT_CALL(cb_a_, OnBufferedAmountLow(StreamID(1))).Times(3);
EXPECT_CALL(cb_a_, OnBufferedAmountLow(StreamID(2))).Times(2);
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
sock_a_.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
sock_a_.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
TEST_F(DcSctpSocketTest, TriggersOnBufferedAmountLowOnlyWhenCrossingThreshold) {
static constexpr size_t kMessageSize = 1000;
static constexpr size_t kBufferedAmountLowThreshold = kMessageSize * 1.5;
sock_a_.SetBufferedAmountLowThreshold(StreamID(1),
kBufferedAmountLowThreshold);
EXPECT_CALL(cb_a_, OnBufferedAmountLow).Times(0);
ConnectSockets();
EXPECT_CALL(cb_a_, OnBufferedAmountLow).Times(0);
// Add a few messages to fill up the congestion window. When that is full,
// messages will start to be fully buffered.
while (sock_a_.buffered_amount(StreamID(1)) == 0) {
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kMessageSize)),
kSendOptions);
}
size_t initial_buffered = sock_a_.buffered_amount(StreamID(1));
ASSERT_GE(initial_buffered, 0u);
ASSERT_LT(initial_buffered, kMessageSize);
// Up to kMessageSize (which is below the threshold)
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kMessageSize - initial_buffered)),
kSendOptions);
EXPECT_EQ(sock_a_.buffered_amount(StreamID(1)), kMessageSize);
// Up to 2*kMessageSize (which is above the threshold)
sock_a_.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(kMessageSize)),
kSendOptions);
EXPECT_EQ(sock_a_.buffered_amount(StreamID(1)), 2 * kMessageSize);
// Start ACKing packets, which will empty the send queue, and trigger the
// callback.
EXPECT_CALL(cb_a_, OnBufferedAmountLow(StreamID(1))).Times(1);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
TEST_F(DcSctpSocketTest, DoesntTriggerOnTotalBufferAmountLowWhenBelow) {
ConnectSockets();
EXPECT_CALL(cb_a_, OnTotalBufferedAmountLow).Times(0);
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kLargeMessageSize)),
kSendOptions);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
TEST_F(DcSctpSocketTest, TriggersOnTotalBufferAmountLowWhenCrossingThreshold) {
ConnectSockets();
EXPECT_CALL(cb_a_, OnTotalBufferedAmountLow).Times(0);
// Fill up the send queue completely.
for (;;) {
if (sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
std::vector<uint8_t>(kLargeMessageSize)),
kSendOptions) == SendStatus::kErrorResourceExhaustion) {
break;
}
}
EXPECT_CALL(cb_a_, OnTotalBufferedAmountLow).Times(1);
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
} // namespace
} // namespace dcsctp

View file

@ -93,7 +93,6 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks {
uint32_t GetRandomInt(uint32_t low, uint32_t high) override {
return random_.Rand(low, high);
}
MOCK_METHOD(void, NotifyOutgoingMessageBufferEmpty, (), (override));
MOCK_METHOD(void, OnMessageReceived, (DcSctpMessage message), (override));
MOCK_METHOD(void,
@ -120,6 +119,8 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks {
OnIncomingStreamsReset,
(rtc::ArrayView<const StreamID> incoming_streams),
(override));
MOCK_METHOD(void, OnBufferedAmountLow, (StreamID stream_id), (override));
MOCK_METHOD(void, OnTotalBufferedAmountLow, (), (override));
bool HasPacket() const { return !sent_packets_.empty(); }

View file

@ -105,7 +105,6 @@ class StreamResetHandlerTest : public testing::Test {
producer_,
[](DurationMs rtt_ms) {},
[]() {},
[]() {},
*t3_rtx_timer_,
/*options=*/{}),
handler_("log: ",

View file

@ -89,7 +89,6 @@ class TransmissionControlBlock : public Context {
a_rwnd,
send_queue,
[this](DurationMs rtt) { return ObserveRTT(rtt); },
[this]() { callbacks_.NotifyOutgoingMessageBufferEmpty(); },
[this]() { tx_error_counter_.Clear(); },
*t3_rtx_,
options,

View file

@ -56,7 +56,6 @@ RetransmissionQueue::RetransmissionQueue(
size_t a_rwnd,
SendQueue& send_queue,
std::function<void(DurationMs rtt)> on_new_rtt,
std::function<void()> on_send_queue_empty,
std::function<void()> on_clear_retransmission_counter,
Timer& t3_rtx,
const DcSctpOptions& options,
@ -69,7 +68,6 @@ RetransmissionQueue::RetransmissionQueue(
? IDataChunk::kHeaderSize
: DataChunk::kHeaderSize),
on_new_rtt_(std::move(on_new_rtt)),
on_send_queue_empty_(std::move(on_send_queue_empty)),
on_clear_retransmission_counter_(
std::move(on_clear_retransmission_counter)),
t3_rtx_(t3_rtx),
@ -592,7 +590,6 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
absl::optional<SendQueue::DataToSend> chunk_opt =
send_queue_.Produce(now, max_bytes - data_chunk_header_size_);
if (!chunk_opt.has_value()) {
on_send_queue_empty_();
break;
}

View file

@ -72,7 +72,6 @@ class RetransmissionQueue {
size_t a_rwnd,
SendQueue& send_queue,
std::function<void(DurationMs rtt)> on_new_rtt,
std::function<void()> on_send_queue_empty,
std::function<void()> on_clear_retransmission_counter,
Timer& t3_rtx,
const DcSctpOptions& options,
@ -330,8 +329,6 @@ class RetransmissionQueue {
const size_t data_chunk_header_size_;
// Called when a new RTT measurement has been done
const std::function<void(DurationMs rtt)> on_new_rtt_;
// Called when the send queue is empty.
const std::function<void()> on_send_queue_empty_;
// Called when a SACK has been seen that cleared the retransmission counter.
const std::function<void()> on_clear_retransmission_counter_;
// The retransmission counter.

View file

@ -80,7 +80,6 @@ class RetransmissionQueueTest : public testing::Test {
options.mtu = kMaxMtu;
return RetransmissionQueue(
"", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(),
on_outgoing_message_buffer_empty_.AsStdFunction(),
on_clear_retransmission_counter_.AsStdFunction(), *timer_, options,
supports_partial_reliability, use_message_interleaving);
}
@ -90,7 +89,6 @@ class RetransmissionQueueTest : public testing::Test {
FakeTimeoutManager timeout_manager_;
TimerManager timer_manager_;
NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_;
NiceMock<MockFunction<void()>> on_outgoing_message_buffer_empty_;
NiceMock<MockFunction<void()>> on_clear_retransmission_counter_;
NiceMock<MockSendQueue> producer_;
std::unique_ptr<Timer> timer_;