Calculate next process time in simulated network.

Currently there's an implicit requirement that users of
SimulatedNetwork should call it repeatedly, even if the return value
of NextDeliveryTimeUs is unset.

With this change, it will indicate that there might be a delivery in
5 ms at any time there are packets in queue. Which results in unchanged
behavior compared to current usage but allows new users to expect
robust behavior.

Bug: webrtc:9510
Change-Id: I45b8b5f1f0d3d13a8ec9b163d4011c5f01a53069
Reviewed-on: https://webrtc-review.googlesource.com/c/120402
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26617}
This commit is contained in:
Sebastian Jansson 2019-02-08 16:08:10 +01:00 committed by Commit Bot
parent f6adac87b4
commit 836fee1e1a
11 changed files with 141 additions and 58 deletions

View file

@ -228,6 +228,7 @@ rtc_static_library("call") {
"../logging:rtc_event_rtp_rtcp", "../logging:rtc_event_rtp_rtcp",
"../logging:rtc_event_video", "../logging:rtc_event_video",
"../logging:rtc_stream_config", "../logging:rtc_stream_config",
"../modules:module_api",
"../modules/bitrate_controller", "../modules/bitrate_controller",
"../modules/congestion_controller", "../modules/congestion_controller",
"../modules/pacing", "../modules/pacing",
@ -300,7 +301,6 @@ rtc_source_set("simulated_packet_receiver") {
deps = [ deps = [
":call_interfaces", ":call_interfaces",
"../api:simulated_network_api", "../api:simulated_network_api",
"../modules:module_api",
] ]
} }
@ -317,7 +317,6 @@ rtc_source_set("fake_network") {
"../api:libjingle_peerconnection_api", "../api:libjingle_peerconnection_api",
"../api:simulated_network_api", "../api:simulated_network_api",
"../api:transport_api", "../api:transport_api",
"../modules:module_api",
"../modules/utility", "../modules/utility",
"../rtc_base:checks", "../rtc_base:checks",
"../rtc_base:rtc_base_approved", "../rtc_base:rtc_base_approved",

View file

@ -15,6 +15,55 @@
#include "rtc_base/location.h" #include "rtc_base/location.h"
namespace webrtc { namespace webrtc {
namespace {
constexpr int64_t kDoNothingProcessIntervalMs = 5000;
} // namespace
FakeNetworkPipeModule::~FakeNetworkPipeModule() = default;
FakeNetworkPipeModule::FakeNetworkPipeModule(
Clock* clock,
std::unique_ptr<NetworkBehaviorInterface> network_behavior,
Transport* transport)
: pipe_(clock, std::move(network_behavior), transport) {}
void FakeNetworkPipeModule::SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) {
pipe_.SendRtp(packet, length, options);
MaybeResumeProcess();
}
void FakeNetworkPipeModule::SendRtcp(const uint8_t* packet, size_t length) {
pipe_.SendRtcp(packet, length);
MaybeResumeProcess();
}
void FakeNetworkPipeModule::MaybeResumeProcess() {
rtc::CritScope cs(&process_thread_lock_);
if (!pending_process_ && pipe_.TimeUntilNextProcess() && process_thread_) {
process_thread_->WakeUp(nullptr);
}
}
int64_t FakeNetworkPipeModule::TimeUntilNextProcess() {
auto delay = pipe_.TimeUntilNextProcess();
rtc::CritScope cs(&process_thread_lock_);
pending_process_ = delay.has_value();
return delay.value_or(kDoNothingProcessIntervalMs);
}
void FakeNetworkPipeModule::ProcessThreadAttached(
ProcessThread* process_thread) {
rtc::CritScope cs(&process_thread_lock_);
process_thread_ = process_thread;
}
void FakeNetworkPipeModule::Process() {
pipe_.Process();
}
DegradedCall::DegradedCall( DegradedCall::DegradedCall(
std::unique_ptr<Call> call, std::unique_ptr<Call> call,
absl::optional<BuiltInNetworkBehaviorConfig> send_config, absl::optional<BuiltInNetworkBehaviorConfig> send_config,
@ -72,8 +121,8 @@ VideoSendStream* DegradedCall::CreateVideoSendStream(
if (send_config_ && !send_pipe_) { if (send_config_ && !send_pipe_) {
auto network = absl::make_unique<SimulatedNetwork>(*send_config_); auto network = absl::make_unique<SimulatedNetwork>(*send_config_);
send_simulated_network_ = network.get(); send_simulated_network_ = network.get();
send_pipe_ = absl::make_unique<FakeNetworkPipe>(clock_, std::move(network), send_pipe_ = absl::make_unique<FakeNetworkPipeModule>(
config.send_transport); clock_, std::move(network), config.send_transport);
config.send_transport = this; config.send_transport = this;
send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
} }
@ -89,8 +138,8 @@ VideoSendStream* DegradedCall::CreateVideoSendStream(
if (send_config_ && !send_pipe_) { if (send_config_ && !send_pipe_) {
auto network = absl::make_unique<SimulatedNetwork>(*send_config_); auto network = absl::make_unique<SimulatedNetwork>(*send_config_);
send_simulated_network_ = network.get(); send_simulated_network_ = network.get();
send_pipe_ = absl::make_unique<FakeNetworkPipe>(clock_, std::move(network), send_pipe_ = absl::make_unique<FakeNetworkPipeModule>(
config.send_transport); clock_, std::move(network), config.send_transport);
config.send_transport = this; config.send_transport = this;
send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
} }

View file

@ -32,6 +32,7 @@
#include "call/simulated_network.h" #include "call/simulated_network.h"
#include "call/video_receive_stream.h" #include "call/video_receive_stream.h"
#include "call/video_send_stream.h" #include "call/video_send_stream.h"
#include "modules/include/module.h"
#include "modules/utility/include/process_thread.h" #include "modules/utility/include/process_thread.h"
#include "rtc_base/bitrate_allocation_strategy.h" #include "rtc_base/bitrate_allocation_strategy.h"
#include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/copy_on_write_buffer.h"
@ -39,6 +40,30 @@
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
namespace webrtc { namespace webrtc {
class FakeNetworkPipeModule : public Module {
public:
FakeNetworkPipeModule(
Clock* clock,
std::unique_ptr<NetworkBehaviorInterface> network_behavior,
Transport* transport);
~FakeNetworkPipeModule() override;
void SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options);
void SendRtcp(const uint8_t* packet, size_t length);
// Implements Module interface
int64_t TimeUntilNextProcess() override;
void ProcessThreadAttached(ProcessThread* process_thread) override;
void Process() override;
private:
void MaybeResumeProcess();
FakeNetworkPipe pipe_;
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr;
bool pending_process_ RTC_GUARDED_BY(process_thread_lock_) = false;
};
class DegradedCall : public Call, private Transport, private PacketReceiver { class DegradedCall : public Call, private Transport, private PacketReceiver {
public: public:
@ -111,7 +136,7 @@ class DegradedCall : public Call, private Transport, private PacketReceiver {
const absl::optional<BuiltInNetworkBehaviorConfig> send_config_; const absl::optional<BuiltInNetworkBehaviorConfig> send_config_;
const std::unique_ptr<ProcessThread> send_process_thread_; const std::unique_ptr<ProcessThread> send_process_thread_;
SimulatedNetwork* send_simulated_network_; SimulatedNetwork* send_simulated_network_;
std::unique_ptr<FakeNetworkPipe> send_pipe_; std::unique_ptr<FakeNetworkPipeModule> send_pipe_;
size_t num_send_streams_; size_t num_send_streams_;
const absl::optional<BuiltInNetworkBehaviorConfig> receive_config_; const absl::optional<BuiltInNetworkBehaviorConfig> receive_config_;

View file

@ -24,7 +24,6 @@
namespace webrtc { namespace webrtc {
namespace { namespace {
constexpr int64_t kDefaultProcessIntervalMs = 5;
constexpr int64_t kLogIntervalMs = 5000; constexpr int64_t kLogIntervalMs = 5000;
} // namespace } // namespace
@ -167,12 +166,6 @@ bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
packets_in_flight_.pop_back(); packets_in_flight_.pop_back();
++dropped_packets_; ++dropped_packets_;
} }
if (network_behavior_->NextDeliveryTimeUs()) {
rtc::CritScope crit(&process_thread_lock_);
if (process_thread_)
process_thread_->WakeUp(nullptr);
}
return sent; return sent;
} }
@ -292,19 +285,14 @@ void FakeNetworkPipe::DeliverNetworkPacket(NetworkPacket* packet) {
} }
} }
int64_t FakeNetworkPipe::TimeUntilNextProcess() { absl::optional<int64_t> FakeNetworkPipe::TimeUntilNextProcess() {
rtc::CritScope crit(&process_lock_); rtc::CritScope crit(&process_lock_);
absl::optional<int64_t> delivery_us = network_behavior_->NextDeliveryTimeUs(); absl::optional<int64_t> delivery_us = network_behavior_->NextDeliveryTimeUs();
if (delivery_us) { if (delivery_us) {
int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds(); int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds();
return std::max<int64_t>((delay_us + 500) / 1000, 0); return std::max<int64_t>((delay_us + 500) / 1000, 0);
} }
return kDefaultProcessIntervalMs; return absl::nullopt;
}
void FakeNetworkPipe::ProcessThreadAttached(ProcessThread* process_thread) {
rtc::CritScope cs(&process_thread_lock_);
process_thread_ = process_thread;
} }
bool FakeNetworkPipe::HasTransport() const { bool FakeNetworkPipe::HasTransport() const {

View file

@ -86,8 +86,7 @@ class NetworkPacket {
// Class faking a network link, internally is uses an implementation of a // Class faking a network link, internally is uses an implementation of a
// SimulatedNetworkInterface to simulate network behavior. // SimulatedNetworkInterface to simulate network behavior.
class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface, class FakeNetworkPipe : public SimulatedPacketReceiverInterface {
public Transport {
public: public:
// Will keep |network_behavior| alive while pipe is alive itself. // Will keep |network_behavior| alive while pipe is alive itself.
// Use these constructors if you plan to insert packets using DeliverPacket(). // Use these constructors if you plan to insert packets using DeliverPacket().
@ -119,8 +118,8 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface,
// constructor. // constructor.
bool SendRtp(const uint8_t* packet, bool SendRtp(const uint8_t* packet,
size_t length, size_t length,
const PacketOptions& options) override; const PacketOptions& options);
bool SendRtcp(const uint8_t* packet, size_t length) override; bool SendRtcp(const uint8_t* packet, size_t length);
// Implements the PacketReceiver interface. When/if packets are delivered, // Implements the PacketReceiver interface. When/if packets are delivered,
// they will be passed directly to the receiver instance given in // they will be passed directly to the receiver instance given in
@ -138,8 +137,7 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface,
// Processes the network queues and trigger PacketReceiver::IncomingPacket for // Processes the network queues and trigger PacketReceiver::IncomingPacket for
// packets ready to be delivered. // packets ready to be delivered.
void Process() override; void Process() override;
int64_t TimeUntilNextProcess() override; absl::optional<int64_t> TimeUntilNextProcess() override;
void ProcessThreadAttached(ProcessThread* process_thread) override;
// Get statistics. // Get statistics.
float PercentageLoss(); float PercentageLoss();
@ -193,10 +191,6 @@ class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface,
// |process_lock| guards the data structures involved in delay and loss // |process_lock| guards the data structures involved in delay and loss
// processes, such as the packet queues. // processes, such as the packet queues.
rtc::CriticalSection process_lock_; rtc::CriticalSection process_lock_;
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr;
// Packets are added at the back of the deque, this makes the deque ordered // Packets are added at the back of the deque, this makes the deque ordered
// by increasing send time. The common case when removing packets from the // by increasing send time. The common case when removing packets from the
// deque is removing early packets, which will be close to the front of the // deque is removing early packets, which will be close to the front of the

View file

@ -20,6 +20,9 @@
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
namespace webrtc { namespace webrtc {
namespace {
constexpr int64_t kDefaultProcessDelayUs = 5000;
}
SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config, SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config,
uint64_t random_seed) uint64_t random_seed)
@ -76,15 +79,16 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
// calculated in UpdateCapacityQueue. // calculated in UpdateCapacityQueue.
queue_size_bytes_ += packet.size; queue_size_bytes_ += packet.size;
capacity_link_.push({packet, packet.send_time_us}); capacity_link_.push({packet, packet.send_time_us});
if (!next_process_time_us_) {
next_process_time_us_ = packet.send_time_us + kDefaultProcessDelayUs;
}
return true; return true;
} }
absl::optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const { absl::optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const {
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_); RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
if (!delay_link_.empty()) return next_process_time_us_;
return delay_link_.begin()->arrival_time_us;
return absl::nullopt;
} }
void SimulatedNetwork::UpdateCapacityQueue(ConfigState state, void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
@ -198,6 +202,14 @@ std::vector<PacketDeliveryInfo> SimulatedNetwork::DequeueDeliverablePackets(
PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us)); PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us));
delay_link_.pop_front(); delay_link_.pop_front();
} }
if (!delay_link_.empty()) {
next_process_time_us_ = delay_link_.front().arrival_time_us;
} else if (!capacity_link_.empty()) {
next_process_time_us_ = receive_time_us + kDefaultProcessDelayUs;
} else {
next_process_time_us_.reset();
}
return packets_to_deliver; return packets_to_deliver;
} }

View file

@ -87,6 +87,8 @@ class SimulatedNetwork : public NetworkBehaviorInterface {
int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0; int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0;
absl::optional<int64_t> last_capacity_link_visit_us_ absl::optional<int64_t> last_capacity_link_visit_us_
RTC_GUARDED_BY(process_checker_); RTC_GUARDED_BY(process_checker_);
absl::optional<int64_t> next_process_time_us_
RTC_GUARDED_BY(process_checker_);
}; };
} // namespace webrtc } // namespace webrtc

