diff --git a/call/BUILD.gn b/call/BUILD.gn index efcb59107e..c56c557ecc 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -320,6 +320,7 @@ rtc_library("call") { "../rtc_base:logging", "../rtc_base:macromagic", "../rtc_base:rate_limiter", + "../rtc_base:rtc_event", "../rtc_base:rtc_task_queue", "../rtc_base:safe_minmax", "../rtc_base:stringutils", diff --git a/call/degraded_call.cc b/call/degraded_call.cc index 8c3da5730f..0090d3a081 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -14,17 +14,19 @@ #include #include "absl/strings/string_view.h" +#include "modules/rtp_rtcp/source/rtp_util.h" +#include "rtc_base/event.h" namespace webrtc { DegradedCall::FakeNetworkPipeOnTaskQueue::FakeNetworkPipeOnTaskQueue( TaskQueueBase* task_queue, - const ScopedTaskSafety& task_safety, + rtc::scoped_refptr call_alive, Clock* clock, std::unique_ptr network_behavior) : clock_(clock), task_queue_(task_queue), - task_safety_(task_safety), + call_alive_(std::move(call_alive)), pipe_(clock, std::move(network_behavior)) {} void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtp( @@ -61,13 +63,13 @@ bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() { return false; } - task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] { + task_queue_->PostTask(SafeTask(call_alive_, [this, time_to_next] { RTC_DCHECK_RUN_ON(task_queue_); int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds(); if (!next_process_ms_ || next_process_time < *next_process_ms_) { next_process_ms_ = next_process_time; task_queue_->PostDelayedHighPrecisionTask( - SafeTask(task_safety_.flag(), + SafeTask(call_alive_, [this] { RTC_DCHECK_RUN_ON(task_queue_); if (!Process()) { @@ -126,12 +128,61 @@ bool DegradedCall::FakeNetworkPipeTransportAdapter::SendRtcp( return true; } +DegradedCall::ThreadedPacketReceiver::ThreadedPacketReceiver( + webrtc::TaskQueueBase* worker_thread, + webrtc::TaskQueueBase* network_thread, + rtc::scoped_refptr call_alive, + webrtc::PacketReceiver* receiver) + : worker_thread_(worker_thread), + network_thread_(network_thread), + call_alive_(std::move(call_alive)), + receiver_(receiver) {} + +DegradedCall::ThreadedPacketReceiver::~ThreadedPacketReceiver() = default; + +PacketReceiver::DeliveryStatus +DegradedCall::ThreadedPacketReceiver::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) { + // `Call::DeliverPacket` expects RTCP packets to be delivered from the + // network thread and RTP packets to be delivered from the worker thread. + // Because `FakeNetworkPipe` queues packets, the thread used when this packet + // is delivered to `DegradedCall::DeliverPacket` may differ from the thread + // used when this packet is delivered to + // `ThreadedPacketReceiver::DeliverPacket`. To solve this problem, always + // make sure that packets are sent in the correct thread. + if (IsRtcpPacket(packet)) { + if (!network_thread_->IsCurrent()) { + network_thread_->PostTask( + SafeTask(call_alive_, [receiver = receiver_, media_type, + packet = std::move(packet), packet_time_us]() { + receiver->DeliverPacket(media_type, std::move(packet), + packet_time_us); + })); + return DELIVERY_OK; + } + } else { + if (!worker_thread_->IsCurrent()) { + worker_thread_->PostTask([receiver = receiver_, media_type, + packet = std::move(packet), packet_time_us]() { + receiver->DeliverPacket(media_type, std::move(packet), packet_time_us); + }); + return DELIVERY_OK; + } + } + + return receiver_->DeliverPacket(media_type, std::move(packet), + packet_time_us); +} + DegradedCall::DegradedCall( std::unique_ptr call, const std::vector& send_configs, const std::vector& receive_configs) : clock_(Clock::GetRealTimeClock()), call_(std::move(call)), + call_alive_(PendingTaskSafetyFlag::CreateDetached()), send_config_index_(0), send_configs_(send_configs), send_simulated_network_(nullptr), @@ -142,11 +193,13 @@ DegradedCall::DegradedCall( receive_simulated_network_ = network.get(); receive_pipe_ = std::make_unique(clock_, std::move(network)); - receive_pipe_->SetReceiver(call_->Receiver()); + packet_receiver_ = std::make_unique( + call_->worker_thread(), call_->network_thread(), call_alive_, + call_->Receiver()); + receive_pipe_->SetReceiver(packet_receiver_.get()); if (receive_configs_.size() > 1) { call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), - [this] { UpdateReceiveNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }), receive_configs_[0].duration); } } @@ -154,16 +207,29 @@ DegradedCall::DegradedCall( auto network = std::make_unique(send_configs_[0]); send_simulated_network_ = network.get(); send_pipe_ = std::make_unique( - call_->network_thread(), task_safety_, clock_, std::move(network)); + call_->network_thread(), call_alive_, clock_, std::move(network)); if (send_configs_.size() > 1) { call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }), send_configs_[0].duration); } } } -DegradedCall::~DegradedCall() = default; +DegradedCall::~DegradedCall() { + RTC_DCHECK_RUN_ON(call_->worker_thread()); + // Thread synchronization is required to call `SetNotAlive`. + // Otherwise, when the `DegradedCall` object is destroyed but + // `SetNotAlive` has not yet been called, + // another Closure guarded by `call_alive_` may be called. + rtc::Event event; + call_->network_thread()->PostTask( + [flag = std::move(call_alive_), &event]() mutable { + flag->SetNotAlive(); + event.Set(); + }); + event.Wait(rtc::Event::kForever); +} AudioSendStream* DegradedCall::CreateAudioSendStream( const AudioSendStream::Config& config) { @@ -352,7 +418,7 @@ void DegradedCall::UpdateSendNetworkConfig() { send_config_index_ = (send_config_index_ + 1) % send_configs_.size(); send_simulated_network_->SetConfig(send_configs_[send_config_index_]); call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }), send_configs_[send_config_index_].duration); } @@ -361,7 +427,7 @@ void DegradedCall::UpdateReceiveNetworkConfig() { receive_simulated_network_->SetConfig( receive_configs_[receive_config_index_]); call_->network_thread()->PostDelayedTask( - SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }), + SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }), receive_configs_[receive_config_index_].duration); } } // namespace webrtc diff --git a/call/degraded_call.h b/call/degraded_call.h index fe5fd7c46c..522302283a 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -122,7 +122,7 @@ class DegradedCall : public Call, private PacketReceiver { public: FakeNetworkPipeOnTaskQueue( TaskQueueBase* task_queue, - const ScopedTaskSafety& task_safety, + rtc::scoped_refptr call_alive, Clock* clock, std::unique_ptr network_behavior); @@ -142,11 +142,30 @@ class DegradedCall : public Call, private PacketReceiver { Clock* const clock_; TaskQueueBase* const task_queue_; - const ScopedTaskSafety& task_safety_; + rtc::scoped_refptr call_alive_; FakeNetworkPipe pipe_; absl::optional next_process_ms_ RTC_GUARDED_BY(&task_queue_); }; + class ThreadedPacketReceiver : public PacketReceiver { + public: + ThreadedPacketReceiver(webrtc::TaskQueueBase* worker_thread, + webrtc::TaskQueueBase* network_thread, + rtc::scoped_refptr call_alive, + PacketReceiver* receiver); + ~ThreadedPacketReceiver() override; + + DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us) override; + + private: + webrtc::TaskQueueBase* const worker_thread_; + webrtc::TaskQueueBase* const network_thread_; + rtc::scoped_refptr call_alive_; + webrtc::PacketReceiver* const receiver_; + }; + // For audio/video send stream, a TransportAdapter instance is used to // intercept packets to be sent, and put them into a common FakeNetworkPipe // in such as way that they will eventually (unless dropped) be forwarded to @@ -178,7 +197,8 @@ class DegradedCall : public Call, private PacketReceiver { Clock* const clock_; const std::unique_ptr call_; - ScopedTaskSafety task_safety_; + // For cancelling tasks on the network thread when DegradedCall is destroyed + rtc::scoped_refptr call_alive_; size_t send_config_index_; const std::vector send_configs_; SimulatedNetwork* send_simulated_network_; @@ -192,6 +212,7 @@ class DegradedCall : public Call, private PacketReceiver { const std::vector receive_configs_; SimulatedNetwork* receive_simulated_network_; std::unique_ptr receive_pipe_; + std::unique_ptr packet_receiver_; }; } // namespace webrtc diff --git a/pc/peer_connection_field_trial_tests.cc b/pc/peer_connection_field_trial_tests.cc index 528b6ba2be..0e6e451a9a 100644 --- a/pc/peer_connection_field_trial_tests.cc +++ b/pc/peer_connection_field_trial_tests.cc @@ -25,15 +25,35 @@ #include "pc/peer_connection_wrapper.h" #include "pc/session_description.h" #include "pc/test/fake_audio_capture_module.h" +#include "pc/test/frame_generator_capturer_video_track_source.h" #include "pc/test/peer_connection_test_wrapper.h" +#include "rtc_base/gunit.h" #include "rtc_base/internal/default_socket_server.h" #include "rtc_base/physical_socket_server.h" #include "rtc_base/thread.h" #include "test/gtest.h" #include "test/scoped_key_value_config.h" +#ifdef WEBRTC_ANDROID +#include "pc/test/android_test_initializer.h" +#endif + namespace webrtc { +namespace { +static const int kDefaultTimeoutMs = 5000; + +bool AddIceCandidates(PeerConnectionWrapper* peer, + std::vector candidates) { + for (const auto candidate : candidates) { + if (!peer->pc()->AddIceCandidate(candidate)) { + return false; + } + } + return true; +} +} // namespace + using RTCConfiguration = PeerConnectionInterface::RTCConfiguration; class PeerConnectionFieldTrialTest : public ::testing::Test { @@ -41,8 +61,12 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { typedef std::unique_ptr WrapperPtr; PeerConnectionFieldTrialTest() - : socket_server_(rtc::CreateDefaultSocketServer()), + : clock_(Clock::GetRealTimeClock()), + socket_server_(rtc::CreateDefaultSocketServer()), main_thread_(socket_server_.get()) { +#ifdef WEBRTC_ANDROID + InitializeAndroidObjects(); +#endif webrtc::PeerConnectionInterface::IceServer ice_server; ice_server.uri = "stun:stun.l.google.com:19302"; config_.servers.push_back(ice_server); @@ -54,8 +78,6 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { void CreatePCFactory(std::unique_ptr field_trials) { PeerConnectionFactoryDependencies pcf_deps; pcf_deps.signaling_thread = rtc::Thread::Current(); - pcf_deps.worker_thread = rtc::Thread::Current(); - pcf_deps.network_thread = rtc::Thread::Current(); pcf_deps.trials = std::move(field_trials); pcf_deps.task_queue_factory = CreateDefaultTaskQueueFactory(); pcf_deps.call_factory = webrtc::CreateCallFactory(); @@ -66,6 +88,13 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { webrtc::SetMediaEngineDefaults(&media_deps); pcf_deps.media_engine = cricket::CreateMediaEngine(std::move(media_deps)); pc_factory_ = CreateModularPeerConnectionFactory(std::move(pcf_deps)); + + // Allow ADAPTER_TYPE_LOOPBACK to create PeerConnections with loopback in + // this test. + RTC_DCHECK(pc_factory_); + PeerConnectionFactoryInterface::Options options; + options.network_ignore_mask = 0; + pc_factory_->SetOptions(options); } WrapperPtr CreatePeerConnection() { @@ -79,6 +108,7 @@ class PeerConnectionFieldTrialTest : public ::testing::Test { pc_factory_, result.MoveValue(), std::move(observer)); } + Clock* const clock_; std::unique_ptr socket_server_; rtc::AutoSocketServerThread main_thread_; rtc::scoped_refptr pc_factory_ = nullptr; @@ -188,4 +218,50 @@ TEST_F(PeerConnectionFieldTrialTest, InjectDependencyDescriptor) { EXPECT_TRUE(found2); } +// Test that the ability to emulate degraded networks works without crashing. +TEST_F(PeerConnectionFieldTrialTest, ApplyFakeNetworkConfig) { + std::unique_ptr field_trials = + std::make_unique( + "WebRTC-FakeNetworkSendConfig/link_capacity_kbps:500/" + "WebRTC-FakeNetworkReceiveConfig/loss_percent:1/"); + + CreatePCFactory(std::move(field_trials)); + + WrapperPtr caller = CreatePeerConnection(); + FrameGeneratorCapturerVideoTrackSource::Config config; + auto video_track_source = + rtc::make_ref_counted( + config, clock_, /*is_screencast=*/false); + caller->AddTrack( + pc_factory_->CreateVideoTrack("v", video_track_source.get())); + WrapperPtr callee = CreatePeerConnection(); + + ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal())); + ASSERT_TRUE( + caller->SetRemoteDescription(callee->CreateAnswerAndSetAsLocal())); + + // Do the SDP negotiation, and also exchange ice candidates. + ASSERT_TRUE(caller->ExchangeOfferAnswerWith(callee.get())); + ASSERT_TRUE_WAIT( + caller->signaling_state() == PeerConnectionInterface::kStable, + kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(caller->IsIceGatheringDone(), kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(callee->IsIceGatheringDone(), kDefaultTimeoutMs); + + // Connect an ICE candidate pairs. + ASSERT_TRUE( + AddIceCandidates(callee.get(), caller->observer()->GetAllCandidates())); + ASSERT_TRUE( + AddIceCandidates(caller.get(), callee->observer()->GetAllCandidates())); + + // This means that ICE and DTLS are connected. + ASSERT_TRUE_WAIT(callee->IsIceConnected(), kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(caller->IsIceConnected(), kDefaultTimeoutMs); + + // Send packets for kDefaultTimeoutMs + // For now, whether this field trial works or not is checked by + // whether a crash occurs. Additional validation can be added later. + WAIT(false, kDefaultTimeoutMs); +} + } // namespace webrtc