diff --git a/net/dcsctp/fuzzers/dcsctp_fuzzers.h b/net/dcsctp/fuzzers/dcsctp_fuzzers.h index 90cfa35099..49aa7f0430 100644 --- a/net/dcsctp/fuzzers/dcsctp_fuzzers.h +++ b/net/dcsctp/fuzzers/dcsctp_fuzzers.h @@ -64,7 +64,7 @@ class FuzzerCallbacks : public DcSctpSocketCallbacks { // The fuzzer timeouts don't implement |precision|. return std::make_unique(active_timeouts_); } - TimeMs TimeMillis() override { return TimeMs(42); } + webrtc::Timestamp Now() override { return webrtc::Timestamp::Millis(42); } uint32_t GetRandomInt(uint32_t low, uint32_t high) override { return kRandomValue; } diff --git a/net/dcsctp/public/dcsctp_socket.h b/net/dcsctp/public/dcsctp_socket.h index 34a12ef81d..d0a81eaeb2 100644 --- a/net/dcsctp/public/dcsctp_socket.h +++ b/net/dcsctp/public/dcsctp_socket.h @@ -324,9 +324,11 @@ class DcSctpSocketCallbacks { // Returns the current time in milliseconds (from any epoch). // + // TODO(bugs.webrtc.org/15593): This method is deprecated, see `Now`. + // // Note that it's NOT ALLOWED to call into this library from within this // callback. - virtual TimeMs TimeMillis() = 0; + virtual TimeMs TimeMillis() { return TimeMs(0); } // Returns the current time (from any epoch). // diff --git a/net/dcsctp/rx/data_tracker_test.cc b/net/dcsctp/rx/data_tracker_test.cc index 2a5c6a203d..f4e1c64444 100644 --- a/net/dcsctp/rx/data_tracker_test.cc +++ b/net/dcsctp/rx/data_tracker_test.cc @@ -25,11 +25,12 @@ namespace dcsctp { namespace { -using ::webrtc::TimeDelta; using ::testing::ElementsAre; using ::testing::IsEmpty; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr size_t kArwnd = 10000; constexpr TSN kInitialTSN(11); @@ -72,7 +73,7 @@ class DataTrackerTest : public testing::Test { tracker_->RestoreFromState(state); } - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager timer_manager_; std::unique_ptr timer_; diff --git a/net/dcsctp/socket/callback_deferrer.cc b/net/dcsctp/socket/callback_deferrer.cc index 1b4ba00658..0a24020167 100644 --- a/net/dcsctp/socket/callback_deferrer.cc +++ b/net/dcsctp/socket/callback_deferrer.cc @@ -70,6 +70,8 @@ std::unique_ptr CallbackDeferrer::CreateTimeout( } TimeMs CallbackDeferrer::TimeMillis() { + // This should not be called by the library - it's migrated to `Now()`. + RTC_DCHECK(false); // Will not be deferred - call directly. return underlying_.TimeMillis(); } diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index c9ecb71ee2..21730276d2 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -83,6 +83,7 @@ namespace dcsctp { namespace { using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // https://tools.ietf.org/html/rfc4960#section-5.1 constexpr uint32_t kMinVerificationTag = 1; @@ -519,7 +520,7 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message, return SendStatus::kErrorResourceExhaustion; } - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); ++metrics_.tx_messages_count; send_queue_.Add(now, std::move(message), send_options); if (tcb_ != nullptr) { @@ -769,7 +770,7 @@ void DcSctpSocket::ReceivePacket(rtc::ArrayView data) { ++metrics_.rx_packets_count; if (packet_observer_ != nullptr) { - packet_observer_->OnReceivedPacket(callbacks_.TimeMillis(), data); + packet_observer_->OnReceivedPacket(TimeMs(callbacks_.Now().ms()), data); } absl::optional packet = SctpPacket::Parse(data, options_); @@ -952,7 +953,7 @@ TimeDelta DcSctpSocket::OnCookieTimerExpiry() { RTC_DCHECK(state_ == State::kCookieEchoed); if (t1_cookie_->is_running()) { - tcb_->SendBufferedPackets(callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(callbacks_.Now()); } else { InternalClose(ErrorKind::kTooManyRetries, "No COOKIE_ACK received"); } @@ -997,7 +998,7 @@ void DcSctpSocket::OnSentPacket(rtc::ArrayView packet, // The packet observer is invoked even if the packet was failed to be sent, to // indicate an attempt was made. if (packet_observer_ != nullptr) { - packet_observer_->OnSentPacket(callbacks_.TimeMillis(), packet); + packet_observer_->OnSentPacket(TimeMs(callbacks_.Now().ms()), packet); } if (status == SendPacketStatus::kSuccess) { @@ -1283,7 +1284,7 @@ void DcSctpSocket::HandleInitAck( // The connection isn't fully established just yet. tcb_->SetCookieEchoChunk(CookieEchoChunk(cookie->data())); - tcb_->SendBufferedPackets(callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(callbacks_.Now()); t1_cookie_->Start(); } @@ -1352,7 +1353,7 @@ void DcSctpSocket::HandleCookieEcho( // "A COOKIE ACK chunk may be bundled with any pending DATA chunks (and/or // SACK chunks), but the COOKIE ACK chunk MUST be the first chunk in the // packet." - tcb_->SendBufferedPackets(b, callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(b, callbacks_.Now()); } bool DcSctpSocket::HandleCookieEchoWithTCB(const CommonHeader& header, @@ -1450,7 +1451,7 @@ void DcSctpSocket::HandleCookieAck( t1_cookie_->Stop(); tcb_->ClearCookieEchoChunk(); SetState(State::kEstablished, "COOKIE_ACK received"); - tcb_->SendBufferedPackets(callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(callbacks_.Now()); callbacks_.OnConnected(); } @@ -1466,7 +1467,7 @@ void DcSctpSocket::HandleSack(const CommonHeader& header, absl::optional chunk = SackChunk::Parse(descriptor.data); if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); SackChunk sack = ChunkValidators::Clean(*std::move(chunk)); if (tcb_->retransmission_queue().HandleSack(now, sack)) { @@ -1555,7 +1556,7 @@ void DcSctpSocket::HandleError(const CommonHeader& header, void DcSctpSocket::HandleReconfig( const CommonHeader& header, const SctpPacket::ChunkDescriptor& descriptor) { - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); absl::optional chunk = ReConfigChunk::Parse(descriptor.data); if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { tcb_->stream_reset_handler().HandleReConfig(*std::move(chunk)); diff --git a/net/dcsctp/socket/dcsctp_socket_network_test.cc b/net/dcsctp/socket/dcsctp_socket_network_test.cc index f097bfa095..f73ecce445 100644 --- a/net/dcsctp/socket/dcsctp_socket_network_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_network_test.cc @@ -55,6 +55,8 @@ using ::testing::AllOf; using ::testing::Ge; using ::testing::Le; using ::testing::SizeIs; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr StreamID kStreamId(1); constexpr PPID kPpid(53); @@ -142,13 +144,13 @@ class SctpActor : public DcSctpSocketCallbacks { emulated_socket_(emulated_socket), timeout_factory_( *thread_, - [this]() { return TimeMillis(); }, + [this]() { return TimeMs(Now().ms()); }, [this](dcsctp::TimeoutID timeout_id) { sctp_socket_.HandleTimeout(timeout_id); }), random_(GetUniqueSeed()), sctp_socket_(name, *this, nullptr, sctp_options), - last_bandwidth_printout_(TimeMs(TimeMillis())) { + last_bandwidth_printout_(Now()) { emulated_socket.SetReceiver([this](rtc::CopyOnWriteBuffer buf) { // The receiver will be executed on the NetworkEmulation task queue, but // the dcSCTP socket is owned by `thread_` and is not thread-safe. @@ -157,11 +159,11 @@ class SctpActor : public DcSctpSocketCallbacks { } void PrintBandwidth() { - TimeMs now = TimeMillis(); - DurationMs duration = now - last_bandwidth_printout_; + Timestamp now = Now(); + TimeDelta duration = now - last_bandwidth_printout_; double bitrate_mbps = - static_cast(received_bytes_ * 8) / *duration / 1000; + static_cast(received_bytes_ * 8) / duration.ms() / 1000; RTC_LOG(LS_INFO) << log_prefix() << rtc::StringFormat("Received %0.2f Mbps", bitrate_mbps); @@ -185,7 +187,7 @@ class SctpActor : public DcSctpSocketCallbacks { return timeout_factory_.CreateTimeout(precision); } - TimeMs TimeMillis() override { return TimeMs(rtc::TimeMillis()); } + Timestamp Now() override { return Timestamp::Millis(rtc::TimeMillis()); } uint32_t GetRandomInt(uint32_t low, uint32_t high) override { return random_.Rand(low, high); @@ -314,7 +316,7 @@ class SctpActor : public DcSctpSocketCallbacks { DcSctpSocket sctp_socket_; size_t received_bytes_ = 0; absl::optional last_received_message_; - TimeMs last_bandwidth_printout_; + Timestamp last_bandwidth_printout_; // Per-second received bitrates, in Mbps std::vector received_bitrate_mbps_; webrtc::ScopedTaskSafety safety_; diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index 13202846ac..dc76b80a37 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -73,6 +73,8 @@ using ::testing::Not; using ::testing::Property; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr SendOptions kSendOptions; constexpr size_t kLargeMessageSize = DcSctpOptions::kMaxSafeMTUSize * 20; @@ -269,7 +271,7 @@ void RunTimers(SocketUnderTest& s) { } } -void AdvanceTime(SocketUnderTest& a, SocketUnderTest& z, DurationMs duration) { +void AdvanceTime(SocketUnderTest& a, SocketUnderTest& z, TimeDelta duration) { a.cb.AdvanceTime(duration); z.cb.AdvanceTime(duration); @@ -282,14 +284,14 @@ void AdvanceTime(SocketUnderTest& a, SocketUnderTest& z, DurationMs duration) { void ExchangeMessagesAndAdvanceTime( SocketUnderTest& a, SocketUnderTest& z, - DurationMs max_timeout = DurationMs(10000)) { - TimeMs time_started = a.cb.TimeMillis(); - while (a.cb.TimeMillis() - time_started < max_timeout) { + TimeDelta max_timeout = TimeDelta::Seconds(10)) { + Timestamp time_started = a.cb.Now(); + while (a.cb.Now() - time_started < max_timeout) { ExchangeMessages(a, z); - DurationMs time_to_next_timeout = + TimeDelta time_to_next_timeout = std::min(a.cb.GetTimeToNextTimeout(), z.cb.GetTimeToNextTimeout()); - if (time_to_next_timeout == DurationMs::InfiniteDuration()) { + if (time_to_next_timeout.IsPlusInfinity()) { // No more pending timer. return; } @@ -535,7 +537,7 @@ TEST(DcSctpSocketTest, EstablishConnectionLostCookieAck) { EXPECT_EQ(z.socket.state(), SocketState::kConnected); // This will make A re-send the COOKIE_ECHO - AdvanceTime(a, z, DurationMs(a.options.t1_cookie_timeout)); + AdvanceTime(a, z, a.options.t1_cookie_timeout.ToTimeDelta()); // Z reads COOKIE_ECHO, produces COOKIE_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -555,7 +557,7 @@ TEST(DcSctpSocketTest, ResendInitAndEstablishConnection) { EXPECT_THAT(a.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsChunkType(InitChunk::kType)))); - AdvanceTime(a, z, a.options.t1_init_timeout); + AdvanceTime(a, z, a.options.t1_init_timeout.ToTimeDelta()); // Z reads INIT, produces INIT_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -581,7 +583,7 @@ TEST(DcSctpSocketTest, ResendingInitTooManyTimesAborts) { HasChunks(ElementsAre(IsChunkType(InitChunk::kType)))); for (int i = 0; i < *a.options.max_init_retransmits; ++i) { - AdvanceTime(a, z, a.options.t1_init_timeout * (1 << i)); + AdvanceTime(a, z, a.options.t1_init_timeout.ToTimeDelta() * (1 << i)); // INIT is resent EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -590,8 +592,9 @@ TEST(DcSctpSocketTest, ResendingInitTooManyTimesAborts) { // Another timeout, after the max init retransmits. EXPECT_CALL(a.cb, OnAborted).Times(1); - AdvanceTime( - a, z, a.options.t1_init_timeout * (1 << *a.options.max_init_retransmits)); + AdvanceTime(a, z, + a.options.t1_init_timeout.ToTimeDelta() * + (1 << *a.options.max_init_retransmits)); EXPECT_EQ(a.socket.state(), SocketState::kClosed); } @@ -611,7 +614,7 @@ TEST(DcSctpSocketTest, ResendCookieEchoAndEstablishConnection) { EXPECT_THAT(a.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsChunkType(CookieEchoChunk::kType)))); - AdvanceTime(a, z, a.options.t1_init_timeout); + AdvanceTime(a, z, a.options.t1_init_timeout.ToTimeDelta()); // Z reads COOKIE_ECHO, produces COOKIE_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -638,7 +641,7 @@ TEST(DcSctpSocketTest, ResendingCookieEchoTooManyTimesAborts) { HasChunks(ElementsAre(IsChunkType(CookieEchoChunk::kType)))); for (int i = 0; i < *a.options.max_init_retransmits; ++i) { - AdvanceTime(a, z, a.options.t1_cookie_timeout * (1 << i)); + AdvanceTime(a, z, a.options.t1_cookie_timeout.ToTimeDelta() * (1 << i)); // COOKIE_ECHO is resent EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -647,9 +650,9 @@ TEST(DcSctpSocketTest, ResendingCookieEchoTooManyTimesAborts) { // Another timeout, after the max init retransmits. EXPECT_CALL(a.cb, OnAborted).Times(1); - AdvanceTime( - a, z, - a.options.t1_cookie_timeout * (1 << *a.options.max_init_retransmits)); + AdvanceTime(a, z, + a.options.t1_cookie_timeout.ToTimeDelta() * + (1 << *a.options.max_init_retransmits)); EXPECT_EQ(a.socket.state(), SocketState::kClosed); } @@ -680,11 +683,13 @@ TEST(DcSctpSocketTest, DoesntSendMorePacketsUntilCookieAckHasBeenReceived) { // will be T1-COOKIE that drives retransmissions, so when the T3-RTX expires, // nothing should be retransmitted. ASSERT_TRUE(a.options.rto_initial < a.options.t1_cookie_timeout); - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); // When T1-COOKIE expires, both the COOKIE-ECHO and DATA should be present. - AdvanceTime(a, z, a.options.t1_cookie_timeout - a.options.rto_initial); + AdvanceTime(a, z, + a.options.t1_cookie_timeout.ToTimeDelta() - + a.options.rto_initial.ToTimeDelta()); // And this COOKIE-ECHO and DATA is also lost - never received by Z. EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -694,7 +699,7 @@ TEST(DcSctpSocketTest, DoesntSendMorePacketsUntilCookieAckHasBeenReceived) { EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); // COOKIE_ECHO has exponential backoff. - AdvanceTime(a, z, a.options.t1_cookie_timeout * 2); + AdvanceTime(a, z, a.options.t1_cookie_timeout.ToTimeDelta() * 2); // Z reads COOKIE_ECHO, produces COOKIE_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -747,7 +752,7 @@ TEST(DcSctpSocketTest, ShutdownTimerExpiresTooManyTimeClosesConnection) { EXPECT_EQ(a.socket.state(), SocketState::kShuttingDown); for (int i = 0; i < *a.options.max_retransmissions; ++i) { - AdvanceTime(a, z, DurationMs(a.options.rto_initial * (1 << i))); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta() * (1 << i)); // Dropping every shutdown chunk. EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -757,7 +762,8 @@ TEST(DcSctpSocketTest, ShutdownTimerExpiresTooManyTimeClosesConnection) { // The last expiry, makes it abort the connection. EXPECT_CALL(a.cb, OnAborted).Times(1); AdvanceTime(a, z, - a.options.rto_initial * (1 << *a.options.max_retransmissions)); + a.options.rto_initial.ToTimeDelta() * + (1 << *a.options.max_retransmissions)); EXPECT_EQ(a.socket.state(), SocketState::kClosed); EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -815,7 +821,7 @@ TEST_P(DcSctpSocketParametrizedTest, TimeoutResendsPacket) { a.cb.ConsumeSentPacket(); RTC_LOG(LS_INFO) << "Advancing time"; - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); z->socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -886,7 +892,7 @@ TEST_P(DcSctpSocketParametrizedTest, ExpectHeartbeatToBeSent) { EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); - AdvanceTime(a, *z, a.options.heartbeat_interval); + AdvanceTime(a, *z, a.options.heartbeat_interval.ToTimeDelta()); std::vector packet = a.cb.ConsumeSentPacket(); // The info is a single 64-bit number. @@ -920,7 +926,7 @@ TEST_P(DcSctpSocketParametrizedTest, for (int i = 0; i < *a.options.max_retransmissions; ++i) { RTC_LOG(LS_INFO) << "Letting HEARTBEAT interval timer expire - sending..."; - AdvanceTime(a, *z, time_to_next_hearbeat); + AdvanceTime(a, *z, time_to_next_hearbeat.ToTimeDelta()); // Dropping every heartbeat. ASSERT_HAS_VALUE_AND_ASSIGN( @@ -929,20 +935,20 @@ TEST_P(DcSctpSocketParametrizedTest, EXPECT_EQ(hb_packet.descriptors()[0].type, HeartbeatRequestChunk::kType); RTC_LOG(LS_INFO) << "Letting the heartbeat expire."; - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Millis(1000)); time_to_next_hearbeat = a.options.heartbeat_interval - DurationMs(1000); } RTC_LOG(LS_INFO) << "Letting HEARTBEAT interval timer expire - sending..."; - AdvanceTime(a, *z, time_to_next_hearbeat); + AdvanceTime(a, *z, time_to_next_hearbeat.ToTimeDelta()); // Last heartbeat EXPECT_THAT(a.cb.ConsumeSentPacket(), Not(IsEmpty())); EXPECT_CALL(a.cb, OnAborted).Times(1); // Should suffice as exceeding RTO - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Millis(1000)); z = MaybeHandoverSocket(std::move(z)); } @@ -959,7 +965,7 @@ TEST_P(DcSctpSocketParametrizedTest, RecoversAfterASuccessfulAck) { // Force-close socket Z so that it doesn't interfere from now on. z->socket.Close(); - DurationMs time_to_next_hearbeat = a.options.heartbeat_interval; + TimeDelta time_to_next_hearbeat = a.options.heartbeat_interval.ToTimeDelta(); for (int i = 0; i < *a.options.max_retransmissions; ++i) { AdvanceTime(a, *z, time_to_next_hearbeat); @@ -968,9 +974,10 @@ TEST_P(DcSctpSocketParametrizedTest, RecoversAfterASuccessfulAck) { a.cb.ConsumeSentPacket(); RTC_LOG(LS_INFO) << "Letting the heartbeat expire."; - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Seconds(1)); - time_to_next_hearbeat = a.options.heartbeat_interval - DurationMs(1000); + time_to_next_hearbeat = + a.options.heartbeat_interval.ToTimeDelta() - TimeDelta::Seconds(1); } RTC_LOG(LS_INFO) << "Getting the last heartbeat - and acking it"; @@ -990,7 +997,7 @@ TEST_P(DcSctpSocketParametrizedTest, RecoversAfterASuccessfulAck) { // Should suffice as exceeding RTO - which will not fire. EXPECT_CALL(a.cb, OnAborted).Times(0); - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Seconds(1)); EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); @@ -1245,7 +1252,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendMessageWithLimitedRtx) { a.socket.ReceivePacket(z->cb.ConsumeSentPacket()); // Handle delayed SACK for third DATA - AdvanceTime(a, *z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, *z, a.options.delayed_ack_max_timeout.ToTimeDelta()); // Handle SACK for second DATA a.socket.ReceivePacket(z->cb.ConsumeSentPacket()); @@ -1254,7 +1261,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendMessageWithLimitedRtx) { // in-flight and the reported gap could be due to out-of-order delivery. So // the RetransmissionQueue will not mark it as "to be retransmitted" until // after the t3-rtx timer has expired. - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); // The chunk will be marked as retransmitted, and then as abandoned, which // will trigger a FORWARD-TSN to be sent. @@ -1352,7 +1359,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendManyFragmentedMessagesWithLimitedRtx) { ExchangeMessages(a, *z); // Let the RTX timer expire, and exchange FORWARD-TSN/SACKs - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, *z); @@ -1484,7 +1491,7 @@ TEST(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { .Build()); // The receiver might have moved into delayed ack mode. - AdvanceTime(a, z, z.options.rto_initial); + AdvanceTime(a, z, z.options.rto_initial.ToTimeDelta()); EXPECT_THAT(z.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsSack( @@ -1528,7 +1535,7 @@ TEST(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { .Build()); // The receiver might have moved into delayed ack mode. - AdvanceTime(a, z, z.options.rto_initial); + AdvanceTime(a, z, z.options.rto_initial.ToTimeDelta()); EXPECT_THAT(z.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsSack( @@ -1562,13 +1569,13 @@ TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) { z = MaybeHandoverSocket(std::move(z)); // Mock that the time always goes forward. - TimeMs now(0); - EXPECT_CALL(a.cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + Timestamp now = Timestamp::Zero(); + EXPECT_CALL(a.cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); - EXPECT_CALL(z->cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + EXPECT_CALL(z->cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); @@ -1592,7 +1599,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) { EXPECT_FALSE(z->cb.ConsumeReceivedMessage().has_value()); // Validate that the sockets really make the time move forward. - EXPECT_GE(*now, kIterations * 2); + EXPECT_GE(now.ms(), kIterations * 2); MaybeHandoverSocketAndSendMessage(a, std::move(z)); } @@ -1614,13 +1621,13 @@ TEST_P(DcSctpSocketParametrizedTest, lifetime_1.lifetime = DurationMs(1); // Mock that the time always goes forward. - TimeMs now(0); - EXPECT_CALL(a.cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + Timestamp now = Timestamp::Zero(); + EXPECT_CALL(a.cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); - EXPECT_CALL(z->cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + EXPECT_CALL(z->cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); @@ -1944,7 +1951,7 @@ TEST(DcSctpSocketTest, RxAndTxPacketMetricsIncrease) { EXPECT_EQ(z.socket.GetMetrics()->rx_messages_count, 2u); // Delayed sack - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); a.socket.ReceivePacket(z.cb.ConsumeSentPacket()); // SACK EXPECT_EQ(a.socket.GetMetrics()->unack_data_count, 0u); @@ -1981,7 +1988,7 @@ TEST(DcSctpSocketTest, RetransmissionMetricsAreSetForNormalRetransmit) { a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), payload), kSendOptions); a.cb.ConsumeSentPacket(); - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, z); EXPECT_EQ(a.socket.GetMetrics()->rtx_packets_count, 1u); @@ -2185,7 +2192,7 @@ TEST_P(DcSctpSocketParametrizedTest, CanLoseFirstOrderedMessage) { // First DATA is lost, and retransmission timer will delete it. a.cb.ConsumeSentPacket(); - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, *z); // Send a second message (SID=0, SSN=1). @@ -2574,7 +2581,7 @@ TEST(DcSctpSocketTest, LifecycleEventsAreGeneratedForAckedMessages) { EXPECT_CALL(a.cb, OnLifecycleEnd(LifecycleId(42))); ExchangeMessages(a, z); // In case of delayed ack. - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); ExchangeMessages(a, z); EXPECT_THAT(GetReceivedMessagePpids(z), ElementsAre(101, 102, 103)); @@ -2617,15 +2624,15 @@ TEST(DcSctpSocketTest, LifecycleEventsForFailMaxRetransmissions) { ExchangeMessages(a, z); // Handle delayed SACK. - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); ExchangeMessages(a, z); // The chunk is now NACKed. Let the RTO expire, to discard the message. - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, z); // Handle delayed SACK. - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); ExchangeMessages(a, z); EXPECT_THAT(GetReceivedMessagePpids(z), ElementsAre(51, 53)); @@ -2672,7 +2679,7 @@ TEST(DcSctpSocketTest, LifecycleEventsForExpiredMessageWithLifetimeLimit) { .lifecycle_id = LifecycleId(1), }); - AdvanceTime(a, z, DurationMs(200)); + AdvanceTime(a, z, TimeDelta::Millis(200)); EXPECT_CALL(a.cb, OnLifecycleMessageExpired(LifecycleId(1), /*maybe_delivered=*/false)); @@ -2769,7 +2776,7 @@ TEST(DcSctpSocketTest, ResetStreamsDeferred) { // Z sent "in progress", which will make A buffer packets until it's sure // that the reconfiguration has been applied. A will retry - wait for that. - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); auto reconfig2 = a.cb.ConsumeSentPacket(); EXPECT_THAT(reconfig2, HasChunks(ElementsAre(IsReConfig(HasParameters( @@ -3018,7 +3025,7 @@ TEST(DcSctpSocketTest, HandlesForwardTsnOutOfOrderWithStreamResetting) { HasChunks(ElementsAre( IsDataChunk(AllOf(Property(&DataChunk::ssn, SSN(0)), Property(&DataChunk::ppid, PPID(51))))))); - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); auto fwd_tsn_packet = a.cb.ConsumeSentPacket(); EXPECT_THAT(fwd_tsn_packet, diff --git a/net/dcsctp/socket/heartbeat_handler.cc b/net/dcsctp/socket/heartbeat_handler.cc index d7c71f511b..31211e0d13 100644 --- a/net/dcsctp/socket/heartbeat_handler.cc +++ b/net/dcsctp/socket/heartbeat_handler.cc @@ -37,6 +37,7 @@ namespace dcsctp { using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // This is stored (in serialized form) as HeartbeatInfoParameter sent in // HeartbeatRequestChunk and received back in HeartbeatAckChunk. It should be @@ -51,11 +52,11 @@ class HeartbeatInfo { static constexpr size_t kBufferSize = sizeof(uint64_t); static_assert(kBufferSize == 8, "Unexpected buffer size"); - explicit HeartbeatInfo(TimeMs created_at) : created_at_(created_at) {} + explicit HeartbeatInfo(Timestamp created_at) : created_at_(created_at) {} std::vector Serialize() { - uint32_t high_bits = static_cast(*created_at_ >> 32); - uint32_t low_bits = static_cast(*created_at_); + uint32_t high_bits = static_cast(created_at_.ms() >> 32); + uint32_t low_bits = static_cast(created_at_.ms()); std::vector data(kBufferSize); BoundedByteWriter writer(data); @@ -77,13 +78,13 @@ class HeartbeatInfo { uint32_t low_bits = reader.Load32<4>(); uint64_t created_at = static_cast(high_bits) << 32 | low_bits; - return HeartbeatInfo(TimeMs(created_at)); + return HeartbeatInfo(Timestamp::Millis(created_at)); } - TimeMs created_at() const { return created_at_; } + Timestamp created_at() const { return created_at_; } private: - const TimeMs created_at_; + const Timestamp created_at_; }; HeartbeatHandler::HeartbeatHandler(absl::string_view log_prefix, @@ -157,9 +158,9 @@ void HeartbeatHandler::HandleHeartbeatAck(HeartbeatAckChunk chunk) { return; } - TimeMs now = ctx_->callbacks().TimeMillis(); - if (info->created_at() > TimeMs(0) && info->created_at() <= now) { - ctx_->ObserveRTT((now - info->created_at()).ToTimeDelta()); + Timestamp now = ctx_->callbacks().Now(); + if (info->created_at() > Timestamp::Zero() && info->created_at() <= now) { + ctx_->ObserveRTT(now - info->created_at()); } // https://tools.ietf.org/html/rfc4960#section-8.1 @@ -170,7 +171,7 @@ void HeartbeatHandler::HandleHeartbeatAck(HeartbeatAckChunk chunk) { TimeDelta HeartbeatHandler::OnIntervalTimerExpiry() { if (ctx_->is_connection_established()) { - HeartbeatInfo info(ctx_->callbacks().TimeMillis()); + HeartbeatInfo info(ctx_->callbacks().Now()); timeout_timer_->set_duration(ctx_->current_rto()); timeout_timer_->Start(); RTC_DLOG(LS_INFO) << log_prefix_ << "Sending HEARTBEAT with timeout " diff --git a/net/dcsctp/socket/heartbeat_handler_test.cc b/net/dcsctp/socket/heartbeat_handler_test.cc index 919af66bca..4475527322 100644 --- a/net/dcsctp/socket/heartbeat_handler_test.cc +++ b/net/dcsctp/socket/heartbeat_handler_test.cc @@ -53,7 +53,7 @@ class HeartbeatHandlerTestBase : public testing::Test { handler_("log: ", options_, &context_, &timer_manager_) {} void AdvanceTime(webrtc::TimeDelta duration) { - callbacks_.AdvanceTime(DurationMs(duration)); + callbacks_.AdvanceTime(duration); for (;;) { absl::optional timeout_id = callbacks_.GetNextExpiredTimeout(); if (!timeout_id.has_value()) { @@ -135,9 +135,9 @@ TEST_F(HeartbeatHandlerTest, SendsHeartbeatRequestsOnIdleChannel) { HeartbeatAckChunk ack(std::move(req).extract_parameters()); // Respond a while later. This RTT will be measured by the handler - constexpr DurationMs rtt(313); + constexpr TimeDelta rtt = TimeDelta::Millis(313); - EXPECT_CALL(context_, ObserveRTT(rtt.ToTimeDelta())).Times(1); + EXPECT_CALL(context_, ObserveRTT(rtt)).Times(1); callbacks_.AdvanceTime(rtt); handler_.HandleHeartbeatAck(std::move(ack)); @@ -162,7 +162,7 @@ TEST_F(HeartbeatHandlerTest, DoesntObserveInvalidHeartbeats) { // Go backwards in time - which make the HEARTBEAT-ACK have an invalid // timestamp in it, as it will be in the future. - callbacks_.AdvanceTime(DurationMs(-100)); + callbacks_.AdvanceTime(TimeDelta::Millis(-100)); handler_.HandleHeartbeatAck(std::move(ack)); } diff --git a/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h b/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h index bcf5bc4019..972e547b12 100644 --- a/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h +++ b/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h @@ -80,10 +80,7 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { << log_prefix_ << "Socket abort: " << ToString(error) << "; " << message; }); - ON_CALL(*this, TimeMillis).WillByDefault([this]() { return now_; }); - ON_CALL(*this, Now).WillByDefault([this]() { - return webrtc::Timestamp::Millis(*now_); - }); + ON_CALL(*this, Now).WillByDefault([this]() { return now_; }); } MOCK_METHOD(SendPacketStatus, @@ -97,7 +94,6 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { return timeout_manager_.CreateTimeout(); } - MOCK_METHOD(TimeMs, TimeMillis, (), (override)); MOCK_METHOD(webrtc::Timestamp, Now, (), (override)); uint32_t GetRandomInt(uint32_t low, uint32_t high) override { return random_.Rand(low, high); @@ -163,20 +159,20 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { return ret; } - void AdvanceTime(DurationMs duration_ms) { now_ = now_ + duration_ms; } - void SetTime(TimeMs now) { now_ = now; } + void AdvanceTime(webrtc::TimeDelta duration) { now_ = now_ + duration; } + void SetTime(webrtc::Timestamp now) { now_ = now; } absl::optional GetNextExpiredTimeout() { return timeout_manager_.GetNextExpiredTimeout(); } - DurationMs GetTimeToNextTimeout() const { + webrtc::TimeDelta GetTimeToNextTimeout() const { return timeout_manager_.GetTimeToNextTimeout(); } private: const std::string log_prefix_; - TimeMs now_ = TimeMs(0); + webrtc::Timestamp now_ = webrtc::Timestamp::Zero(); webrtc::Random random_; FakeTimeoutManager timeout_manager_; std::deque> sent_packets_; diff --git a/net/dcsctp/socket/stream_reset_handler_test.cc b/net/dcsctp/socket/stream_reset_handler_test.cc index fa0a6a89dc..e675c9bcef 100644 --- a/net/dcsctp/socket/stream_reset_handler_test.cc +++ b/net/dcsctp/socket/stream_reset_handler_test.cc @@ -130,8 +130,8 @@ class StreamResetHandlerTest : public testing::Test { EXPECT_CALL(ctx_, current_rto).WillRepeatedly(Return(kRto)); } - void AdvanceTime(DurationMs duration) { - callbacks_.AdvanceTime(DurationMs(kRto)); + void AdvanceTime(TimeDelta duration) { + callbacks_.AdvanceTime(duration); for (;;) { absl::optional timeout_id = callbacks_.GetNextExpiredTimeout(); if (!timeout_id.has_value()) { @@ -630,7 +630,7 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) { // Let some time pass, so that the reconfig timer expires, and retries the // same request. EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(1); - AdvanceTime(DurationMs(kRto)); + AdvanceTime(kRto); std::vector payload = callbacks_.ConsumeSentPacket(); ASSERT_FALSE(payload.empty()); diff --git a/net/dcsctp/socket/transmission_control_block.cc b/net/dcsctp/socket/transmission_control_block.cc index 2bbb721e81..c6c8861e1f 100644 --- a/net/dcsctp/socket/transmission_control_block.cc +++ b/net/dcsctp/socket/transmission_control_block.cc @@ -39,6 +39,7 @@ namespace dcsctp { using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; TransmissionControlBlock::TransmissionControlBlock( TimerManager& timer_manager, @@ -129,7 +130,7 @@ void TransmissionControlBlock::ObserveRTT(TimeDelta rtt) { } TimeDelta TransmissionControlBlock::OnRtxTimerExpiry() { - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); RTC_DLOG(LS_INFO) << log_prefix_ << "Timer " << t3_rtx_->name() << " has expired"; if (cookie_echo_chunk_.has_value()) { @@ -161,7 +162,7 @@ void TransmissionControlBlock::MaybeSendSack() { } void TransmissionControlBlock::MaybeSendForwardTsn(SctpPacket::Builder& builder, - TimeMs now) { + Timestamp now) { if (now >= limit_forward_tsn_until_ && retransmission_queue_.ShouldSendForwardTsn(now)) { if (capabilities_.message_interleaving) { @@ -177,7 +178,7 @@ void TransmissionControlBlock::MaybeSendForwardTsn(SctpPacket::Builder& builder, // "Any delay applied to the sending of FORWARD TSN chunk SHOULD NOT exceed // 200ms and MUST NOT exceed 500ms". limit_forward_tsn_until_ = - now + std::min(DurationMs(200), DurationMs(rto_.srtt())); + now + std::min(TimeDelta::Millis(200), rto_.srtt()); } } @@ -209,7 +210,7 @@ void TransmissionControlBlock::MaybeSendFastRetransmit() { } void TransmissionControlBlock::SendBufferedPackets(SctpPacket::Builder& builder, - TimeMs now) { + Timestamp now) { for (int packet_idx = 0; packet_idx < options_.max_burst && retransmission_queue_.can_send_data(); ++packet_idx) { diff --git a/net/dcsctp/socket/transmission_control_block.h b/net/dcsctp/socket/transmission_control_block.h index c4c48af048..f8b2445525 100644 --- a/net/dcsctp/socket/transmission_control_block.h +++ b/net/dcsctp/socket/transmission_control_block.h @@ -108,7 +108,7 @@ class TransmissionControlBlock : public Context { void MaybeSendSack(); // Sends a FORWARD-TSN, if it is needed and allowed (rate-limited). - void MaybeSendForwardTsn(SctpPacket::Builder& builder, TimeMs now); + void MaybeSendForwardTsn(SctpPacket::Builder& builder, webrtc::Timestamp now); // Will be set while the socket is in kCookieEcho state. In this state, there // can only be a single packet outstanding, and it must contain the COOKIE @@ -129,12 +129,12 @@ class TransmissionControlBlock : public Context { // Fills `builder` (which may already be filled with control chunks) with // other control and data chunks, and sends packets as much as can be // allowed by the congestion control algorithm. - void SendBufferedPackets(SctpPacket::Builder& builder, TimeMs now); + void SendBufferedPackets(SctpPacket::Builder& builder, webrtc::Timestamp now); // As above, but without passing in a builder. If `cookie_echo_chunk_` is // present, then only one packet will be sent, with this chunk as the first // chunk. - void SendBufferedPackets(TimeMs now) { + void SendBufferedPackets(webrtc::Timestamp now) { SctpPacket::Builder builder(peer_verification_tag_, options_); SendBufferedPackets(builder, now); } @@ -172,7 +172,7 @@ class TransmissionControlBlock : public Context { const std::function is_connection_established_; PacketSender& packet_sender_; // Rate limiting of FORWARD-TSN. Next can be sent at or after this timestamp. - TimeMs limit_forward_tsn_until_ = TimeMs(0); + webrtc::Timestamp limit_forward_tsn_until_ = webrtc::Timestamp::Zero(); RetransmissionTimeout rto_; RetransmissionErrorCounter tx_error_counter_; diff --git a/net/dcsctp/timer/BUILD.gn b/net/dcsctp/timer/BUILD.gn index 00a18a4fa5..9dbe11b3ba 100644 --- a/net/dcsctp/timer/BUILD.gn +++ b/net/dcsctp/timer/BUILD.gn @@ -13,6 +13,7 @@ rtc_library("timer") { "../../../api:array_view", "../../../api/task_queue:task_queue", "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:strong_alias", "../../../rtc_base/containers:flat_map", @@ -38,6 +39,7 @@ rtc_library("task_queue_timeout") { "../../../api/task_queue:pending_task_safety_flag", "../../../api/task_queue:task_queue", "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:logging", "../public:socket", @@ -58,9 +60,9 @@ if (rtc_include_tests) { ":task_queue_timeout", ":timer", "../../../api:array_view", - "../../../api/units:time_delta", "../../../api/task_queue:task_queue", "../../../api/task_queue/test:mock_task_queue_base", + "../../../api/units:time_delta", "../../../rtc_base:checks", "../../../rtc_base:gunit_helpers", "../../../test:test_support", diff --git a/net/dcsctp/timer/fake_timeout.h b/net/dcsctp/timer/fake_timeout.h index 4621b2ce83..cac49287d4 100644 --- a/net/dcsctp/timer/fake_timeout.h +++ b/net/dcsctp/timer/fake_timeout.h @@ -19,6 +19,7 @@ #include "absl/types/optional.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/timestamp.h" #include "net/dcsctp/public/timeout.h" #include "net/dcsctp/public/types.h" #include "rtc_base/checks.h" @@ -29,46 +30,46 @@ namespace dcsctp { // A timeout used in tests. class FakeTimeout : public Timeout { public: - FakeTimeout(std::function get_time, + FakeTimeout(std::function get_time, std::function on_delete) : get_time_(std::move(get_time)), on_delete_(std::move(on_delete)) {} ~FakeTimeout() override { on_delete_(this); } void Start(DurationMs duration_ms, TimeoutID timeout_id) override { - RTC_DCHECK(expiry_ == TimeMs::InfiniteFuture()); + RTC_DCHECK(expiry_.IsPlusInfinity()); timeout_id_ = timeout_id; - expiry_ = get_time_() + duration_ms; + expiry_ = get_time_() + duration_ms.ToTimeDelta(); } void Stop() override { - RTC_DCHECK(expiry_ != TimeMs::InfiniteFuture()); - expiry_ = TimeMs::InfiniteFuture(); + RTC_DCHECK(!expiry_.IsPlusInfinity()); + expiry_ = webrtc::Timestamp::PlusInfinity(); } - bool EvaluateHasExpired(TimeMs now) { + bool EvaluateHasExpired(webrtc::Timestamp now) { if (now >= expiry_) { - expiry_ = TimeMs::InfiniteFuture(); + expiry_ = webrtc::Timestamp::PlusInfinity(); return true; } return false; } TimeoutID timeout_id() const { return timeout_id_; } - TimeMs expiry() const { return expiry_; } + webrtc::Timestamp expiry() const { return expiry_; } private: - const std::function get_time_; + const std::function get_time_; const std::function on_delete_; TimeoutID timeout_id_ = TimeoutID(0); - TimeMs expiry_ = TimeMs::InfiniteFuture(); + webrtc::Timestamp expiry_ = webrtc::Timestamp::PlusInfinity(); }; class FakeTimeoutManager { public: // The `get_time` function must return the current time, relative to any // epoch. - explicit FakeTimeoutManager(std::function get_time) + explicit FakeTimeoutManager(std::function get_time) : get_time_(std::move(get_time)) {} std::unique_ptr CreateTimeout() { @@ -89,7 +90,7 @@ class FakeTimeoutManager { // Timer::is_running_ to false before you operate on the Timer or Timeout // again. absl::optional GetNextExpiredTimeout() { - TimeMs now = get_time_(); + webrtc::Timestamp now = get_time_(); std::vector expired_timers; for (auto& timer : timers_) { if (timer->EvaluateHasExpired(now)) { @@ -99,21 +100,21 @@ class FakeTimeoutManager { return absl::nullopt; } - DurationMs GetTimeToNextTimeout() const { - TimeMs next_expiry = TimeMs::InfiniteFuture(); + webrtc::TimeDelta GetTimeToNextTimeout() const { + webrtc::Timestamp next_expiry = webrtc::Timestamp::PlusInfinity(); for (const FakeTimeout* timer : timers_) { if (timer->expiry() < next_expiry) { next_expiry = timer->expiry(); } } - TimeMs now = get_time_(); - return next_expiry != TimeMs::InfiniteFuture() && next_expiry >= now + webrtc::Timestamp now = get_time_(); + return !next_expiry.IsPlusInfinity() && next_expiry >= now ? next_expiry - now - : DurationMs::InfiniteDuration(); + : webrtc::TimeDelta::PlusInfinity(); } private: - const std::function get_time_; + const std::function get_time_; webrtc::flat_set timers_; }; diff --git a/net/dcsctp/timer/task_queue_timeout.cc b/net/dcsctp/timer/task_queue_timeout.cc index 6c43640d39..7612f98f3a 100644 --- a/net/dcsctp/timer/task_queue_timeout.cc +++ b/net/dcsctp/timer/task_queue_timeout.cc @@ -14,6 +14,8 @@ #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; TaskQueueTimeoutFactory::TaskQueueTimeout::TaskQueueTimeout( TaskQueueTimeoutFactory& parent, @@ -30,8 +32,8 @@ TaskQueueTimeoutFactory::TaskQueueTimeout::~TaskQueueTimeout() { void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, TimeoutID timeout_id) { RTC_DCHECK_RUN_ON(&parent_.thread_checker_); - RTC_DCHECK(timeout_expiration_ == TimeMs::InfiniteFuture()); - timeout_expiration_ = parent_.get_time_() + duration_ms; + RTC_DCHECK(timeout_expiration_.IsPlusInfinity()); + timeout_expiration_ = parent_.Now() + duration_ms.ToTimeDelta(); timeout_id_ = timeout_id; if (timeout_expiration_ >= posted_task_expiration_) { @@ -43,7 +45,7 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, return; } - if (posted_task_expiration_ != TimeMs::InfiniteFuture()) { + if (!posted_task_expiration_.IsPlusInfinity()) { RTC_DLOG(LS_VERBOSE) << "New timeout duration is less than scheduled - " "ghosting old delayed task."; // There is already a scheduled delayed task, but its expiration time is @@ -63,10 +65,10 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, [timeout_id, this]() { RTC_DLOG(LS_VERBOSE) << "Timout expired: " << timeout_id.value(); RTC_DCHECK_RUN_ON(&parent_.thread_checker_); - RTC_DCHECK(posted_task_expiration_ != TimeMs::InfiniteFuture()); - posted_task_expiration_ = TimeMs::InfiniteFuture(); + RTC_DCHECK(!posted_task_expiration_.IsPlusInfinity()); + posted_task_expiration_ = Timestamp::PlusInfinity(); - if (timeout_expiration_ == TimeMs::InfiniteFuture()) { + if (timeout_expiration_.IsPlusInfinity()) { // The timeout was stopped before it expired. Very common. } else { // Note that the timeout might have been restarted, which updated @@ -74,10 +76,10 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, // if it's not quite time to trigger the timeout yet, schedule a // new delayed task with what's remaining and retry at that point // in time. - DurationMs remaining = timeout_expiration_ - parent_.get_time_(); - timeout_expiration_ = TimeMs::InfiniteFuture(); - if (*remaining > 0) { - Start(remaining, timeout_id_); + TimeDelta remaining = timeout_expiration_ - parent_.Now(); + timeout_expiration_ = Timestamp::PlusInfinity(); + if (remaining > TimeDelta::Zero()) { + Start(DurationMs(remaining.ms()), timeout_id_); } else { // It has actually triggered. RTC_DLOG(LS_VERBOSE) @@ -93,7 +95,7 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Stop() { // As the TaskQueue doesn't support deleting a posted task, just mark the // timeout as not running. RTC_DCHECK_RUN_ON(&parent_.thread_checker_); - timeout_expiration_ = TimeMs::InfiniteFuture(); + timeout_expiration_ = Timestamp::PlusInfinity(); } } // namespace dcsctp diff --git a/net/dcsctp/timer/task_queue_timeout.h b/net/dcsctp/timer/task_queue_timeout.h index faae14464f..4b40309f83 100644 --- a/net/dcsctp/timer/task_queue_timeout.h +++ b/net/dcsctp/timer/task_queue_timeout.h @@ -15,6 +15,7 @@ #include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/timestamp.h" #include "net/dcsctp/public/timeout.h" namespace dcsctp { @@ -74,14 +75,17 @@ class TaskQueueTimeoutFactory { rtc::scoped_refptr pending_task_safety_flag_; // The time when the posted delayed task is set to expire. Will be set to // the infinite future if there is no such task running. - TimeMs posted_task_expiration_ = TimeMs::InfiniteFuture(); + webrtc::Timestamp posted_task_expiration_ = + webrtc::Timestamp::PlusInfinity(); // The time when the timeout expires. It will be set to the infinite future // if the timeout is not running/not started. - TimeMs timeout_expiration_ = TimeMs::InfiniteFuture(); + webrtc::Timestamp timeout_expiration_ = webrtc::Timestamp::PlusInfinity(); // The current timeout ID that will be reported when expired. TimeoutID timeout_id_ = TimeoutID(0); }; + webrtc::Timestamp Now() { return webrtc::Timestamp::Millis(*get_time_()); } + RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker thread_checker_; webrtc::TaskQueueBase& task_queue_; const std::function get_time_; diff --git a/net/dcsctp/timer/timer_test.cc b/net/dcsctp/timer/timer_test.cc index 5584b50468..9a7c029ec5 100644 --- a/net/dcsctp/timer/timer_test.cc +++ b/net/dcsctp/timer/timer_test.cc @@ -23,6 +23,7 @@ namespace dcsctp { namespace { using ::testing::Return; using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; class TimerTest : public testing::Test { protected: @@ -35,7 +36,7 @@ class TimerTest : public testing::Test { } void AdvanceTimeAndRunTimers(TimeDelta duration) { - now_ = now_ + DurationMs(duration); + now_ = now_ + duration; for (;;) { absl::optional timeout_id = @@ -47,7 +48,7 @@ class TimerTest : public testing::Test { } } - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager manager_; testing::MockFunction on_expired_; @@ -429,7 +430,7 @@ TEST_F(TimerTest, DurationStaysWithinMaxTimerBackOffDuration) { } TEST(TimerManagerTest, TimerManagerPassesPrecisionToCreateTimeoutMethod) { - FakeTimeoutManager timeout_manager([&]() { return TimeMs(0); }); + FakeTimeoutManager timeout_manager([&]() { return Timestamp::Zero(); }); absl::optional create_timer_precison; TimerManager manager([&](webrtc::TaskQueueBase::DelayPrecision precision) { create_timer_precison = precision; diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index 4e3a959b27..3183be36d8 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -11,6 +11,7 @@ import("../../../webrtc.gni") rtc_source_set("send_queue") { deps = [ "../../../api:array_view", + "../../../api/units:timestamp", "../common:internal_types", "../packet:chunk", "../packet:data", @@ -105,6 +106,7 @@ rtc_library("outstanding_data") { ":send_queue", "../../../api:array_view", "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:logging", "../../../rtc_base/containers:flat_set", @@ -164,6 +166,7 @@ if (rtc_include_tests) { deps = [ ":send_queue", "../../../api:array_view", + "../../../api/units:timestamp", "../../../test:test_support", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h index 04921866ae..3511403eab 100644 --- a/net/dcsctp/tx/mock_send_queue.h +++ b/net/dcsctp/tx/mock_send_queue.h @@ -15,6 +15,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/timestamp.h" #include "net/dcsctp/tx/send_queue.h" #include "test/gmock.h" @@ -23,14 +24,15 @@ namespace dcsctp { class MockSendQueue : public SendQueue { public: MockSendQueue() { - ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) { - return absl::nullopt; - }); + ON_CALL(*this, Produce) + .WillByDefault([](webrtc::Timestamp now, size_t max_size) { + return absl::nullopt; + }); } MOCK_METHOD(absl::optional, Produce, - (TimeMs now, size_t max_size), + (webrtc::Timestamp now, size_t max_size), (override)); MOCK_METHOD(bool, Discard, diff --git a/net/dcsctp/tx/outstanding_data.cc b/net/dcsctp/tx/outstanding_data.cc index 2a92f14218..30d83870e3 100644 --- a/net/dcsctp/tx/outstanding_data.cc +++ b/net/dcsctp/tx/outstanding_data.cc @@ -15,12 +15,14 @@ #include #include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/public/types.h" #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::Timestamp; // The number of times a packet must be NACKed before it's retransmitted. // See https://tools.ietf.org/html/rfc4960#section-7.2.4 @@ -64,12 +66,12 @@ void OutstandingData::Item::MarkAsRetransmitted() { } void OutstandingData::Item::Abandon() { - RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() || + RTC_DCHECK(!expires_at_.IsPlusInfinity() || max_retransmissions_ != MaxRetransmits::NoLimit()); lifecycle_ = Lifecycle::kAbandoned; } -bool OutstandingData::Item::has_expired(TimeMs now) const { +bool OutstandingData::Item::has_expired(Timestamp now) const { return expires_at_ <= now; } @@ -283,9 +285,9 @@ void OutstandingData::AbandonAllFor(const Item& item) { outstanding_data_ .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), std::forward_as_tuple( - item.message_id(), std::move(message_end), TimeMs(0), - MaxRetransmits(0), TimeMs::InfiniteFuture(), - LifecycleId::NotSet())) + item.message_id(), std::move(message_end), + Timestamp::Zero(), MaxRetransmits(0), + Timestamp::PlusInfinity(), LifecycleId::NotSet())) .first->second; // The added chunk shouldn't be included in `outstanding_bytes`, so set it // as acked. @@ -371,7 +373,7 @@ std::vector> OutstandingData::GetChunksToBeRetransmitted( return ExtractChunksThatCanFit(to_be_retransmitted_, max_size); } -void OutstandingData::ExpireOutstandingChunks(TimeMs now) { +void OutstandingData::ExpireOutstandingChunks(Timestamp now) { for (const auto& [tsn, item] : outstanding_data_) { // Chunks that are nacked can be expired. Care should be taken not to expire // unacked (in-flight) chunks as they might have been received, but the SACK @@ -399,9 +401,9 @@ UnwrappedTSN OutstandingData::highest_outstanding_tsn() const { absl::optional OutstandingData::Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + Timestamp expires_at, LifecycleId lifecycle_id) { UnwrappedTSN tsn = next_tsn_; next_tsn_.Increment(); @@ -442,7 +444,7 @@ void OutstandingData::NackAll() { RTC_DCHECK(IsConsistent()); } -webrtc::TimeDelta OutstandingData::MeasureRTT(TimeMs now, +webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now, UnwrappedTSN tsn) const { auto it = outstanding_data_.find(tsn); if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) { @@ -451,7 +453,7 @@ webrtc::TimeDelta OutstandingData::MeasureRTT(TimeMs now, // packets that were retransmitted (and thus for which it is ambiguous // whether the reply was for the first instance of the chunk or for a // later instance)" - return (now - it->second.time_sent()).ToTimeDelta(); + return now - it->second.time_sent(); } return webrtc::TimeDelta::PlusInfinity(); } diff --git a/net/dcsctp/tx/outstanding_data.h b/net/dcsctp/tx/outstanding_data.h index 323f2de349..3cad8069c1 100644 --- a/net/dcsctp/tx/outstanding_data.h +++ b/net/dcsctp/tx/outstanding_data.h @@ -16,6 +16,7 @@ #include #include "absl/types/optional.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" @@ -105,7 +106,7 @@ class OutstandingData { // Given the current time `now_ms`, expire and abandon outstanding (sent at // least once) chunks that have a limited lifetime. - void ExpireOutstandingChunks(TimeMs now); + void ExpireOutstandingChunks(webrtc::Timestamp now); bool empty() const { return outstanding_data_.empty(); } @@ -131,9 +132,9 @@ class OutstandingData { absl::optional Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + webrtc::Timestamp time_sent, MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(), - TimeMs expires_at = TimeMs::InfiniteFuture(), + webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(), LifecycleId lifecycle_id = LifecycleId::NotSet()); // Nacks all outstanding data. @@ -148,7 +149,7 @@ class OutstandingData { // Given the current time and a TSN, it returns the measured RTT between when // the chunk was sent and now. It takes into acccount Karn's algorithm, so if // the chunk has ever been retransmitted, it will return `PlusInfinity()`. - webrtc::TimeDelta MeasureRTT(TimeMs now, UnwrappedTSN tsn) const; + webrtc::TimeDelta MeasureRTT(webrtc::Timestamp now, UnwrappedTSN tsn) const; // Returns the internal state of all queued chunks. This is only used in // unit-tests. @@ -179,9 +180,9 @@ class OutstandingData { Item(OutgoingMessageId message_id, Data data, - TimeMs time_sent, + webrtc::Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + webrtc::Timestamp expires_at, LifecycleId lifecycle_id) : message_id_(message_id), time_sent_(time_sent), @@ -195,7 +196,7 @@ class OutstandingData { OutgoingMessageId message_id() const { return message_id_; } - TimeMs time_sent() const { return time_sent_; } + webrtc::Timestamp time_sent() const { return time_sent_; } const Data& data() const { return data_; } @@ -229,7 +230,7 @@ class OutstandingData { // Given the current time, and the current state of this DATA chunk, it will // indicate if it has expired (SCTP Partial Reliability Extension). - bool has_expired(TimeMs now) const; + bool has_expired(webrtc::Timestamp now) const; LifecycleId lifecycle_id() const { return lifecycle_id_; } @@ -258,7 +259,7 @@ class OutstandingData { const OutgoingMessageId message_id_; // When the packet was sent, and placed in this queue. - const TimeMs time_sent_; + const webrtc::Timestamp time_sent_; // If the message was sent with a maximum number of retransmissions, this is // set to that number. The value zero (0) means that it will never be // retransmitted. @@ -278,7 +279,7 @@ class OutstandingData { // At this exact millisecond, the item is considered expired. If the message // is not to be expired, this is set to the infinite future. - const TimeMs expires_at_; + const webrtc::Timestamp expires_at_; // An optional lifecycle id, which may only be set for the last fragment. const LifecycleId lifecycle_id_; diff --git a/net/dcsctp/tx/outstanding_data_test.cc b/net/dcsctp/tx/outstanding_data_test.cc index e119a56607..33fc51acf1 100644 --- a/net/dcsctp/tx/outstanding_data_test.cc +++ b/net/dcsctp/tx/outstanding_data_test.cc @@ -38,8 +38,9 @@ using ::testing::Return; using ::testing::StrictMock; using ::testing::UnorderedElementsAre; using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; -constexpr TimeMs kNow(42); +constexpr Timestamp kNow = Timestamp::Millis(42); constexpr OutgoingMessageId kMessageId = OutgoingMessageId(17); class OutstandingDataTest : public testing::Test { @@ -278,20 +279,20 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { } TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) { - static constexpr TimeMs kExpiresAt = kNow + DurationMs(1); + static constexpr Timestamp kExpiresAt = kNow + TimeDelta::Millis(1); EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, ""), - kNow + DurationMs(0), MaxRetransmits::NoLimit(), - kExpiresAt) + kNow + TimeDelta::Millis(0), + MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); EXPECT_FALSE(buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), - kNow + DurationMs(1), MaxRetransmits::NoLimit(), - kExpiresAt) + kNow + TimeDelta::Millis(1), + MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); @@ -363,12 +364,12 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) { TEST_F(OutstandingDataTest, MeasureRTT) { buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); - buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(1)); - buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(2)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(1)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(2)); static constexpr TimeDelta kDuration = TimeDelta::Millis(123); TimeDelta duration = - buf_.MeasureRTT(kNow + DurationMs(kDuration), unwrapper_.Unwrap(TSN(11))); + buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11))); EXPECT_EQ(duration, kDuration - TimeDelta::Millis(1)); } @@ -453,13 +454,13 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) { buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(42)); buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(43)); buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(44)); OutstandingData::AckInfo ack1 = @@ -479,7 +480,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) { buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), - TimeMs::InfiniteFuture(), LifecycleId(42)); + Timestamp::PlusInfinity(), LifecycleId(42)); std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -515,7 +516,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) { buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), - TimeMs::InfiniteFuture(), LifecycleId(42)); + Timestamp::PlusInfinity(), LifecycleId(42)); EXPECT_THAT(buf_.GetChunkStatesForTesting(), testing::ElementsAre(Pair(TSN(9), State::kAcked), // diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 2b166f1fde..da06ddcafd 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -45,6 +45,7 @@ namespace dcsctp { namespace { using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // Allow sending only slightly less than an MTU, to account for headers. constexpr float kMinBytesRequiredToSendFactor = 0.9; @@ -258,7 +259,7 @@ bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const { return true; } -bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { +bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) { if (!IsSackValid(sack)) { return false; } @@ -336,7 +337,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { return true; } -void RetransmissionQueue::UpdateRTT(TimeMs now, +void RetransmissionQueue::UpdateRTT(Timestamp now, UnwrappedTSN cumulative_tsn_ack) { // RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C, // Halvorsen P (2006) Considerations of SCTP retransmission delays for thin @@ -449,7 +450,7 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) { } std::vector> RetransmissionQueue::GetChunksToSend( - TimeMs now, + Timestamp now, size_t bytes_remaining_in_packet) { // Chunks are always padded to even divisible by four. RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); @@ -494,7 +495,8 @@ std::vector> RetransmissionQueue::GetChunksToSend( chunk_opt->message_id, chunk_opt->data, now, partial_reliability_ ? chunk_opt->max_retransmissions : MaxRetransmits::NoLimit(), - partial_reliability_ ? chunk_opt->expires_at : TimeMs::InfiniteFuture(), + partial_reliability_ ? chunk_opt->expires_at + : Timestamp::PlusInfinity(), chunk_opt->lifecycle_id); if (tsn.has_value()) { @@ -539,7 +541,7 @@ bool RetransmissionQueue::can_send_data() const { max_bytes_to_send() >= min_bytes_required_to_send_; } -bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { +bool RetransmissionQueue::ShouldSendForwardTsn(Timestamp now) { if (!partial_reliability_) { return false; } diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index d4c6edf7fb..690aeeef91 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -69,7 +69,7 @@ class RetransmissionQueue { // Handles a received SACK. Returns true if the `sack` was processed and // false if it was discarded due to received out-of-order and not relevant. - bool HandleSack(TimeMs now, const SackChunk& sack); + bool HandleSack(webrtc::Timestamp now, const SackChunk& sack); // Handles an expired retransmission timer. void HandleT3RtxTimerExpiry(); @@ -90,7 +90,7 @@ class RetransmissionQueue { // called prior to this method, to abandon expired chunks, as this method will // not expire any chunks. std::vector> GetChunksToSend( - TimeMs now, + webrtc::Timestamp now, size_t bytes_remaining_in_packet); // Returns the internal state of all queued chunks. This is only used in @@ -136,7 +136,7 @@ class RetransmissionQueue { // Given the current time `now`, it will evaluate if there are chunks that // have expired and that need to be discarded. It returns true if a // FORWARD-TSN should be sent. - bool ShouldSendForwardTsn(TimeMs now); + bool ShouldSendForwardTsn(webrtc::Timestamp now); // Creates a FORWARD-TSN chunk. ForwardTsnChunk CreateForwardTsn() const { @@ -185,7 +185,7 @@ class RetransmissionQueue { // When a SACK chunk is received, this method will be called which _may_ call // into the `RetransmissionTimeout` to update the RTO. - void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack); + void UpdateRTT(webrtc::Timestamp now, UnwrappedTSN cumulative_tsn_ack); // If the congestion control is in "fast recovery mode", this may be exited // now. diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc index 4cbdea14b1..3b4a2323c3 100644 --- a/net/dcsctp/tx/retransmission_queue_test.cc +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -53,6 +53,7 @@ using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr uint32_t kArwnd = 100000; constexpr uint32_t kMaxMtu = 1191; @@ -78,9 +79,9 @@ class RetransmissionQueueTest : public testing::Test { []() { return TimeDelta::Zero(); }, TimerOptions(options_.rto_initial.ToTimeDelta()))) {} - std::function CreateChunk( + std::function CreateChunk( OutgoingMessageId message_id) { - return [this, message_id](TimeMs now, size_t max_size) { + return [this, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend(message_id, gen_.Ordered({1, 2, 3, 4}, "BE")); }; @@ -128,7 +129,7 @@ class RetransmissionQueueTest : public testing::Test { MockDcSctpSocketCallbacks callbacks_; DcSctpOptions options_; DataGenerator gen_; - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager timer_manager_; NiceMock> on_rtt_; @@ -147,7 +148,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunk) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -160,7 +161,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -176,7 +177,7 @@ TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); @@ -199,7 +200,7 @@ TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -230,7 +231,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -242,7 +243,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); // Ack 12, 14-15, 17-18 @@ -263,7 +264,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); // Ack 12, 14-15, 17-19 @@ -275,7 +276,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); // Ack 12, 14-15, 17-20 @@ -322,16 +323,16 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); - static constexpr TimeMs kStartTime(100000); + static constexpr Timestamp kStartTime = Timestamp::Seconds(100); now_ = kStartTime; EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); // Ack 10, 12, after 100ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); @@ -343,22 +344,22 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Send 13 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(3))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13))); // Ack 10, 12-13, after 100ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {})); // Send 14 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(4))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14))); // Ack 10, 12-14, after 100 ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); @@ -384,11 +385,11 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Verify that the timer was really restarted when fast-retransmitting. The // timeout is `options_.rto_initial`, so advance the time just before that. - now_ += options_.rto_initial - DurationMs(1); + now_ += options_.rto_initial.ToTimeDelta() - TimeDelta::Millis(1); EXPECT_FALSE(timeout_manager_.GetNextExpiredTimeout().has_value()); // And ensure it really is running. - now_ += DurationMs(1); + now_ += TimeDelta::Millis(1); ASSERT_HAS_VALUE_AND_ASSIGN(TimeoutID timeout, timeout_manager_.GetNextExpiredTimeout()); // An expired timeout has to be handled (asserts validate this). @@ -398,15 +399,15 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(1), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); @@ -421,11 +422,11 @@ TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = @@ -460,12 +461,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { RetransmissionQueue queue = CreateQueue(/*supports_partial_reliability=*/false); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = @@ -489,12 +490,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = @@ -530,12 +531,12 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(3); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = @@ -583,11 +584,11 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { std::vector payload(1000); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1500); @@ -620,23 +621,23 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector> chunks_to_send = @@ -676,23 +677,23 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "E")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector> chunks_to_send = @@ -730,7 +731,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(1); SendQueue::DataToSend dts(OutgoingMessageId(42), @@ -738,7 +739,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(2); SendQueue::DataToSend dts(OutgoingMessageId(43), @@ -746,7 +747,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(3); SendQueue::DataToSend dts(OutgoingMessageId(44), @@ -754,7 +755,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(4); SendQueue::DataToSend dts(OutgoingMessageId(45), @@ -762,7 +763,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); @@ -851,19 +852,19 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { TEST_F(RetransmissionQueueTest, MeasureRTT) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); - now_ = now_ + DurationMs(123); + now_ = now_ + TimeDelta::Millis(123); EXPECT_CALL(on_rtt_, Call(TimeDelta::Millis(123))).Times(1); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); @@ -889,7 +890,7 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -919,7 +920,7 @@ TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -966,7 +967,7 @@ TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -996,14 +997,14 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the // magic numbers in this test. EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t size) { + .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); std::vector payload(183); return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillOnce([this](TimeMs, size_t size) { + .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); std::vector payload(957); @@ -1019,23 +1020,23 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector> chunks_to_send = @@ -1083,21 +1084,21 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { DataGeneratorOptions options; options.stream_id = StreamID(17); options.mid = MID(42); - TimeMs test_start = now_; + Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B", options)); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "", options)); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 24); @@ -1105,7 +1106,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId)) .WillOnce(Return(true)); - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); @@ -1119,38 +1120,38 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { RetransmissionQueue queue = CreateQueue(); - TimeMs test_start = now_; + Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(42), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(43), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) // Stream reset - MID reset to zero again. - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44))) .WillOnce(Return(true)); @@ -1161,7 +1162,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { EXPECT_THAT(queue.GetChunksToSend(now_, 24), ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0))))); - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); EXPECT_THAT( @@ -1177,7 +1178,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; @@ -1185,7 +1186,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1247,7 +1248,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { // This is a fairly long test. RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(2); return dts; @@ -1261,7 +1262,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1387,11 +1388,11 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) { std::vector payload(1000); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1500); @@ -1415,12 +1416,12 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) - .WillOnce([chunk_size, this](TimeMs, size_t) { + .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector(chunk_size), "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector> chunks_to_send = @@ -1448,12 +1449,12 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) - .WillOnce([chunk_size, this](TimeMs, size_t) { + .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector(chunk_size), "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector> chunks_to_send = @@ -1468,7 +1469,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); EXPECT_EQ( @@ -1491,7 +1492,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8)); EXPECT_EQ( queue.GetHandoverReadiness(), @@ -1504,7 +1505,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-18 @@ -1516,7 +1517,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-19 @@ -1528,7 +1529,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-20 @@ -1564,7 +1565,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2)); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); @@ -1575,7 +1576,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(*handedover_queue), testing::ElementsAre(TSN(12), TSN(13), TSN(14))); @@ -1593,27 +1594,27 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) { std::vector payload(mtu - 100); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "B")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "E")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Produce all chunks and put them in the retransmission queue. std::vector> chunks_to_send = diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index facb432c59..b889c063d7 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -30,6 +30,8 @@ #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, @@ -137,7 +139,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, } absl::optional RRSendQueue::OutgoingStream::Produce( - TimeMs now, + Timestamp now, size_t max_size) { RTC_DCHECK(pause_state_ != PauseState::kPaused && pause_state_ != PauseState::kResetting); @@ -349,7 +351,7 @@ bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { return items_.front().mid.has_value(); } -void RRSendQueue::Add(TimeMs now, +void RRSendQueue::Add(Timestamp now, DcSctpMessage message, const SendOptions& send_options) { RTC_DCHECK(!message.payload().empty()); @@ -366,8 +368,9 @@ void RRSendQueue::Add(TimeMs now, ? MaxRetransmits(send_options.max_retransmissions.value()) : MaxRetransmits::NoLimit(), .expires_at = send_options.lifetime.has_value() - ? now + *send_options.lifetime + DurationMs(1) - : TimeMs::InfiniteFuture(), + ? now + send_options.lifetime->ToTimeDelta() + + TimeDelta::Millis(1) + : Timestamp::PlusInfinity(), .lifecycle_id = send_options.lifecycle_id, }; GetOrCreateStreamInfo(message.stream_id()) @@ -383,7 +386,7 @@ bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } -absl::optional RRSendQueue::Produce(TimeMs now, +absl::optional RRSendQueue::Produce(Timestamp now, size_t max_size) { return scheduler_.Produce(now, max_size); } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index bef5fe437d..b6c359dc1e 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -71,12 +71,13 @@ class RRSendQueue : public SendQueue { // time should be in `now`. Note that it's the responsibility of the caller to // ensure that the buffer is not full (by calling `IsFull`) before adding // messages to it. - void Add(TimeMs now, + void Add(webrtc::Timestamp now, DcSctpMessage message, const SendOptions& send_options = {}); // Implementation of `SendQueue`. - absl::optional Produce(TimeMs now, size_t max_size) override; + absl::optional Produce(webrtc::Timestamp now, + size_t max_size) override; bool Discard(StreamID stream_id, OutgoingMessageId message_id) override; void PrepareResetStream(StreamID streams) override; bool HasStreamsReadyToBeReset() const override; @@ -104,7 +105,7 @@ class RRSendQueue : public SendQueue { struct MessageAttributes { IsUnordered unordered; MaxRetransmits max_retransmissions; - TimeMs expires_at; + webrtc::Timestamp expires_at; LifecycleId lifecycle_id; }; @@ -154,7 +155,7 @@ class RRSendQueue : public SendQueue { void Add(DcSctpMessage message, MessageAttributes attributes); // Implementing `StreamScheduler::StreamProducer`. - absl::optional Produce(TimeMs now, + absl::optional Produce(webrtc::Timestamp now, size_t max_size) override; size_t bytes_to_send_in_next_message() const override; @@ -265,7 +266,7 @@ class RRSendQueue : public SendQueue { OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); absl::optional Produce( std::map::iterator it, - TimeMs now, + webrtc::Timestamp now, size_t max_size); const absl::string_view log_prefix_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 9d6da7bdff..632cd8fc19 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -29,8 +29,10 @@ namespace dcsctp { namespace { using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; -constexpr TimeMs kNow = TimeMs(0); +constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); constexpr size_t kMaxQueueSize = 1000; @@ -181,9 +183,9 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { std::vector payload(20); // Default is no expiry - TimeMs now = kNow; + Timestamp now = kNow; buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); - now += DurationMs(1000000); + now += TimeDelta::Seconds(1000); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); SendOptions expires_2_seconds; @@ -191,17 +193,17 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { // Add and consume within lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(2000); + now += TimeDelta::Millis(2000); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); // Add and consume just outside lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(2001); + now += TimeDelta::Millis(2001); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // A long time after expiry buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(1000000); + now += TimeDelta::Seconds(1000); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // Expire one message, but produce the second that is not expired. @@ -211,7 +213,7 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { expires_4_seconds.lifetime = DurationMs(4000); buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); - now += DurationMs(2001); + now += TimeDelta::Millis(2001); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); @@ -846,8 +848,9 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) { EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1), /*maybe_delivered=*/false)); EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1))); - EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize) - .has_value()); + EXPECT_FALSE( + buf_.Produce(kNow + TimeDelta::Millis(1001), kOneFragmentPacketSize) + .has_value()); } TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) { diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h index 48eaefaf6a..d0d834c901 100644 --- a/net/dcsctp/tx/send_queue.h +++ b/net/dcsctp/tx/send_queue.h @@ -17,6 +17,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/types.h" @@ -37,7 +38,7 @@ class SendQueue { // Partial reliability - RFC3758 MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(); - TimeMs expires_at = TimeMs::InfiniteFuture(); + webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(); // Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for // all other fragments. @@ -55,7 +56,8 @@ class SendQueue { // // `max_size` refers to how many payload bytes that may be produced, not // including any headers. - virtual absl::optional Produce(TimeMs now, size_t max_size) = 0; + virtual absl::optional Produce(webrtc::Timestamp now, + size_t max_size) = 0; // Discards a partially sent message identified by the parameters // `stream_id` and `message_id`. The `message_id` comes from the returned diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc index c1d220aaa2..6c51e1e553 100644 --- a/net/dcsctp/tx/stream_scheduler.cc +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -31,7 +31,7 @@ void StreamScheduler::Stream::SetPriority(StreamPriority priority) { } absl::optional StreamScheduler::Produce( - TimeMs now, + webrtc::Timestamp now, size_t max_size) { // For non-interleaved streams, avoid rescheduling while still sending a // message as it needs to be sent in full. For interleaved messaging, @@ -127,7 +127,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime( } absl::optional StreamScheduler::Stream::Produce( - TimeMs now, + webrtc::Timestamp now, size_t max_size) { absl::optional data = producer_.Produce(now, max_size); diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h index ce836a5826..9d76fc6f56 100644 --- a/net/dcsctp/tx/stream_scheduler.h +++ b/net/dcsctp/tx/stream_scheduler.h @@ -87,7 +87,7 @@ class StreamScheduler { // The parameter `max_size` specifies the maximum amount of actual payload // that may be returned. If these constraints prevents the stream from // sending some data, `absl::nullopt` should be returned. - virtual absl::optional Produce(TimeMs now, + virtual absl::optional Produce(webrtc::Timestamp now, size_t max_size) = 0; // Returns the number of payload bytes that is scheduled to be sent in the @@ -132,7 +132,8 @@ class StreamScheduler { // Produces a message from this stream. This will only be called on streams // that have data. - absl::optional Produce(TimeMs now, size_t max_size); + absl::optional Produce(webrtc::Timestamp now, + size_t max_size); void MakeActive(size_t bytes_to_send_next); void ForceMarkInactive(); @@ -180,7 +181,8 @@ class StreamScheduler { // `now` and will be used to skip chunks with expired limited lifetime. The // parameter `max_size` specifies the maximum amount of actual payload that // may be returned. If no data can be produced, `absl::nullopt` is returned. - absl::optional Produce(TimeMs now, size_t max_size); + absl::optional Produce(webrtc::Timestamp now, + size_t max_size); std::set ActiveStreamsForTesting() const; diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc index 4f5fb0fb84..42d0b3cd35 100644 --- a/net/dcsctp/tx/stream_scheduler_test.cc +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -19,9 +19,11 @@ namespace dcsctp { namespace { using ::testing::Return; using ::testing::StrictMock; +using ::webrtc::Timestamp; constexpr size_t kMtu = 1000; constexpr size_t kPayloadSize = 4; +constexpr Timestamp kNow = Timestamp::Zero(); MATCHER_P(HasDataWithMid, mid, "") { if (!arg.has_value()) { @@ -38,12 +40,12 @@ MATCHER_P(HasDataWithMid, mid, "") { return true; } -std::function(TimeMs, size_t)> +std::function(Timestamp, size_t)> CreateChunk(OutgoingMessageId message_id, StreamID sid, MID mid, size_t payload_size = kPayloadSize) { - return [sid, mid, payload_size, message_id](TimeMs now, size_t max_size) { + return [sid, mid, payload_size, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend( message_id, Data(sid, SSN(0), mid, FSN(0), PPID(42), @@ -56,8 +58,7 @@ std::map GetPacketCounts(StreamScheduler& scheduler, size_t packets_to_generate) { std::map packet_counts; for (size_t i = 0; i < packets_to_generate; ++i) { - absl::optional data = - scheduler.Produce(TimeMs(0), kMtu); + absl::optional data = scheduler.Produce(kNow, kMtu); if (data.has_value()) { ++packet_counts[data->data.stream_id]; } @@ -69,7 +70,7 @@ class MockStreamProducer : public StreamScheduler::StreamProducer { public: MOCK_METHOD(absl::optional, Produce, - (TimeMs, size_t), + (Timestamp, size_t), (override)); MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override)); }; @@ -100,7 +101,7 @@ class TestStream { TEST(StreamSchedulerTest, HasNoActiveStreams) { StreamScheduler scheduler("", kMtu); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Stream properties can be set and retrieved @@ -132,8 +133,8 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); stream->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Switches between two streams after every packet. @@ -168,13 +169,13 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Switches between two streams after every packet, but keeps producing from the @@ -232,15 +233,15 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Deactivates a stream before it has finished producing all packets. @@ -259,12 +260,12 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // ... but the stream is made inactive before it can be produced. stream1->MakeInactive(); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Resumes a paused stream - makes a stream active after inactivating it. @@ -287,14 +288,14 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); stream1->MakeInactive(); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Iterates between streams, where one is suddenly paused and later resumed. @@ -330,15 +331,15 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); stream1->MakeInactive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Verifies that packet counts are evenly distributed in round robin scheduling. @@ -427,18 +428,18 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { stream2->MaybeMakeActive(); // t = 30 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t = 60 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t = 70 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t = 90 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); // t = 140 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t = 210 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Will do weighted fair queuing with three streams having different priority. @@ -492,24 +493,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { stream3->MaybeMakeActive(); // t ~= 20 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300))); // t ~= 40 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301))); // t ~= 50 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t ~= 60 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302))); // t ~= 80 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t ~= 100 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t ~= 150 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); // t ~= 160 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t ~= 240 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Will do weighted fair queuing with three streams having different priority @@ -586,24 +587,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { stream3->MaybeMakeActive(); // t ~= 400 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300))); // t ~= 1400 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301))); // t ~= 2500 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t ~= 2800 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302))); // t ~= 4000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t ~= 5600 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t ~= 6000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t ~= 7000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); // t ~= 11200 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) { // A simple test with two streams of different priority, but sending packets @@ -723,11 +724,11 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { auto stream2 = scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Sending large messages with large MTU will not fragment messages and will @@ -756,9 +757,9 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { auto stream2 = scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } } // namespace