From 658f1814da9bb7fa2f303a588fabff3b98a40abf Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Thu, 16 Jan 2020 10:59:28 +0100 Subject: [PATCH] Reland "Moves TransportFeedbackAdapter to TaskQueue." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a reland of 62d01cde6f6ec1fa91b1e5234a7922ad1a4ce036 Original change's description: > Moves TransportFeedbackAdapter to TaskQueue. > > Bug: webrtc:9883 > Change-Id: Id87e281751d98043f4470df5a71d458f4cd654c1 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158793 > Commit-Queue: Sebastian Jansson > Reviewed-by: Erik Språng > Cr-Commit-Position: refs/heads/master@{#30037} Bug: webrtc:9883 Change-Id: Icc63883903b283d490e9d4ed455e0eca69ed2074 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162000 Commit-Queue: Sebastian Jansson Reviewed-by: Erik Språng Cr-Commit-Position: refs/heads/master@{#30285} --- call/rtp_transport_controller_send.cc | 72 +++++++++---------- call/rtp_transport_controller_send.h | 13 ++-- .../bbr/bbr_network_controller_unittest.cc | 4 +- .../rtp/transport_feedback_adapter.cc | 39 +++++----- .../rtp/transport_feedback_adapter.h | 26 ++++--- 5 files changed, 71 insertions(+), 83 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 76dbc459ed..62b7008396 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -22,6 +22,7 @@ #include "call/rtp_video_sender.h" #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h" #include "logging/rtc_event_log/events/rtc_event_route_change.h" +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/rate_limiter.h" @@ -278,11 +279,6 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( << " bps."; RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0); - if (reset_feedback_on_route_change_) - transport_feedback_adapter_.SetNetworkIds( - network_route.local_network_id, network_route.remote_network_id); - transport_overhead_bytes_per_packet_ = network_route.packet_overhead; - if (event_log_) { event_log_->Log(std::make_unique( network_route.connected, network_route.packet_overhead)); @@ -290,8 +286,13 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( NetworkRouteChange msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.constraints = ConvertConstraints(bitrate_config, clock_); - task_queue_.PostTask([this, msg] { + task_queue_.PostTask([this, msg, network_route] { RTC_DCHECK_RUN_ON(&task_queue_); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; + if (reset_feedback_on_route_change_) { + transport_feedback_adapter_.SetNetworkIds( + network_route.local_network_id, network_route.remote_network_id); + } if (controller_) { PostUpdates(controller_->OnNetworkRouteChange(msg)); } else { @@ -351,17 +352,15 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { - absl::optional packet_msg = - transport_feedback_adapter_.ProcessSentPacket(sent_packet); - if (packet_msg) { - task_queue_.PostTask([this, packet_msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnSentPacket(*packet_msg)); - }); - } - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + task_queue_.PostTask([this, sent_packet]() { + RTC_DCHECK_RUN_ON(&task_queue_); + absl::optional packet_msg = + transport_feedback_adapter_.ProcessSentPacket(sent_packet); + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); + if (packet_msg && controller_) + PostUpdates(controller_->OnSentPacket(*packet_msg)); + }); } void RtpTransportControllerSend::OnReceivedPacket( @@ -470,30 +469,31 @@ void RtpTransportControllerSend::OnAddPacket( const RtpPacketSendInfo& packet_info) { feedback_demuxer_.AddPacket(packet_info); - transport_feedback_adapter_.AddPacket( - packet_info, - send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load() - : 0, - Timestamp::ms(clock_->TimeInMilliseconds())); + Timestamp creation_time = Timestamp::ms(clock_->TimeInMilliseconds()); + task_queue_.PostTask([this, packet_info, creation_time]() { + RTC_DCHECK_RUN_ON(&task_queue_); + transport_feedback_adapter_.AddPacket( + packet_info, + send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0, + creation_time); + }); } void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); feedback_demuxer_.OnTransportFeedback(feedback); - - absl::optional feedback_msg = - transport_feedback_adapter_.ProcessTransportFeedback( - feedback, Timestamp::ms(clock_->TimeInMilliseconds())); - if (feedback_msg) { - task_queue_.PostTask([this, feedback_msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); - }); - } - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + auto feedback_time = Timestamp::ms(clock_->TimeInMilliseconds()); + task_queue_.PostTask([this, feedback, feedback_time]() { + RTC_DCHECK_RUN_ON(&task_queue_); + absl::optional feedback_msg = + transport_feedback_adapter_.ProcessTransportFeedback(feedback, + feedback_time); + if (feedback_msg && controller_) { + PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); + } + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); + }); } void RtpTransportControllerSend::OnRemoteNetworkEstimate( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 4e9ff4de4e..f74c4e598f 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -152,8 +152,8 @@ class RtpTransportControllerSend final TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); TransportFeedbackDemuxer feedback_demuxer_; - // TODO(srte): Move all access to feedback adapter to task queue. - TransportFeedbackAdapter transport_feedback_adapter_; + TransportFeedbackAdapter transport_feedback_adapter_ + RTC_GUARDED_BY(task_queue_); NetworkControllerFactoryInterface* const controller_factory_override_ RTC_PT_GUARDED_BY(task_queue_); @@ -178,16 +178,13 @@ class RtpTransportControllerSend final const bool reset_feedback_on_route_change_; const bool send_side_bwe_with_overhead_; const bool add_pacing_to_cwin_; - // Transport overhead is written by OnNetworkRouteChanged and read by - // AddPacket. - // TODO(srte): Remove atomic when feedback adapter runs on task queue. - std::atomic transport_overhead_bytes_per_packet_; + + size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_); bool network_available_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); - // TODO(srte): Remove this checker when feedback adapter runs on task queue. - rtc::RaceChecker worker_race_; + // Protected by internal locks. RateLimiter retransmission_rate_limiter_; // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. diff --git a/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc b/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc index 2a8a224a81..8cf4d17a9f 100644 --- a/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc +++ b/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc @@ -155,8 +155,8 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) { ret_net->UpdateConfig( [](NetworkSimulationConfig* c) { c->delay = TimeDelta::ms(200); }); - s.RunFor(TimeDelta::seconds(40)); - EXPECT_NEAR(client->send_bandwidth().kbps(), 200, 40); + s.RunFor(TimeDelta::seconds(35)); + EXPECT_NEAR(client->send_bandwidth().kbps(), 180, 50); } } // namespace test diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index efb88d21a9..b1de93559c 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -70,31 +70,27 @@ TransportFeedbackAdapter::TransportFeedbackAdapter() = default; void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, Timestamp creation_time) { - { - rtc::CritScope cs(&lock_); - PacketFeedback packet; - packet.creation_time = creation_time; - packet.sent.sequence_number = - seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); - packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); - packet.local_net_id = local_net_id_; - packet.remote_net_id = remote_net_id_; - packet.sent.pacing_info = packet_info.pacing_info; + PacketFeedback packet; + packet.creation_time = creation_time; + packet.sent.sequence_number = + seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); + packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); + packet.local_net_id = local_net_id_; + packet.remote_net_id = remote_net_id_; + packet.sent.pacing_info = packet_info.pacing_info; - while (!history_.empty() && - creation_time - history_.begin()->second.creation_time > - kSendTimeHistoryWindow) { - // TODO(sprang): Warn if erasing (too many) old items? - if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) - in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); - history_.erase(history_.begin()); - } - history_.insert(std::make_pair(packet.sent.sequence_number, packet)); + while (!history_.empty() && + creation_time - history_.begin()->second.creation_time > + kSendTimeHistoryWindow) { + // TODO(sprang): Warn if erasing (too many) old items? + if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) + in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); + history_.erase(history_.begin()); } + history_.insert(std::make_pair(packet.sent.sequence_number, packet)); } absl::optional TransportFeedbackAdapter::ProcessSentPacket( const rtc::SentPacket& sent_packet) { - rtc::CritScope cs(&lock_); auto send_time = Timestamp::ms(sent_packet.send_time_ms); // TODO(srte): Only use one way to indicate that packet feedback is used. if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) { @@ -141,7 +137,6 @@ TransportFeedbackAdapter::ProcessTransportFeedback( return absl::nullopt; } - rtc::CritScope cs(&lock_); TransportPacketsFeedback msg; msg.feedback_time = feedback_receive_time; @@ -164,13 +159,11 @@ TransportFeedbackAdapter::ProcessTransportFeedback( void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, uint16_t remote_id) { - rtc::CritScope cs(&lock_); local_net_id_ = local_id; remote_net_id_ = remote_id; } DataSize TransportFeedbackAdapter::GetOutstandingData() const { - rtc::CritScope cs(&lock_); return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h index b6bed96711..c8ff9b9db5 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.h +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h @@ -75,26 +75,24 @@ class TransportFeedbackAdapter { std::vector ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, - Timestamp feedback_time) RTC_RUN_ON(&lock_); + Timestamp feedback_time); - rtc::CriticalSection lock_; - DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero(); - Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); - Timestamp last_untracked_send_time_ RTC_GUARDED_BY(&lock_) = - Timestamp::MinusInfinity(); - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); - std::map history_ RTC_GUARDED_BY(&lock_); + DataSize pending_untracked_size_ = DataSize::Zero(); + Timestamp last_send_time_ = Timestamp::MinusInfinity(); + Timestamp last_untracked_send_time_ = Timestamp::MinusInfinity(); + SequenceNumberUnwrapper seq_num_unwrapper_; + std::map history_; // Sequence numbers are never negative, using -1 as it always < a real // sequence number. - int64_t last_ack_seq_num_ RTC_GUARDED_BY(&lock_) = -1; - InFlightBytesTracker in_flight_ RTC_GUARDED_BY(&lock_); + int64_t last_ack_seq_num_ = -1; + InFlightBytesTracker in_flight_; - Timestamp current_offset_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); - TimeDelta last_timestamp_ RTC_GUARDED_BY(&lock_) = TimeDelta::MinusInfinity(); + Timestamp current_offset_ = Timestamp::MinusInfinity(); + TimeDelta last_timestamp_ = TimeDelta::MinusInfinity(); - uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0; - uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0; + uint16_t local_net_id_ = 0; + uint16_t remote_net_id_ = 0; }; } // namespace webrtc