View file

@ -13,13 +13,12 @@
#include "api/test/simulated_network.h" #include "api/test/simulated_network.h"
#include "call/packet_receiver.h" #include "call/packet_receiver.h"
#include "modules/include/module.h"
namespace webrtc { namespace webrtc {
// Private API that is fixing surface between DirectTransport and underlying // Private API that is fixing surface between DirectTransport and underlying
// network conditions simulation implementation. // network conditions simulation implementation.
class SimulatedPacketReceiverInterface : public PacketReceiver, public Module { class SimulatedPacketReceiverInterface : public PacketReceiver {
public: public:
// Must not be called in parallel with DeliverPacket or Process. // Must not be called in parallel with DeliverPacket or Process.
// Destination receiver will be injected with this method // Destination receiver will be injected with this method
@ -27,6 +26,15 @@ class SimulatedPacketReceiverInterface : public PacketReceiver, public Module {
// Reports average packet delay. // Reports average packet delay.
virtual int AverageDelay() = 0; virtual int AverageDelay() = 0;
// Process any pending tasks such as timeouts.
// Called on a worker thread.
virtual void Process() = 0;
// Returns the time until next process or nullopt to indicate that the next
// process time is unknown. If the next process time is unknown, this should
// be checked again any time a packet is enqueued.
virtual absl::optional<int64_t> TimeUntilNextProcess() = 0;
}; };
} // namespace webrtc } // namespace webrtc

View file

@ -259,7 +259,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithEmptyPipeTest) {
// Check that all the packets were sent. // Check that all the packets were sent.
EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->SentPackets()); EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->SentPackets());
fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess());
EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0); EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0);
pipe->Process(); pipe->Process();
} }
@ -307,7 +307,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithPacketsInPipeTest) {
// Check that all the packets were sent. // Check that all the packets were sent.
EXPECT_EQ(static_cast<size_t>(kNumPackets), pipe->SentPackets()); EXPECT_EQ(static_cast<size_t>(kNumPackets), pipe->SentPackets());
fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess());
EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0); EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0);
pipe->Process(); pipe->Process();
} }

