/* * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "modules/pacing/packet_queue.h" #include #include #include #include "modules/include/module_common_types.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" namespace webrtc { PacketQueue::PacketQueue(const Clock* clock) : bytes_(0), clock_(clock), queue_time_sum_(0), time_last_updated_(clock_->TimeInMilliseconds()), paused_(false) {} PacketQueue::~PacketQueue() {} void PacketQueue::Push(const Packet& packet) { UpdateQueueTime(packet.enqueue_time_ms); // Store packet in list, use pointers in priority queue for cheaper moves. // Packets have a handle to its own iterator in the list, for easy removal // when popping from queue. packet_list_.push_front(packet); std::list::iterator it = packet_list_.begin(); it->this_it = it; // Handle for direct removal from list. prio_queue_.push(&(*it)); // Pointer into list. bytes_ += packet.bytes; } const PacketQueueInterface::Packet& PacketQueue::BeginPop() { const Packet& packet = *prio_queue_.top(); prio_queue_.pop(); return packet; } void PacketQueue::CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } void PacketQueue::FinalizePop(const Packet& packet) { bytes_ -= packet.bytes; int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); packet_queue_time_ms -= packet.sum_paused_ms; RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); queue_time_sum_ -= packet_queue_time_ms; packet_list_.erase(packet.this_it); RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); if (packet_list_.empty()) RTC_DCHECK_EQ(0, queue_time_sum_); } bool PacketQueue::Empty() const { return prio_queue_.empty(); } size_t PacketQueue::SizeInPackets() const { return prio_queue_.size(); } uint64_t PacketQueue::SizeInBytes() const { return bytes_; } int64_t PacketQueue::OldestEnqueueTimeMs() const { auto it = packet_list_.rbegin(); if (it == packet_list_.rend()) return 0; return it->enqueue_time_ms; } void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) { RTC_DCHECK_GE(timestamp_ms, time_last_updated_); if (timestamp_ms == time_last_updated_) return; int64_t delta_ms = timestamp_ms - time_last_updated_; if (paused_) { // Increase per-packet accumulators of time spent in queue while paused, // so that we can disregard that when subtracting main accumulator when // popping packet from the queue. for (auto& it : packet_list_) { it.sum_paused_ms += delta_ms; } } else { // Use packet packet_list_.size() not prio_queue_.size() here, as there // might be an outstanding element popped from prio_queue_ currently in // the SendPacket() call, while packet_list_ will always be correct. queue_time_sum_ += delta_ms * packet_list_.size(); } time_last_updated_ = timestamp_ms; } void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { if (paused_ == paused) return; UpdateQueueTime(timestamp_ms); paused_ = paused; } int64_t PacketQueue::AverageQueueTimeMs() const { if (prio_queue_.empty()) return 0; return queue_time_sum_ / packet_list_.size(); } } // namespace webrtc