Process events with the same timestamp in a defined order.

As before, events are processed primarily in timestamp order.
This CL adds a heuristic to break ties for events with the same timestamp.

- Roughly speaking, configs and connectivity events are processed first, followed by incoming packets, then BWE updates, then other (general) events and finally outgoing packets and ALR events.

- Among RTP packets, transport sequence number is used to break ties.

- The insertion order (into the EventProcessor) is used as a last resort.

Bug: b/282153758
Change-Id: I914e4500ca63e1a8754766d1833a7b32f6a38176
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/308140
Commit-Queue: Björn Terelius <terelius@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40318}
This commit is contained in:
Björn Terelius 2023-06-20 14:08:04 +02:00 committed by WebRTC LUCI CQ
parent 64d384ff29
commit ff58aed678
8 changed files with 696 additions and 40 deletions

View file

@ -513,6 +513,7 @@ if (rtc_enable_protobuf) {
"rtc_event_log/rtc_event_log_parser.h",
"rtc_event_log/rtc_event_processor.cc",
"rtc_event_log/rtc_event_processor.h",
"rtc_event_log/rtc_event_processor_order.h",
]
deps = [

View file

@ -2405,12 +2405,14 @@ std::vector<LoggedPacketInfo> ParsedRtcEventLog::GetPacketInfos(
RtcEventProcessor process;
for (const auto& rtp_packets : rtp_packets_by_ssrc(direction)) {
process.AddEvents(rtp_packets.packet_view, rtp_handler);
process.AddEvents(rtp_packets.packet_view, rtp_handler, direction);
}
if (direction == PacketDirection::kOutgoingPacket) {
process.AddEvents(incoming_transport_feedback_, feedback_handler);
process.AddEvents(incoming_transport_feedback_, feedback_handler,
PacketDirection::kIncomingPacket);
} else {
process.AddEvents(outgoing_transport_feedback_, feedback_handler);
process.AddEvents(outgoing_transport_feedback_, feedback_handler,
PacketDirection::kOutgoingPacket);
}
process.ProcessEventsInOrder();
return packets;

View file

@ -9,16 +9,20 @@
*/
#include "logging/rtc_event_log/rtc_event_processor.h"
#include "rtc_base/numerics/sequence_number_util.h"
namespace webrtc {
RtcEventProcessor::RtcEventProcessor() = default;
RtcEventProcessor::~RtcEventProcessor() = default;
void RtcEventProcessor::ProcessEventsInOrder() {
// `event_lists_` is a min-heap of lists ordered by the timestamp of the
// first element in the list. We therefore process the first element of the
// first list, then reinsert the remainder of that list into the heap
// if the list still contains unprocessed elements.
std::make_heap(event_lists_.begin(), event_lists_.end(), Cmp);
while (!event_lists_.empty()) {
event_lists_.front()->ProcessNext();
std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp);
@ -33,9 +37,21 @@ void RtcEventProcessor::ProcessEventsInOrder() {
bool RtcEventProcessor::Cmp(const RtcEventProcessor::ListPtrType& a,
const RtcEventProcessor::ListPtrType& b) {
int64_t time_diff = a->GetNextTime() - b->GetNextTime();
if (time_diff == 0)
return a->GetTieBreaker() > b->GetTieBreaker();
return time_diff > 0;
if (time_diff != 0)
return time_diff > 0;
if (a->GetTypeOrder() != b->GetTypeOrder())
return a->GetTypeOrder() > b->GetTypeOrder();
absl::optional<uint16_t> wrapped_seq_num_a = a->GetTransportSeqNum();
absl::optional<uint16_t> wrapped_seq_num_b = b->GetTransportSeqNum();
if (wrapped_seq_num_a && wrapped_seq_num_b) {
return AheadOf<uint16_t>(*wrapped_seq_num_a, *wrapped_seq_num_b);
} else if (wrapped_seq_num_a.has_value() != wrapped_seq_num_b.has_value()) {
return wrapped_seq_num_a.has_value();
}
return a->GetInsertionOrder() > b->GetInsertionOrder();
}
} // namespace webrtc

View file

@ -19,7 +19,7 @@
#include <vector>
#include "api/function_view.h"
#include "logging/rtc_event_log/rtc_event_log_parser.h"
#include "logging/rtc_event_log/rtc_event_processor_order.h"
#include "rtc_base/checks.h"
namespace webrtc {
@ -31,7 +31,7 @@ namespace webrtc {
namespace event_processor_impl {
// Interface to allow "merging" lists of different types. ProcessNext()
// processes the next unprocesses element in the list. IsEmpty() checks if all
// processes the next unprocessed element in the list. IsEmpty() checks if all
// elements have been processed. GetNextTime returns the timestamp of the next
// unprocessed element.
class ProcessableEventListInterface {
@ -40,7 +40,9 @@ class ProcessableEventListInterface {
virtual void ProcessNext() = 0;
virtual bool IsEmpty() const = 0;
virtual int64_t GetNextTime() const = 0;
virtual int GetTieBreaker() const = 0;
virtual int GetTypeOrder() const = 0;
virtual absl::optional<uint16_t> GetTransportSeqNum() const = 0;
virtual int GetInsertionOrder() const = 0;
};
// ProcessableEventList encapsulates a list of events and a function that will
@ -51,8 +53,16 @@ class ProcessableEventList : public ProcessableEventListInterface {
ProcessableEventList(Iterator begin,
Iterator end,
std::function<void(const T&)> f,
int tie_breaker)
: begin_(begin), end_(end), f_(f), tie_breaker_(tie_breaker) {}
int type_order,
std::function<absl::optional<uint16_t>(const T&)>
transport_seq_num_accessor,
int insertion_order)
: begin_(begin),
end_(end),
f_(f),
type_order_(type_order),
transport_seq_num_accessor_(transport_seq_num_accessor),
insertion_order_(insertion_order) {}
void ProcessNext() override {
RTC_DCHECK(!IsEmpty());
@ -66,14 +76,25 @@ class ProcessableEventList : public ProcessableEventListInterface {
RTC_DCHECK(!IsEmpty());
return begin_->log_time_us();
}
int GetTieBreaker() const override { return tie_breaker_; }
int GetTypeOrder() const override { return type_order_; }
absl::optional<uint16_t> GetTransportSeqNum() const override {
RTC_DCHECK(!IsEmpty());
return transport_seq_num_accessor_(*begin_);
}
int GetInsertionOrder() const override { return insertion_order_; }
private:
Iterator begin_;
Iterator end_;
std::function<void(const T&)> f_;
int tie_breaker_;
int type_order_;
std::function<absl::optional<uint16_t>(const T&)> transport_seq_num_accessor_;
int insertion_order_;
};
} // namespace event_processor_impl
// Helper class used to "merge" two or more lists of ordered RtcEventLog events
@ -106,22 +127,41 @@ class RtcEventProcessor {
void AddEvents(
const Iterable& iterable,
std::function<void(const typename Iterable::value_type&)> handler) {
if (iterable.begin() == iterable.end())
return;
event_lists_.push_back(
std::make_unique<event_processor_impl::ProcessableEventList<
typename Iterable::const_iterator, typename Iterable::value_type>>(
iterable.begin(), iterable.end(), handler,
insertion_order_index_++));
std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
using ValueType =
typename std::remove_const<typename Iterable::value_type>::type;
AddEvents(iterable, handler, TieBreaker<ValueType>::type_order,
TieBreaker<ValueType>::transport_seq_num_accessor,
num_insertions_);
}
template <typename Iterable>
void AddEvents(
const Iterable& iterable,
std::function<void(const typename Iterable::value_type&)> handler,
PacketDirection /*not used*/) {
AddEvents(iterable, handler);
PacketDirection direction) {
using ValueType =
typename std::remove_const<typename Iterable::value_type>::type;
AddEvents(iterable, handler, TieBreaker<ValueType>::type_order(direction),
TieBreaker<ValueType>::transport_seq_num_accessor,
num_insertions_);
}
template <typename Iterable>
void AddEvents(
const Iterable& iterable,
std::function<void(const typename Iterable::value_type&)> handler,
int type_order,
std::function<absl::optional<uint16_t>(
const typename Iterable::value_type&)> transport_seq_num_accessor,
int insertion_order) {
if (iterable.begin() == iterable.end())
return;
num_insertions_++;
event_lists_.push_back(
std::make_unique<event_processor_impl::ProcessableEventList<
typename Iterable::const_iterator, typename Iterable::value_type>>(
iterable.begin(), iterable.end(), handler, type_order,
transport_seq_num_accessor, insertion_order));
}
void ProcessEventsInOrder();
@ -129,10 +169,11 @@ class RtcEventProcessor {
private:
using ListPtrType =
std::unique_ptr<event_processor_impl::ProcessableEventListInterface>;
int insertion_order_index_ = 0;
std::vector<ListPtrType> event_lists_;
// Comparison function to make `event_lists_` into a min heap.
static bool Cmp(const ListPtrType& a, const ListPtrType& b);
std::vector<ListPtrType> event_lists_;
int num_insertions_ = 0;
};
} // namespace webrtc

View file

@ -0,0 +1,442 @@
/*
* Copyright (c) 2023 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_ORDER_H_
#define LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_ORDER_H_
#include <stdint.h>
#include "absl/types/optional.h"
#include "api/function_view.h"
#include "logging/rtc_event_log/events/logged_rtp_rtcp.h"
#include "logging/rtc_event_log/rtc_event_log_parser.h"
namespace webrtc {
// The RTC event log only uses millisecond precision timestamps
// and doesn't preserve order between events in different batches.
// This is a heuristic to order events in a way that preserves
// "typical" dependencies, e.g. we receive packets before we
// send feedback about them, and RTP packets sent or received
// during the same millisecond are in sequence number order.
enum class TypeOrder {
Start,
// Connectivity and stream configurations before incoming packets
StreamConfig,
IceCondidateConfig,
IceCandidateEvent,
DtlsTransportState,
DtlsWritable,
RouteChange,
// Incoming packets
RtpIn,
RtcpIn,
GenericPacketIn,
GenericAckIn,
// BWE depends on incoming feedback (send side estimation)
// or incoming media packets (receive side estimation).
// Delay-based BWE depends on probe results.
// Loss-based BWE depends on delay-based BWE.
// Loss-based BWE may trigger new probes.
BweRemoteEstimate,
BweProbeFailure,
BweProbeSuccess,
BweDelayBased,
BweLossBased,
BweProbeCreated,
// General processing events. No obvious order.
AudioNetworkAdaptation,
NetEqSetMinDelay,
AudioPlayout,
FrameDecoded,
// Outgoing packets and feedback depends on BWE and might depend on
// processing.
RtpOut,
RtcpOut,
GenericPacketOut,
// Alr is updated after a packet is sent.
AlrState,
Stop,
};
template <typename T>
class TieBreaker {
static_assert(sizeof(T) != sizeof(T),
"Specialize TieBreaker to define an order for the event type.");
};
template <>
class TieBreaker<LoggedStartEvent> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::Start);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedStartEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedStopEvent> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::Stop);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedStopEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedAudioRecvConfig> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::StreamConfig);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedAudioRecvConfig&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedAudioSendConfig> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::StreamConfig);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedAudioSendConfig&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedVideoRecvConfig> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::StreamConfig);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedVideoRecvConfig&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedVideoSendConfig> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::StreamConfig);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedVideoSendConfig&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedIceCandidatePairConfig> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::IceCondidateConfig);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedIceCandidatePairConfig&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedIceCandidatePairEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::IceCandidateEvent);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedIceCandidatePairEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedDtlsTransportState> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::DtlsTransportState);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedDtlsTransportState&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedDtlsWritableState> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::DtlsWritable);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedDtlsWritableState&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRouteChangeEvent> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::RouteChange);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRouteChangeEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRemoteEstimateEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::BweRemoteEstimate);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRemoteEstimateEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedBweProbeFailureEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::BweProbeFailure);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedBweProbeFailureEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedBweProbeSuccessEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::BweProbeSuccess);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedBweProbeSuccessEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedBweDelayBasedUpdate> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::BweDelayBased);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedBweDelayBasedUpdate&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedBweLossBasedUpdate> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::BweLossBased);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedBweLossBasedUpdate&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedBweProbeClusterCreatedEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::BweProbeCreated);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedBweProbeClusterCreatedEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedAudioNetworkAdaptationEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::AudioNetworkAdaptation);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedAudioNetworkAdaptationEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedNetEqSetMinimumDelayEvent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::NetEqSetMinDelay);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedNetEqSetMinimumDelayEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedAudioPlayoutEvent> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::AudioPlayout);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedAudioPlayoutEvent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedFrameDecoded> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::FrameDecoded);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedFrameDecoded&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedGenericPacketReceived> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::GenericPacketIn);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedGenericPacketReceived&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedGenericAckReceived> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::GenericAckIn);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedGenericAckReceived&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedGenericPacketSent> {
public:
static constexpr int type_order =
static_cast<int>(TypeOrder::GenericPacketOut);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedGenericPacketSent&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtpPacket> {
public:
static constexpr int type_order(PacketDirection direction) {
return static_cast<int>(direction == PacketDirection::kIncomingPacket
? TypeOrder::RtpIn
: TypeOrder::RtpOut);
}
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtpPacket& p) {
return p.header.extension.hasTransportSequenceNumber
? p.header.extension.transportSequenceNumber
: absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedPacketInfo> {
public:
static constexpr int type_order(PacketDirection direction) {
return static_cast<int>(direction == PacketDirection::kIncomingPacket
? TypeOrder::RtpIn
: TypeOrder::RtpOut);
}
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedPacketInfo& p) {
return p.has_transport_seq_no ? p.transport_seq_no
: absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtpPacketIncoming> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::RtpIn);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtpPacketIncoming& p) {
return p.rtp.header.extension.hasTransportSequenceNumber
? p.rtp.header.extension.transportSequenceNumber
: absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtpPacketOutgoing> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::RtpOut);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtpPacketOutgoing& p) {
return p.rtp.header.extension.hasTransportSequenceNumber
? p.rtp.header.extension.transportSequenceNumber
: absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtcpPacketIncoming> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::RtcpIn);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtcpPacketIncoming&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtcpPacketOutgoing> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::RtcpOut);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtcpPacketOutgoing&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtcpPacketTransportFeedback> {
public:
static constexpr int type_order(PacketDirection direction) {
return static_cast<int>(direction == PacketDirection::kIncomingPacket
? TypeOrder::RtcpIn
: TypeOrder::RtcpOut);
}
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtcpPacketTransportFeedback&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedRtcpPacketReceiverReport> {
public:
static constexpr int type_order(PacketDirection direction) {
return static_cast<int>(direction == PacketDirection::kIncomingPacket
? TypeOrder::RtcpIn
: TypeOrder::RtcpOut);
}
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedRtcpPacketReceiverReport&) {
return absl::optional<uint16_t>();
}
};
template <>
class TieBreaker<LoggedAlrStateEvent> {
public:
static constexpr int type_order = static_cast<int>(TypeOrder::AlrState);
static absl::optional<uint16_t> transport_seq_num_accessor(
const LoggedAlrStateEvent&) {
return absl::optional<uint16_t>();
}
};
} // namespace webrtc
#endif // LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_ORDER_H_

View file

@ -14,17 +14,23 @@
#include <cstdint>
#include <initializer_list>
#include <numeric>
#include <limits>
#include "absl/memory/memory.h"
#include "logging/rtc_event_log/rtc_event_log_parser.h"
#include "rtc_base/checks.h"
#include "rtc_base/random.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
LoggedStartEvent CreateEvent(int64_t time_ms, int64_t utc_time_ms) {
return LoggedStartEvent(Timestamp::Millis(time_ms),
Timestamp::Millis(utc_time_ms));
}
std::vector<LoggedStartEvent> CreateEventList(
std::initializer_list<int64_t> timestamp_list) {
std::vector<LoggedStartEvent> v;
@ -45,6 +51,22 @@ CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) {
}
return lists;
}
LoggedRtpPacket CreateRtpPacket(int64_t time_ms,
uint32_t ssrc,
absl::optional<uint16_t> transport_seq_num) {
RTPHeader header;
header.ssrc = ssrc;
header.timestamp = static_cast<uint32_t>(time_ms);
header.paddingLength = 0;
header.headerLength = 20;
header.extension.hasTransportSequenceNumber = transport_seq_num.has_value();
if (transport_seq_num.has_value()) {
header.extension.transportSequenceNumber = transport_seq_num.value();
}
return LoggedRtpPacket(Timestamp::Millis(time_ms), header, 20, 1000);
}
} // namespace
TEST(RtcEventProcessor, NoList) {
@ -160,4 +182,131 @@ TEST(RtcEventProcessor, DifferentTypes) {
}
}
TEST(RtcEventProcessor, IncomingPacketBeforeOutgoingFeedback) {
EXPECT_LT(TieBreaker<LoggedRtpPacketIncoming>::type_order,
TieBreaker<LoggedRtcpPacketOutgoing>::type_order);
}
TEST(RtcEventProcessor, PacketWrapperTypesOrderedAsRtp) {
EXPECT_EQ(TieBreaker<LoggedRtpPacketIncoming>::type_order,
TieBreaker<LoggedRtpPacket>::type_order(
PacketDirection::kIncomingPacket));
EXPECT_EQ(TieBreaker<LoggedRtpPacketOutgoing>::type_order,
TieBreaker<LoggedRtpPacket>::type_order(
PacketDirection::kOutgoingPacket));
EXPECT_EQ(TieBreaker<LoggedRtpPacketIncoming>::type_order,
TieBreaker<LoggedPacketInfo>::type_order(
PacketDirection::kIncomingPacket));
EXPECT_EQ(TieBreaker<LoggedRtpPacketOutgoing>::type_order,
TieBreaker<LoggedPacketInfo>::type_order(
PacketDirection::kOutgoingPacket));
}
TEST(RtcEventProcessor, IncomingFeedbackBeforeBwe) {
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedBweProbeSuccessEvent>::type_order);
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedRemoteEstimateEvent>::type_order);
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedBweProbeSuccessEvent>::type_order);
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedBweProbeFailureEvent>::type_order);
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedBweDelayBasedUpdate>::type_order);
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedBweLossBasedUpdate>::type_order);
EXPECT_LT(TieBreaker<LoggedRtcpPacketIncoming>::type_order,
TieBreaker<LoggedBweProbeClusterCreatedEvent>::type_order);
}
TEST(RtcEventProcessor, RtpPacketsInTransportSeqNumOrder) {
std::vector<LoggedRtpPacket> ssrc_1234{
CreateRtpPacket(1, 1234, absl::nullopt),
CreateRtpPacket(1, 1234, absl::nullopt)};
std::vector<LoggedRtpPacket> ssrc_2345{CreateRtpPacket(1, 2345, 2),
CreateRtpPacket(1, 2345, 3),
CreateRtpPacket(1, 2345, 6)};
std::vector<LoggedRtpPacket> ssrc_3456{CreateRtpPacket(1, 3456, 1),
CreateRtpPacket(1, 3456, 4),
CreateRtpPacket(1, 3456, 5)};
// Store SSRC and transport sequence number for each processed packet.
std::vector<std::pair<uint32_t, absl::optional<uint16_t>>> results;
auto get_packet = [&results](const LoggedRtpPacket& packet) {
absl::optional<uint16_t> transport_seq_num;
if (packet.header.extension.hasTransportSequenceNumber)
transport_seq_num = packet.header.extension.transportSequenceNumber;
results.emplace_back(packet.header.ssrc, transport_seq_num);
};
RtcEventProcessor processor;
processor.AddEvents(ssrc_1234, get_packet, PacketDirection::kIncomingPacket);
processor.AddEvents(ssrc_2345, get_packet, PacketDirection::kIncomingPacket);
processor.AddEvents(ssrc_3456, get_packet, PacketDirection::kIncomingPacket);
processor.ProcessEventsInOrder();
std::vector<std::pair<uint32_t, absl::optional<uint16_t>>> expected{
{1234, absl::nullopt},
{1234, absl::nullopt},
{3456, 1},
{2345, 2},
{2345, 3},
{3456, 4},
{3456, 5},
{2345, 6}};
EXPECT_THAT(results, testing::ElementsAreArray(expected));
}
TEST(RtcEventProcessor, TransportSeqNumOrderHandlesWrapAround) {
std::vector<LoggedRtpPacket> ssrc_1234{
CreateRtpPacket(0, 1234, std::numeric_limits<uint16_t>::max() - 1),
CreateRtpPacket(1, 1234, 1), CreateRtpPacket(1, 1234, 2)};
std::vector<LoggedRtpPacket> ssrc_2345{
CreateRtpPacket(1, 2345, std::numeric_limits<uint16_t>::max()),
CreateRtpPacket(1, 2345, 0), CreateRtpPacket(1, 2345, 3)};
// Store SSRC and transport sequence number for each processed packet.
std::vector<std::pair<uint32_t, absl::optional<uint16_t>>> results;
auto get_packet = [&results](const LoggedRtpPacket& packet) {
absl::optional<uint16_t> transport_seq_num;
if (packet.header.extension.hasTransportSequenceNumber)
transport_seq_num = packet.header.extension.transportSequenceNumber;
results.emplace_back(packet.header.ssrc, transport_seq_num);
};
RtcEventProcessor processor;
processor.AddEvents(ssrc_1234, get_packet, PacketDirection::kOutgoingPacket);
processor.AddEvents(ssrc_2345, get_packet, PacketDirection::kOutgoingPacket);
processor.ProcessEventsInOrder();
std::vector<std::pair<uint32_t, absl::optional<uint16_t>>> expected{
{1234, std::numeric_limits<uint16_t>::max() - 1},
{2345, std::numeric_limits<uint16_t>::max()},
{2345, 0},
{1234, 1},
{1234, 2},
{2345, 3}};
EXPECT_THAT(results, testing::ElementsAreArray(expected));
}
TEST(RtcEventProcessor, InsertionOrderIfNoTransportSeqNum) {
std::vector<LoggedStartEvent> events1{CreateEvent(1, 222)};
std::vector<LoggedStartEvent> events2{CreateEvent(1, 111)};
std::vector<LoggedStartEvent> events3{CreateEvent(1, 333)};
std::vector<int64_t> results;
auto get_utc_time = [&results](const LoggedStartEvent& elem) {
results.push_back(elem.utc_time().ms());
};
RtcEventProcessor processor;
processor.AddEvents(events1, get_utc_time);
processor.AddEvents(events2, get_utc_time);
processor.AddEvents(events3, get_utc_time);
processor.ProcessEventsInOrder();
EXPECT_THAT(results, testing::ElementsAreArray({222, 111, 333}));
}
} // namespace webrtc

View file

@ -621,23 +621,25 @@ void EventLogAnalyzer::CreateTotalPacketRateGraph(PacketDirection direction,
Timestamp log_time_;
};
std::vector<LogTime> packet_times;
auto handle_rtp = [&](const LoggedRtpPacket& packet) {
auto handle_rtp = [&packet_times](const LoggedRtpPacket& packet) {
packet_times.emplace_back(packet.log_time());
};
RtcEventProcessor process;
for (const auto& stream : parsed_log_.rtp_packets_by_ssrc(direction)) {
process.AddEvents(stream.packet_view, handle_rtp);
process.AddEvents(stream.packet_view, handle_rtp, direction);
}
if (direction == kIncomingPacket) {
auto handle_incoming_rtcp = [&](const LoggedRtcpPacketIncoming& packet) {
packet_times.emplace_back(packet.log_time());
};
auto handle_incoming_rtcp =
[&packet_times](const LoggedRtcpPacketIncoming& packet) {
packet_times.emplace_back(packet.log_time());
};
process.AddEvents(parsed_log_.incoming_rtcp_packets(),
handle_incoming_rtcp);
} else {
auto handle_outgoing_rtcp = [&](const LoggedRtcpPacketOutgoing& packet) {
packet_times.emplace_back(packet.log_time());
};
auto handle_outgoing_rtcp =
[&packet_times](const LoggedRtcpPacketOutgoing& packet) {
packet_times.emplace_back(packet.log_time());
};
process.AddEvents(parsed_log_.outgoing_rtcp_packets(),
handle_outgoing_rtcp);
}

View file

@ -186,19 +186,22 @@ void LogBasedNetworkControllerSimulation::ProcessEventsInLog(
[this](const LoggedBweProbeClusterCreatedEvent& probe_cluster) {
OnProbeCreated(probe_cluster);
});
processor.AddEvents(packet_infos, [this](const LoggedPacketInfo& packet) {
OnPacketSent(packet);
});
processor.AddEvents(
packet_infos,
[this](const LoggedPacketInfo& packet) { OnPacketSent(packet); },
PacketDirection::kOutgoingPacket);
processor.AddEvents(
parsed_log_.transport_feedbacks(PacketDirection::kIncomingPacket),
[this](const LoggedRtcpPacketTransportFeedback& feedback) {
OnFeedback(feedback);
});
},
PacketDirection::kIncomingPacket);
processor.AddEvents(
parsed_log_.receiver_reports(PacketDirection::kIncomingPacket),
[this](const LoggedRtcpPacketReceiverReport& report) {
OnReceiverReport(report);
});
},
PacketDirection::kIncomingPacket);
processor.AddEvents(parsed_log_.ice_candidate_pair_configs(),
[this](const LoggedIceCandidatePairConfig& candidate) {
OnIceConfig(candidate);