diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn index d66fd6ba72..f5f5b7ed81 100644 --- a/net/dcsctp/rx/BUILD.gn +++ b/net/dcsctp/rx/BUILD.gn @@ -95,6 +95,7 @@ rtc_library("reassembly_queue") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base/containers:flat_set", "../common:internal_types", "../common:sequence_numbers", "../common:str_join", @@ -109,6 +110,7 @@ rtc_library("reassembly_queue") { "reassembly_queue.h", ] absl_deps = [ + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", ] diff --git a/net/dcsctp/rx/data_tracker.h b/net/dcsctp/rx/data_tracker.h index 62a12320ad..9991ee6139 100644 --- a/net/dcsctp/rx/data_tracker.h +++ b/net/dcsctp/rx/data_tracker.h @@ -93,6 +93,10 @@ class DataTracker { return TSN(last_cumulative_acked_tsn_.Wrap()); } + bool IsLaterThanCumulativeAckedTsn(TSN tsn) const { + return tsn_unwrapper_.PeekUnwrap(tsn) > last_cumulative_acked_tsn_; + } + // Returns true if the received `tsn` would increase the cumulative ack TSN. bool will_increase_cum_ack_tsn(TSN tsn) const; diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc index 0b0d8e7f80..573443635c 100644 --- a/net/dcsctp/rx/reassembly_queue.cc +++ b/net/dcsctp/rx/reassembly_queue.cc @@ -29,6 +29,7 @@ #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" #include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/types.h" #include "net/dcsctp/rx/interleaved_reassembly_streams.h" #include "net/dcsctp/rx/reassembly_streams.h" #include "net/dcsctp/rx/traditional_reassembly_streams.h" @@ -68,7 +69,6 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, use_message_interleaving)) {} void ReassemblyQueue::Add(TSN tsn, Data data) { - RTC_DCHECK(IsConsistent()); RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn << ", stream=" << *data.stream_id << ":" << *data.mid << ":" << *data.fsn << ", type=" @@ -83,21 +83,23 @@ void ReassemblyQueue::Add(TSN tsn, Data data) { // the future, the socket is in "deferred reset processing" mode and must // buffer chunks until it's exited. if (deferred_reset_streams_.has_value() && - unwrapped_tsn > - tsn_unwrapper_.Unwrap( - deferred_reset_streams_->req.sender_last_assigned_tsn())) { + unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn && + deferred_reset_streams_->streams.contains(data.stream_id)) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Deferring chunk with tsn=" << *tsn - << " until cum_ack_tsn=" - << *deferred_reset_streams_->req.sender_last_assigned_tsn(); + << ", sid=" << *data.stream_id << " until tsn=" + << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap(); // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "In this mode, any data arriving with a TSN larger than the // Sender's Last Assigned TSN for the affected stream(s) MUST be queued // locally and held until the cumulative acknowledgment point reaches the // Sender's Last Assigned TSN." queued_bytes_ += data.size(); - deferred_reset_streams_->deferred_chunks.emplace_back( - std::make_pair(tsn, std::move(data))); + deferred_reset_streams_->deferred_actions.push_back( + [this, tsn, data = std::move(data)]() mutable { + queued_bytes_ -= data.size(); + Add(tsn, std::move(data)); + }); } else { queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data)); } @@ -113,83 +115,51 @@ void ReassemblyQueue::Add(TSN tsn, Data data) { RTC_DCHECK(IsConsistent()); } -ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams( - const OutgoingSSNResetRequestParameter& req, - TSN cum_tsn_ack) { - RTC_DCHECK(IsConsistent()); - if (deferred_reset_streams_.has_value()) { - // In deferred mode already. - return ReconfigurationResponseParameter::Result::kInProgress; - } else if (req.request_sequence_number() <= - last_completed_reset_req_seq_nbr_) { - // Already performed at some time previously. - return ReconfigurationResponseParameter::Result::kSuccessPerformed; - } - - UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn()); - UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack); - - // https://tools.ietf.org/html/rfc6525#section-5.2.2 - // "If the Sender's Last Assigned TSN is greater than the - // cumulative acknowledgment point, then the endpoint MUST enter "deferred - // reset processing"." - if (sla_tsn > unwrapped_cum_tsn_ack) { - RTC_DLOG(LS_VERBOSE) - << log_prefix_ - << "Entering deferred reset processing mode until cum_tsn_ack=" - << *req.sender_last_assigned_tsn(); - deferred_reset_streams_ = absl::make_optional(req); - return ReconfigurationResponseParameter::Result::kInProgress; - } +void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset( + rtc::ArrayView stream_ids) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: [" + << StrJoin(stream_ids, ",", + [](rtc::StringBuilder& sb, StreamID sid) { + sb << *sid; + }) + << "]"; // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "... streams MUST be reset to 0 as the next expected SSN." - streams_->ResetStreams(req.stream_ids()); - last_completed_reset_req_seq_nbr_ = req.request_sequence_number(); - RTC_DCHECK(IsConsistent()); - return ReconfigurationResponseParameter::Result::kSuccessPerformed; -} + streams_->ResetStreams(stream_ids); -bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) { - RTC_DCHECK(IsConsistent()); if (deferred_reset_streams_.has_value()) { - UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn); - UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap( - deferred_reset_streams_->req.sender_last_assigned_tsn()); - if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ - << "Leaving deferred reset processing with tsn=" - << *cum_ack_tsn << ", feeding back " - << deferred_reset_streams_->deferred_chunks.size() - << " chunks"; - // https://tools.ietf.org/html/rfc6525#section-5.2.2 - // "... streams MUST be reset to 0 as the next expected SSN." - streams_->ResetStreams(deferred_reset_streams_->req.stream_ids()); - std::vector> deferred_chunks = - std::move(deferred_reset_streams_->deferred_chunks); - // The response will not be sent now, but as a reply to the retried - // request, which will come as "in progress" has been sent prior. - last_completed_reset_req_seq_nbr_ = - deferred_reset_streams_->req.request_sequence_number(); - deferred_reset_streams_ = absl::nullopt; + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Leaving deferred reset processing, feeding back " + << deferred_reset_streams_->deferred_actions.size() + << " actions"; + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "Any queued TSNs (queued at step E2) MUST now be released and processed + // normally." + auto deferred_actions = + std::move(deferred_reset_streams_->deferred_actions); + deferred_reset_streams_ = absl::nullopt; - // https://tools.ietf.org/html/rfc6525#section-5.2.2 - // "Any queued TSNs (queued at step E2) MUST now be released and processed - // normally." - for (auto& [tsn, data] : deferred_chunks) { - queued_bytes_ -= data.size(); - Add(tsn, std::move(data)); - } - - RTC_DCHECK(IsConsistent()); - return true; - } else { - RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn=" - << *cum_ack_tsn; + for (auto& action : deferred_actions) { + action(); } } - return false; + RTC_DCHECK(IsConsistent()); +} + +void ReassemblyQueue::EnterDeferredReset( + TSN sender_last_assigned_tsn, + rtc::ArrayView streams) { + if (!deferred_reset_streams_.has_value()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Entering deferred reset; sender_last_assigned_tsn=" + << *sender_last_assigned_tsn; + deferred_reset_streams_ = absl::make_optional( + tsn_unwrapper_.Unwrap(sender_last_assigned_tsn), + webrtc::flat_set(streams.begin(), streams.end())); + } + RTC_DCHECK(IsConsistent()); } std::vector ReassemblyQueue::FlushMessages() { @@ -237,18 +207,32 @@ void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() { } } -void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) { - RTC_DCHECK(IsConsistent()); - UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn()); +void ReassemblyQueue::HandleForwardTsn( + TSN new_cumulative_tsn, + rtc::ArrayView skipped_streams) { + UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn); + if (deferred_reset_streams_.has_value() && + tsn > deferred_reset_streams_->sender_last_assigned_tsn) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() + << "- deferring."; + deferred_reset_streams_->deferred_actions.emplace_back( + [this, new_cumulative_tsn, + streams = std::vector( + skipped_streams.begin(), skipped_streams.end())] { + HandleForwardTsn(new_cumulative_tsn, streams); + }); + RTC_DCHECK(IsConsistent()); + return; + } + + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() + << " - performing."; last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn); delivered_tsns_.erase(delivered_tsns_.begin(), delivered_tsns_.upper_bound(tsn)); - MaybeMoveLastAssembledWatermarkFurther(); - - queued_bytes_ -= - streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams()); + queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams); RTC_DCHECK(IsConsistent()); } diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h index e1f231e2a3..761ec3556c 100644 --- a/net/dcsctp/rx/reassembly_queue.h +++ b/net/dcsctp/rx/reassembly_queue.h @@ -19,6 +19,7 @@ #include #include +#include "absl/functional/any_invocable.h" #include "absl/strings/string_view.h" #include "api/array_view.h" #include "net/dcsctp/common/internal_types.h" @@ -30,6 +31,7 @@ #include "net/dcsctp/public/dcsctp_handover_state.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/rx/reassembly_streams.h" +#include "rtc_base/containers/flat_set.h" namespace dcsctp { @@ -88,18 +90,18 @@ class ReassemblyQueue { // Handle a ForwardTSN chunk, when the sender has indicated that the received // (this class) should forget about some chunks. This is used to implement // partial reliability. - void Handle(const AnyForwardTsnChunk& forward_tsn); + void HandleForwardTsn( + TSN new_cumulative_tsn, + rtc::ArrayView skipped_streams); - // Given the reset stream request and the current cum_tsn_ack, might either - // reset the streams directly (returns kSuccessPerformed), or at a later time, - // by entering the "deferred reset processing" mode (returns kInProgress). - ReconfigurationResponseParameter::Result ResetStreams( - const OutgoingSSNResetRequestParameter& req, - TSN cum_tsn_ack); + // Resets the provided streams and leaves deferred reset processing, if + // enabled. + void ResetStreamsAndLeaveDeferredReset( + rtc::ArrayView stream_ids); - // Given the current (updated) cum_tsn_ack, might leave "defererred reset - // processing" mode and reset streams. Returns true if so. - bool MaybeResetStreamsDeferred(TSN cum_ack_tsn); + // Enters deferred reset processing. + void EnterDeferredReset(TSN sender_last_assigned_tsn, + rtc::ArrayView streams); // The number of payload bytes that have been queued. Note that the actual // memory usage is higher due to additional overhead of tracking received @@ -126,18 +128,22 @@ class ReassemblyQueue { void RestoreFromState(const DcSctpSocketHandoverState& state); private: + struct DeferredResetStreams { + DeferredResetStreams(UnwrappedTSN sender_last_assigned_tsn, + webrtc::flat_set streams) + : sender_last_assigned_tsn(sender_last_assigned_tsn), + streams(std::move(streams)) {} + + UnwrappedTSN sender_last_assigned_tsn; + webrtc::flat_set streams; + std::vector> deferred_actions; + }; + bool IsConsistent() const; void AddReassembledMessage(rtc::ArrayView tsns, DcSctpMessage message); void MaybeMoveLastAssembledWatermarkFurther(); - struct DeferredResetStreams { - explicit DeferredResetStreams(OutgoingSSNResetRequestParameter req) - : req(std::move(req)) {} - OutgoingSSNResetRequestParameter req; - std::vector> deferred_chunks; - }; - const absl::string_view log_prefix_; const size_t max_size_bytes_; const size_t watermark_bytes_; diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc index 0cd26954a5..fd8c423a5f 100644 --- a/net/dcsctp/rx/reassembly_queue_test.cc +++ b/net/dcsctp/rx/reassembly_queue_test.cc @@ -34,6 +34,7 @@ namespace { using ::testing::ElementsAre; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using SkippedStream = AnyForwardTsnChunk::SkippedStream; // The default maximum size of the Reassembly Queue. static constexpr size_t kBufferSize = 10000; @@ -194,7 +195,7 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) { EXPECT_FALSE(reasm.HasMessages()); - reasm.Handle(ForwardTsnChunk(TSN(13), {})); + reasm.HandleForwardTsn(TSN(13), std::vector()); EXPECT_EQ(reasm.queued_bytes(), 3u); // The second lost chunk comes, message is assembled. @@ -217,8 +218,8 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) { EXPECT_FALSE(reasm.HasMessages()); - reasm.Handle(ForwardTsnChunk( - TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)})); + reasm.HandleForwardTsn( + TSN(13), std::vector({SkippedStream(kStreamID, kSSN)})); EXPECT_EQ(reasm.queued_bytes(), 0u); // The lost chunk comes, but too late. @@ -241,8 +242,8 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) { EXPECT_FALSE(reasm.HasMessages()); - reasm.Handle(ForwardTsnChunk( - TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)})); + reasm.HandleForwardTsn( + TSN(13), std::vector({SkippedStream(kStreamID, kSSN)})); EXPECT_EQ(reasm.queued_bytes(), 0u); // The lost chunk comes, but too late. @@ -274,46 +275,24 @@ TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) { .Add( HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks)); - reasm.Handle(ForwardTsnChunk(TSN(13), {})); + reasm.HandleForwardTsn(TSN(13), std::vector()); EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus()); } TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) { ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); - DataGeneratorOptions opts; - opts.mid = MID(0); - reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - opts.mid = MID(1); - reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", opts)); + reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); + reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); EXPECT_THAT(reasm.FlushMessages(), SizeIs(2)); - reasm.ResetStreams( - OutgoingSSNResetRequestParameter( - ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(13), {StreamID(1)}), - TSN(11)); + reasm.EnterDeferredReset(TSN(12), std::vector({StreamID(1)})); EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus().Add( HandoverUnreadinessReason::kStreamResetDeferred)); - opts.mid = MID(3); - opts.ppid = PPID(3); - reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm.MaybeResetStreamsDeferred(TSN(11)); + reasm.Add(TSN(12), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(2)})); - opts.mid = MID(2); - opts.ppid = PPID(2); - reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm.MaybeResetStreamsDeferred(TSN(15)); - EXPECT_EQ(reasm.GetHandoverReadiness(), - HandoverReadinessStatus().Add( - HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)); - - EXPECT_THAT(reasm.FlushMessages(), SizeIs(2)); - EXPECT_EQ(reasm.GetHandoverReadiness(), - HandoverReadinessStatus().Add( - HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)); - - reasm.Handle(ForwardTsnChunk(TSN(15), {})); + reasm.ResetStreamsAndLeaveDeferredReset(std::vector({StreamID(1)})); EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus()); } @@ -427,9 +406,8 @@ TEST_F(ReassemblyQueueTest, IForwardTSNRemoveALotOrdered) { ASSERT_FALSE(reasm.HasMessages()); EXPECT_EQ(reasm.queued_bytes(), 7u); - reasm.Handle( - IForwardTsnChunk(TSN(13), {IForwardTsnChunk::SkippedStream( - IsUnordered(false), kStreamID, MID(0))})); + reasm.HandleForwardTsn(TSN(13), std::vector({SkippedStream( + IsUnordered(false), kStreamID, MID(0))})); EXPECT_EQ(reasm.queued_bytes(), 0u); // The lost chunk comes, but too late. diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 2e29a5a9e0..32bcdaaacf 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -1102,7 +1102,7 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) { if (tcb_->data_tracker().Observe(tsn, immediate_ack)) { tcb_->reassembly_queue().Add(tsn, std::move(data)); - MaybeResetStreamsDeferredAndDeliverMessages(); + MaybeDeliverMessages(); } } @@ -1453,12 +1453,7 @@ void DcSctpSocket::HandleCookieAck( callbacks_.OnConnected(); } -void DcSctpSocket::MaybeResetStreamsDeferredAndDeliverMessages() { - // As new data has been received, see if paused streams can be resumed, which - // results in even more data added to the reassembly queue. - tcb_->reassembly_queue().MaybeResetStreamsDeferred( - tcb_->data_tracker().last_cumulative_acked_tsn()); - +void DcSctpSocket::MaybeDeliverMessages() { for (auto& message : tcb_->reassembly_queue().FlushMessages()) { ++metrics_.rx_messages_count; callbacks_.OnMessageReceived(std::move(message)); @@ -1571,6 +1566,10 @@ void DcSctpSocket::HandleReconfig( // If a response was processed, pending to-be-reset streams may now have // become unpaused. Try to send more DATA chunks. tcb_->SendBufferedPackets(now); + + // If it leaves "deferred reset processing", there may be chunks to deliver + // that were queued while waiting for the stream to reset. + MaybeDeliverMessages(); } } @@ -1710,12 +1709,12 @@ void DcSctpSocket::HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk) { return; } if (tcb_->data_tracker().HandleForwardTsn(chunk.new_cumulative_tsn())) { - tcb_->reassembly_queue().Handle(chunk); + tcb_->reassembly_queue().HandleForwardTsn(chunk.new_cumulative_tsn(), + chunk.skipped_streams()); } - // A forward TSN - for ordered streams - may allow messages to be - // delivered. - MaybeResetStreamsDeferredAndDeliverMessages(); + // A forward TSN - for ordered streams - may allow messages to be delivered. + MaybeDeliverMessages(); } void DcSctpSocket::MaybeSendShutdownOrAck() { diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h index 4f7d1787a5..f91eb3ead4 100644 --- a/net/dcsctp/socket/dcsctp_socket.h +++ b/net/dcsctp/socket/dcsctp_socket.h @@ -180,9 +180,8 @@ class DcSctpSocket : public DcSctpSocketInterface { // sent and prints all chunks. void DebugPrintOutgoing(rtc::ArrayView payload); // Called whenever data has been received, or the cumulative acknowledgment - // TSN has moved, that may result in performing deferred stream resetting and - // delivering messages. - void MaybeResetStreamsDeferredAndDeliverMessages(); + // TSN has moved, that may result in delivering messages. + void MaybeDeliverMessages(); // Returns true if there is a TCB, and false otherwise (and reports an error). bool ValidateHasTCB(); diff --git a/net/dcsctp/socket/stream_reset_handler.cc b/net/dcsctp/socket/stream_reset_handler.cc index f9201eadf0..2094309afe 100644 --- a/net/dcsctp/socket/stream_reset_handler.cc +++ b/net/dcsctp/socket/stream_reset_handler.cc @@ -131,7 +131,7 @@ void StreamResetHandler::HandleReConfig(ReConfigChunk chunk) { } bool StreamResetHandler::ValidateReqSeqNbr( - ReconfigRequestSN req_seq_nbr, + UnwrappedReconfigRequestSn req_seq_nbr, std::vector& responses) { if (req_seq_nbr == last_processed_req_seq_nbr_) { // https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.1 "If the @@ -143,11 +143,11 @@ bool StreamResetHandler::ValidateReqSeqNbr( << " already processed, returning result=" << ToString(last_processed_req_result_); responses.push_back(ReconfigurationResponseParameter( - req_seq_nbr, last_processed_req_result_)); + req_seq_nbr.Wrap(), last_processed_req_result_)); return false; } - if (req_seq_nbr != ReconfigRequestSN(*last_processed_req_seq_nbr_ + 1)) { + if (req_seq_nbr != last_processed_req_seq_nbr_.next_value()) { // Too old, too new, from wrong association etc. // This is expected to happen when handing over a RTCPeerConnection from one // server to another. The client will notice this and may decide to close @@ -156,7 +156,7 @@ bool StreamResetHandler::ValidateReqSeqNbr( RTC_DLOG(LS_VERBOSE) << log_prefix_ << "req=" << *req_seq_nbr << " bad seq_nbr"; responses.push_back(ReconfigurationResponseParameter( - req_seq_nbr, ResponseResult::kErrorBadSequenceNumber)); + req_seq_nbr.Wrap(), ResponseResult::kErrorBadSequenceNumber)); return false; } @@ -174,16 +174,43 @@ void StreamResetHandler::HandleResetOutgoing( return; } - if (ValidateReqSeqNbr(req->request_sequence_number(), responses)) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ - << "Reset outgoing streams with req_seq_nbr=" - << *req->request_sequence_number(); + UnwrappedReconfigRequestSn request_sn = + incoming_reconfig_request_sn_unwrapper_.Unwrap( + req->request_sequence_number()); - last_processed_req_seq_nbr_ = req->request_sequence_number(); - last_processed_req_result_ = reassembly_queue_->ResetStreams( - *req, data_tracker_->last_cumulative_acked_tsn()); - if (last_processed_req_result_ == ResponseResult::kSuccessPerformed) { + if (ValidateReqSeqNbr(request_sn, responses)) { + last_processed_req_seq_nbr_ = request_sn; + if (data_tracker_->IsLaterThanCumulativeAckedTsn( + req->sender_last_assigned_tsn())) { + // https://datatracker.ietf.org/doc/html/rfc6525#section-5.2.2 + // E2) "If the Sender's Last Assigned TSN is greater than the cumulative + // acknowledgment point, then the endpoint MUST enter 'deferred reset + // processing'." + reassembly_queue_->EnterDeferredReset(req->sender_last_assigned_tsn(), + req->stream_ids()); + // "If the endpoint enters 'deferred reset processing', it MUST put a + // Re-configuration Response Parameter into a RE-CONFIG chunk indicating + // 'In progress' and MUST send the RE-CONFIG chunk. + last_processed_req_result_ = ResponseResult::kInProgress; + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Reset outgoing; Sender last_assigned=" + << *req->sender_last_assigned_tsn() + << " - not yet reached -> InProgress"; + } else { + // https://datatracker.ietf.org/doc/html/rfc6525#section-5.2.2 + // E3) If no stream numbers are listed in the parameter, then all incoming + // streams MUST be reset to 0 as the next expected SSN. If specific stream + // numbers are listed, then only these specific streams MUST be reset to + // 0, and all other non-listed SSNs remain unchanged. E4: Any queued TSNs + // (queued at step E2) MUST now be released and processed normally. + reassembly_queue_->ResetStreamsAndLeaveDeferredReset(req->stream_ids()); ctx_->callbacks().OnIncomingStreamsReset(req->stream_ids()); + last_processed_req_result_ = ResponseResult::kSuccessPerformed; + + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Reset outgoing; Sender last_assigned=" + << *req->sender_last_assigned_tsn() + << " - reached -> SuccessPerformed"; } responses.push_back(ReconfigurationResponseParameter( req->request_sequence_number(), last_processed_req_result_)); @@ -200,10 +227,15 @@ void StreamResetHandler::HandleResetIncoming( "Failed to parse Incoming Reset command"); return; } - if (ValidateReqSeqNbr(req->request_sequence_number(), responses)) { + + UnwrappedReconfigRequestSn request_sn = + incoming_reconfig_request_sn_unwrapper_.Unwrap( + req->request_sequence_number()); + + if (ValidateReqSeqNbr(request_sn, responses)) { responses.push_back(ReconfigurationResponseParameter( req->request_sequence_number(), ResponseResult::kSuccessNothingToDo)); - last_processed_req_seq_nbr_ = req->request_sequence_number(); + last_processed_req_seq_nbr_ = request_sn; } } @@ -345,7 +377,8 @@ HandoverReadinessStatus StreamResetHandler::GetHandoverReadiness() const { } void StreamResetHandler::AddHandoverState(DcSctpSocketHandoverState& state) { - state.rx.last_completed_reset_req_sn = last_processed_req_seq_nbr_.value(); + state.rx.last_completed_reset_req_sn = + last_processed_req_seq_nbr_.Wrap().value(); state.tx.next_reset_req_sn = next_outgoing_req_seq_nbr_.value(); } diff --git a/net/dcsctp/socket/stream_reset_handler.h b/net/dcsctp/socket/stream_reset_handler.h index 8140903c49..c335130175 100644 --- a/net/dcsctp/socket/stream_reset_handler.h +++ b/net/dcsctp/socket/stream_reset_handler.h @@ -86,9 +86,11 @@ class StreamResetHandler { ? ReconfigRequestSN(handover_state->tx.next_reset_req_sn) : ReconfigRequestSN(*ctx_->my_initial_tsn())), last_processed_req_seq_nbr_( - handover_state ? ReconfigRequestSN( - handover_state->rx.last_completed_reset_req_sn) - : ReconfigRequestSN(*ctx_->peer_initial_tsn() - 1)), + incoming_reconfig_request_sn_unwrapper_.Unwrap( + handover_state + ? ReconfigRequestSN( + handover_state->rx.last_completed_reset_req_sn) + : ReconfigRequestSN(*ctx_->peer_initial_tsn() - 1))), last_processed_req_result_( ReconfigurationResponseParameter::Result::kSuccessNothingToDo) {} @@ -113,6 +115,7 @@ class StreamResetHandler { void AddHandoverState(DcSctpSocketHandoverState& state); private: + using UnwrappedReconfigRequestSn = UnwrappedSequenceNumber; // Represents a stream request operation. There can only be one ongoing at // any time, and a sent request may either succeed, fail or result in the // receiver signaling that it can't process it right now, and then it will be @@ -185,7 +188,7 @@ class StreamResetHandler { // fails to validate, and returns false, it will also add a response to // `responses`. bool ValidateReqSeqNbr( - ReconfigRequestSN req_seq_nbr, + UnwrappedReconfigRequestSn req_seq_nbr, std::vector& responses); // Called when this socket receives an outgoing stream reset request. It might @@ -215,6 +218,7 @@ class StreamResetHandler { DataTracker* data_tracker_; ReassemblyQueue* reassembly_queue_; RetransmissionQueue* retransmission_queue_; + UnwrappedReconfigRequestSn::Unwrapper incoming_reconfig_request_sn_unwrapper_; const std::unique_ptr reconfig_timer_; // The next sequence number for outgoing stream requests. @@ -224,7 +228,7 @@ class StreamResetHandler { absl::optional current_request_; // For incoming requests - last processed request sequence number. - ReconfigRequestSN last_processed_req_seq_nbr_; + UnwrappedReconfigRequestSn last_processed_req_seq_nbr_; // The result from last processed incoming request ReconfigurationResponseParameter::Result last_processed_req_result_; }; diff --git a/net/dcsctp/socket/stream_reset_handler_test.cc b/net/dcsctp/socket/stream_reset_handler_test.cc index 991f182101..091d717f8a 100644 --- a/net/dcsctp/socket/stream_reset_handler_test.cc +++ b/net/dcsctp/socket/stream_reset_handler_test.cc @@ -20,12 +20,14 @@ #include "api/task_queue/task_queue_base.h" #include "net/dcsctp/common/handover_testing.h" #include "net/dcsctp/common/internal_types.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/chunk/reconfig_chunk.h" #include "net/dcsctp/packet/parameter/incoming_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/parameter.h" #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" #include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/types.h" #include "net/dcsctp/rx/data_tracker.h" #include "net/dcsctp/rx/reassembly_queue.h" #include "net/dcsctp/socket/mock_context.h" @@ -42,10 +44,12 @@ namespace dcsctp { namespace { using ::testing::IsEmpty; using ::testing::NiceMock; +using ::testing::Property; using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; using ResponseResult = ReconfigurationResponseParameter::Result; +using SkippedStream = AnyForwardTsnChunk::SkippedStream; constexpr TSN kMyInitialTsn = MockContext::MyInitialTsn(); constexpr ReconfigRequestSN kMyInitialReqSn = ReconfigRequestSN(*kMyInitialTsn); @@ -289,61 +293,187 @@ TEST_F(StreamResetHandlerTest, ResetStreamsNotDeferred) { } TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) { - DataGeneratorOptions opts; - opts.mid = MID(0); - reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts)); + constexpr StreamID kStreamId = StreamID(1); + data_tracker_->Observe(TSN(10)); + reasm_->Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); - opts.mid = MID(1); - reasm_->Add(AddTo(kPeerInitialTsn, 1), - gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - - data_tracker_->Observe(kPeerInitialTsn); - data_tracker_->Observe(AddTo(kPeerInitialTsn, 1)); - EXPECT_THAT(reasm_->FlushMessages(), - UnorderedElementsAre( - SctpMessageIs(StreamID(1), PPID(53), kShortPayload), - SctpMessageIs(StreamID(1), PPID(53), kShortPayload))); - - Parameters::Builder builder; - builder.Add(OutgoingSSNResetRequestParameter( - kPeerInitialReqSn, ReconfigRequestSN(3), AddTo(kPeerInitialTsn, 3), - {StreamID(1)})); - - std::vector responses = - HandleAndCatchResponse(ReConfigChunk(builder.Build())); - EXPECT_THAT(responses, SizeIs(1)); - EXPECT_EQ(responses[0].result(), ResponseResult::kInProgress); - - opts.mid = MID(1); - opts.ppid = PPID(5); - reasm_->Add(AddTo(kPeerInitialTsn, 5), - gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1)); - - opts.mid = MID(0); - opts.ppid = PPID(4); - reasm_->Add(AddTo(kPeerInitialTsn, 4), - gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1)); - - opts.mid = MID(3); - opts.ppid = PPID(3); - reasm_->Add(AddTo(kPeerInitialTsn, 3), - gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1)); - - opts.mid = MID(2); - opts.ppid = PPID(2); - reasm_->Add(AddTo(kPeerInitialTsn, 2), - gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 5)); + data_tracker_->Observe(TSN(11)); + reasm_->Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); EXPECT_THAT( reasm_->FlushMessages(), - UnorderedElementsAre(SctpMessageIs(StreamID(1), PPID(2), kShortPayload), - SctpMessageIs(StreamID(1), PPID(3), kShortPayload), - SctpMessageIs(StreamID(1), PPID(4), kShortPayload), - SctpMessageIs(StreamID(1), PPID(5), kShortPayload))); + UnorderedElementsAre(SctpMessageIs(kStreamId, PPID(53), kShortPayload), + SctpMessageIs(kStreamId, PPID(53), kShortPayload))); + + Parameters::Builder builder; + builder.Add(OutgoingSSNResetRequestParameter( + ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(13), {kStreamId})); + EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())), + ElementsAre(Property(&ReconfigurationResponseParameter::result, + ResponseResult::kInProgress))); + + data_tracker_->Observe(TSN(15)); + reasm_->Add(TSN(15), gen_.Ordered({1, 2, 3, 4}, "BE", + {.mid = MID(1), .ppid = PPID(5)})); + + data_tracker_->Observe(TSN(14)); + reasm_->Add(TSN(14), gen_.Ordered({1, 2, 3, 4}, "BE", + {.mid = MID(0), .ppid = PPID(4)})); + + data_tracker_->Observe(TSN(13)); + reasm_->Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", + {.mid = MID(3), .ppid = PPID(3)})); + + data_tracker_->Observe(TSN(12)); + reasm_->Add(TSN(12), gen_.Ordered({1, 2, 3, 4}, "BE", + {.mid = MID(2), .ppid = PPID(2)})); + + builder.Add(OutgoingSSNResetRequestParameter( + ReconfigRequestSN(11), ReconfigRequestSN(4), TSN(13), {kStreamId})); + EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())), + ElementsAre(Property(&ReconfigurationResponseParameter::result, + ResponseResult::kSuccessPerformed))); + + EXPECT_THAT( + reasm_->FlushMessages(), + UnorderedElementsAre(SctpMessageIs(kStreamId, PPID(2), kShortPayload), + SctpMessageIs(kStreamId, PPID(3), kShortPayload), + SctpMessageIs(kStreamId, PPID(4), kShortPayload), + SctpMessageIs(kStreamId, PPID(5), kShortPayload))); +} + +TEST_F(StreamResetHandlerTest, ResetStreamsDeferredOnlySelectedStreams) { + // This test verifies the receiving behavior of receiving messages on + // streams 1, 2 and 3, and receiving a reset request on stream 1, 2, causing + // deferred reset processing. + + // Reset stream 1,2 with "last assigned TSN=12" + Parameters::Builder builder; + builder.Add(OutgoingSSNResetRequestParameter(ReconfigRequestSN(10), + ReconfigRequestSN(3), TSN(12), + {StreamID(1), StreamID(2)})); + EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())), + ElementsAre(Property(&ReconfigurationResponseParameter::result, + ResponseResult::kInProgress))); + + // TSN 10, SID 1 - before TSN 12 -> deliver + data_tracker_->Observe(TSN(10)); + reasm_->Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", + {.stream_id = StreamID(1), + .mid = MID(0), + .ppid = PPID(1001)})); + + // TSN 11, SID 2 - before TSN 12 -> deliver + data_tracker_->Observe(TSN(11)); + reasm_->Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", + {.stream_id = StreamID(2), + .mid = MID(0), + .ppid = PPID(1002)})); + + // TSN 12, SID 3 - at TSN 12 -> deliver + data_tracker_->Observe(TSN(12)); + reasm_->Add(TSN(12), gen_.Ordered({1, 2, 3, 4}, "BE", + {.stream_id = StreamID(3), + .mid = MID(0), + .ppid = PPID(1003)})); + + // TSN 13, SID 1 - after TSN 12 and SID=1 -> defer + data_tracker_->Observe(TSN(13)); + reasm_->Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", + {.stream_id = StreamID(1), + .mid = MID(0), + .ppid = PPID(1004)})); + + // TSN 14, SID 2 - after TSN 12 and SID=2 -> defer + data_tracker_->Observe(TSN(14)); + reasm_->Add(TSN(14), gen_.Ordered({1, 2, 3, 4}, "BE", + {.stream_id = StreamID(2), + .mid = MID(0), + .ppid = PPID(1005)})); + + // TSN 15, SID 3 - after TSN 12, but SID 3 is not reset -> deliver + data_tracker_->Observe(TSN(15)); + reasm_->Add(TSN(15), gen_.Ordered({1, 2, 3, 4}, "BE", + {.stream_id = StreamID(3), + .mid = MID(1), + .ppid = PPID(1006)})); + + EXPECT_THAT(reasm_->FlushMessages(), + UnorderedElementsAre( + SctpMessageIs(StreamID(1), PPID(1001), kShortPayload), + SctpMessageIs(StreamID(2), PPID(1002), kShortPayload), + SctpMessageIs(StreamID(3), PPID(1003), kShortPayload), + SctpMessageIs(StreamID(3), PPID(1006), kShortPayload))); + + builder.Add(OutgoingSSNResetRequestParameter(ReconfigRequestSN(11), + ReconfigRequestSN(3), TSN(13), + {StreamID(1), StreamID(2)})); + EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())), + ElementsAre(Property(&ReconfigurationResponseParameter::result, + ResponseResult::kSuccessPerformed))); + + EXPECT_THAT(reasm_->FlushMessages(), + UnorderedElementsAre( + SctpMessageIs(StreamID(1), PPID(1004), kShortPayload), + SctpMessageIs(StreamID(2), PPID(1005), kShortPayload))); +} + +TEST_F(StreamResetHandlerTest, ResetStreamsDefersForwardTsn) { + // This test verifies that FORWARD-TSNs are deferred if they want to move + // the cumulative ack TSN point past sender's last assigned TSN. + static constexpr StreamID kStreamId = StreamID(42); + + // Simulate sender sends: + // * TSN 10 (SSN=0, BE, lost), + // * TSN 11 (SSN=1, BE, lost), + // * TSN 12 (SSN=2, BE, lost) + // * RESET THE STREAM + // * TSN 13 (SSN=0, B, received) + // * TSN 14 (SSN=0, E, lost), + // * TSN 15 (SSN=1, BE, received) + Parameters::Builder builder; + builder.Add(OutgoingSSNResetRequestParameter( + ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(12), {kStreamId})); + EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())), + ElementsAre(Property(&ReconfigurationResponseParameter::result, + ResponseResult::kInProgress))); + + // TSN 13, B, after TSN=12 -> defer + data_tracker_->Observe(TSN(13)); + reasm_->Add(TSN(13), + gen_.Ordered( + {1, 2, 3, 4}, "B", + {.stream_id = kStreamId, .mid = MID(0), .ppid = PPID(1004)})); + + // TSN 15, BE, after TSN=12 -> defer + data_tracker_->Observe(TSN(15)); + reasm_->Add(TSN(15), + gen_.Ordered( + {1, 2, 3, 4}, "BE", + {.stream_id = kStreamId, .mid = MID(1), .ppid = PPID(1005)})); + + // Time passes, sender decides to send FORWARD-TSN up to the RESET. + data_tracker_->HandleForwardTsn(TSN(12)); + reasm_->HandleForwardTsn( + TSN(12), std::vector({SkippedStream(kStreamId, SSN(2))})); + + // The receiver sends a SACK in response to that. The stream hasn't been + // reset yet, but the sender now decides that TSN=13-14 is to be skipped. + // As this has a TSN 14, after TSN=12 -> defer it. + data_tracker_->HandleForwardTsn(TSN(14)); + reasm_->HandleForwardTsn( + TSN(14), std::vector({SkippedStream(kStreamId, SSN(0))})); + + // Reset the stream -> deferred TSNs should be re-added. + builder.Add(OutgoingSSNResetRequestParameter( + ReconfigRequestSN(11), ReconfigRequestSN(3), TSN(12), {kStreamId})); + EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())), + ElementsAre(Property(&ReconfigurationResponseParameter::result, + ResponseResult::kSuccessPerformed))); + + EXPECT_THAT(reasm_->FlushMessages(), + UnorderedElementsAre( + SctpMessageIs(kStreamId, PPID(1005), kShortPayload))); } TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) { @@ -767,7 +897,6 @@ TEST_F(StreamResetHandlerTest, PerformCloseAfterOneFirstFailing) { DataGeneratorOptions opts; opts.mid = MID(0); reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts)); - reasm_->MaybeResetStreamsDeferred(kPeerInitialTsn); data_tracker_->Observe(kPeerInitialTsn); // And emulate that time has passed, and the peer retries the stream reset,