User Timestamp and TimeDelta instead of raw ints in RtpSenderEgress

Bug: webrtc:13757
Change-Id: I5244ed1148f628df9482f934fdfb509e511a9856
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/291103
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Emil Lundmark <lndmrk@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39151}
This commit is contained in:
Danil Chapovalov 2023-01-19 12:59:55 +01:00 committed by WebRTC LUCI CQ
parent bd1e5d5aa5
commit 4cb2ac0e30
2 changed files with 53 additions and 74 deletions

View file

@ -23,7 +23,7 @@
namespace webrtc { namespace webrtc {
namespace { namespace {
constexpr uint32_t kTimestampTicksPerMs = 90; constexpr uint32_t kTimestampTicksPerMs = 90;
constexpr int kSendSideDelayWindowMs = 1000; constexpr TimeDelta kSendSideDelayWindow = TimeDelta::Seconds(1);
constexpr int kBitrateStatisticsWindowMs = 1000; constexpr int kBitrateStatisticsWindowMs = 1000;
constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13; constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
constexpr TimeDelta kUpdateInterval = constexpr TimeDelta kUpdateInterval =
@ -91,7 +91,7 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
force_part_of_allocation_(false), force_part_of_allocation_(false),
timestamp_offset_(0), timestamp_offset_(0),
max_delay_it_(send_delays_.end()), max_delay_it_(send_delays_.end()),
sum_delays_ms_(0), sum_delays_(TimeDelta::Zero()),
send_rates_(kNumMediaTypes, send_rates_(kNumMediaTypes,
{kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}), {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
rtp_sequence_number_map_(need_rtp_packet_infos_ rtp_sequence_number_map_(need_rtp_packet_infos_
@ -144,10 +144,9 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const Timestamp now = clock_->CurrentTime(); const Timestamp now = clock_->CurrentTime();
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
worker_queue_->PostTask( worker_queue_->PostTask(SafeTask(
SafeTask(task_safety_.flag(), [this, now, packet_ssrc]() { task_safety_.flag(),
BweTestLoggingPlot(now.ms(), packet_ssrc); [this, now, packet_ssrc]() { BweTestLoggingPlot(now, packet_ssrc); }));
}));
#endif #endif
if (need_rtp_packet_infos_ && if (need_rtp_packet_infos_ &&
@ -250,9 +249,8 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
if (packet->packet_type() != RtpPacketMediaType::kPadding && if (packet->packet_type() != RtpPacketMediaType::kPadding &&
packet->packet_type() != RtpPacketMediaType::kRetransmission) { packet->packet_type() != RtpPacketMediaType::kRetransmission) {
UpdateDelayStatistics(packet->capture_time().ms(), now.ms(), packet_ssrc); UpdateDelayStatistics(packet->capture_time(), now, packet_ssrc);
UpdateOnSendPacket(options.packet_id, packet->capture_time().ms(), UpdateOnSendPacket(options.packet_id, packet->capture_time(), packet_ssrc);
packet_ssrc);
} }
const bool send_success = SendPacketToNetwork(*packet, options, pacing_info); const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
@ -283,7 +281,7 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type, SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type,
counter = std::move(counter), size]() { counter = std::move(counter), size]() {
RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
UpdateRtpStats(now.ms(), packet_ssrc, packet_type, std::move(counter), UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter),
size); size);
})); }));
} }
@ -291,16 +289,15 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
RtpSendRates RtpSenderEgress::GetSendRates() const { RtpSendRates RtpSenderEgress::GetSendRates() const {
MutexLock lock(&lock_); MutexLock lock(&lock_);
const int64_t now_ms = clock_->TimeInMilliseconds(); return GetSendRatesLocked(clock_->CurrentTime());
return GetSendRatesLocked(now_ms);
} }
RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const { RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const {
RtpSendRates current_rates; RtpSendRates current_rates;
for (size_t i = 0; i < kNumMediaTypes; ++i) { for (size_t i = 0; i < kNumMediaTypes; ++i) {
RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i); RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
current_rates[type] = current_rates[type] =
DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0)); DataRate::BitsPerSec(send_rates_[i].Rate(now.ms()).value_or(0));
} }
return current_rates; return current_rates;
} }
@ -441,14 +438,14 @@ void RtpSenderEgress::AddPacketToTransportFeedback(
} }
} }
void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms, void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time,
int64_t now_ms, Timestamp now,
uint32_t ssrc) { uint32_t ssrc) {
if (!send_side_delay_observer_ || capture_time_ms <= 0) if (!send_side_delay_observer_ || capture_time.IsInfinite())
return; return;
int avg_delay_ms = 0; TimeDelta avg_delay = TimeDelta::Zero();
int max_delay_ms = 0; TimeDelta max_delay = TimeDelta::Zero();
{ {
MutexLock lock(&lock_); MutexLock lock(&lock_);
// Compute the max and average of the recent capture-to-send delays. // Compute the max and average of the recent capture-to-send delays.
@ -456,13 +453,12 @@ void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
// of the delay values. This could be done more efficiently. // of the delay values. This could be done more efficiently.
// Remove elements older than kSendSideDelayWindowMs. // Remove elements older than kSendSideDelayWindowMs.
auto lower_bound = auto lower_bound = send_delays_.lower_bound(now - kSendSideDelayWindow);
send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
for (auto it = send_delays_.begin(); it != lower_bound; ++it) { for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
if (max_delay_it_ == it) { if (max_delay_it_ == it) {
max_delay_it_ = send_delays_.end(); max_delay_it_ = send_delays_.end();
} }
sum_delays_ms_ -= it->second; sum_delays_ -= it->second;
} }
send_delays_.erase(send_delays_.begin(), lower_bound); send_delays_.erase(send_delays_.begin(), lower_bound);
if (max_delay_it_ == send_delays_.end()) { if (max_delay_it_ == send_delays_.end()) {
@ -471,24 +467,14 @@ void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
} }
// Add the new element. // Add the new element.
RTC_DCHECK_GE(now_ms, 0); TimeDelta new_send_delay = now - capture_time;
RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2); auto [it, inserted] = send_delays_.emplace(now, new_send_delay);
RTC_DCHECK_GE(capture_time_ms, 0);
RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
int64_t diff_ms = now_ms - capture_time_ms;
RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
SendDelayMap::iterator it;
bool inserted;
std::tie(it, inserted) =
send_delays_.insert(std::make_pair(now_ms, new_send_delay));
if (!inserted) { if (!inserted) {
// TODO(terelius): If we have multiple delay measurements during the same // TODO(terelius): If we have multiple delay measurements during the same
// millisecond then we keep the most recent one. It is not clear that this // millisecond then we keep the most recent one. It is not clear that this
// is the right decision, but it preserves an earlier behavior. // is the right decision, but it preserves an earlier behavior.
int previous_send_delay = it->second; TimeDelta previous_send_delay = it->second;
sum_delays_ms_ -= previous_send_delay; sum_delays_ -= previous_send_delay;
it->second = new_send_delay; it->second = new_send_delay;
if (max_delay_it_ == it && new_send_delay < previous_send_delay) { if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
RecomputeMaxSendDelay(); RecomputeMaxSendDelay();
@ -498,20 +484,15 @@ void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
it->second >= max_delay_it_->second) { it->second >= max_delay_it_->second) {
max_delay_it_ = it; max_delay_it_ = it;
} }
sum_delays_ms_ += new_send_delay; sum_delays_ += new_send_delay;
size_t num_delays = send_delays_.size(); size_t num_delays = send_delays_.size();
RTC_DCHECK(max_delay_it_ != send_delays_.end()); RTC_DCHECK(max_delay_it_ != send_delays_.end());
max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second); max_delay = max_delay_it_->second;
int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays; avg_delay = sum_delays_ / num_delays;
RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
RTC_DCHECK_LE(avg_ms,
static_cast<int64_t>(std::numeric_limits<int>::max()));
avg_delay_ms =
rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
} }
send_side_delay_observer_->SendSideDelayUpdated(avg_delay_ms, max_delay_ms, send_side_delay_observer_->SendSideDelayUpdated(avg_delay.ms(),
ssrc); max_delay.ms(), ssrc);
} }
void RtpSenderEgress::RecomputeMaxSendDelay() { void RtpSenderEgress::RecomputeMaxSendDelay() {
@ -524,13 +505,13 @@ void RtpSenderEgress::RecomputeMaxSendDelay() {
} }
void RtpSenderEgress::UpdateOnSendPacket(int packet_id, void RtpSenderEgress::UpdateOnSendPacket(int packet_id,
int64_t capture_time_ms, Timestamp capture_time,
uint32_t ssrc) { uint32_t ssrc) {
if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) { if (!send_packet_observer_ || capture_time.IsInfinite() || packet_id == -1) {
return; return;
} }
send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc); send_packet_observer_->OnSendPacket(packet_id, capture_time.ms(), ssrc);
} }
bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
@ -554,7 +535,7 @@ bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
return true; return true;
} }
void RtpSenderEgress::UpdateRtpStats(int64_t now_ms, void RtpSenderEgress::UpdateRtpStats(Timestamp now,
uint32_t packet_ssrc, uint32_t packet_ssrc,
RtpPacketMediaType packet_type, RtpPacketMediaType packet_type,
RtpPacketCounter counter, RtpPacketCounter counter,
@ -573,7 +554,7 @@ void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
if (counters->first_packet_time_ms == -1) { if (counters->first_packet_time_ms == -1) {
counters->first_packet_time_ms = now_ms; counters->first_packet_time_ms = now.ms();
} }
if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
@ -583,9 +564,9 @@ void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
} }
counters->transmitted.Add(counter); counters->transmitted.Add(counter);
send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now_ms); send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now.ms());
if (bitrate_callback_) { if (bitrate_callback_) {
send_rates = GetSendRatesLocked(now_ms); send_rates = GetSendRatesLocked(now);
} }
if (rtp_stats_callback_) { if (rtp_stats_callback_) {
@ -612,21 +593,21 @@ void RtpSenderEgress::PeriodicUpdate() {
} }
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
void RtpSenderEgress::BweTestLoggingPlot(int64_t now_ms, uint32_t packet_ssrc) { void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) {
RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
const auto rates = GetSendRates(); const auto rates = GetSendRates();
if (is_audio_) { if (is_audio_) {
BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms, BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now.ms(),
rates.Sum().kbps(), packet_ssrc); rates.Sum().kbps(), packet_ssrc);
BWE_TEST_LOGGING_PLOT_WITH_SSRC( BWE_TEST_LOGGING_PLOT_WITH_SSRC(
1, "AudioNackBitrate_kbps", now_ms, 1, "AudioNackBitrate_kbps", now.ms(),
rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc); rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
} else { } else {
BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms, BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now.ms(),
rates.Sum().kbps(), packet_ssrc); rates.Sum().kbps(), packet_ssrc);
BWE_TEST_LOGGING_PLOT_WITH_SSRC( BWE_TEST_LOGGING_PLOT_WITH_SSRC(
1, "VideoNackBitrate_kbps", now_ms, 1, "VideoNackBitrate_kbps", now.ms(),
rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc); rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
} }
} }

