mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-19 08:37:54 +01:00
Reland "Moves TransportFeedbackAdapter to TaskQueue."
This is a reland of 62d01cde6f
Original change's description:
> Moves TransportFeedbackAdapter to TaskQueue.
>
> Bug: webrtc:9883
> Change-Id: Id87e281751d98043f4470df5a71d458f4cd654c1
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158793
> Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#30037}
Bug: webrtc:9883
Change-Id: Icc63883903b283d490e9d4ed455e0eca69ed2074
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162000
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30285}
This commit is contained in:
parent
c0f25cf762
commit
658f1814da
5 changed files with 71 additions and 83 deletions
|
@ -22,6 +22,7 @@
|
|||
#include "call/rtp_video_sender.h"
|
||||
#include "logging/rtc_event_log/events/rtc_event_remote_estimate.h"
|
||||
#include "logging/rtc_event_log/events/rtc_event_route_change.h"
|
||||
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/rate_limiter.h"
|
||||
|
@ -278,11 +279,6 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
|
|||
<< " bps.";
|
||||
RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
|
||||
|
||||
if (reset_feedback_on_route_change_)
|
||||
transport_feedback_adapter_.SetNetworkIds(
|
||||
network_route.local_network_id, network_route.remote_network_id);
|
||||
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
|
||||
|
||||
if (event_log_) {
|
||||
event_log_->Log(std::make_unique<RtcEventRouteChange>(
|
||||
network_route.connected, network_route.packet_overhead));
|
||||
|
@ -290,8 +286,13 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
|
|||
NetworkRouteChange msg;
|
||||
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
|
||||
msg.constraints = ConvertConstraints(bitrate_config, clock_);
|
||||
task_queue_.PostTask([this, msg] {
|
||||
task_queue_.PostTask([this, msg, network_route] {
|
||||
RTC_DCHECK_RUN_ON(&task_queue_);
|
||||
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
|
||||
if (reset_feedback_on_route_change_) {
|
||||
transport_feedback_adapter_.SetNetworkIds(
|
||||
network_route.local_network_id, network_route.remote_network_id);
|
||||
}
|
||||
if (controller_) {
|
||||
PostUpdates(controller_->OnNetworkRouteChange(msg));
|
||||
} else {
|
||||
|
@ -351,17 +352,15 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
|
|||
}
|
||||
void RtpTransportControllerSend::OnSentPacket(
|
||||
const rtc::SentPacket& sent_packet) {
|
||||
task_queue_.PostTask([this, sent_packet]() {
|
||||
RTC_DCHECK_RUN_ON(&task_queue_);
|
||||
absl::optional<SentPacket> packet_msg =
|
||||
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
|
||||
if (packet_msg) {
|
||||
task_queue_.PostTask([this, packet_msg]() {
|
||||
RTC_DCHECK_RUN_ON(&task_queue_);
|
||||
if (controller_)
|
||||
PostUpdates(controller_->OnSentPacket(*packet_msg));
|
||||
});
|
||||
}
|
||||
pacer()->UpdateOutstandingData(
|
||||
transport_feedback_adapter_.GetOutstandingData());
|
||||
if (packet_msg && controller_)
|
||||
PostUpdates(controller_->OnSentPacket(*packet_msg));
|
||||
});
|
||||
}
|
||||
|
||||
void RtpTransportControllerSend::OnReceivedPacket(
|
||||
|
@ -470,30 +469,31 @@ void RtpTransportControllerSend::OnAddPacket(
|
|||
const RtpPacketSendInfo& packet_info) {
|
||||
feedback_demuxer_.AddPacket(packet_info);
|
||||
|
||||
Timestamp creation_time = Timestamp::ms(clock_->TimeInMilliseconds());
|
||||
task_queue_.PostTask([this, packet_info, creation_time]() {
|
||||
RTC_DCHECK_RUN_ON(&task_queue_);
|
||||
transport_feedback_adapter_.AddPacket(
|
||||
packet_info,
|
||||
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load()
|
||||
: 0,
|
||||
Timestamp::ms(clock_->TimeInMilliseconds()));
|
||||
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0,
|
||||
creation_time);
|
||||
});
|
||||
}
|
||||
|
||||
void RtpTransportControllerSend::OnTransportFeedback(
|
||||
const rtcp::TransportFeedback& feedback) {
|
||||
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
|
||||
feedback_demuxer_.OnTransportFeedback(feedback);
|
||||
|
||||
absl::optional<TransportPacketsFeedback> feedback_msg =
|
||||
transport_feedback_adapter_.ProcessTransportFeedback(
|
||||
feedback, Timestamp::ms(clock_->TimeInMilliseconds()));
|
||||
if (feedback_msg) {
|
||||
task_queue_.PostTask([this, feedback_msg]() {
|
||||
auto feedback_time = Timestamp::ms(clock_->TimeInMilliseconds());
|
||||
task_queue_.PostTask([this, feedback, feedback_time]() {
|
||||
RTC_DCHECK_RUN_ON(&task_queue_);
|
||||
if (controller_)
|
||||
absl::optional<TransportPacketsFeedback> feedback_msg =
|
||||
transport_feedback_adapter_.ProcessTransportFeedback(feedback,
|
||||
feedback_time);
|
||||
if (feedback_msg && controller_) {
|
||||
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
|
||||
});
|
||||
}
|
||||
pacer()->UpdateOutstandingData(
|
||||
transport_feedback_adapter_.GetOutstandingData());
|
||||
});
|
||||
}
|
||||
|
||||
void RtpTransportControllerSend::OnRemoteNetworkEstimate(
|
||||
|
|
|
@ -152,8 +152,8 @@ class RtpTransportControllerSend final
|
|||
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
|
||||
TransportFeedbackDemuxer feedback_demuxer_;
|
||||
|
||||
// TODO(srte): Move all access to feedback adapter to task queue.
|
||||
TransportFeedbackAdapter transport_feedback_adapter_;
|
||||
TransportFeedbackAdapter transport_feedback_adapter_
|
||||
RTC_GUARDED_BY(task_queue_);
|
||||
|
||||
NetworkControllerFactoryInterface* const controller_factory_override_
|
||||
RTC_PT_GUARDED_BY(task_queue_);
|
||||
|
@ -178,16 +178,13 @@ class RtpTransportControllerSend final
|
|||
const bool reset_feedback_on_route_change_;
|
||||
const bool send_side_bwe_with_overhead_;
|
||||
const bool add_pacing_to_cwin_;
|
||||
// Transport overhead is written by OnNetworkRouteChanged and read by
|
||||
// AddPacket.
|
||||
// TODO(srte): Remove atomic when feedback adapter runs on task queue.
|
||||
std::atomic<size_t> transport_overhead_bytes_per_packet_;
|
||||
|
||||
size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_);
|
||||
bool network_available_ RTC_GUARDED_BY(task_queue_);
|
||||
RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_);
|
||||
RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_);
|
||||
// TODO(srte): Remove this checker when feedback adapter runs on task queue.
|
||||
rtc::RaceChecker worker_race_;
|
||||
|
||||
// Protected by internal locks.
|
||||
RateLimiter retransmission_rate_limiter_;
|
||||
|
||||
// TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.
|
||||
|
|
|
@ -155,8 +155,8 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) {
|
|||
ret_net->UpdateConfig(
|
||||
[](NetworkSimulationConfig* c) { c->delay = TimeDelta::ms(200); });
|
||||
|
||||
s.RunFor(TimeDelta::seconds(40));
|
||||
EXPECT_NEAR(client->send_bandwidth().kbps(), 200, 40);
|
||||
s.RunFor(TimeDelta::seconds(35));
|
||||
EXPECT_NEAR(client->send_bandwidth().kbps(), 180, 50);
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
|
|
|
@ -70,8 +70,6 @@ TransportFeedbackAdapter::TransportFeedbackAdapter() = default;
|
|||
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
|
||||
size_t overhead_bytes,
|
||||
Timestamp creation_time) {
|
||||
{
|
||||
rtc::CritScope cs(&lock_);
|
||||
PacketFeedback packet;
|
||||
packet.creation_time = creation_time;
|
||||
packet.sent.sequence_number =
|
||||
|
@ -91,10 +89,8 @@ void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
|
|||
}
|
||||
history_.insert(std::make_pair(packet.sent.sequence_number, packet));
|
||||
}
|
||||
}
|
||||
absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
|
||||
const rtc::SentPacket& sent_packet) {
|
||||
rtc::CritScope cs(&lock_);
|
||||
auto send_time = Timestamp::ms(sent_packet.send_time_ms);
|
||||
// TODO(srte): Only use one way to indicate that packet feedback is used.
|
||||
if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
|
||||
|
@ -141,7 +137,6 @@ TransportFeedbackAdapter::ProcessTransportFeedback(
|
|||
return absl::nullopt;
|
||||
}
|
||||
|
||||
rtc::CritScope cs(&lock_);
|
||||
TransportPacketsFeedback msg;
|
||||
msg.feedback_time = feedback_receive_time;
|
||||
|
||||
|
@ -164,13 +159,11 @@ TransportFeedbackAdapter::ProcessTransportFeedback(
|
|||
|
||||
void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id,
|
||||
uint16_t remote_id) {
|
||||
rtc::CritScope cs(&lock_);
|
||||
local_net_id_ = local_id;
|
||||
remote_net_id_ = remote_id;
|
||||
}
|
||||
|
||||
DataSize TransportFeedbackAdapter::GetOutstandingData() const {
|
||||
rtc::CritScope cs(&lock_);
|
||||
return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
||||
}
|
||||
|
||||
|
|
|
@ -75,26 +75,24 @@ class TransportFeedbackAdapter {
|
|||
|
||||
std::vector<PacketResult> ProcessTransportFeedbackInner(
|
||||
const rtcp::TransportFeedback& feedback,
|
||||
Timestamp feedback_time) RTC_RUN_ON(&lock_);
|
||||
Timestamp feedback_time);
|
||||
|
||||
rtc::CriticalSection lock_;
|
||||
DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero();
|
||||
Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity();
|
||||
Timestamp last_untracked_send_time_ RTC_GUARDED_BY(&lock_) =
|
||||
Timestamp::MinusInfinity();
|
||||
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
|
||||
std::map<int64_t, PacketFeedback> history_ RTC_GUARDED_BY(&lock_);
|
||||
DataSize pending_untracked_size_ = DataSize::Zero();
|
||||
Timestamp last_send_time_ = Timestamp::MinusInfinity();
|
||||
Timestamp last_untracked_send_time_ = Timestamp::MinusInfinity();
|
||||
SequenceNumberUnwrapper seq_num_unwrapper_;
|
||||
std::map<int64_t, PacketFeedback> history_;
|
||||
|
||||
// Sequence numbers are never negative, using -1 as it always < a real
|
||||
// sequence number.
|
||||
int64_t last_ack_seq_num_ RTC_GUARDED_BY(&lock_) = -1;
|
||||
InFlightBytesTracker in_flight_ RTC_GUARDED_BY(&lock_);
|
||||
int64_t last_ack_seq_num_ = -1;
|
||||
InFlightBytesTracker in_flight_;
|
||||
|
||||
Timestamp current_offset_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity();
|
||||
TimeDelta last_timestamp_ RTC_GUARDED_BY(&lock_) = TimeDelta::MinusInfinity();
|
||||
Timestamp current_offset_ = Timestamp::MinusInfinity();
|
||||
TimeDelta last_timestamp_ = TimeDelta::MinusInfinity();
|
||||
|
||||
uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0;
|
||||
uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0;
|
||||
uint16_t local_net_id_ = 0;
|
||||
uint16_t remote_net_id_ = 0;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
|
Loading…
Reference in a new issue