From 836fee1e1ad07378c0e48f61ddd7f2f0fdc02cf3 Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 8 Feb 2019 16:08:10 +0100 Subject: [PATCH] Calculate next process time in simulated network. Currently there's an implicit requirement that users of SimulatedNetwork should call it repeatedly, even if the return value of NextDeliveryTimeUs is unset. With this change, it will indicate that there might be a delivery in 5 ms at any time there are packets in queue. Which results in unchanged behavior compared to current usage but allows new users to expect robust behavior. Bug: webrtc:9510 Change-Id: I45b8b5f1f0d3d13a8ec9b163d4011c5f01a53069 Reviewed-on: https://webrtc-review.googlesource.com/c/120402 Commit-Queue: Sebastian Jansson Reviewed-by: Christoffer Rodbro Cr-Commit-Position: refs/heads/master@{#26617} --- call/BUILD.gn | 3 +- call/degraded_call.cc | 57 +++++++++++++++++++++++-- call/degraded_call.h | 27 +++++++++++- call/fake_network_pipe.cc | 16 +------ call/fake_network_pipe.h | 14 ++---- call/simulated_network.cc | 18 ++++++-- call/simulated_network.h | 2 + call/simulated_packet_receiver.h | 12 +++++- call/test/fake_network_pipe_unittest.cc | 4 +- test/direct_transport.cc | 37 +++++++++------- test/direct_transport.h | 9 ++-- 11 files changed, 141 insertions(+), 58 deletions(-) diff --git a/call/BUILD.gn b/call/BUILD.gn index 38b2f4145c..9011094ce7 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -228,6 +228,7 @@ rtc_static_library("call") { "../logging:rtc_event_rtp_rtcp", "../logging:rtc_event_video", "../logging:rtc_stream_config", + "../modules:module_api", "../modules/bitrate_controller", "../modules/congestion_controller", "../modules/pacing", @@ -300,7 +301,6 @@ rtc_source_set("simulated_packet_receiver") { deps = [ ":call_interfaces", "../api:simulated_network_api", - "../modules:module_api", ] } @@ -317,7 +317,6 @@ rtc_source_set("fake_network") { "../api:libjingle_peerconnection_api", "../api:simulated_network_api", "../api:transport_api", - "../modules:module_api", "../modules/utility", "../rtc_base:checks", "../rtc_base:rtc_base_approved", diff --git a/call/degraded_call.cc b/call/degraded_call.cc index c066d5d6d3..4b6d1af7dc 100644 --- a/call/degraded_call.cc +++ b/call/degraded_call.cc @@ -15,6 +15,55 @@ #include "rtc_base/location.h" namespace webrtc { + +namespace { +constexpr int64_t kDoNothingProcessIntervalMs = 5000; +} // namespace + +FakeNetworkPipeModule::~FakeNetworkPipeModule() = default; + +FakeNetworkPipeModule::FakeNetworkPipeModule( + Clock* clock, + std::unique_ptr network_behavior, + Transport* transport) + : pipe_(clock, std::move(network_behavior), transport) {} + +void FakeNetworkPipeModule::SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options) { + pipe_.SendRtp(packet, length, options); + MaybeResumeProcess(); +} + +void FakeNetworkPipeModule::SendRtcp(const uint8_t* packet, size_t length) { + pipe_.SendRtcp(packet, length); + MaybeResumeProcess(); +} + +void FakeNetworkPipeModule::MaybeResumeProcess() { + rtc::CritScope cs(&process_thread_lock_); + if (!pending_process_ && pipe_.TimeUntilNextProcess() && process_thread_) { + process_thread_->WakeUp(nullptr); + } +} + +int64_t FakeNetworkPipeModule::TimeUntilNextProcess() { + auto delay = pipe_.TimeUntilNextProcess(); + rtc::CritScope cs(&process_thread_lock_); + pending_process_ = delay.has_value(); + return delay.value_or(kDoNothingProcessIntervalMs); +} + +void FakeNetworkPipeModule::ProcessThreadAttached( + ProcessThread* process_thread) { + rtc::CritScope cs(&process_thread_lock_); + process_thread_ = process_thread; +} + +void FakeNetworkPipeModule::Process() { + pipe_.Process(); +} + DegradedCall::DegradedCall( std::unique_ptr call, absl::optional send_config, @@ -72,8 +121,8 @@ VideoSendStream* DegradedCall::CreateVideoSendStream( if (send_config_ && !send_pipe_) { auto network = absl::make_unique(*send_config_); send_simulated_network_ = network.get(); - send_pipe_ = absl::make_unique(clock_, std::move(network), - config.send_transport); + send_pipe_ = absl::make_unique( + clock_, std::move(network), config.send_transport); config.send_transport = this; send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); } @@ -89,8 +138,8 @@ VideoSendStream* DegradedCall::CreateVideoSendStream( if (send_config_ && !send_pipe_) { auto network = absl::make_unique(*send_config_); send_simulated_network_ = network.get(); - send_pipe_ = absl::make_unique(clock_, std::move(network), - config.send_transport); + send_pipe_ = absl::make_unique( + clock_, std::move(network), config.send_transport); config.send_transport = this; send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); } diff --git a/call/degraded_call.h b/call/degraded_call.h index 8f062c3a7b..89eafdb9dc 100644 --- a/call/degraded_call.h +++ b/call/degraded_call.h @@ -32,6 +32,7 @@ #include "call/simulated_network.h" #include "call/video_receive_stream.h" #include "call/video_send_stream.h" +#include "modules/include/module.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/bitrate_allocation_strategy.h" #include "rtc_base/copy_on_write_buffer.h" @@ -39,6 +40,30 @@ #include "system_wrappers/include/clock.h" namespace webrtc { +class FakeNetworkPipeModule : public Module { + public: + FakeNetworkPipeModule( + Clock* clock, + std::unique_ptr network_behavior, + Transport* transport); + ~FakeNetworkPipeModule() override; + void SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options); + void SendRtcp(const uint8_t* packet, size_t length); + + // Implements Module interface + int64_t TimeUntilNextProcess() override; + void ProcessThreadAttached(ProcessThread* process_thread) override; + void Process() override; + + private: + void MaybeResumeProcess(); + FakeNetworkPipe pipe_; + rtc::CriticalSection process_thread_lock_; + ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr; + bool pending_process_ RTC_GUARDED_BY(process_thread_lock_) = false; +}; class DegradedCall : public Call, private Transport, private PacketReceiver { public: @@ -111,7 +136,7 @@ class DegradedCall : public Call, private Transport, private PacketReceiver { const absl::optional send_config_; const std::unique_ptr send_process_thread_; SimulatedNetwork* send_simulated_network_; - std::unique_ptr send_pipe_; + std::unique_ptr send_pipe_; size_t num_send_streams_; const absl::optional receive_config_; diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc index b5c0cb52e3..46adcb47a9 100644 --- a/call/fake_network_pipe.cc +++ b/call/fake_network_pipe.cc @@ -24,7 +24,6 @@ namespace webrtc { namespace { -constexpr int64_t kDefaultProcessIntervalMs = 5; constexpr int64_t kLogIntervalMs = 5000; } // namespace @@ -167,12 +166,6 @@ bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet, packets_in_flight_.pop_back(); ++dropped_packets_; } - if (network_behavior_->NextDeliveryTimeUs()) { - rtc::CritScope crit(&process_thread_lock_); - if (process_thread_) - process_thread_->WakeUp(nullptr); - } - return sent; } @@ -292,19 +285,14 @@ void FakeNetworkPipe::DeliverNetworkPacket(NetworkPacket* packet) { } } -int64_t FakeNetworkPipe::TimeUntilNextProcess() { +absl::optional FakeNetworkPipe::TimeUntilNextProcess() { rtc::CritScope crit(&process_lock_); absl::optional delivery_us = network_behavior_->NextDeliveryTimeUs(); if (delivery_us) { int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds(); return std::max((delay_us + 500) / 1000, 0); } - return kDefaultProcessIntervalMs; -} - -void FakeNetworkPipe::ProcessThreadAttached(ProcessThread* process_thread) { - rtc::CritScope cs(&process_thread_lock_); - process_thread_ = process_thread; + return absl::nullopt; } bool FakeNetworkPipe::HasTransport() const { diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h index 2c41dbfd93..661815bf44 100644 --- a/call/fake_network_pipe.h +++ b/call/fake_network_pipe.h @@ -86,8 +86,7 @@ class NetworkPacket { // Class faking a network link, internally is uses an implementation of a // SimulatedNetworkInterface to simulate network behavior. -class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, - public Transport { +class FakeNetworkPipe : public SimulatedPacketReceiverInterface { public: // Will keep |network_behavior| alive while pipe is alive itself. // Use these constructors if you plan to insert packets using DeliverPacket(). @@ -119,8 +118,8 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, // constructor. bool SendRtp(const uint8_t* packet, size_t length, - const PacketOptions& options) override; - bool SendRtcp(const uint8_t* packet, size_t length) override; + const PacketOptions& options); + bool SendRtcp(const uint8_t* packet, size_t length); // Implements the PacketReceiver interface. When/if packets are delivered, // they will be passed directly to the receiver instance given in @@ -138,8 +137,7 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, // Processes the network queues and trigger PacketReceiver::IncomingPacket for // packets ready to be delivered. void Process() override; - int64_t TimeUntilNextProcess() override; - void ProcessThreadAttached(ProcessThread* process_thread) override; + absl::optional TimeUntilNextProcess() override; // Get statistics. float PercentageLoss(); @@ -193,10 +191,6 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, // |process_lock| guards the data structures involved in delay and loss // processes, such as the packet queues. rtc::CriticalSection process_lock_; - - rtc::CriticalSection process_thread_lock_; - ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr; - // Packets are added at the back of the deque, this makes the deque ordered // by increasing send time. The common case when removing packets from the // deque is removing early packets, which will be close to the front of the diff --git a/call/simulated_network.cc b/call/simulated_network.cc index 0884b295f8..c80255f388 100644 --- a/call/simulated_network.cc +++ b/call/simulated_network.cc @@ -20,6 +20,9 @@ #include "rtc_base/checks.h" namespace webrtc { +namespace { +constexpr int64_t kDefaultProcessDelayUs = 5000; +} SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config, uint64_t random_seed) @@ -76,15 +79,16 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) { // calculated in UpdateCapacityQueue. queue_size_bytes_ += packet.size; capacity_link_.push({packet, packet.send_time_us}); + if (!next_process_time_us_) { + next_process_time_us_ = packet.send_time_us + kDefaultProcessDelayUs; + } return true; } absl::optional SimulatedNetwork::NextDeliveryTimeUs() const { RTC_DCHECK_RUNS_SERIALIZED(&process_checker_); - if (!delay_link_.empty()) - return delay_link_.begin()->arrival_time_us; - return absl::nullopt; + return next_process_time_us_; } void SimulatedNetwork::UpdateCapacityQueue(ConfigState state, @@ -198,6 +202,14 @@ std::vector SimulatedNetwork::DequeueDeliverablePackets( PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us)); delay_link_.pop_front(); } + + if (!delay_link_.empty()) { + next_process_time_us_ = delay_link_.front().arrival_time_us; + } else if (!capacity_link_.empty()) { + next_process_time_us_ = receive_time_us + kDefaultProcessDelayUs; + } else { + next_process_time_us_.reset(); + } return packets_to_deliver; } diff --git a/call/simulated_network.h b/call/simulated_network.h index 6adb412edf..632eb5dfed 100644 --- a/call/simulated_network.h +++ b/call/simulated_network.h @@ -87,6 +87,8 @@ class SimulatedNetwork : public NetworkBehaviorInterface { int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0; absl::optional last_capacity_link_visit_us_ RTC_GUARDED_BY(process_checker_); + absl::optional next_process_time_us_ + RTC_GUARDED_BY(process_checker_); }; } // namespace webrtc diff --git a/call/simulated_packet_receiver.h b/call/simulated_packet_receiver.h index 03d7e96ea6..2db46e8c38 100644 --- a/call/simulated_packet_receiver.h +++ b/call/simulated_packet_receiver.h @@ -13,13 +13,12 @@ #include "api/test/simulated_network.h" #include "call/packet_receiver.h" -#include "modules/include/module.h" namespace webrtc { // Private API that is fixing surface between DirectTransport and underlying // network conditions simulation implementation. -class SimulatedPacketReceiverInterface : public PacketReceiver, public Module { +class SimulatedPacketReceiverInterface : public PacketReceiver { public: // Must not be called in parallel with DeliverPacket or Process. // Destination receiver will be injected with this method @@ -27,6 +26,15 @@ class SimulatedPacketReceiverInterface : public PacketReceiver, public Module { // Reports average packet delay. virtual int AverageDelay() = 0; + + // Process any pending tasks such as timeouts. + // Called on a worker thread. + virtual void Process() = 0; + + // Returns the time until next process or nullopt to indicate that the next + // process time is unknown. If the next process time is unknown, this should + // be checked again any time a packet is enqueued. + virtual absl::optional TimeUntilNextProcess() = 0; }; } // namespace webrtc diff --git a/call/test/fake_network_pipe_unittest.cc b/call/test/fake_network_pipe_unittest.cc index 9f2a663403..b8c7e56d17 100644 --- a/call/test/fake_network_pipe_unittest.cc +++ b/call/test/fake_network_pipe_unittest.cc @@ -259,7 +259,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithEmptyPipeTest) { // Check that all the packets were sent. EXPECT_EQ(static_cast(2 * kNumPackets), pipe->SentPackets()); - fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); + fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess()); EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0); pipe->Process(); } @@ -307,7 +307,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithPacketsInPipeTest) { // Check that all the packets were sent. EXPECT_EQ(static_cast(kNumPackets), pipe->SentPackets()); - fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); + fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess()); EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0); pipe->Process(); } diff --git a/test/direct_transport.cc b/test/direct_transport.cc index fd7369171a..b7554496c6 100644 --- a/test/direct_transport.cc +++ b/test/direct_transport.cc @@ -50,19 +50,18 @@ DirectTransport::DirectTransport( } DirectTransport::~DirectTransport() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); - // Constructor updates |next_scheduled_task_|, so it's guaranteed to - // be initialized. - task_queue_->CancelTask(next_scheduled_task_); + if (next_process_task_) + task_queue_->CancelTask(*next_process_task_); } void DirectTransport::StopSending() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); - task_queue_->CancelTask(next_scheduled_task_); + rtc::CritScope cs(&process_lock_); + if (next_process_task_) + task_queue_->CancelTask(*next_process_task_); } void DirectTransport::SetReceiver(PacketReceiver* receiver) { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); + rtc::CritScope cs(&process_lock_); fake_network_->SetReceiver(receiver); } @@ -92,6 +91,9 @@ void DirectTransport::SendPacket(const uint8_t* data, size_t length) { int64_t send_time = clock_->TimeInMicroseconds(); fake_network_->DeliverPacket(media_type, rtc::CopyOnWriteBuffer(data, length), send_time); + rtc::CritScope cs(&process_lock_); + if (!next_process_task_) + ProcessPackets(); } int DirectTransport::GetAverageDelayMs() { @@ -104,17 +106,20 @@ void DirectTransport::Start() { send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); } - SendPackets(); } -void DirectTransport::SendPackets() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); - - fake_network_->Process(); - - int64_t delay_ms = fake_network_->TimeUntilNextProcess(); - next_scheduled_task_ = - task_queue_->PostDelayedTask([this]() { SendPackets(); }, delay_ms); +void DirectTransport::ProcessPackets() { + next_process_task_.reset(); + auto delay_ms = fake_network_->TimeUntilNextProcess(); + if (delay_ms) { + next_process_task_ = task_queue_->PostDelayedTask( + [this]() { + fake_network_->Process(); + rtc::CritScope cs(&process_lock_); + ProcessPackets(); + }, + *delay_ms); + } } } // namespace test } // namespace webrtc diff --git a/test/direct_transport.h b/test/direct_transport.h index f926ec5b6a..d70748ffc6 100644 --- a/test/direct_transport.h +++ b/test/direct_transport.h @@ -60,7 +60,7 @@ class DirectTransport : public Transport { int GetAverageDelayMs(); private: - void SendPackets(); + void ProcessPackets() RTC_EXCLUSIVE_LOCKS_REQUIRED(&process_lock_); void SendPacket(const uint8_t* data, size_t length); void Start(); @@ -68,13 +68,14 @@ class DirectTransport : public Transport { Clock* const clock_; SingleThreadedTaskQueueForTesting* const task_queue_; - SingleThreadedTaskQueueForTesting::TaskId next_scheduled_task_ - RTC_GUARDED_BY(&sequence_checker_); + + rtc::CriticalSection process_lock_; + absl::optional next_process_task_ + RTC_GUARDED_BY(&process_lock_); const Demuxer demuxer_; const std::unique_ptr fake_network_; - rtc::SequencedTaskChecker sequence_checker_; }; } // namespace test } // namespace webrtc