View file

@ -23,6 +23,8 @@
#include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/units/data_rate.h" #include "api/units/data_rate.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/remote_bitrate_estimator/test/bwe_test_logging.h" #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/packet_sequencer.h" #include "modules/rtp_rtcp/source/packet_sequencer.h"
@ -98,36 +100,29 @@ class RtpSenderEgress {
rtc::ArrayView<const uint16_t> sequence_numbers); rtc::ArrayView<const uint16_t> sequence_numbers);
private: private:
// Maps capture time in milliseconds to send-side delay in milliseconds. RtpSendRates GetSendRatesLocked(Timestamp now) const
// Send-side delay is the difference between transmission time and capture
// time.
typedef std::map<int64_t, int> SendDelayMap;
RtpSendRates GetSendRatesLocked(int64_t now_ms) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
bool HasCorrectSsrc(const RtpPacketToSend& packet) const; bool HasCorrectSsrc(const RtpPacketToSend& packet) const;
void AddPacketToTransportFeedback(uint16_t packet_id, void AddPacketToTransportFeedback(uint16_t packet_id,
const RtpPacketToSend& packet, const RtpPacketToSend& packet,
const PacedPacketInfo& pacing_info); const PacedPacketInfo& pacing_info);
void UpdateDelayStatistics(int64_t capture_time_ms, void UpdateDelayStatistics(Timestamp capture_time,
int64_t now_ms, Timestamp now,
uint32_t ssrc); uint32_t ssrc);
void RecomputeMaxSendDelay() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); void RecomputeMaxSendDelay() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void UpdateOnSendPacket(int packet_id, void UpdateOnSendPacket(int packet_id, Timestamp capture_time, uint32_t ssrc);
int64_t capture_time_ms,
uint32_t ssrc);
// Sends packet on to `transport_`, leaving the RTP module. // Sends packet on to `transport_`, leaving the RTP module.
bool SendPacketToNetwork(const RtpPacketToSend& packet, bool SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options, const PacketOptions& options,
const PacedPacketInfo& pacing_info); const PacedPacketInfo& pacing_info);
void UpdateRtpStats(int64_t now_ms, void UpdateRtpStats(Timestamp now,
uint32_t packet_ssrc, uint32_t packet_ssrc,
RtpPacketMediaType packet_type, RtpPacketMediaType packet_type,
RtpPacketCounter counter, RtpPacketCounter counter,
size_t packet_size); size_t packet_size);
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
void BweTestLoggingPlot(int64_t now_ms, uint32_t packet_ssrc); void BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc);
#endif #endif
// Called on a timer, once a second, on the worker_queue_. // Called on a timer, once a second, on the worker_queue_.
@ -162,10 +157,13 @@ class RtpSenderEgress {
bool force_part_of_allocation_ RTC_GUARDED_BY(lock_); bool force_part_of_allocation_ RTC_GUARDED_BY(lock_);
uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_); uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_);
SendDelayMap send_delays_ RTC_GUARDED_BY(lock_); // Maps capture time to send-side delay. Send-side delay is the difference
SendDelayMap::const_iterator max_delay_it_ RTC_GUARDED_BY(lock_); // between transmission time and capture time.
std::map<Timestamp, TimeDelta> send_delays_ RTC_GUARDED_BY(lock_);
std::map<Timestamp, TimeDelta>::const_iterator max_delay_it_
RTC_GUARDED_BY(lock_);
// The sum of delays over a kSendSideDelayWindowMs sliding window. // The sum of delays over a kSendSideDelayWindowMs sliding window.
int64_t sum_delays_ms_ RTC_GUARDED_BY(lock_); TimeDelta sum_delays_ RTC_GUARDED_BY(lock_);
StreamDataCounters rtp_stats_ RTC_GUARDED_BY(lock_); StreamDataCounters rtp_stats_ RTC_GUARDED_BY(lock_);
StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(lock_); StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(lock_);
// One element per value in RtpPacketMediaType, with index matching value. // One element per value in RtpPacketMediaType, with index matching value.