From c03a187391be0e802b27dbb51b25e69a796b84d4 Mon Sep 17 00:00:00 2001 From: Taylor Brandstetter Date: Wed, 2 Sep 2020 13:25:31 -0700 Subject: [PATCH] Default streams: don't block media even if on different transceiver. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes some edge cases where early media could cause default stream that block the actual signaled media from beind delivered. Bug: webrtc:11477 Change-Id: I8b26df63a690861bd19f083102d1395e882f8733 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/183120 Commit-Queue: Taylor Reviewed-by: Erik Språng Reviewed-by: Rasmus Brandt Cr-Commit-Position: refs/heads/master@{#32030} --- media/base/media_channel.h | 3 +- media/engine/webrtc_voice_engine.cc | 5 + media/engine/webrtc_voice_engine_unittest.cc | 24 +++ pc/BUILD.gn | 1 + pc/channel.cc | 62 +++++-- pc/channel.h | 19 ++- pc/channel_interface.h | 1 + pc/channel_manager.h | 19 +-- pc/peer_connection.cc | 83 +++++++++ pc/peer_connection.h | 4 + pc/test/mock_channel_interface.h | 1 + test/peer_scenario/peer_scenario_client.cc | 10 +- test/peer_scenario/peer_scenario_client.h | 11 +- test/peer_scenario/signaling_route.cc | 16 +- test/peer_scenario/signaling_route.h | 12 ++ test/peer_scenario/tests/BUILD.gn | 4 + .../tests/unsignaled_stream_test.cc | 157 ++++++++++++++++++ 17 files changed, 393 insertions(+), 39 deletions(-) create mode 100644 test/peer_scenario/tests/unsignaled_stream_test.cc diff --git a/media/base/media_channel.h b/media/base/media_channel.h index e8400a58a9..812d4929e1 100644 --- a/media/base/media_channel.h +++ b/media/base/media_channel.h @@ -204,7 +204,8 @@ class MediaChannel : public sigslot::has_slots<> { // ssrc must be the first SSRC of the media stream if the stream uses // multiple SSRCs. virtual bool RemoveRecvStream(uint32_t ssrc) = 0; - // Resets any cached StreamParams for an unsignaled RecvStream. + // Resets any cached StreamParams for an unsignaled RecvStream, and removes + // any existing unsignaled streams. virtual void ResetUnsignaledRecvStream() = 0; // Returns the absoulte sendtime extension id value from media channel. virtual int GetRtpSendTimeExtnId() const; diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc index 41308bafa6..d0d4b15e55 100644 --- a/media/engine/webrtc_voice_engine.cc +++ b/media/engine/webrtc_voice_engine.cc @@ -2028,6 +2028,11 @@ void WebRtcVoiceMediaChannel::ResetUnsignaledRecvStream() { RTC_DCHECK(worker_thread_checker_.IsCurrent()); RTC_LOG(LS_INFO) << "ResetUnsignaledRecvStream."; unsignaled_stream_params_ = StreamParams(); + // Create a copy since RemoveRecvStream will modify |unsignaled_recv_ssrcs_|. + std::vector to_remove = unsignaled_recv_ssrcs_; + for (uint32_t ssrc : to_remove) { + RemoveRecvStream(ssrc); + } } bool WebRtcVoiceMediaChannel::SetLocalSource(uint32_t ssrc, diff --git a/media/engine/webrtc_voice_engine_unittest.cc b/media/engine/webrtc_voice_engine_unittest.cc index 30109748bb..f844b50d00 100644 --- a/media/engine/webrtc_voice_engine_unittest.cc +++ b/media/engine/webrtc_voice_engine_unittest.cc @@ -2722,6 +2722,30 @@ TEST_P(WebRtcVoiceEngineTestFake, RecvUnsignaledSsrcWithSignaledStreamId) { EXPECT_TRUE(GetRecvStream(kSsrc1).GetConfig().sync_group.empty()); } +TEST_P(WebRtcVoiceEngineTestFake, + ResetUnsignaledRecvStreamDeletesAllDefaultStreams) { + ASSERT_TRUE(SetupChannel()); + // No receive streams to start with. + ASSERT_TRUE(call_.GetAudioReceiveStreams().empty()); + + // Deliver a couple packets with unsignaled SSRCs. + unsigned char packet[sizeof(kPcmuFrame)]; + memcpy(packet, kPcmuFrame, sizeof(kPcmuFrame)); + rtc::SetBE32(&packet[8], 0x1234); + DeliverPacket(packet, sizeof(packet)); + rtc::SetBE32(&packet[8], 0x5678); + DeliverPacket(packet, sizeof(packet)); + + // Verify that the receive streams were created. + const auto& receivers1 = call_.GetAudioReceiveStreams(); + ASSERT_EQ(receivers1.size(), 2u); + + // Should remove all default streams. + channel_->ResetUnsignaledRecvStream(); + const auto& receivers2 = call_.GetAudioReceiveStreams(); + EXPECT_EQ(0u, receivers2.size()); +} + // Test that receiving N unsignaled stream works (streams will be created), and // that packets are forwarded to them all. TEST_P(WebRtcVoiceEngineTestFake, RecvMultipleUnsignaled) { diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 712449f1c2..dd35de0051 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -115,6 +115,7 @@ rtc_library("rtc_pc_base") { "../rtc_base:rtc_task_queue", "../rtc_base:stringutils", "../rtc_base/synchronization:mutex", + "../rtc_base/synchronization:sequence_checker", "../rtc_base/system:file_wrapper", "../rtc_base/system:rtc_export", "../rtc_base/third_party/base64", diff --git a/pc/channel.cc b/pc/channel.cc index eeba19b2c1..accc233aa1 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -30,6 +30,7 @@ #include "rtc_base/logging.h" #include "rtc_base/network_route.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/trace_event.h" namespace cricket { @@ -206,7 +207,7 @@ void BaseChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) { } void BaseChannel::Deinit() { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->SetInterface(/*iface=*/nullptr); // Packets arrive on the network thread, processing packets calls virtual // functions, so need to stop this process in Deinit that is called in @@ -289,6 +290,13 @@ bool BaseChannel::SetRemoteContent(const MediaContentDescription* content, Bind(&BaseChannel::SetRemoteContent_w, this, content, type, error_desc)); } +void BaseChannel::SetPayloadTypeDemuxingEnabled(bool enabled) { + TRACE_EVENT0("webrtc", "BaseChannel::SetPayloadTypeDemuxingEnabled"); + InvokeOnWorker( + RTC_FROM_HERE, + Bind(&BaseChannel::SetPayloadTypeDemuxingEnabled_w, this, enabled)); +} + bool BaseChannel::IsReadyToReceiveMedia_w() const { // Receive data if we are enabled and have local content, return enabled() && @@ -330,7 +338,7 @@ int BaseChannel::SetOption(SocketType type, int BaseChannel::SetOption_n(SocketType type, rtc::Socket::Option opt, int value) { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(rtp_transport_); switch (type) { case ST_RTP: @@ -346,7 +354,7 @@ int BaseChannel::SetOption_n(SocketType type, } void BaseChannel::OnWritableState(bool writable) { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); if (writable) { ChannelWritable_n(); } else { @@ -358,7 +366,7 @@ void BaseChannel::OnNetworkRouteChanged( absl::optional network_route) { RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed."; - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); rtc::NetworkRoute new_route; if (network_route) { new_route = *(network_route); @@ -479,7 +487,7 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { invoker_.AsyncInvoke( RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnPacketReceived(packet_buffer, packet_time_us); }); } @@ -537,7 +545,7 @@ void BaseChannel::UpdateWritableState_n() { } void BaseChannel::ChannelWritable_n() { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); if (writable_) { return; } @@ -551,7 +559,7 @@ void BaseChannel::ChannelWritable_n() { } void BaseChannel::ChannelNotWritable_n() { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); if (!writable_) return; @@ -575,6 +583,24 @@ void BaseChannel::ResetUnsignaledRecvStream_w() { media_channel()->ResetUnsignaledRecvStream(); } +void BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) { + RTC_DCHECK_RUN_ON(worker_thread()); + if (enabled == payload_type_demuxing_enabled_) { + return; + } + if (!enabled) { + // TODO(crbug.com/11477): This will remove *all* unsignaled streams (those + // without an explicitly signaled SSRC), which may include streams that + // were matched to this channel by MID or RID. Ideally we'd remove only the + // streams that were matched based on payload type alone, but currently + // there is no straightforward way to identify those streams. + media_channel()->ResetUnsignaledRecvStream(); + ClearHandledPayloadTypes(); + RegisterRtpDemuxerSink(); + } + payload_type_demuxing_enabled_ = enabled; +} + bool BaseChannel::UpdateLocalStreams_w(const std::vector& streams, SdpType type, std::string* error_desc) { @@ -741,7 +767,7 @@ void BaseChannel::OnMessage(rtc::Message* pmsg) { switch (pmsg->message_id) { case MSG_SEND_RTP_PACKET: case MSG_SEND_RTCP_PACKET: { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); SendPacketMessageData* data = static_cast(pmsg->pdata); bool rtcp = pmsg->message_id == MSG_SEND_RTCP_PACKET; @@ -756,8 +782,10 @@ void BaseChannel::OnMessage(rtc::Message* pmsg) { } } -void BaseChannel::AddHandledPayloadType(int payload_type) { - demuxer_criteria_.payload_types.insert(static_cast(payload_type)); +void BaseChannel::MaybeAddHandledPayloadType(int payload_type) { + if (payload_type_demuxing_enabled_) { + demuxer_criteria_.payload_types.insert(static_cast(payload_type)); + } } void BaseChannel::ClearHandledPayloadTypes() { @@ -767,7 +795,7 @@ void BaseChannel::ClearHandledPayloadTypes() { void BaseChannel::FlushRtcpMessages_n() { // Flush all remaining RTCP messages. This should only be called in // destructor. - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); rtc::MessageList rtcp_messages; network_thread_->Clear(this, MSG_SEND_RTCP_PACKET, &rtcp_messages); for (const auto& message : rtcp_messages) { @@ -777,10 +805,10 @@ void BaseChannel::FlushRtcpMessages_n() { } void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [this, sent_packet] { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(worker_thread()); SignalSentPacket(sent_packet); }); } @@ -810,7 +838,7 @@ VoiceChannel::~VoiceChannel() { } void BaseChannel::UpdateMediaSendRecvState() { - RTC_DCHECK(network_thread_->IsCurrent()); + RTC_DCHECK_RUN_ON(network_thread()); invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [this] { UpdateMediaSendRecvState_w(); }); } @@ -869,7 +897,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content, if (webrtc::RtpTransceiverDirectionHasRecv(audio->direction())) { for (const AudioCodec& codec : audio->codecs()) { - AddHandledPayloadType(codec.id); + MaybeAddHandledPayloadType(codec.id); } // Need to re-register the sink to update the handled payload. if (!RegisterRtpDemuxerSink()) { @@ -1062,7 +1090,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content, if (webrtc::RtpTransceiverDirectionHasRecv(video->direction())) { for (const VideoCodec& codec : video->codecs()) { - AddHandledPayloadType(codec.id); + MaybeAddHandledPayloadType(codec.id); } // Need to re-register the sink to update the handled payload. if (!RegisterRtpDemuxerSink()) { @@ -1287,7 +1315,7 @@ bool RtpDataChannel::SetLocalContent_w(const MediaContentDescription* content, return false; } for (const DataCodec& codec : data->codecs()) { - AddHandledPayloadType(codec.id); + MaybeAddHandledPayloadType(codec.id); } // Need to re-register the sink to update the handled payload. if (!RegisterRtpDemuxerSink()) { diff --git a/pc/channel.h b/pc/channel.h index 44374b176b..bda2da4ba1 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -39,7 +39,9 @@ #include "rtc_base/async_invoker.h" #include "rtc_base/async_udp_socket.h" #include "rtc_base/network.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/third_party/sigslot/sigslot.h" +#include "rtc_base/thread_annotations.h" #include "rtc_base/unique_id_generator.h" namespace webrtc { @@ -124,6 +126,15 @@ class BaseChannel : public ChannelInterface, bool SetRemoteContent(const MediaContentDescription* content, webrtc::SdpType type, std::string* error_desc) override; + // Controls whether this channel will receive packets on the basis of + // matching payload type alone. This is needed for legacy endpoints that + // don't signal SSRCs or use MID/RID, but doesn't make sense if there is + // more than channel of specific media type, As that creates an ambiguity. + // + // This method will also remove any existing streams that were bound to this + // channel on the basis of payload type, since one of these streams might + // actually belong to a new channel. See: crbug.com/webrtc/11477 + void SetPayloadTypeDemuxingEnabled(bool enabled) override; bool Enable(bool enable) override; @@ -224,6 +235,7 @@ class BaseChannel : public ChannelInterface, bool AddRecvStream_w(const StreamParams& sp); bool RemoveRecvStream_w(uint32_t ssrc); void ResetUnsignaledRecvStream_w(); + void SetPayloadTypeDemuxingEnabled_w(bool enabled); bool AddSendStream_w(const StreamParams& sp); bool RemoveSendStream_w(uint32_t ssrc); @@ -261,9 +273,11 @@ class BaseChannel : public ChannelInterface, return worker_thread_->Invoke(posted_from, functor); } - void AddHandledPayloadType(int payload_type); + // Add |payload_type| to |demuxer_criteria_| if payload type demuxing is + // enabled. + void MaybeAddHandledPayloadType(int payload_type) RTC_RUN_ON(worker_thread()); - void ClearHandledPayloadTypes(); + void ClearHandledPayloadTypes() RTC_RUN_ON(worker_thread()); void UpdateRtpHeaderExtensionMap( const RtpHeaderExtensions& header_extensions); @@ -308,6 +322,7 @@ class BaseChannel : public ChannelInterface, // well, but it can be changed only when signaling thread does a synchronous // call to the worker thread, so it should be safe. bool enabled_ = false; + bool payload_type_demuxing_enabled_ RTC_GUARDED_BY(worker_thread()) = true; std::vector local_streams_; std::vector remote_streams_; webrtc::RtpTransceiverDirection local_content_direction_ = diff --git a/pc/channel_interface.h b/pc/channel_interface.h index cd29ed4f84..f510c9442d 100644 --- a/pc/channel_interface.h +++ b/pc/channel_interface.h @@ -52,6 +52,7 @@ class ChannelInterface { virtual bool SetRemoteContent(const MediaContentDescription* content, webrtc::SdpType type, std::string* error_desc) = 0; + virtual void SetPayloadTypeDemuxingEnabled(bool enabled) = 0; // Access to the local and remote streams that were set on the channel. virtual const std::vector& local_streams() const = 0; diff --git a/pc/channel_manager.h b/pc/channel_manager.h index 8d5fc0aa5b..ba2c260099 100644 --- a/pc/channel_manager.h +++ b/pc/channel_manager.h @@ -96,16 +96,15 @@ class ChannelManager final { // call the appropriate Destroy*Channel method when done. // Creates a voice channel, to be associated with the specified session. - VoiceChannel* CreateVoiceChannel( - webrtc::Call* call, - const cricket::MediaConfig& media_config, - webrtc::RtpTransportInternal* rtp_transport, - rtc::Thread* signaling_thread, - const std::string& content_name, - bool srtp_required, - const webrtc::CryptoOptions& crypto_options, - rtc::UniqueRandomIdGenerator* ssrc_generator, - const AudioOptions& options); + VoiceChannel* CreateVoiceChannel(webrtc::Call* call, + const cricket::MediaConfig& media_config, + webrtc::RtpTransportInternal* rtp_transport, + rtc::Thread* signaling_thread, + const std::string& content_name, + bool srtp_required, + const webrtc::CryptoOptions& crypto_options, + rtc::UniqueRandomIdGenerator* ssrc_generator, + const AudioOptions& options); // Destroys a voice channel created by CreateVoiceChannel. void DestroyVoiceChannel(VoiceChannel* voice_channel); diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index a39d3c6c42..71c9de452d 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -5982,6 +5982,87 @@ RTCError PeerConnection::UpdateSessionState( return RTCError::OK(); } +void PeerConnection::UpdatePayloadTypeDemuxingState( + cricket::ContentSource source) { + // We may need to delete any created default streams and disable creation of + // new ones on the basis of payload type. This is needed to avoid SSRC + // collisions in Call's RtpDemuxer, in the case that a transceiver has + // created a default stream, and then some other channel gets the SSRC + // signaled in the corresponding Unified Plan "m=" section. For more context + // see https://bugs.chromium.org/p/webrtc/issues/detail?id=11477 + const SessionDescriptionInterface* sdesc = + (source == cricket::CS_LOCAL ? local_description() + : remote_description()); + size_t num_receiving_video_transceivers = 0; + size_t num_receiving_audio_transceivers = 0; + for (auto& content_info : sdesc->description()->contents()) { + if (content_info.rejected || + (source == cricket::ContentSource::CS_LOCAL && + !RtpTransceiverDirectionHasRecv( + content_info.media_description()->direction())) || + (source == cricket::ContentSource::CS_REMOTE && + !RtpTransceiverDirectionHasSend( + content_info.media_description()->direction()))) { + // Ignore transceivers that are not receiving. + continue; + } + switch (content_info.media_description()->type()) { + case cricket::MediaType::MEDIA_TYPE_AUDIO: + ++num_receiving_audio_transceivers; + break; + case cricket::MediaType::MEDIA_TYPE_VIDEO: + ++num_receiving_video_transceivers; + break; + default: + // Ignore data channels. + continue; + } + } + bool pt_demuxing_enabled_video = num_receiving_video_transceivers <= 1; + bool pt_demuxing_enabled_audio = num_receiving_audio_transceivers <= 1; + + // Gather all updates ahead of time so that all channels can be updated in a + // single Invoke; necessary due to thread guards. + std::vector> + channels_to_update; + for (const auto& transceiver : transceivers_) { + cricket::ChannelInterface* channel = transceiver->internal()->channel(); + const ContentInfo* content = + FindMediaSectionForTransceiver(transceiver, sdesc); + if (!channel || !content) { + continue; + } + RtpTransceiverDirection local_direction = + content->media_description()->direction(); + if (source == cricket::CS_REMOTE) { + local_direction = RtpTransceiverDirectionReversed(local_direction); + } + channels_to_update.emplace_back(local_direction, + transceiver->internal()->channel()); + } + + if (!channels_to_update.empty()) { + worker_thread()->Invoke( + RTC_FROM_HERE, [&channels_to_update, pt_demuxing_enabled_audio, + pt_demuxing_enabled_video]() { + for (const auto& it : channels_to_update) { + RtpTransceiverDirection local_direction = it.first; + cricket::ChannelInterface* channel = it.second; + cricket::MediaType media_type = channel->media_type(); + if (media_type == cricket::MediaType::MEDIA_TYPE_AUDIO) { + channel->SetPayloadTypeDemuxingEnabled( + pt_demuxing_enabled_audio && + RtpTransceiverDirectionHasRecv(local_direction)); + } else if (media_type == cricket::MediaType::MEDIA_TYPE_VIDEO) { + channel->SetPayloadTypeDemuxingEnabled( + pt_demuxing_enabled_video && + RtpTransceiverDirectionHasRecv(local_direction)); + } + } + }); + } +} + RTCError PeerConnection::PushdownMediaDescription( SdpType type, cricket::ContentSource source) { @@ -5990,6 +6071,8 @@ RTCError PeerConnection::PushdownMediaDescription( : remote_description()); RTC_DCHECK(sdesc); + UpdatePayloadTypeDemuxingState(source); + // Push down the new SDP media section for each audio/video transceiver. for (const auto& transceiver : transceivers_) { const ContentInfo* content_info = diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 74c4ebee3c..8e32608f7d 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -936,6 +936,10 @@ class PeerConnection : public PeerConnectionInternal, RTCError UpdateSessionState(SdpType type, cricket::ContentSource source, const cricket::SessionDescription* description); + // Based on number of transceivers per media type, enabled or disable + // payload type based demuxing in the affected channels. + void UpdatePayloadTypeDemuxingState(cricket::ContentSource source) + RTC_RUN_ON(signaling_thread()); // Push the media parts of the local or remote session description // down to all of the channels. RTCError PushdownMediaDescription(SdpType type, cricket::ContentSource source) diff --git a/pc/test/mock_channel_interface.h b/pc/test/mock_channel_interface.h index 2df3baee47..3c3a4ee67e 100644 --- a/pc/test/mock_channel_interface.h +++ b/pc/test/mock_channel_interface.h @@ -46,6 +46,7 @@ class MockChannelInterface : public cricket::ChannelInterface { webrtc::SdpType, std::string*), (override)); + MOCK_METHOD(void, SetPayloadTypeDemuxingEnabled, (bool), (override)); MOCK_METHOD(const std::vector&, local_streams, (), diff --git a/test/peer_scenario/peer_scenario_client.cc b/test/peer_scenario/peer_scenario_client.cc index 465ee45adb..1ced030f34 100644 --- a/test/peer_scenario/peer_scenario_client.cc +++ b/test/peer_scenario/peer_scenario_client.cc @@ -228,6 +228,9 @@ PeerScenarioClient::PeerScenarioClient( pcf_deps.network_state_predictor_factory = nullptr; pc_factory_ = CreateModularPeerConnectionFactory(std::move(pcf_deps)); + PeerConnectionFactoryInterface::Options pc_options; + pc_options.disable_encryption = config.disable_encryption; + pc_factory_->SetOptions(pc_options); PeerConnectionDependencies pc_deps(observer_.get()); pc_deps.allocator = @@ -285,14 +288,17 @@ void PeerScenarioClient::AddVideoReceiveSink( } void PeerScenarioClient::CreateAndSetSdp( + std::function munge_offer, std::function offer_handler) { RTC_DCHECK_RUN_ON(signaling_thread_); peer_connection_->CreateOffer( SdpCreateObserver([=](SessionDescriptionInterface* offer) { RTC_DCHECK_RUN_ON(signaling_thread_); + if (munge_offer) { + munge_offer(offer); + } std::string sdp_offer; - offer->ToString(&sdp_offer); - RTC_LOG(LS_INFO) << sdp_offer; + RTC_CHECK(offer->ToString(&sdp_offer)); peer_connection_->SetLocalDescription( SdpSetObserver( [sdp_offer, offer_handler]() { offer_handler(sdp_offer); }), diff --git a/test/peer_scenario/peer_scenario_client.h b/test/peer_scenario/peer_scenario_client.h index d939d7f3a7..65ad0734db 100644 --- a/test/peer_scenario/peer_scenario_client.h +++ b/test/peer_scenario/peer_scenario_client.h @@ -89,6 +89,7 @@ class PeerScenarioClient { {0, EmulatedEndpointConfig()}}; CallbackHandlers handlers; PeerConnectionInterface::RTCConfiguration rtc_config; + bool disable_encryption = false; Config() { rtc_config.sdp_semantics = SdpSemantics::kUnifiedPlan; } }; @@ -136,9 +137,13 @@ class PeerScenarioClient { CallbackHandlers* handlers() { return &handlers_; } - // Note that there's no provision for munging SDP as that is deprecated - // behavior. - void CreateAndSetSdp(std::function offer_handler); + // The |munge_offer| function can be used to munge the SDP, i.e. modify a + // local description afer creating it but before setting it. Note that this is + // legacy behavior. It's added here only to be able to have test coverage for + // scenarios even if they are not spec compliant. + void CreateAndSetSdp( + std::function munge_offer, + std::function offer_handler); void SetSdpOfferAndGetAnswer(std::string remote_offer, std::function answer_handler); void SetSdpAnswer( diff --git a/test/peer_scenario/signaling_route.cc b/test/peer_scenario/signaling_route.cc index 1e5b9aad9a..2e0213df16 100644 --- a/test/peer_scenario/signaling_route.cc +++ b/test/peer_scenario/signaling_route.cc @@ -58,9 +58,10 @@ void StartSdpNegotiation( PeerScenarioClient* callee, TrafficRoute* send_route, TrafficRoute* ret_route, + std::function munge_offer, std::function modify_offer, std::function exchange_finished) { - caller->CreateAndSetSdp([=](std::string sdp_offer) { + caller->CreateAndSetSdp(munge_offer, [=](std::string sdp_offer) { if (modify_offer) { auto offer = CreateSessionDescription(SdpType::kOffer, sdp_offer); modify_offer(offer.get()); @@ -92,15 +93,22 @@ void SignalingRoute::StartIceSignaling() { } void SignalingRoute::NegotiateSdp( + std::function munge_offer, std::function modify_offer, std::function exchange_finished) { - StartSdpNegotiation(caller_, callee_, send_route_, ret_route_, modify_offer, - exchange_finished); + StartSdpNegotiation(caller_, callee_, send_route_, ret_route_, munge_offer, + modify_offer, exchange_finished); +} + +void SignalingRoute::NegotiateSdp( + std::function modify_offer, + std::function exchange_finished) { + NegotiateSdp({}, modify_offer, exchange_finished); } void SignalingRoute::NegotiateSdp( std::function exchange_finished) { - NegotiateSdp({}, exchange_finished); + NegotiateSdp({}, {}, exchange_finished); } } // namespace test diff --git a/test/peer_scenario/signaling_route.h b/test/peer_scenario/signaling_route.h index 189c4b6f3f..7434551d3f 100644 --- a/test/peer_scenario/signaling_route.h +++ b/test/peer_scenario/signaling_route.h @@ -30,7 +30,19 @@ class SignalingRoute { void StartIceSignaling(); + // The |modify_offer| callback is used to modify an offer after the local + // description has been set. This is legal (but odd) behavior. + // The |munge_offer| callback is used to modify an offer between its creation + // and set local description. This behavior is forbidden according to the spec + // but available here in order to allow test coverage on corner cases. + // The |exchange_finished| callback is called with the answer produced after + // SDP negotations has completed. // TODO(srte): Handle lossy links. + void NegotiateSdp( + std::function munge_offer, + std::function modify_offer, + std::function + exchange_finished); void NegotiateSdp( std::function modify_offer, std::function diff --git a/test/peer_scenario/tests/BUILD.gn b/test/peer_scenario/tests/BUILD.gn index 35528626f8..0cf7cf3472 100644 --- a/test/peer_scenario/tests/BUILD.gn +++ b/test/peer_scenario/tests/BUILD.gn @@ -14,12 +14,16 @@ if (rtc_include_tests) { sources = [ "peer_scenario_quality_test.cc", "remote_estimate_test.cc", + "unsignaled_stream_test.cc", ] deps = [ "..:peer_scenario", "../../:field_trial", + "../../:rtp_test_utils", "../../:test_support", + "../../../media:rtc_media_base", "../../../modules/rtp_rtcp:rtp_rtcp", + "../../../modules/rtp_rtcp:rtp_rtcp_format", "../../../pc:rtc_pc_base", ] } diff --git a/test/peer_scenario/tests/unsignaled_stream_test.cc b/test/peer_scenario/tests/unsignaled_stream_test.cc new file mode 100644 index 0000000000..5f470a833b --- /dev/null +++ b/test/peer_scenario/tests/unsignaled_stream_test.cc @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "media/base/stream_params.h" +#include "modules/rtp_rtcp/source/byte_io.h" + +#include "pc/media_session.h" +#include "pc/session_description.h" +#include "test/field_trial.h" +#include "test/peer_scenario/peer_scenario.h" +#include "test/rtp_header_parser.h" + +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { +namespace test { +namespace { + +class FrameObserver : public rtc::VideoSinkInterface { + public: + FrameObserver() : frame_observed_(false) {} + void OnFrame(const VideoFrame&) override { frame_observed_ = true; } + + std::atomic frame_observed_; +}; + +uint32_t get_ssrc(SessionDescriptionInterface* offer, size_t track_index) { + EXPECT_LT(track_index, offer->description()->contents().size()); + return offer->description() + ->contents()[track_index] + .media_description() + ->streams()[0] + .ssrcs[0]; +} + +void set_ssrc(SessionDescriptionInterface* offer, size_t index, uint32_t ssrc) { + EXPECT_LT(index, offer->description()->contents().size()); + cricket::StreamParams& new_stream_params = offer->description() + ->contents()[index] + .media_description() + ->mutable_streams()[0]; + new_stream_params.ssrcs[0] = ssrc; + new_stream_params.ssrc_groups[0].ssrcs[0] = ssrc; +} + +} // namespace + +TEST(UnsignaledStreamTest, ReplacesUnsignaledStreamOnCompletedSignaling) { + // This test covers a scenario that might occur if a remote client starts + // sending media packets before negotiation has completed. These packets will + // trigger an unsignalled default stream to be created, and connects that to + // a default video sink. + // In some edge cases using unified plan, the default stream is create in a + // different transceiver to where the media SSRC will actually be used. + // This test verifies that the default stream is removed properly, and that + // packets are demuxed and video frames reach the desired sink. + + // Defined before PeerScenario so it gets destructed after, to avoid use after + // free. + PeerScenario s(*test_info_); + + PeerScenarioClient::Config config = PeerScenarioClient::Config(); + // Disable encryption so that we can inject a fake early media packet without + // triggering srtp failures. + config.disable_encryption = true; + auto* caller = s.CreateClient(config); + auto* callee = s.CreateClient(config); + + auto send_node = s.net()->NodeBuilder().Build().node; + auto ret_node = s.net()->NodeBuilder().Build().node; + + s.net()->CreateRoute(caller->endpoint(), {send_node}, callee->endpoint()); + s.net()->CreateRoute(callee->endpoint(), {ret_node}, caller->endpoint()); + + auto signaling = s.ConnectSignaling(caller, callee, {send_node}, {ret_node}); + PeerScenarioClient::VideoSendTrackConfig video_conf; + video_conf.generator.squares_video->framerate = 15; + + auto first_track = caller->CreateVideo("VIDEO", video_conf); + FrameObserver first_sink; + callee->AddVideoReceiveSink(first_track.track->id(), &first_sink); + + signaling.StartIceSignaling(); + std::atomic offer_exchange_done(false); + std::atomic got_unsignaled_packet(false); + + // We will capture the media ssrc of the first added stream, and preemptively + // inject a new media packet using a different ssrc. + // This will create "default stream" for the second ssrc and connected it to + // the default video sink (not set in this test). + uint32_t first_ssrc = 0; + uint32_t second_ssrc = 0; + + signaling.NegotiateSdp( + /* munge_sdp = */ {}, + /* modify_sdp = */ + [&](SessionDescriptionInterface* offer) { + first_ssrc = get_ssrc(offer, 0); + second_ssrc = first_ssrc + 1; + + send_node->router()->SetWatcher([&](const EmulatedIpPacket& packet) { + if (packet.size() > 1 && packet.cdata()[0] >> 6 == 2 && + !RtpHeaderParser::IsRtcp(packet.data.cdata(), + packet.data.size())) { + if (ByteReader::ReadBigEndian(&(packet.cdata()[8])) == + first_ssrc && + !got_unsignaled_packet) { + rtc::CopyOnWriteBuffer updated_buffer = packet.data; + ByteWriter::WriteBigEndian(&updated_buffer.data()[8], + second_ssrc); + EmulatedIpPacket updated_packet( + packet.from, packet.to, updated_buffer, packet.arrival_time); + send_node->OnPacketReceived(std::move(updated_packet)); + got_unsignaled_packet = true; + } + } + }); + }, + [&](const SessionDescriptionInterface& answer) { + EXPECT_EQ(answer.description()->contents().size(), 1u); + offer_exchange_done = true; + }); + EXPECT_TRUE(s.WaitAndProcess(&offer_exchange_done)); + EXPECT_TRUE(s.WaitAndProcess(&got_unsignaled_packet)); + EXPECT_TRUE(s.WaitAndProcess(&first_sink.frame_observed_)); + + auto second_track = caller->CreateVideo("VIDEO2", video_conf); + FrameObserver second_sink; + callee->AddVideoReceiveSink(second_track.track->id(), &second_sink); + + // Create a second video stream, munge the sdp to force it to use our fake + // early media ssrc. + offer_exchange_done = false; + signaling.NegotiateSdp( + /* munge_sdp = */ + [&](SessionDescriptionInterface* offer) { + set_ssrc(offer, 1, second_ssrc); + }, + /* modify_sdp = */ {}, + [&](const SessionDescriptionInterface& answer) { + EXPECT_EQ(answer.description()->contents().size(), 2u); + offer_exchange_done = true; + }); + EXPECT_TRUE(s.WaitAndProcess(&offer_exchange_done)); + EXPECT_TRUE(s.WaitAndProcess(&second_sink.frame_observed_)); +} + +} // namespace test +} // namespace webrtc