View file

@ -50,19 +50,18 @@ DirectTransport::DirectTransport(
} }
DirectTransport::~DirectTransport() { DirectTransport::~DirectTransport() {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); if (next_process_task_)
// Constructor updates |next_scheduled_task_|, so it's guaranteed to task_queue_->CancelTask(*next_process_task_);
// be initialized.
task_queue_->CancelTask(next_scheduled_task_);
} }
void DirectTransport::StopSending() { void DirectTransport::StopSending() {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); rtc::CritScope cs(&process_lock_);
task_queue_->CancelTask(next_scheduled_task_); if (next_process_task_)
task_queue_->CancelTask(*next_process_task_);
} }
void DirectTransport::SetReceiver(PacketReceiver* receiver) { void DirectTransport::SetReceiver(PacketReceiver* receiver) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); rtc::CritScope cs(&process_lock_);
fake_network_->SetReceiver(receiver); fake_network_->SetReceiver(receiver);
} }
@ -92,6 +91,9 @@ void DirectTransport::SendPacket(const uint8_t* data, size_t length) {
int64_t send_time = clock_->TimeInMicroseconds(); int64_t send_time = clock_->TimeInMicroseconds();
fake_network_->DeliverPacket(media_type, rtc::CopyOnWriteBuffer(data, length), fake_network_->DeliverPacket(media_type, rtc::CopyOnWriteBuffer(data, length),
send_time); send_time);
rtc::CritScope cs(&process_lock_);
if (!next_process_task_)
ProcessPackets();
} }
int DirectTransport::GetAverageDelayMs() { int DirectTransport::GetAverageDelayMs() {
@ -104,17 +106,20 @@ void DirectTransport::Start() {
send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp); send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp); send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
} }
SendPackets();
} }
void DirectTransport::SendPackets() { void DirectTransport::ProcessPackets() {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); next_process_task_.reset();
auto delay_ms = fake_network_->TimeUntilNextProcess();
if (delay_ms) {
next_process_task_ = task_queue_->PostDelayedTask(
[this]() {
fake_network_->Process(); fake_network_->Process();
rtc::CritScope cs(&process_lock_);
int64_t delay_ms = fake_network_->TimeUntilNextProcess(); ProcessPackets();
next_scheduled_task_ = },
task_queue_->PostDelayedTask([this]() { SendPackets(); }, delay_ms); *delay_ms);
}
} }
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc

