From f554b3c577f69fa9ffad5c07155898c2d985ac76 Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Mon, 8 Feb 2021 16:00:10 +0100 Subject: [PATCH] Remove thread hops from events provided by JsepTransportController. Events associated with Subscribe* methods in JTC had trampolines that would use an async invoker to fire the events on the signaling thread. This was being done for the purposes of PeerConnection but the concept of a signaling thread is otherwise not applicable to JTC and use of JTC from PC is inconsistent across threads (as has been flagged in webrtc:9987). This change makes all CallbackList members only accessible from the network thread and moves the signaling thread related work over to PeerConnection, which makes hops there more visible as well as making that class easier to refactor for thread efficiency. This CL removes the AsyncInvoker from JTC (webrtc:12339) The signaling_thread_ variable is also removed from JTC and more thread checks added to catch errors. Bug: webrtc:12427, webrtc:11988, webrtc:12339 Change-Id: Id232aedd00dfd5403b2ba0ca147d3eca7c12c7c5 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/206062 Commit-Queue: Tommi Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#33195} --- pc/connection_context.h | 4 +- pc/jsep_transport_controller.cc | 65 +--- pc/jsep_transport_controller.h | 43 +-- pc/jsep_transport_controller_unittest.cc | 37 +-- pc/peer_connection.cc | 352 ++++++++++++---------- pc/peer_connection.h | 26 +- pc/sdp_offer_answer.cc | 2 +- test/peer_scenario/scenario_connection.cc | 3 +- 8 files changed, 263 insertions(+), 269 deletions(-) diff --git a/pc/connection_context.h b/pc/connection_context.h index 02d08a191e..71e2f1eeae 100644 --- a/pc/connection_context.h +++ b/pc/connection_context.h @@ -62,7 +62,6 @@ class ConnectionContext : public rtc::RefCountInterface { // Functions called from PeerConnection and friends SctpTransportFactoryInterface* sctp_transport_factory() const { - RTC_DCHECK_RUN_ON(signaling_thread_); return sctp_factory_.get(); } @@ -123,8 +122,7 @@ class ConnectionContext : public rtc::RefCountInterface { RTC_GUARDED_BY(signaling_thread_); std::unique_ptr media_engine_ RTC_GUARDED_BY(signaling_thread_); - std::unique_ptr const sctp_factory_ - RTC_GUARDED_BY(signaling_thread_); + std::unique_ptr const sctp_factory_; // Accessed both on signaling thread and worker thread. std::unique_ptr const trials_; }; diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 0ded1de84f..28ba899cb3 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -84,13 +84,11 @@ webrtc::RTCError VerifyCandidates(const cricket::Candidates& candidates) { namespace webrtc { JsepTransportController::JsepTransportController( - rtc::Thread* signaling_thread, rtc::Thread* network_thread, cricket::PortAllocator* port_allocator, AsyncResolverFactory* async_resolver_factory, Config config) - : signaling_thread_(signaling_thread), - network_thread_(network_thread), + : network_thread_(network_thread), port_allocator_(port_allocator), async_resolver_factory_(async_resolver_factory), config_(config), @@ -222,12 +220,6 @@ void JsepTransportController::SetNeedsIceRestartFlag() { bool JsepTransportController::NeedsIceRestart( const std::string& transport_name) const { - if (!network_thread_->IsCurrent()) { - RTC_DCHECK_RUN_ON(signaling_thread_); - return network_thread_->Invoke( - RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); }); - } - RTC_DCHECK_RUN_ON(network_thread_); const cricket::JsepTransport* transport = @@ -414,11 +406,6 @@ RTCError JsepTransportController::RemoveRemoteCandidates( bool JsepTransportController::GetStats(const std::string& transport_name, cricket::TransportStats* stats) { - if (!network_thread_->IsCurrent()) { - return network_thread_->Invoke( - RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); }); - } - RTC_DCHECK_RUN_ON(network_thread_); cricket::JsepTransport* transport = GetJsepTransportByName(transport_name); @@ -1194,35 +1181,24 @@ void JsepTransportController::OnTransportCandidateGathered_n( RTC_NOTREACHED(); return; } - std::string transport_name = transport->transport_name(); - // TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be - // able to just call this directly here. - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] { - signal_ice_candidates_gathered_.Send( - transport_name, std::vector{candidate}); - }); + + signal_ice_candidates_gathered_.Send( + transport->transport_name(), std::vector{candidate}); } void JsepTransportController::OnTransportCandidateError_n( cricket::IceTransportInternal* transport, const cricket::IceCandidateErrorEvent& event) { - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { - signal_ice_candidate_error_.Send(event); - }); + signal_ice_candidate_error_.Send(event); } void JsepTransportController::OnTransportCandidatesRemoved_n( cricket::IceTransportInternal* transport, const cricket::Candidates& candidates) { - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, - [this, candidates] { signal_ice_candidates_removed_.Send(candidates); }); + signal_ice_candidates_removed_.Send(candidates); } void JsepTransportController::OnTransportCandidatePairChanged_n( const cricket::CandidatePairChangeEvent& event) { - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { - signal_ice_candidate_pair_changed_.Send(event); - }); + signal_ice_candidate_pair_changed_.Send(event); } void JsepTransportController::OnTransportRoleConflict_n( @@ -1298,10 +1274,7 @@ void JsepTransportController::UpdateAggregateStates_n() { if (ice_connection_state_ != new_connection_state) { ice_connection_state_ = new_connection_state; - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, new_connection_state] { - signal_ice_connection_state_.Send(new_connection_state); - }); + signal_ice_connection_state_.Send(new_connection_state); } // Compute the current RTCIceConnectionState as described in @@ -1357,17 +1330,11 @@ void JsepTransportController::UpdateAggregateStates_n() { new_ice_connection_state == PeerConnectionInterface::kIceConnectionCompleted) { // Ensure that we never skip over the "connected" state. - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this] { - signal_standardized_ice_connection_state_.Send( - PeerConnectionInterface::kIceConnectionConnected); - }); + signal_standardized_ice_connection_state_.Send( + PeerConnectionInterface::kIceConnectionConnected); } standardized_ice_connection_state_ = new_ice_connection_state; - invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, - [this, new_ice_connection_state] { - signal_standardized_ice_connection_state_.Send( - new_ice_connection_state); - }); + signal_standardized_ice_connection_state_.Send(new_ice_connection_state); } // Compute the current RTCPeerConnectionState as described in @@ -1418,10 +1385,7 @@ void JsepTransportController::UpdateAggregateStates_n() { if (combined_connection_state_ != new_combined_state) { combined_connection_state_ = new_combined_state; - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, new_combined_state] { - signal_connection_state_.Send(new_combined_state); - }); + signal_connection_state_.Send(new_combined_state); } // Compute the gathering state. @@ -1434,10 +1398,7 @@ void JsepTransportController::UpdateAggregateStates_n() { } if (ice_gathering_state_ != new_gathering_state) { ice_gathering_state_ = new_gathering_state; - invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread_, [this, new_gathering_state] { - signal_ice_gathering_state_.Send(new_gathering_state); - }); + signal_ice_gathering_state_.Send(new_gathering_state); } } diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index 59d66a24f2..949c9ad1dc 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -54,7 +54,6 @@ #include "pc/session_description.h" #include "pc/srtp_transport.h" #include "pc/transport_stats.h" -#include "rtc_base/async_invoker.h" #include "rtc_base/callback_list.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/copy_on_write_buffer.h" @@ -137,10 +136,11 @@ class JsepTransportController : public sigslot::has_slots<> { std::function on_dtls_handshake_error_; }; - // The ICE related events are signaled on the |signaling_thread|. - // All the transport related methods are called on the |network_thread|. - JsepTransportController(rtc::Thread* signaling_thread, - rtc::Thread* network_thread, + // The ICE related events are fired on the |network_thread|. + // All the transport related methods are called on the |network_thread| + // and destruction of the JsepTransportController must occur on the + // |network_thread|. + JsepTransportController(rtc::Thread* network_thread, cricket::PortAllocator* port_allocator, AsyncResolverFactory* async_resolver_factory, Config config); @@ -227,26 +227,28 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(const std::string&, const std::vector&) template void SubscribeIceCandidateGathered(F&& callback) { - // TODO(bugs.webrtc.org/12427): Post this subscription to the network - // thread. + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidates_gathered_.AddReceiver(std::forward(callback)); } // F: void(cricket::IceConnectionState) template void SubscribeIceConnectionState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_connection_state_.AddReceiver(std::forward(callback)); } // F: void(PeerConnectionInterface::PeerConnectionState) template void SubscribeConnectionState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_connection_state_.AddReceiver(std::forward(callback)); } // F: void(PeerConnectionInterface::IceConnectionState) template void SubscribeStandardizedIceConnectionState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_standardized_ice_connection_state_.AddReceiver( std::forward(callback)); } @@ -254,60 +256,65 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(cricket::IceGatheringState) template void SubscribeIceGatheringState(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_gathering_state_.AddReceiver(std::forward(callback)); } // F: void(const cricket::IceCandidateErrorEvent&) template void SubscribeIceCandidateError(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidate_error_.AddReceiver(std::forward(callback)); } // F: void(const std::vector&) template void SubscribeIceCandidatesRemoved(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidates_removed_.AddReceiver(std::forward(callback)); } // F: void(const cricket::CandidatePairChangeEvent&) template void SubscribeIceCandidatePairChanged(F&& callback) { + RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidate_pair_changed_.AddReceiver(std::forward(callback)); } private: - // All of these callbacks are fired on the signaling thread. + // All of these callbacks are fired on the network thread. // If any transport failed => failed, // Else if all completed => completed, // Else if all connected => connected, // Else => connecting - CallbackList signal_ice_connection_state_; + CallbackList signal_ice_connection_state_ + RTC_GUARDED_BY(network_thread_); CallbackList - signal_connection_state_; + signal_connection_state_ RTC_GUARDED_BY(network_thread_); CallbackList - signal_standardized_ice_connection_state_; + signal_standardized_ice_connection_state_ RTC_GUARDED_BY(network_thread_); // If all transports done gathering => complete, // Else if any are gathering => gathering, // Else => new - CallbackList signal_ice_gathering_state_; + CallbackList signal_ice_gathering_state_ + RTC_GUARDED_BY(network_thread_); // [mid, candidates] - // TODO(bugs.webrtc.org/12427): Protect this with network_thread_. CallbackList&> - signal_ice_candidates_gathered_; + signal_ice_candidates_gathered_ RTC_GUARDED_BY(network_thread_); CallbackList - signal_ice_candidate_error_; + signal_ice_candidate_error_ RTC_GUARDED_BY(network_thread_); CallbackList&> - signal_ice_candidates_removed_; + signal_ice_candidates_removed_ RTC_GUARDED_BY(network_thread_); CallbackList - signal_ice_candidate_pair_changed_; + signal_ice_candidate_pair_changed_ RTC_GUARDED_BY(network_thread_); RTCError ApplyDescription_n(bool local, SdpType type, @@ -452,7 +459,6 @@ class JsepTransportController : public sigslot::has_slots<> { void OnDtlsHandshakeError(rtc::SSLHandshakeError error); - rtc::Thread* const signaling_thread_ = nullptr; rtc::Thread* const network_thread_ = nullptr; cricket::PortAllocator* const port_allocator_ = nullptr; AsyncResolverFactory* const async_resolver_factory_ = nullptr; @@ -490,7 +496,6 @@ class JsepTransportController : public sigslot::has_slots<> { cricket::IceRole ice_role_ = cricket::ICEROLE_CONTROLLING; uint64_t ice_tiebreaker_ = rtc::CreateRandomId64(); rtc::scoped_refptr certificate_; - rtc::AsyncInvoker invoker_; RTC_DISALLOW_COPY_AND_ASSIGN(JsepTransportController); }; diff --git a/pc/jsep_transport_controller_unittest.cc b/pc/jsep_transport_controller_unittest.cc index 9efa205368..0424afe876 100644 --- a/pc/jsep_transport_controller_unittest.cc +++ b/pc/jsep_transport_controller_unittest.cc @@ -74,7 +74,6 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, void CreateJsepTransportController( JsepTransportController::Config config, - rtc::Thread* signaling_thread = rtc::Thread::Current(), rtc::Thread* network_thread = rtc::Thread::Current(), cricket::PortAllocator* port_allocator = nullptr) { config.transport_observer = this; @@ -84,9 +83,10 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, config.dtls_transport_factory = fake_dtls_transport_factory_.get(); config.on_dtls_handshake_error_ = [](rtc::SSLHandshakeError s) {}; transport_controller_ = std::make_unique( - signaling_thread, network_thread, port_allocator, - nullptr /* async_resolver_factory */, config); - ConnectTransportControllerSignals(); + network_thread, port_allocator, nullptr /* async_resolver_factory */, + config); + network_thread->Invoke(RTC_FROM_HERE, + [&] { ConnectTransportControllerSignals(); }); } void ConnectTransportControllerSignals() { @@ -276,18 +276,14 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, protected: void OnConnectionState(cricket::IceConnectionState state) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); connection_state_ = state; ++connection_state_signal_count_; } void OnStandardizedIceConnectionState( PeerConnectionInterface::IceConnectionState state) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); ice_connection_state_ = state; ++ice_connection_state_signal_count_; } @@ -296,26 +292,20 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, PeerConnectionInterface::PeerConnectionState state) { RTC_LOG(LS_INFO) << "OnCombinedConnectionState: " << static_cast(state); - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); combined_connection_state_ = state; ++combined_connection_state_signal_count_; } void OnGatheringState(cricket::IceGatheringState state) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); gathering_state_ = state; ++gathering_state_signal_count_; } void OnCandidatesGathered(const std::string& transport_name, const Candidates& candidates) { - if (!signaling_thread_->IsCurrent()) { - signaled_on_non_signaling_thread_ = true; - } + ice_signaled_on_thread_ = rtc::Thread::Current(); candidates_[transport_name].insert(candidates_[transport_name].end(), candidates.begin(), candidates.end()); ++candidates_signal_count_; @@ -360,7 +350,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, std::unique_ptr fake_ice_transport_factory_; std::unique_ptr fake_dtls_transport_factory_; rtc::Thread* const signaling_thread_ = nullptr; - bool signaled_on_non_signaling_thread_ = false; + rtc::Thread* ice_signaled_on_thread_ = nullptr; // Used to verify the SignalRtpTransportChanged/SignalDtlsTransportChanged are // signaled correctly. std::map changed_rtp_transport_by_mid_; @@ -883,11 +873,12 @@ TEST_F(JsepTransportControllerTest, SignalCandidatesGathered) { EXPECT_EQ(1u, candidates_[kAudioMid1].size()); } -TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) { +TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) { network_thread_ = rtc::Thread::CreateWithSocketServer(); network_thread_->Start(); + EXPECT_EQ(ice_signaled_on_thread_, nullptr); CreateJsepTransportController(JsepTransportController::Config(), - signaling_thread_, network_thread_.get(), + network_thread_.get(), /*port_allocator=*/nullptr); CreateLocalDescriptionAndCompleteConnectionOnNetworkThread(); @@ -903,7 +894,7 @@ TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) { EXPECT_EQ_WAIT(1u, candidates_[kVideoMid1].size(), kTimeout); EXPECT_EQ(2, candidates_signal_count_); - EXPECT_TRUE(!signaled_on_non_signaling_thread_); + EXPECT_EQ(ice_signaled_on_thread_, network_thread_.get()); network_thread_->Invoke(RTC_FROM_HERE, [&] { transport_controller_.reset(); }); diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index f82fe35c6d..b4e4246766 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -88,7 +88,6 @@ const char kSimulcastNumberOfEncodings[] = static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000; - uint32_t ConvertIceTransportTypeToCandidateFilter( PeerConnectionInterface::IceTransportsType type) { switch (type) { @@ -264,6 +263,20 @@ bool HasRtcpMuxEnabled(const cricket::ContentInfo* content) { return content->media_description()->rtcp_mux(); } +bool DtlsEnabled(const PeerConnectionInterface::RTCConfiguration& configuration, + const PeerConnectionFactoryInterface::Options& options, + const PeerConnectionDependencies& dependencies) { + if (options.disable_encryption) + return false; + + // Enable DTLS by default if we have an identity store or a certificate. + bool default_enabled = + (dependencies.cert_generator || !configuration.certificates.empty()); + + // The |configuration| can override the default value. + return configuration.enable_dtls_srtp.value_or(default_enabled); +} + } // namespace bool PeerConnectionInterface::RTCConfiguration::operator==( @@ -421,11 +434,12 @@ RTCErrorOr> PeerConnection::Create( bool is_unified_plan = configuration.sdp_semantics == SdpSemantics::kUnifiedPlan; + bool dtls_enabled = DtlsEnabled(configuration, options, dependencies); // The PeerConnection constructor consumes some, but not all, dependencies. rtc::scoped_refptr pc( new rtc::RefCountedObject( context, options, is_unified_plan, std::move(event_log), - std::move(call), dependencies)); + std::move(call), dependencies, dtls_enabled)); RTCError init_error = pc->Initialize(configuration, std::move(dependencies)); if (!init_error.ok()) { RTC_LOG(LS_ERROR) << "PeerConnection initialization failed"; @@ -440,7 +454,8 @@ PeerConnection::PeerConnection( bool is_unified_plan, std::unique_ptr event_log, std::unique_ptr call, - PeerConnectionDependencies& dependencies) + PeerConnectionDependencies& dependencies, + bool dtls_enabled) : context_(context), options_(options), observer_(dependencies.observer), @@ -453,9 +468,17 @@ PeerConnection::PeerConnection( tls_cert_verifier_(std::move(dependencies.tls_cert_verifier)), call_(std::move(call)), call_ptr_(call_.get()), + dtls_enabled_(dtls_enabled), data_channel_controller_(this), message_handler_(signaling_thread()), - weak_factory_(this) {} + weak_factory_(this) { + worker_thread()->Invoke(RTC_FROM_HERE, [this] { + RTC_DCHECK_RUN_ON(worker_thread()); + worker_thread_safety_ = PendingTaskSafetyFlag::Create(); + if (!call_) + worker_thread_safety_->SetNotAlive(); + }); +} PeerConnection::~PeerConnection() { TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection"); @@ -496,15 +519,13 @@ PeerConnection::~PeerConnection() { RTC_DCHECK_RUN_ON(network_thread()); transport_controller_.reset(); port_allocator_.reset(); - if (network_thread_safety_) { + if (network_thread_safety_) network_thread_safety_->SetNotAlive(); - network_thread_safety_ = nullptr; - } }); // call_ and event_log_ must be destroyed on the worker thread. worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); - call_safety_.reset(); + worker_thread_safety_->SetNotAlive(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -531,20 +552,6 @@ RTCError PeerConnection::Initialize( turn_server.turn_logging_id = configuration.turn_logging_id; } - // The port allocator lives on the network thread and should be initialized - // there. Also set up the task safety flag for canceling pending tasks on - // the network thread when closing. - // TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and - // initialize all the |transport_controller_->Subscribe*| calls below on the - // network thread via this invoke. - const auto pa_result = - network_thread()->Invoke( - RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] { - network_thread_safety_ = PendingTaskSafetyFlag::Create(); - return InitializePortAllocator_n(stun_servers, turn_servers, - configuration); - }); - // Note if STUN or TURN servers were supplied. if (!stun_servers.empty()) { NoteUsageEvent(UsageEvent::STUN_SERVER_ADDED); @@ -553,52 +560,11 @@ RTCError PeerConnection::Initialize( NoteUsageEvent(UsageEvent::TURN_SERVER_ADDED); } - // Send information about IPv4/IPv6 status. - PeerConnectionAddressFamilyCounter address_family; - if (pa_result.enable_ipv6) { - address_family = kPeerConnection_IPv6; - } else { - address_family = kPeerConnection_IPv4; - } - RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, - kPeerConnectionAddressFamilyCounter_Max); - // RFC 3264: The numeric value of the session id and version in the // o line MUST be representable with a "64 bit signed integer". // Due to this constraint session id |session_id_| is max limited to // LLONG_MAX. session_id_ = rtc::ToString(rtc::CreateRandomId64() & LLONG_MAX); - JsepTransportController::Config config; - config.redetermine_role_on_ice_restart = - configuration.redetermine_role_on_ice_restart; - config.ssl_max_version = options_.ssl_max_version; - config.disable_encryption = options_.disable_encryption; - config.bundle_policy = configuration.bundle_policy; - config.rtcp_mux_policy = configuration.rtcp_mux_policy; - // TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove - // this stub. - config.crypto_options = configuration.crypto_options.has_value() - ? *configuration.crypto_options - : options_.crypto_options; - config.transport_observer = this; - config.rtcp_handler = InitializeRtcpCallback(); - config.event_log = event_log_ptr_; -#if defined(ENABLE_EXTERNAL_AUTH) - config.enable_external_auth = true; -#endif - config.active_reset_srtp_params = configuration.active_reset_srtp_params; - - if (options_.disable_encryption) { - dtls_enabled_ = false; - } else { - // Enable DTLS by default if we have an identity store or a certificate. - dtls_enabled_ = - (dependencies.cert_generator || !configuration.certificates.empty()); - // |configuration| can override the default |dtls_enabled_| value. - if (configuration.enable_dtls_srtp) { - dtls_enabled_ = *(configuration.enable_dtls_srtp); - } - } if (configuration.enable_rtp_data_channel) { // Enable creation of RTP data channels if the kEnableRtpDataChannels is @@ -609,77 +575,27 @@ RTCError PeerConnection::Initialize( // DTLS has to be enabled to use SCTP. if (!options_.disable_sctp_data_channels && dtls_enabled_) { data_channel_controller_.set_data_channel_type(cricket::DCT_SCTP); - config.sctp_factory = context_->sctp_transport_factory(); } } - config.ice_transport_factory = ice_transport_factory_.get(); - config.on_dtls_handshake_error_ = - [weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) { - if (weak_ptr) { - weak_ptr->OnTransportControllerDtlsHandshakeError(s); - } - }; - - transport_controller_.reset(new JsepTransportController( - signaling_thread(), network_thread(), port_allocator_.get(), - async_resolver_factory_.get(), config)); - - // The following RTC_DCHECKs are added by looking at the caller thread. - // If this is incorrect there might not be test failures - // due to lack of unit tests which trigger these scenarios. - // TODO(bugs.webrtc.org/12160): Remove above comments. - // callbacks for signaling_thread. - // TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network - // Invoke(), then perhaps we could post these subscription calls to the - // network thread so that the transport controller doesn't have to do the - // signaling/network handling internally and use AsyncInvoker. - transport_controller_->SubscribeIceConnectionState( - [this](cricket::IceConnectionState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerConnectionState(s); - }); - transport_controller_->SubscribeConnectionState( - [this](PeerConnectionInterface::PeerConnectionState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - SetConnectionState(s); - }); - transport_controller_->SubscribeStandardizedIceConnectionState( - [this](PeerConnectionInterface::IceConnectionState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - SetStandardizedIceConnectionState(s); - }); - transport_controller_->SubscribeIceGatheringState( - [this](cricket::IceGatheringState s) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerGatheringState(s); - }); - transport_controller_->SubscribeIceCandidateGathered( - [this](const std::string& transport, - const std::vector& candidates) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesGathered(transport, candidates); - }); - transport_controller_->SubscribeIceCandidateError( - [this](const cricket::IceCandidateErrorEvent& event) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidateError(event); - }); - transport_controller_->SubscribeIceCandidatesRemoved( - [this](const std::vector& c) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesRemoved(c); - }); - transport_controller_->SubscribeIceCandidatePairChanged( - [this](const cricket::CandidatePairChangeEvent& event) { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidateChanged(event); - }); + // Network thread initialization. + network_thread()->Invoke(RTC_FROM_HERE, [this, &stun_servers, + &turn_servers, &configuration, + &dependencies] { + RTC_DCHECK_RUN_ON(network_thread()); + network_thread_safety_ = PendingTaskSafetyFlag::Create(); + InitializePortAllocatorResult pa_result = + InitializePortAllocator_n(stun_servers, turn_servers, configuration); + // Send information about IPv4/IPv6 status. + PeerConnectionAddressFamilyCounter address_family = + pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4; + RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, + kPeerConnectionAddressFamilyCounter_Max); + InitializeTransportController_n(configuration, dependencies); + }); configuration_ = configuration; - transport_controller_->SetIceConfig(ParseIceConfig(configuration)); - stats_ = std::make_unique(this); stats_collector_ = RTCStatsCollector::Create(this); @@ -716,6 +632,125 @@ RTCError PeerConnection::Initialize( return RTCError::OK(); } +void PeerConnection::InitializeTransportController_n( + const RTCConfiguration& configuration, + const PeerConnectionDependencies& dependencies) { + JsepTransportController::Config config; + config.redetermine_role_on_ice_restart = + configuration.redetermine_role_on_ice_restart; + config.ssl_max_version = options_.ssl_max_version; + config.disable_encryption = options_.disable_encryption; + config.bundle_policy = configuration.bundle_policy; + config.rtcp_mux_policy = configuration.rtcp_mux_policy; + // TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove + // this stub. + config.crypto_options = configuration.crypto_options.has_value() + ? *configuration.crypto_options + : options_.crypto_options; + config.transport_observer = this; + config.rtcp_handler = InitializeRtcpCallback(); + config.event_log = event_log_ptr_; +#if defined(ENABLE_EXTERNAL_AUTH) + config.enable_external_auth = true; +#endif + config.active_reset_srtp_params = configuration.active_reset_srtp_params; + + // DTLS has to be enabled to use SCTP. + if (!configuration.enable_rtp_data_channel && + !options_.disable_sctp_data_channels && dtls_enabled_) { + config.sctp_factory = context_->sctp_transport_factory(); + } + + config.ice_transport_factory = ice_transport_factory_.get(); + config.on_dtls_handshake_error_ = + [weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) { + if (weak_ptr) { + weak_ptr->OnTransportControllerDtlsHandshakeError(s); + } + }; + + transport_controller_.reset( + new JsepTransportController(network_thread(), port_allocator_.get(), + async_resolver_factory_.get(), config)); + + transport_controller_->SubscribeIceConnectionState( + [this](cricket::IceConnectionState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerConnectionState(s); + })); + }); + transport_controller_->SubscribeConnectionState( + [this](PeerConnectionInterface::PeerConnectionState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + SetConnectionState(s); + })); + }); + transport_controller_->SubscribeStandardizedIceConnectionState( + [this](PeerConnectionInterface::IceConnectionState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + SetStandardizedIceConnectionState(s); + })); + }); + transport_controller_->SubscribeIceGatheringState( + [this](cricket::IceGatheringState s) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerGatheringState(s); + })); + }); + transport_controller_->SubscribeIceCandidateGathered( + [this](const std::string& transport, + const std::vector& candidates) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), + [this, t = transport, c = candidates]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesGathered(t, c); + })); + }); + transport_controller_->SubscribeIceCandidateError( + [this](const cricket::IceCandidateErrorEvent& event) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask(ToQueuedTask( + signaling_thread_safety_.flag(), [this, event = event]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidateError(event); + })); + }); + transport_controller_->SubscribeIceCandidatesRemoved( + [this](const std::vector& c) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesRemoved(c); + })); + }); + transport_controller_->SubscribeIceCandidatePairChanged( + [this](const cricket::CandidatePairChangeEvent& event) { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask(ToQueuedTask( + signaling_thread_safety_.flag(), [this, event = event]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidateChanged(event); + })); + }); + + transport_controller_->SetIceConfig(ParseIceConfig(configuration)); +} + rtc::scoped_refptr PeerConnection::local_streams() { RTC_DCHECK_RUN_ON(signaling_thread()); RTC_CHECK(!IsUnifiedPlan()) << "local_streams is not available with Unified " @@ -1440,6 +1475,7 @@ RTCError PeerConnection::SetConfiguration( if (configuration_.active_reset_srtp_params != modified_config.active_reset_srtp_params) { + // TODO(tommi): move to the network thread - this hides an invoke. transport_controller_->SetActiveResetSrtpParams( modified_config.active_reset_srtp_params); } @@ -1594,6 +1630,7 @@ void PeerConnection::StopRtcEventLog() { rtc::scoped_refptr PeerConnection::LookupDtlsTransportByMid(const std::string& mid) { RTC_DCHECK_RUN_ON(signaling_thread()); + // TODO(tommi): Move to the network thread - this hides an invoke. return transport_controller_->LookupDtlsTransportByMid(mid); } @@ -1697,13 +1734,12 @@ void PeerConnection::Close() { port_allocator_->DiscardCandidatePool(); if (network_thread_safety_) { network_thread_safety_->SetNotAlive(); - network_thread_safety_ = nullptr; } }); worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); - call_safety_.reset(); + worker_thread_safety_->SetNotAlive(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -2144,7 +2180,10 @@ bool PeerConnection::IceRestartPending(const std::string& content_name) const { } bool PeerConnection::NeedsIceRestart(const std::string& content_name) const { - return transport_controller_->NeedsIceRestart(content_name); + return network_thread()->Invoke(RTC_FROM_HERE, [this, &content_name] { + RTC_DCHECK_RUN_ON(network_thread()); + return transport_controller_->NeedsIceRestart(content_name); + }); } void PeerConnection::OnTransportControllerConnectionState( @@ -2487,6 +2526,7 @@ void PeerConnection::OnTransportControllerGatheringState( } void PeerConnection::ReportTransportStats() { + rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; std::map> media_types_by_transport_name; for (const auto& transceiver : rtp_manager()->transceivers()->List()) { @@ -2508,18 +2548,25 @@ void PeerConnection::ReportTransportStats() { cricket::MEDIA_TYPE_DATA); } - for (const auto& entry : media_types_by_transport_name) { - const std::string& transport_name = entry.first; - const std::set media_types = entry.second; - cricket::TransportStats stats; - if (transport_controller_->GetStats(transport_name, &stats)) { - ReportBestConnectionState(stats); - ReportNegotiatedCiphers(stats, media_types); - } - } + // Run the loop that reports the state on the network thread since the + // transport controller requires the stats to be read there (GetStats()). + network_thread()->PostTask(ToQueuedTask( + network_thread_safety_, [this, media_types_by_transport_name = std::move( + media_types_by_transport_name)] { + for (const auto& entry : media_types_by_transport_name) { + const std::string& transport_name = entry.first; + const std::set media_types = entry.second; + cricket::TransportStats stats; + if (transport_controller_->GetStats(transport_name, &stats)) { + ReportBestConnectionState(stats); + ReportNegotiatedCiphers(dtls_enabled_, stats, media_types); + } + } + })); } // Walk through the ConnectionInfos to gather best connection usage // for IPv4 and IPv6. +// static (no member state required) void PeerConnection::ReportBestConnectionState( const cricket::TransportStats& stats) { for (const cricket::TransportChannelStats& channel_stats : @@ -2567,10 +2614,12 @@ void PeerConnection::ReportBestConnectionState( } } +// static void PeerConnection::ReportNegotiatedCiphers( + bool dtls_enabled, const cricket::TransportStats& stats, const std::set& media_types) { - if (!dtls_enabled_ || stats.channel_stats.empty()) { + if (!dtls_enabled || stats.channel_stats.empty()) { return; } @@ -2721,24 +2770,9 @@ void PeerConnection::RequestUsagePatternReportForTesting() { std::function PeerConnection::InitializeRtcpCallback() { - RTC_DCHECK_RUN_ON(signaling_thread()); - - auto flag = - worker_thread()->Invoke>( - RTC_FROM_HERE, [this] { - RTC_DCHECK_RUN_ON(worker_thread()); - if (!call_) - return rtc::scoped_refptr(); - if (!call_safety_) - call_safety_.reset(new ScopedTaskSafety()); - return call_safety_->flag(); - }); - - if (!flag) - return [](const rtc::CopyOnWriteBuffer&, int64_t) {}; - - return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet, - int64_t packet_time_us) { + RTC_DCHECK_RUN_ON(network_thread()); + return [this, flag = worker_thread_safety_]( + const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) { RTC_DCHECK_RUN_ON(network_thread()); // TODO(bugs.webrtc.org/11993): We should actually be delivering this call // directly to the Call class somehow directly on the network thread and not diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 92e33d2858..75af0ae170 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -455,7 +455,8 @@ class PeerConnection : public PeerConnectionInternal, bool is_unified_plan, std::unique_ptr event_log, std::unique_ptr call, - PeerConnectionDependencies& dependencies); + PeerConnectionDependencies& dependencies, + bool dtls_enabled); ~PeerConnection() override; @@ -463,6 +464,10 @@ class PeerConnection : public PeerConnectionInternal, RTCError Initialize( const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies); + void InitializeTransportController_n( + const RTCConfiguration& configuration, + const PeerConnectionDependencies& dependencies) + RTC_RUN_ON(network_thread()); rtc::scoped_refptr> FindTransceiverBySender(rtc::scoped_refptr sender) @@ -573,11 +578,12 @@ class PeerConnection : public PeerConnectionInternal, void ReportTransportStats() RTC_RUN_ON(signaling_thread()); // Gather the usage of IPv4/IPv6 as best connection. - void ReportBestConnectionState(const cricket::TransportStats& stats); + static void ReportBestConnectionState(const cricket::TransportStats& stats); - void ReportNegotiatedCiphers(const cricket::TransportStats& stats, - const std::set& media_types) - RTC_RUN_ON(signaling_thread()); + static void ReportNegotiatedCiphers( + bool dtls_enabled, + const cricket::TransportStats& stats, + const std::set& media_types); void ReportIceCandidateCollected(const cricket::Candidate& candidate) RTC_RUN_ON(signaling_thread()); @@ -627,8 +633,9 @@ class PeerConnection : public PeerConnectionInternal, // TODO(zstein): |async_resolver_factory_| can currently be nullptr if it // is not injected. It should be required once chromium supplies it. - const std::unique_ptr async_resolver_factory_ - RTC_GUARDED_BY(signaling_thread()); + // This member variable is only used by JsepTransportController so we should + // consider moving ownership to there. + const std::unique_ptr async_resolver_factory_; std::unique_ptr port_allocator_; // TODO(bugs.webrtc.org/9987): Accessed on both // signaling and network thread. @@ -646,8 +653,7 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr call_ RTC_GUARDED_BY(worker_thread()); ScopedTaskSafety signaling_thread_safety_; rtc::scoped_refptr network_thread_safety_; - std::unique_ptr call_safety_ - RTC_GUARDED_BY(worker_thread()); + rtc::scoped_refptr worker_thread_safety_; // Points to the same thing as `call_`. Since it's const, we may read the // pointer from any thread. @@ -681,7 +687,7 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr sdp_handler_ RTC_GUARDED_BY(signaling_thread()); - bool dtls_enabled_ RTC_GUARDED_BY(signaling_thread()) = false; + const bool dtls_enabled_; UsagePattern usage_pattern_ RTC_GUARDED_BY(signaling_thread()); bool return_histogram_very_quickly_ RTC_GUARDED_BY(signaling_thread()) = diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 9fa4188e10..8588ca8dbf 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -2794,7 +2794,7 @@ bool SdpOfferAnswerHandler::IceRestartPending( bool SdpOfferAnswerHandler::NeedsIceRestart( const std::string& content_name) const { - return transport_controller()->NeedsIceRestart(content_name); + return pc_->NeedsIceRestart(content_name); } absl::optional SdpOfferAnswerHandler::GetDtlsRole( diff --git a/test/peer_scenario/scenario_connection.cc b/test/peer_scenario/scenario_connection.cc index 8e5b3162cb..fefaa00c72 100644 --- a/test/peer_scenario/scenario_connection.cc +++ b/test/peer_scenario/scenario_connection.cc @@ -97,8 +97,7 @@ ScenarioIceConnectionImpl::ScenarioIceConnectionImpl( port_allocator_( new cricket::BasicPortAllocator(manager_->network_manager())), jsep_controller_( - new JsepTransportController(signaling_thread_, - network_thread_, + new JsepTransportController(network_thread_, port_allocator_.get(), /*async_resolver_factory*/ nullptr, CreateJsepConfig())) {