Introduce PacketReceiver::DeliverRtpPacket and PacketReceier::DeliverRtcpPacket

DeliverRtpPacket use a parsed RTP packet as argument where the RTP extensions are supposed to be known.
This method is implemented in webrt::Call and temporary used by the exising method  Call::DeliverRtp, but the idea is to instead avoid extra packet parsing by forwarding a RtpPacketReceived from RtpTransport::DemuxRtpPacket via  WebrtcVideoChannel::OnPacketReceived and WebrtcVoiceChannel.

DeliverRtcpPacket is also implemented in Call and is directly used in PeerConnection::InitializeRtcpCallback.

Bug: webrtc:14795, webrtc:7135
Change-Id: Ib6ffe8e1229ac07fa459ee2fc9a0af8455a23bac
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/290401
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39015}
This commit is contained in:
Per K 2023-01-05 14:01:39 +01:00 committed by WebRTC LUCI CQ
parent 7f8680cf6f
commit cf439a04e5
7 changed files with 149 additions and 83 deletions

View file

@ -23,6 +23,7 @@
#include "absl/functional/bind_front.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/media_types.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
@ -33,6 +34,7 @@
#include "call/adaptation/broadcast_resource_listener.h"
#include "call/bitrate_allocator.h"
#include "call/flexfec_receive_stream_impl.h"
#include "call/packet_receiver.h"
#include "call/receive_time_calculator.h"
#include "call/rtp_stream_receiver_controller.h"
#include "call/rtp_transport_controller_send.h"
@ -244,6 +246,13 @@ class Call final : public webrtc::Call,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
void DeliverRtpPacket(
MediaType media_type,
RtpPacketReceived packet,
OnUndemuxablePacketHandler undemuxable_packet_handler) override;
void SignalChannelNetworkState(MediaType media, NetworkState state) override;
void OnAudioTransportOverheadChanged(
@ -1375,60 +1384,102 @@ void Call::ConfigureSync(absl::string_view sync_group) {
}
}
void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) {
RTC_DCHECK_RUN_ON(network_thread_);
void Call::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(IsRtcpPacket(packet));
TRACE_EVENT0("webrtc", "Call::DeliverRtcp");
// TODO(bugs.webrtc.org/11993): This DCHECK is here just to maintain the
// invariant that currently the only call path to this function is via
// `PeerConnection::InitializeRtcpCallback()`. DeliverRtp on the other hand
// gets called via the channel classes and
// WebRtc[Audio|Video]Channel's `OnPacketReceived`. We'll remove the
// PeerConnection involvement as well as
// `JsepTransportController::OnRtcpPacketReceived_n` and `rtcp_handler`
// and make sure that the flow of packets is consistent from the
// `RtpTransport` class, via the *Channel and *Engine classes and into Call.
// This way we'll also know more about the context of the packet.
RTC_DCHECK_EQ(media_type, MediaType::ANY);
receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
bool rtcp_delivered = false;
for (VideoReceiveStream2* stream : video_receive_streams_) {
if (stream->DeliverRtcp(packet.cdata(), packet.size()))
rtcp_delivered = true;
}
// TODO(bugs.webrtc.org/11993): This should execute directly on the network
// thread.
worker_thread_->PostTask(
SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() {
RTC_DCHECK_RUN_ON(worker_thread_);
for (AudioReceiveStreamImpl* stream : audio_receive_streams_) {
stream->DeliverRtcp(packet.cdata(), packet.size());
rtcp_delivered = true;
}
receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
bool rtcp_delivered = false;
for (VideoReceiveStream2* stream : video_receive_streams_) {
if (stream->DeliverRtcp(packet.cdata(), packet.size()))
rtcp_delivered = true;
}
for (VideoSendStream* stream : video_send_streams_) {
stream->DeliverRtcp(packet.cdata(), packet.size());
rtcp_delivered = true;
}
for (AudioReceiveStreamImpl* stream : audio_receive_streams_) {
stream->DeliverRtcp(packet.cdata(), packet.size());
rtcp_delivered = true;
}
for (auto& kv : audio_send_ssrcs_) {
kv.second->DeliverRtcp(packet.cdata(), packet.size());
rtcp_delivered = true;
}
for (VideoSendStream* stream : video_send_streams_) {
stream->DeliverRtcp(packet.cdata(), packet.size());
rtcp_delivered = true;
}
if (rtcp_delivered) {
event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(packet));
}
}
for (auto& kv : audio_send_ssrcs_) {
kv.second->DeliverRtcp(packet.cdata(), packet.size());
rtcp_delivered = true;
}
void Call::DeliverRtpPacket(
MediaType media_type,
RtpPacketReceived packet,
OnUndemuxablePacketHandler undemuxable_packet_handler) {
RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(packet.arrival_time().IsFinite());
if (rtcp_delivered) {
event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
rtc::MakeArrayView(packet.cdata(), packet.size())));
}
}));
if (receive_time_calculator_) {
int64_t packet_time_us = packet.arrival_time().us();
// Repair packet_time_us for clock resets by comparing a new read of
// the same clock (TimeUTCMicros) to a monotonic clock reading.
packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
packet.set_arrival_time(Timestamp::Micros(packet_time_us));
}
// We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
// These are empty (zero length payload) RTP packets with an unsignaled
// payload type.
const bool is_keep_alive_packet = packet.payload_size() == 0;
RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
is_keep_alive_packet);
NotifyBweOfReceivedPacket(packet, media_type);
if (media_type != MediaType::AUDIO && media_type != MediaType::VIDEO) {
RTC_DCHECK(is_keep_alive_packet);
return;
}
RtpStreamReceiverController& receiver_controller =
media_type == MediaType::AUDIO ? audio_receiver_controller_
: video_receiver_controller_;
if (!receiver_controller.OnRtpPacket(packet)) {
// Demuxing failed. Allow the caller to create a
// receive stream in order to handle unsignalled SSRCs and try again.
// Note that we dont want to call NotifyBweOfReceivedPacket twice per
// packet.
if (!undemuxable_packet_handler(packet)) {
return;
}
if (!receiver_controller.OnRtpPacket(packet)) {
RTC_LOG(LS_INFO) << "Failed to demux packet " << packet.Ssrc();
return;
}
}
event_log_->Log(std::make_unique<RtcEventRtpPacketIncoming>(packet));
// RateCounters expect input parameter as int, save it as int,
// instead of converting each time it is passed to RateCounter::Add below.
int length = static_cast<int>(packet.size());
if (media_type == MediaType::AUDIO) {
receive_stats_.AddReceivedAudioBytes(length, packet.arrival_time());
}
if (media_type == MediaType::VIDEO) {
receive_stats_.AddReceivedVideoBytes(length, packet.arrival_time());
}
}
PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
// TODO(perkj, https://bugs.webrtc.org/7135): Deprecate this method and
// direcly use DeliverRtpPacket.
TRACE_EVENT0("webrtc", "Call::DeliverRtp");
RTC_DCHECK_NE(media_type, MediaType::ANY);
@ -1437,52 +1488,24 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
return DELIVERY_PACKET_ERROR;
if (packet_time_us != -1) {
if (receive_time_calculator_) {
// Repair packet_time_us for clock resets by comparing a new read of
// the same clock (TimeUTCMicros) to a monotonic clock reading.
packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
}
parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us));
} else {
parsed_packet.set_arrival_time(clock_->CurrentTime());
}
// We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
// These are empty (zero length payload) RTP packets with an unsignaled
// payload type.
const bool is_keep_alive_packet = parsed_packet.payload_size() == 0;
RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
is_keep_alive_packet);
if (!IdentifyReceivedPacket(parsed_packet))
return DELIVERY_UNKNOWN_SSRC;
NotifyBweOfReceivedPacket(parsed_packet, media_type);
// RateCounters expect input parameter as int, save it as int,
// instead of converting each time it is passed to RateCounter::Add below.
int length = static_cast<int>(parsed_packet.size());
if (media_type == MediaType::AUDIO) {
if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) {
receive_stats_.AddReceivedAudioBytes(length,
parsed_packet.arrival_time());
event_log_->Log(
std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
return DELIVERY_OK;
}
} else if (media_type == MediaType::VIDEO) {
if (media_type == MediaType::VIDEO) {
parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
receive_stats_.AddReceivedVideoBytes(length,
parsed_packet.arrival_time());
event_log_->Log(
std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
return DELIVERY_OK;
}
}
return DELIVERY_UNKNOWN_SSRC;
DeliverRtpPacket(media_type, std::move(parsed_packet),
[](const webrtc::RtpPacketReceived& packet) {
// If IdentifyReceivedPacket returns true, a packet is
// expected to be demuxable.
RTC_DCHECK_NOTREACHED();
return false;
});
return DELIVERY_OK;
}
PacketReceiver::DeliveryStatus Call::DeliverPacket(
@ -1491,7 +1514,11 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
int64_t packet_time_us) {
if (IsRtcpPacket(packet)) {
RTC_DCHECK_RUN_ON(network_thread_);
DeliverRtcp(media_type, std::move(packet));
worker_thread_->PostTask(SafeTask(
task_safety_.flag(), [this, packet = std::move(packet)]() mutable {
RTC_DCHECK_RUN_ON(worker_thread_);
DeliverRtcpPacket(std::move(packet));
}));
return DELIVERY_OK;
}

View file

@ -414,6 +414,11 @@ PacketReceiver::DeliveryStatus DegradedCall::DeliverPacket(
return status;
}
void DegradedCall::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
receive_pipe_->DeliverRtcpPacket(std::move(packet));
receive_pipe_->Process();
}
void DegradedCall::SetClientBitratePreferences(
const webrtc::BitrateSettings& preferences) {
call_->SetClientBitratePreferences(preferences);

View file

@ -116,6 +116,7 @@ class DegradedCall : public Call, private PacketReceiver {
DeliveryStatus DeliverPacket(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
private:
class FakeNetworkPipeOnTaskQueue {

View file

@ -184,6 +184,11 @@ PacketReceiver::DeliveryStatus FakeNetworkPipe::DeliverPacket(
: PacketReceiver::DELIVERY_PACKET_ERROR;
}
void FakeNetworkPipe::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
EnqueuePacket(std::move(packet), absl::nullopt, true, MediaType::ANY,
absl::nullopt);
}
void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) {
MutexLock lock(&config_lock_);
clock_offset_ms_ = offset_ms;

View file

@ -149,6 +149,8 @@ class FakeNetworkPipe : public SimulatedPacketReceiverInterface {
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
// TODO(bugs.webrtc.org/9584): Needed to inherit the alternative signature for
// this method.
using PacketReceiver::DeliverPacket;

View file

@ -10,7 +10,10 @@
#ifndef CALL_PACKET_RECEIVER_H_
#define CALL_PACKET_RECEIVER_H_
#include "absl/functional/any_invocable.h"
#include "api/media_types.h"
#include "modules/rtp_rtcp/source/rtp_packet_received.h"
#include "rtc_base/checks.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace webrtc {
@ -27,6 +30,28 @@ class PacketReceiver {
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) = 0;
// Demux RTCP packets. Must be called on the worker thread.
virtual void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
// TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and
// FakeNetworkPipe.
RTC_CHECK_NOTREACHED();
}
// Invoked once when a packet packet is received that can not be demuxed.
// If the method returns true, a new attempt is made to demux the packet.
using OnUndemuxablePacketHandler =
absl::AnyInvocable<bool(const RtpPacketReceived& parsed_packet)>;
// Demux RTP packets. Must be called on the worker thread.
virtual void DeliverRtpPacket(
MediaType media_type,
RtpPacketReceived packet,
OnUndemuxablePacketHandler undemuxable_packet_handler) {
// TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and
// FakeNetworkPipe.
RTC_CHECK_NOTREACHED();
}
protected:
virtual ~PacketReceiver() {}
};

View file

@ -2978,10 +2978,11 @@ std::function<void(const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us)>
PeerConnection::InitializeRtcpCallback() {
RTC_DCHECK_RUN_ON(network_thread());
return [this](const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread());
call_ptr_->Receiver()->DeliverPacket(MediaType::ANY, packet,
packet_time_us);
return [this](const rtc::CopyOnWriteBuffer& packet,
int64_t /*packet_time_us*/) {
worker_thread()->PostTask(SafeTask(worker_thread_safety_, [this, packet]() {
call_ptr_->Receiver()->DeliverRtcpPacket(packet);
}));
};
}