View file

@ -60,7 +60,7 @@ class DirectTransport : public Transport {
int GetAverageDelayMs(); int GetAverageDelayMs();
private: private:
void SendPackets(); void ProcessPackets() RTC_EXCLUSIVE_LOCKS_REQUIRED(&process_lock_);
void SendPacket(const uint8_t* data, size_t length); void SendPacket(const uint8_t* data, size_t length);
void Start(); void Start();
@ -68,13 +68,14 @@ class DirectTransport : public Transport {
Clock* const clock_; Clock* const clock_;
SingleThreadedTaskQueueForTesting* const task_queue_; SingleThreadedTaskQueueForTesting* const task_queue_;
SingleThreadedTaskQueueForTesting::TaskId next_scheduled_task_
RTC_GUARDED_BY(&sequence_checker_); rtc::CriticalSection process_lock_;
absl::optional<SingleThreadedTaskQueueForTesting::TaskId> next_process_task_
RTC_GUARDED_BY(&process_lock_);
const Demuxer demuxer_; const Demuxer demuxer_;
const std::unique_ptr<SimulatedPacketReceiverInterface> fake_network_; const std::unique_ptr<SimulatedPacketReceiverInterface> fake_network_;
rtc::SequencedTaskChecker sequence_checker_;
}; };
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc