From b6bbdeb24d0125b797bbf70898d58ca7b45c2b9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Fri, 13 Aug 2021 16:12:41 +0200 Subject: [PATCH] Allow RTP module thread checking to know PacketRouter status. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since https://webrtc-review.googlesource.com/c/src/+/228433 both audio and video now only call Get/SetRtpState while not registered to the packet router. We can thus remove the lock around packet sequencer and just use a thread checker. Bug: webrtc:11340 Change-Id: Ie6865cc96c36208700c31a75747ff4dd992ce68d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228435 Commit-Queue: Erik Språng Reviewed-by: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#34755} --- call/rtp_video_sender.cc | 9 +++ modules/pacing/packet_router.cc | 5 ++ modules/rtp_rtcp/mocks/mock_rtp_rtcp.h | 1 + modules/rtp_rtcp/source/rtp_rtcp_impl.cc | 2 + modules/rtp_rtcp/source/rtp_rtcp_impl.h | 2 + modules/rtp_rtcp/source/rtp_rtcp_impl2.cc | 79 +++++++++----------- modules/rtp_rtcp/source/rtp_rtcp_impl2.h | 12 ++- modules/rtp_rtcp/source/rtp_rtcp_interface.h | 9 +++ 8 files changed, 70 insertions(+), 49 deletions(-) diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 7f0e9dacab..f95c74337a 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -451,6 +451,15 @@ RtpVideoSender::RtpVideoSender( // Signal congestion controller this object is ready for OnPacket* callbacks. transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver( rtp_config_.ssrcs, this); + + // Construction happens on the worker thread (see Call::CreateVideoSendStream) + // but subseqeuent calls to the RTP state will happen on one of two threads: + // * The pacer thread for actually sending packets. + // * The transport thread when tearing down and quering GetRtpState(). + // Detach thread checkers. + for (const RtpStreamSender& stream : rtp_streams_) { + stream.rtp_rtcp->OnPacketSendingThreadSwitched(); + } } RtpVideoSender::~RtpVideoSender() { diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc index 3b1278e504..fcc7ee3449 100644 --- a/modules/pacing/packet_router.cc +++ b/modules/pacing/packet_router.cc @@ -68,6 +68,10 @@ void PacketRouter::AddSendRtpModule(RtpRtcpInterface* rtp_module, void PacketRouter::AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module, uint32_t ssrc) { RTC_DCHECK(send_modules_map_.find(ssrc) == send_modules_map_.end()); + + // Signal to module that the pacer thread is attached and can send packets. + rtp_module->OnPacketSendingThreadSwitched(); + // Always keep the audio modules at the back of the list, so that when we // iterate over the modules in order to find one that can send padding we // will prioritize video. This is important to make sure they are counted @@ -102,6 +106,7 @@ void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) { if (last_send_module_ == rtp_module) { last_send_module_ = nullptr; } + rtp_module->OnPacketSendingThreadSwitched(); } void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender, diff --git a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h index 59b8cf19b8..33c5a9bcf9 100644 --- a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h +++ b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h @@ -115,6 +115,7 @@ class MockRtpRtcpInterface : public RtpRtcpInterface { (rtc::ArrayView sequence_numbers), (const, override)); MOCK_METHOD(size_t, ExpectedPerPacketOverhead, (), (const, override)); + MOCK_METHOD(void, OnPacketSendingThreadSwitched, (), (override)); MOCK_METHOD(RtcpMode, RTCP, (), (const, override)); MOCK_METHOD(void, SetRTCPStatus, (RtcpMode method), (override)); MOCK_METHOD(int32_t, diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl.cc index f6f9afb427..34e8429f87 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.cc @@ -490,6 +490,8 @@ size_t ModuleRtpRtcpImpl::ExpectedPerPacketOverhead() const { return rtp_sender_->packet_generator.ExpectedPerPacketOverhead(); } +void ModuleRtpRtcpImpl::OnPacketSendingThreadSwitched() {} + size_t ModuleRtpRtcpImpl::MaxRtpPacketSize() const { RTC_DCHECK(rtp_sender_); return rtp_sender_->packet_generator.MaxRtpPacketSize(); diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/modules/rtp_rtcp/source/rtp_rtcp_impl.h index 81b1170b13..2ffe013483 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.h @@ -157,6 +157,8 @@ class ModuleRtpRtcpImpl : public RtpRtcp, public RTCPReceiver::ModuleRtpRtcp { size_t ExpectedPerPacketOverhead() const override; + void OnPacketSendingThreadSwitched() override; + // RTCP part. // Get RTCP status. diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index ec42f6132a..7d33ca08b9 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -64,22 +64,22 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( const RtpRtcpInterface::Configuration& config) : packet_history(config.clock, config.enable_rtx_padding_prioritization), deferred_sequencing_(config.use_deferred_sequencing), - sequencer_(config.local_media_ssrc, - config.rtx_send_ssrc, - /*require_marker_before_media_padding=*/!config.audio, - config.clock), + sequencer(config.local_media_ssrc, + config.rtx_send_ssrc, + /*require_marker_before_media_padding=*/!config.audio, + config.clock), packet_sender(config, &packet_history), non_paced_sender(&packet_sender, this, config.use_deferred_sequencing), packet_generator( config, &packet_history, config.paced_sender ? config.paced_sender : &non_paced_sender, - config.use_deferred_sequencing ? nullptr : &sequencer_) {} + config.use_deferred_sequencing ? nullptr : &sequencer) {} void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber( RtpPacketToSend* packet) { + RTC_DCHECK_RUN_ON(&sequencing_checker); if (deferred_sequencing_) { - MutexLock lock(&mutex_sequencer_); - sequencer_.Sequence(*packet); + sequencer.Sequence(*packet); } else { packet_generator.AssignSequenceNumber(packet); } @@ -101,10 +101,10 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) rtt_stats_(configuration.rtt_stats), rtt_ms_(0) { RTC_DCHECK(worker_queue_); - packet_sequence_checker_.Detach(); - pacer_thread_checker_.Detach(); + rtcp_thread_checker_.Detach(); if (!configuration.receiver_only) { rtp_sender_ = std::make_unique(configuration); + rtp_sender_->sequencing_checker.Detach(); // Make sure rtcp sender use same timestamp offset as rtp sender. rtcp_sender_.SetTimestampOffset( rtp_sender_->packet_generator.TimestampOffset()); @@ -162,7 +162,7 @@ absl::optional ModuleRtpRtcpImpl2::FlexfecSsrc() const { void ModuleRtpRtcpImpl2::IncomingRtcpPacket(const uint8_t* rtcp_packet, const size_t length) { - RTC_DCHECK_RUN_ON(&packet_sequence_checker_); + RTC_DCHECK_RUN_ON(&rtcp_thread_checker_); rtcp_receiver_.IncomingPacket(rtcp_packet, length); } @@ -187,20 +187,19 @@ void ModuleRtpRtcpImpl2::SetStartTimestamp(const uint32_t timestamp) { } uint16_t ModuleRtpRtcpImpl2::SequenceNumber() const { + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); if (rtp_sender_->deferred_sequencing_) { - MutexLock lock(&rtp_sender_->mutex_sequencer_); - return rtp_sender_->sequencer_.media_sequence_number(); + return rtp_sender_->sequencer.media_sequence_number(); } return rtp_sender_->packet_generator.SequenceNumber(); } // Set SequenceNumber, default is a random number. void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) { + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); if (rtp_sender_->deferred_sequencing_) { - RTC_DCHECK_RUN_ON(&pacer_thread_checker_); - MutexLock lock(&rtp_sender_->mutex_sequencer_); - if (rtp_sender_->sequencer_.media_sequence_number() != seq_num) { - rtp_sender_->sequencer_.set_media_sequence_number(seq_num); + if (rtp_sender_->sequencer.media_sequence_number() != seq_num) { + rtp_sender_->sequencer.set_media_sequence_number(seq_num); rtp_sender_->packet_history.Clear(); } } else { @@ -209,36 +208,36 @@ void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) { } void ModuleRtpRtcpImpl2::SetRtpState(const RtpState& rtp_state) { + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); rtp_sender_->packet_generator.SetRtpState(rtp_state); if (rtp_sender_->deferred_sequencing_) { - MutexLock lock(&rtp_sender_->mutex_sequencer_); - rtp_sender_->sequencer_.SetRtpState(rtp_state); + rtp_sender_->sequencer.SetRtpState(rtp_state); } rtcp_sender_.SetTimestampOffset(rtp_state.start_timestamp); } void ModuleRtpRtcpImpl2::SetRtxState(const RtpState& rtp_state) { + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); rtp_sender_->packet_generator.SetRtxRtpState(rtp_state); if (rtp_sender_->deferred_sequencing_) { - MutexLock lock(&rtp_sender_->mutex_sequencer_); - rtp_sender_->sequencer_.set_rtx_sequence_number(rtp_state.sequence_number); + rtp_sender_->sequencer.set_rtx_sequence_number(rtp_state.sequence_number); } } RtpState ModuleRtpRtcpImpl2::GetRtpState() const { + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); RtpState state = rtp_sender_->packet_generator.GetRtpState(); if (rtp_sender_->deferred_sequencing_) { - MutexLock lock(&rtp_sender_->mutex_sequencer_); - rtp_sender_->sequencer_.PopulateRtpState(state); + rtp_sender_->sequencer.PopulateRtpState(state); } return state; } RtpState ModuleRtpRtcpImpl2::GetRtxState() const { + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); RtpState state = rtp_sender_->packet_generator.GetRtxRtpState(); if (rtp_sender_->deferred_sequencing_) { - MutexLock lock(&rtp_sender_->mutex_sequencer_); - state.sequence_number = rtp_sender_->sequencer_.rtx_sequence_number(); + state.sequence_number = rtp_sender_->sequencer.rtx_sequence_number(); } return state; } @@ -249,7 +248,7 @@ void ModuleRtpRtcpImpl2::SetNonSenderRttMeasurement(bool enabled) { } uint32_t ModuleRtpRtcpImpl2::local_media_ssrc() const { - RTC_DCHECK_RUN_ON(&packet_sequence_checker_); + RTC_DCHECK_RUN_ON(&rtcp_thread_checker_); RTC_DCHECK_EQ(rtcp_receiver_.local_media_ssrc(), rtcp_sender_.SSRC()); return rtcp_receiver_.local_media_ssrc(); } @@ -335,12 +334,6 @@ bool ModuleRtpRtcpImpl2::Sending() const { // updated. void ModuleRtpRtcpImpl2::SetSendingMediaStatus(const bool sending) { if (rtp_sender_) { - // Turning on or off sending status indicates module being set - // up or torn down, detach thread checker since subsequent calls - // may be from a different thread. - if (rtp_sender_->packet_generator.SendingMedia() != sending) { - pacer_thread_checker_.Detach(); - } rtp_sender_->packet_generator.SetSendingMediaStatus(sending); } else { RTC_DCHECK(!sending); @@ -389,15 +382,14 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp, bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet, const PacedPacketInfo& pacing_info) { RTC_DCHECK(rtp_sender_); - RTC_DCHECK_RUN_ON(&pacer_thread_checker_); + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); if (rtp_sender_->deferred_sequencing_) { if (!rtp_sender_->packet_generator.SendingMedia()) { return false; } - MutexLock lock(&rtp_sender_->mutex_sequencer_); if (packet->packet_type() == RtpPacketMediaType::kPadding && packet->Ssrc() == rtp_sender_->packet_generator.SSRC() && - !rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()) { + !rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()) { // New media packet preempted this generated padding packet, discard it. return false; } @@ -405,7 +397,7 @@ bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet, packet->packet_type() == RtpPacketMediaType::kForwardErrorCorrection && packet->Ssrc() == rtp_sender_->packet_generator.FlexfecSsrc(); if (!is_flexfec) { - rtp_sender_->sequencer_.Sequence(*packet); + rtp_sender_->sequencer.Sequence(*packet); } } else if (!rtp_sender_->packet_generator.SendingMedia()) { return false; @@ -425,7 +417,7 @@ void ModuleRtpRtcpImpl2::SetFecProtectionParams( std::vector> ModuleRtpRtcpImpl2::FetchFecPackets() { RTC_DCHECK(rtp_sender_); - RTC_DCHECK_RUN_ON(&pacer_thread_checker_); + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); auto fec_packets = rtp_sender_->packet_sender.FetchFecPackets(); if (!fec_packets.empty() && !rtp_sender_->deferred_sequencing_) { // Only assign sequence numbers for FEC packets in non-deferred mode, and @@ -460,17 +452,15 @@ bool ModuleRtpRtcpImpl2::SupportsRtxPayloadPadding() const { std::vector> ModuleRtpRtcpImpl2::GeneratePadding(size_t target_size_bytes) { RTC_DCHECK(rtp_sender_); - RTC_DCHECK_RUN_ON(&pacer_thread_checker_); - MutexLock lock(&rtp_sender_->mutex_sequencer_); + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); // `can_send_padding_on_media_ssrc` set to false when deferred sequencing // is off. It will be ignored in that case, RTPSender will internally query - // `sequencer_` while holding the send lock instead. + // `sequencer` while holding the send lock instead. return rtp_sender_->packet_generator.GeneratePadding( target_size_bytes, rtp_sender_->packet_sender.MediaHasBeenSent(), - rtp_sender_->deferred_sequencing_ - ? rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc() + ? rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc() : false); } @@ -488,6 +478,11 @@ size_t ModuleRtpRtcpImpl2::ExpectedPerPacketOverhead() const { return rtp_sender_->packet_generator.ExpectedPerPacketOverhead(); } +void ModuleRtpRtcpImpl2::OnPacketSendingThreadSwitched() { + // Ownership of sequencing is being transferred to another thread. + rtp_sender_->sequencing_checker.Detach(); +} + size_t ModuleRtpRtcpImpl2::MaxRtpPacketSize() const { RTC_DCHECK(rtp_sender_); return rtp_sender_->packet_generator.MaxRtpPacketSize(); @@ -730,7 +725,7 @@ void ModuleRtpRtcpImpl2::SetRemoteSSRC(const uint32_t ssrc) { } void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) { - RTC_DCHECK_RUN_ON(&packet_sequence_checker_); + RTC_DCHECK_RUN_ON(&rtcp_thread_checker_); rtcp_receiver_.set_local_media_ssrc(local_ssrc); rtcp_sender_.SetSsrc(local_ssrc); } diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h index b468bb72cf..f01c0c066b 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h @@ -169,6 +169,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, size_t ExpectedPerPacketOverhead() const override; + void OnPacketSendingThreadSwitched() override; + // RTCP part. // Get RTCP status. @@ -269,12 +271,9 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, // If false, sequencing is owned by `packet_generator` and can happen on // several threads. If true, sequencing always happens on the pacer thread. const bool deferred_sequencing_; - // TODO(bugs.webrtc.org/11340): Remove lock one we can guarantee that - // setting/getting rtp state only happens after removal from packet sending - // code path. - mutable Mutex mutex_sequencer_; + SequenceChecker sequencing_checker; // Handles sequence number assignment and padding timestamp generation. - PacketSequencer sequencer_ RTC_GUARDED_BY(mutex_sequencer_); + PacketSequencer sequencer RTC_GUARDED_BY(sequencing_checker); // Handles final time timestamping/stats/etc and handover to Transport. RtpSenderEgress packet_sender; // If no paced sender configured, this class will be used to pass packets @@ -314,8 +313,7 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, TimeDelta duration); TaskQueueBase* const worker_queue_; - RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_; - RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_thread_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker rtcp_thread_checker_; std::unique_ptr rtp_sender_; RTCPSender rtcp_sender_; diff --git a/modules/rtp_rtcp/source/rtp_rtcp_interface.h b/modules/rtp_rtcp/source/rtp_rtcp_interface.h index e90d866188..c1a44fa4df 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_interface.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_interface.h @@ -347,6 +347,15 @@ class RtpRtcpInterface : public RtcpFeedbackSenderInterface { // when we expect to send them). virtual size_t ExpectedPerPacketOverhead() const = 0; + // Access to packet state (e.g. sequence numbering) must only be access by + // one thread at a time. It may be only one thread, or a construction thread + // that calls SetRtpState() - handing over to a pacer thread that calls + // TrySendPacket() - and at teardown ownership is handed to a destruciton + // thread that calls GetRtpState(). + // This method is used to signal that "ownership" of the rtp state is being + // transferred to another thread. + virtual void OnPacketSendingThreadSwitched() = 0; + // ************************************************************************** // RTCP // **************************************************************************