in rtcp::TransportFeedback do not memorise all described packet

Instead generate such info on request

Bug: None
Change-Id: I8c3b54c8acdd0e3df822ecbc313ab8c232de5812
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/269251
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Björn Terelius <terelius@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39207}
This commit is contained in:
Danil Chapovalov 2023-01-26 11:19:09 +01:00 committed by WebRTC LUCI CQ
parent dcb09ff218
commit 6c032cb835
7 changed files with 79 additions and 88 deletions

View file

@ -214,8 +214,7 @@ struct LoggedRtcpPacketPli {
struct LoggedRtcpPacketTransportFeedback {
LoggedRtcpPacketTransportFeedback()
: transport_feedback(/*include_timestamps=*/true, /*include_lost*/ true) {
}
: transport_feedback(/*include_timestamps=*/true) {}
LoggedRtcpPacketTransportFeedback(
Timestamp timestamp,
const rtcp::TransportFeedback& transport_feedback)

View file

@ -2370,28 +2370,26 @@ std::vector<LoggedPacketInfo> ParsedRtcEventLog::GetPacketInfos(
last_feedback_base_time = feedback.BaseTime();
std::vector<LoggedPacketInfo*> packet_feedbacks;
packet_feedbacks.reserve(feedback.GetAllPackets().size());
Timestamp receive_timestamp = feedback_base_time;
packet_feedbacks.reserve(feedback.GetPacketStatusCount());
std::vector<int64_t> unknown_seq_nums;
for (const auto& packet : feedback.GetAllPackets()) {
int64_t unwrapped_seq_num =
seq_num_unwrapper.Unwrap(packet.sequence_number());
feedback.ForAllPackets([&](uint16_t sequence_number,
TimeDelta delta_since_base) {
int64_t unwrapped_seq_num = seq_num_unwrapper.Unwrap(sequence_number);
auto it = indices.find(unwrapped_seq_num);
if (it == indices.end()) {
unknown_seq_nums.push_back(unwrapped_seq_num);
continue;
return;
}
LoggedPacketInfo* sent = &packets[it->second];
if (log_feedback_time - sent->log_packet_time >
TimeDelta::Seconds(60)) {
RTC_LOG(LS_WARNING)
<< "Received very late feedback, possibly due to wraparound.";
continue;
return;
}
if (packet.received()) {
receive_timestamp += packet.delta();
if (delta_since_base.IsFinite()) {
if (sent->reported_recv_time.IsInfinite()) {
sent->reported_recv_time = receive_timestamp;
sent->reported_recv_time = feedback_base_time + delta_since_base;
sent->log_feedback_time = log_feedback_time;
}
} else {
@ -2402,7 +2400,7 @@ std::vector<LoggedPacketInfo> ParsedRtcEventLog::GetPacketInfos(
}
}
packet_feedbacks.push_back(sent);
}
});
if (!unknown_seq_nums.empty()) {
RTC_LOG(LS_WARNING)
<< "Received feedback for unknown packets: "

View file

@ -212,9 +212,10 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
size_t failed_lookups = 0;
size_t ignored = 0;
TimeDelta packet_offset = TimeDelta::Zero();
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num = seq_num_unwrapper_.Unwrap(packet.sequence_number());
feedback.ForAllPackets([&](uint16_t sequence_number,
TimeDelta delta_since_base) {
int64_t seq_num = seq_num_unwrapper_.Unwrap(sequence_number);
if (seq_num > last_ack_seq_num_) {
// Starts at history_.begin() if last_ack_seq_num_ < 0, since any valid
@ -229,7 +230,7 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
auto it = history_.find(seq_num);
if (it == history_.end()) {
++failed_lookups;
continue;
return;
}
if (it->second.sent.send_time.IsInfinite()) {
@ -237,14 +238,13 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
// DCHECK.
RTC_DLOG(LS_ERROR)
<< "Received feedback before packet was indicated as sent";
continue;
return;
}
PacketFeedback packet_feedback = it->second;
if (packet.received()) {
packet_offset += packet.delta();
if (delta_since_base.IsFinite()) {
packet_feedback.receive_time =
current_offset_ + packet_offset.RoundDownTo(TimeDelta::Millis(1));
current_offset_ + delta_since_base.RoundDownTo(TimeDelta::Millis(1));
// Note: Lost packets are not removed from history because they might be
// reported as received by a later feedback.
history_.erase(it);
@ -257,7 +257,7 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
} else {
++ignored;
}
}
});
if (failed_lookups > 0) {
RTC_LOG(LS_WARNING) << "Failed to lookup send time for " << failed_lookups

View file

@ -65,17 +65,18 @@ void TransportFeedbackDemuxer::OnTransportFeedback(
RTC_DCHECK_RUN_ON(&observer_checker_);
std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num = seq_num_unwrapper_.PeekUnwrap(packet.sequence_number());
auto it = history_.find(seq_num);
if (it != history_.end()) {
auto packet_info = it->second;
packet_info.received = packet.received();
stream_feedbacks.push_back(std::move(packet_info));
if (packet.received())
history_.erase(it);
}
}
feedback.ForAllPackets(
[&](uint16_t sequence_number, TimeDelta delta_since_base) {
RTC_DCHECK_RUN_ON(&observer_checker_);
auto it = history_.find(seq_num_unwrapper_.PeekUnwrap(sequence_number));
if (it != history_.end()) {
auto packet_info = it->second;
packet_info.received = delta_since_base.IsFinite();
stream_feedbacks.push_back(std::move(packet_info));
if (delta_since_base.IsFinite())
history_.erase(it);
}
});
for (auto& observer : observers_) {
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;

View file

@ -273,11 +273,10 @@ void TransportFeedback::LastChunk::DecodeRunLength(uint16_t chunk,
}
TransportFeedback::TransportFeedback()
: TransportFeedback(/*include_timestamps=*/true, /*include_lost=*/true) {}
: TransportFeedback(/*include_timestamps=*/true) {}
TransportFeedback::TransportFeedback(bool include_timestamps, bool include_lost)
: include_lost_(include_lost),
base_seq_no_(0),
TransportFeedback::TransportFeedback(bool include_timestamps)
: base_seq_no_(0),
num_seq_no_(0),
base_time_ticks_(0),
feedback_seq_(0),
@ -288,8 +287,7 @@ TransportFeedback::TransportFeedback(bool include_timestamps, bool include_lost)
TransportFeedback::TransportFeedback(const TransportFeedback&) = default;
TransportFeedback::TransportFeedback(TransportFeedback&& other)
: include_lost_(other.include_lost_),
base_seq_no_(other.base_seq_no_),
: base_seq_no_(other.base_seq_no_),
num_seq_no_(other.num_seq_no_),
base_time_ticks_(other.base_time_ticks_),
feedback_seq_(other.feedback_seq_),
@ -355,11 +353,6 @@ bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number,
uint16_t num_missing_packets = sequence_number - next_seq_no;
if (!AddMissingPackets(num_missing_packets))
return false;
if (include_lost_) {
for (; next_seq_no != sequence_number; ++next_seq_no) {
all_packets_.emplace_back(next_seq_no);
}
}
}
DeltaSize delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2;
@ -367,8 +360,6 @@ bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number,
return false;
received_packets_.emplace_back(sequence_number, delta);
if (include_lost_)
all_packets_.emplace_back(sequence_number, delta);
last_timestamp_ += delta * kDeltaTick;
if (include_timestamps_) {
size_bytes_ += delta_size;
@ -381,10 +372,22 @@ TransportFeedback::GetReceivedPackets() const {
return received_packets_;
}
const std::vector<TransportFeedback::ReceivedPacket>&
TransportFeedback::GetAllPackets() const {
RTC_DCHECK(include_lost_);
return all_packets_;
void TransportFeedback::ForAllPackets(
rtc::FunctionView<void(uint16_t, TimeDelta)> handler) const {
TimeDelta delta_since_base = TimeDelta::Zero();
auto received_it = received_packets_.begin();
const uint16_t last_seq_num = base_seq_no_ + num_seq_no_;
for (uint16_t seq_num = base_seq_no_; seq_num != last_seq_num; ++seq_num) {
if (received_it != received_packets_.end() &&
received_it->sequence_number() == seq_num) {
delta_since_base += received_it->delta();
handler(seq_num, delta_since_base);
++received_it;
} else {
handler(seq_num, TimeDelta::PlusInfinity());
}
}
RTC_DCHECK(received_it == received_packets_.end());
}
uint16_t TransportFeedback::GetBaseSequence() const {
@ -469,14 +472,10 @@ bool TransportFeedback::Parse(const CommonHeader& packet) {
RTC_DCHECK_LE(index + delta_size, end_index);
switch (delta_size) {
case 0:
if (include_lost_)
all_packets_.emplace_back(seq_no);
break;
case 1: {
int16_t delta = payload[index];
received_packets_.emplace_back(seq_no, delta);
if (include_lost_)
all_packets_.emplace_back(seq_no, delta);
last_timestamp_ += delta * kDeltaTick;
index += delta_size;
break;
@ -484,8 +483,6 @@ bool TransportFeedback::Parse(const CommonHeader& packet) {
case 2: {
int16_t delta = ByteReader<int16_t>::ReadBigEndian(&payload[index]);
received_packets_.emplace_back(seq_no, delta);
if (include_lost_)
all_packets_.emplace_back(seq_no, delta);
last_timestamp_ += delta * kDeltaTick;
index += delta_size;
break;
@ -509,13 +506,6 @@ bool TransportFeedback::Parse(const CommonHeader& packet) {
if (delta_size > 0) {
received_packets_.emplace_back(seq_no, 0);
}
if (include_lost_) {
if (delta_size > 0) {
all_packets_.emplace_back(seq_no, 0);
} else {
all_packets_.emplace_back(seq_no);
}
}
++seq_no;
}
}

View file

@ -16,6 +16,7 @@
#include <vector>
#include "absl/base/attributes.h"
#include "api/function_view.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/source/rtcp_packet/rtpfb.h"
@ -29,23 +30,17 @@ class TransportFeedback : public Rtpfb {
class ReceivedPacket {
public:
ReceivedPacket(uint16_t sequence_number, int16_t delta_ticks)
: sequence_number_(sequence_number),
delta_ticks_(delta_ticks),
received_(true) {}
explicit ReceivedPacket(uint16_t sequence_number)
: sequence_number_(sequence_number), received_(false) {}
: sequence_number_(sequence_number), delta_ticks_(delta_ticks) {}
ReceivedPacket(const ReceivedPacket&) = default;
ReceivedPacket& operator=(const ReceivedPacket&) = default;
uint16_t sequence_number() const { return sequence_number_; }
int16_t delta_ticks() const { return delta_ticks_; }
TimeDelta delta() const { return delta_ticks_ * kDeltaTick; }
bool received() const { return received_; }
private:
uint16_t sequence_number_;
int16_t delta_ticks_;
bool received_;
};
// TODO(sprang): IANA reg?
static constexpr uint8_t kFeedbackMessageType = 15;
@ -58,8 +53,7 @@ class TransportFeedback : public Rtpfb {
// If `include_timestamps` is set to false, the created packet will not
// contain the receive delta block.
explicit TransportFeedback(bool include_timestamps,
bool include_lost = false);
explicit TransportFeedback(bool include_timestamps);
TransportFeedback(const TransportFeedback&);
TransportFeedback(TransportFeedback&&);
@ -72,7 +66,14 @@ class TransportFeedback : public Rtpfb {
// NOTE: This method requires increasing sequence numbers (excepting wraps).
bool AddReceivedPacket(uint16_t sequence_number, Timestamp timestamp);
const std::vector<ReceivedPacket>& GetReceivedPackets() const;
const std::vector<ReceivedPacket>& GetAllPackets() const;
// Calls `handler` for all packets this feedback describes.
// For received packets pass receieve time as `delta_since_base` since the
// `BaseTime()`. For missed packets calls `handler` with `delta_since_base =
// PlusInfinity()`.
void ForAllPackets(
rtc::FunctionView<void(uint16_t sequence_number,
TimeDelta delta_since_base)> handler) const;
uint16_t GetBaseSequence() const;
@ -164,7 +165,6 @@ class TransportFeedback : public Rtpfb {
// Adds `num_missing_packets` deltas of size 0.
bool AddMissingPackets(size_t num_missing_packets);
const bool include_lost_;
uint16_t base_seq_no_;
uint16_t num_seq_no_;
uint32_t base_time_ticks_;

View file

@ -30,6 +30,9 @@ using ::testing::AllOf;
using ::testing::Each;
using ::testing::ElementsAreArray;
using ::testing::Eq;
using ::testing::InSequence;
using ::testing::MockFunction;
using ::testing::Ne;
using ::testing::Property;
using ::testing::SizeIs;
@ -633,13 +636,13 @@ TEST(TransportFeedbackTest, ReportsMissingPackets) {
feedback_builder.AddReceivedPacket(kBaseSeqNo + 3,
kBaseTimestamp + TimeDelta::Millis(2));
EXPECT_THAT(
Parse(feedback_builder.Build()).GetAllPackets(),
ElementsAre(
Property(&TransportFeedback::ReceivedPacket::received, true),
Property(&TransportFeedback::ReceivedPacket::received, false),
Property(&TransportFeedback::ReceivedPacket::received, false),
Property(&TransportFeedback::ReceivedPacket::received, true)));
MockFunction<void(uint16_t, TimeDelta)> handler;
InSequence s;
EXPECT_CALL(handler, Call(kBaseSeqNo + 0, Ne(TimeDelta::PlusInfinity())));
EXPECT_CALL(handler, Call(kBaseSeqNo + 1, TimeDelta::PlusInfinity()));
EXPECT_CALL(handler, Call(kBaseSeqNo + 2, TimeDelta::PlusInfinity()));
EXPECT_CALL(handler, Call(kBaseSeqNo + 3, Ne(TimeDelta::PlusInfinity())));
Parse(feedback_builder.Build()).ForAllPackets(handler.AsStdFunction());
}
TEST(TransportFeedbackTest, ReportsMissingPacketsWithoutTimestamps) {
@ -652,13 +655,13 @@ TEST(TransportFeedbackTest, ReportsMissingPacketsWithoutTimestamps) {
// Packet losses indicated by jump in sequence number.
feedback_builder.AddReceivedPacket(kBaseSeqNo + 3, Timestamp::Zero());
EXPECT_THAT(
Parse(feedback_builder.Build()).GetAllPackets(),
ElementsAre(
Property(&TransportFeedback::ReceivedPacket::received, true),
Property(&TransportFeedback::ReceivedPacket::received, false),
Property(&TransportFeedback::ReceivedPacket::received, false),
Property(&TransportFeedback::ReceivedPacket::received, true)));
MockFunction<void(uint16_t, TimeDelta)> handler;
InSequence s;
EXPECT_CALL(handler, Call(kBaseSeqNo + 0, Ne(TimeDelta::PlusInfinity())));
EXPECT_CALL(handler, Call(kBaseSeqNo + 1, TimeDelta::PlusInfinity()));
EXPECT_CALL(handler, Call(kBaseSeqNo + 2, TimeDelta::PlusInfinity()));
EXPECT_CALL(handler, Call(kBaseSeqNo + 3, Ne(TimeDelta::PlusInfinity())));
Parse(feedback_builder.Build()).ForAllPackets(handler.AsStdFunction());
}
} // namespace
} // namespace webrtc