diff --git a/call/call.cc b/call/call.cc index 3b2283d8da..218505cdea 100644 --- a/call/call.cc +++ b/call/call.cc @@ -23,6 +23,7 @@ #include "absl/functional/bind_front.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "api/media_types.h" #include "api/rtc_event_log/rtc_event_log.h" #include "api/sequence_checker.h" #include "api/task_queue/pending_task_safety_flag.h" @@ -33,6 +34,7 @@ #include "call/adaptation/broadcast_resource_listener.h" #include "call/bitrate_allocator.h" #include "call/flexfec_receive_stream_impl.h" +#include "call/packet_receiver.h" #include "call/receive_time_calculator.h" #include "call/rtp_stream_receiver_controller.h" #include "call/rtp_transport_controller_send.h" @@ -244,6 +246,13 @@ class Call final : public webrtc::Call, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) override; + void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override; + + void DeliverRtpPacket( + MediaType media_type, + RtpPacketReceived packet, + OnUndemuxablePacketHandler undemuxable_packet_handler) override; + void SignalChannelNetworkState(MediaType media, NetworkState state) override; void OnAudioTransportOverheadChanged( @@ -1375,60 +1384,102 @@ void Call::ConfigureSync(absl::string_view sync_group) { } } -void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) { - RTC_DCHECK_RUN_ON(network_thread_); +void Call::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) { + RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK(IsRtcpPacket(packet)); TRACE_EVENT0("webrtc", "Call::DeliverRtcp"); - // TODO(bugs.webrtc.org/11993): This DCHECK is here just to maintain the - // invariant that currently the only call path to this function is via - // `PeerConnection::InitializeRtcpCallback()`. DeliverRtp on the other hand - // gets called via the channel classes and - // WebRtc[Audio|Video]Channel's `OnPacketReceived`. We'll remove the - // PeerConnection involvement as well as - // `JsepTransportController::OnRtcpPacketReceived_n` and `rtcp_handler` - // and make sure that the flow of packets is consistent from the - // `RtpTransport` class, via the *Channel and *Engine classes and into Call. - // This way we'll also know more about the context of the packet. - RTC_DCHECK_EQ(media_type, MediaType::ANY); + receive_stats_.AddReceivedRtcpBytes(static_cast(packet.size())); + bool rtcp_delivered = false; + for (VideoReceiveStream2* stream : video_receive_streams_) { + if (stream->DeliverRtcp(packet.cdata(), packet.size())) + rtcp_delivered = true; + } - // TODO(bugs.webrtc.org/11993): This should execute directly on the network - // thread. - worker_thread_->PostTask( - SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() { - RTC_DCHECK_RUN_ON(worker_thread_); + for (AudioReceiveStreamImpl* stream : audio_receive_streams_) { + stream->DeliverRtcp(packet.cdata(), packet.size()); + rtcp_delivered = true; + } - receive_stats_.AddReceivedRtcpBytes(static_cast(packet.size())); - bool rtcp_delivered = false; - for (VideoReceiveStream2* stream : video_receive_streams_) { - if (stream->DeliverRtcp(packet.cdata(), packet.size())) - rtcp_delivered = true; - } + for (VideoSendStream* stream : video_send_streams_) { + stream->DeliverRtcp(packet.cdata(), packet.size()); + rtcp_delivered = true; + } - for (AudioReceiveStreamImpl* stream : audio_receive_streams_) { - stream->DeliverRtcp(packet.cdata(), packet.size()); - rtcp_delivered = true; - } + for (auto& kv : audio_send_ssrcs_) { + kv.second->DeliverRtcp(packet.cdata(), packet.size()); + rtcp_delivered = true; + } - for (VideoSendStream* stream : video_send_streams_) { - stream->DeliverRtcp(packet.cdata(), packet.size()); - rtcp_delivered = true; - } + if (rtcp_delivered) { + event_log_->Log(std::make_unique(packet)); + } +} - for (auto& kv : audio_send_ssrcs_) { - kv.second->DeliverRtcp(packet.cdata(), packet.size()); - rtcp_delivered = true; - } +void Call::DeliverRtpPacket( + MediaType media_type, + RtpPacketReceived packet, + OnUndemuxablePacketHandler undemuxable_packet_handler) { + RTC_DCHECK_RUN_ON(worker_thread_); + RTC_DCHECK(packet.arrival_time().IsFinite()); - if (rtcp_delivered) { - event_log_->Log(std::make_unique( - rtc::MakeArrayView(packet.cdata(), packet.size()))); - } - })); + if (receive_time_calculator_) { + int64_t packet_time_us = packet.arrival_time().us(); + // Repair packet_time_us for clock resets by comparing a new read of + // the same clock (TimeUTCMicros) to a monotonic clock reading. + packet_time_us = receive_time_calculator_->ReconcileReceiveTimes( + packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds()); + packet.set_arrival_time(Timestamp::Micros(packet_time_us)); + } + + // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6. + // These are empty (zero length payload) RTP packets with an unsignaled + // payload type. + const bool is_keep_alive_packet = packet.payload_size() == 0; + RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO || + is_keep_alive_packet); + NotifyBweOfReceivedPacket(packet, media_type); + + if (media_type != MediaType::AUDIO && media_type != MediaType::VIDEO) { + RTC_DCHECK(is_keep_alive_packet); + return; + } + + RtpStreamReceiverController& receiver_controller = + media_type == MediaType::AUDIO ? audio_receiver_controller_ + : video_receiver_controller_; + + if (!receiver_controller.OnRtpPacket(packet)) { + // Demuxing failed. Allow the caller to create a + // receive stream in order to handle unsignalled SSRCs and try again. + // Note that we dont want to call NotifyBweOfReceivedPacket twice per + // packet. + if (!undemuxable_packet_handler(packet)) { + return; + } + if (!receiver_controller.OnRtpPacket(packet)) { + RTC_LOG(LS_INFO) << "Failed to demux packet " << packet.Ssrc(); + return; + } + } + event_log_->Log(std::make_unique(packet)); + + // RateCounters expect input parameter as int, save it as int, + // instead of converting each time it is passed to RateCounter::Add below. + int length = static_cast(packet.size()); + if (media_type == MediaType::AUDIO) { + receive_stats_.AddReceivedAudioBytes(length, packet.arrival_time()); + } + if (media_type == MediaType::VIDEO) { + receive_stats_.AddReceivedVideoBytes(length, packet.arrival_time()); + } } PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { + // TODO(perkj, https://bugs.webrtc.org/7135): Deprecate this method and + // direcly use DeliverRtpPacket. TRACE_EVENT0("webrtc", "Call::DeliverRtp"); RTC_DCHECK_NE(media_type, MediaType::ANY); @@ -1437,52 +1488,24 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, return DELIVERY_PACKET_ERROR; if (packet_time_us != -1) { - if (receive_time_calculator_) { - // Repair packet_time_us for clock resets by comparing a new read of - // the same clock (TimeUTCMicros) to a monotonic clock reading. - packet_time_us = receive_time_calculator_->ReconcileReceiveTimes( - packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds()); - } parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us)); } else { parsed_packet.set_arrival_time(clock_->CurrentTime()); } - // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6. - // These are empty (zero length payload) RTP packets with an unsignaled - // payload type. - const bool is_keep_alive_packet = parsed_packet.payload_size() == 0; - - RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO || - is_keep_alive_packet); - if (!IdentifyReceivedPacket(parsed_packet)) return DELIVERY_UNKNOWN_SSRC; - - NotifyBweOfReceivedPacket(parsed_packet, media_type); - - // RateCounters expect input parameter as int, save it as int, - // instead of converting each time it is passed to RateCounter::Add below. - int length = static_cast(parsed_packet.size()); - if (media_type == MediaType::AUDIO) { - if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) { - receive_stats_.AddReceivedAudioBytes(length, - parsed_packet.arrival_time()); - event_log_->Log( - std::make_unique(parsed_packet)); - return DELIVERY_OK; - } - } else if (media_type == MediaType::VIDEO) { + if (media_type == MediaType::VIDEO) { parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency); - if (video_receiver_controller_.OnRtpPacket(parsed_packet)) { - receive_stats_.AddReceivedVideoBytes(length, - parsed_packet.arrival_time()); - event_log_->Log( - std::make_unique(parsed_packet)); - return DELIVERY_OK; - } } - return DELIVERY_UNKNOWN_SSRC; + DeliverRtpPacket(media_type, std::move(parsed_packet), + [](const webrtc::RtpPacketReceived& packet) { + // If IdentifyReceivedPacket returns true, a packet is + // expected to be demuxable. + RTC_DCHECK_NOTREACHED(); + return false; + }); + return DELIVERY_OK; } PacketReceiver::DeliveryStatus Call::DeliverPacket( @@ -1491,7 +1514,11 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( int64_t packet_time_us) { if (IsRtcpPacket(packet)) { RTC_DCHECK_RUN_ON(network_thread_); - DeliverRtcp(media_type, std::move(packet)); + worker_thread_->PostTask(SafeTask( + task_safety_.flag(), [this, packet = std::move(packet)]() mutable { + RTC_DCHECK_RUN_ON(worker_thread_); + DeliverRtcpPacket(std::move(packet)); + })); return DELIVERY_OK; } diff --git a/call/degraded_call.cc b/call/degraded_call.cc index c59a63ba69..445a1e90c2 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -414,6 +414,11 @@ PacketReceiver::DeliveryStatus DegradedCall::DeliverPacket( return status; } +void DegradedCall::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) { + receive_pipe_->DeliverRtcpPacket(std::move(packet)); + receive_pipe_->Process(); +} + void DegradedCall::SetClientBitratePreferences( const webrtc::BitrateSettings& preferences) { call_->SetClientBitratePreferences(preferences); diff --git a/call/degraded_call.h b/call/degraded_call.h index 5906e557f1..bff93e20ce 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -116,6 +116,7 @@ class DegradedCall : public Call, private PacketReceiver { DeliveryStatus DeliverPacket(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) override; + void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override; private: class FakeNetworkPipeOnTaskQueue { diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc index 8a03e0ce7a..60650640c8 100644 --- a/call/fake_network_pipe.cc +++ b/call/fake_network_pipe.cc @@ -184,6 +184,11 @@ PacketReceiver::DeliveryStatus FakeNetworkPipe::DeliverPacket( : PacketReceiver::DELIVERY_PACKET_ERROR; } +void FakeNetworkPipe::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) { + EnqueuePacket(std::move(packet), absl::nullopt, true, MediaType::ANY, + absl::nullopt); +} + void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) { MutexLock lock(&config_lock_); clock_offset_ms_ = offset_ms; diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h index be72e91637..3a6cb05643 100644 --- a/call/fake_network_pipe.h +++ b/call/fake_network_pipe.h @@ -149,6 +149,8 @@ class FakeNetworkPipe : public SimulatedPacketReceiverInterface { rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) override; + void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override; + // TODO(bugs.webrtc.org/9584): Needed to inherit the alternative signature for // this method. using PacketReceiver::DeliverPacket; diff --git a/call/packet_receiver.h b/call/packet_receiver.h index 13d3b84c90..a97bb965ff 100644 --- a/call/packet_receiver.h +++ b/call/packet_receiver.h @@ -10,7 +10,10 @@ #ifndef CALL_PACKET_RECEIVER_H_ #define CALL_PACKET_RECEIVER_H_ +#include "absl/functional/any_invocable.h" #include "api/media_types.h" +#include "modules/rtp_rtcp/source/rtp_packet_received.h" +#include "rtc_base/checks.h" #include "rtc_base/copy_on_write_buffer.h" namespace webrtc { @@ -27,6 +30,28 @@ class PacketReceiver { rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) = 0; + // Demux RTCP packets. Must be called on the worker thread. + virtual void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) { + // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and + // FakeNetworkPipe. + RTC_CHECK_NOTREACHED(); + } + + // Invoked once when a packet packet is received that can not be demuxed. + // If the method returns true, a new attempt is made to demux the packet. + using OnUndemuxablePacketHandler = + absl::AnyInvocable; + + // Demux RTP packets. Must be called on the worker thread. + virtual void DeliverRtpPacket( + MediaType media_type, + RtpPacketReceived packet, + OnUndemuxablePacketHandler undemuxable_packet_handler) { + // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and + // FakeNetworkPipe. + RTC_CHECK_NOTREACHED(); + } + protected: virtual ~PacketReceiver() {} }; diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 5de77fee9d..622870cfca 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -2978,10 +2978,11 @@ std::function PeerConnection::InitializeRtcpCallback() { RTC_DCHECK_RUN_ON(network_thread()); - return [this](const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) { - RTC_DCHECK_RUN_ON(network_thread()); - call_ptr_->Receiver()->DeliverPacket(MediaType::ANY, packet, - packet_time_us); + return [this](const rtc::CopyOnWriteBuffer& packet, + int64_t /*packet_time_us*/) { + worker_thread()->PostTask(SafeTask(worker_thread_safety_, [this, packet]() { + call_ptr_->Receiver()->DeliverRtcpPacket(packet); + })); }; }