Add growing buffer support to MRP

This commit is contained in:
adel-signal 2025-02-03 21:24:58 -08:00 committed by GitHub
parent c2024aedc3
commit 8abf69ae13
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 135 additions and 39 deletions

View file

@ -84,6 +84,21 @@ pub enum MrpSendError {
InnerSendFailed(anyhow::Error), InnerSendFailed(anyhow::Error),
} }
impl<SendData, ReceiveData> Default for MrpStream<SendData, ReceiveData>
where
SendData: Clone + Debug,
ReceiveData: Clone + Debug,
{
/// allows for unlimited buffers
fn default() -> Self {
Self {
should_ack: false,
send_buffer: BufferWindow::<PendingPacket<SendData>>::new(Self::INITIAL_SEQNUM),
receive_buffer: BufferWindow::<ReceiveData>::new(Self::INITIAL_ACKNUM),
}
}
}
impl<SendData, ReceiveData> MrpStream<SendData, ReceiveData> impl<SendData, ReceiveData> MrpStream<SendData, ReceiveData>
where where
SendData: Clone + Debug, SendData: Clone + Debug,
@ -92,14 +107,17 @@ where
const INITIAL_SEQNUM: u64 = 1; const INITIAL_SEQNUM: u64 = 1;
const INITIAL_ACKNUM: u64 = 1; const INITIAL_ACKNUM: u64 = 1;
pub fn new(max_window_size: usize) -> Self { pub fn with_capacity_limit(max_window_size: usize) -> Self {
Self { Self {
should_ack: false, should_ack: false,
send_buffer: BufferWindow::<PendingPacket<SendData>>::new( send_buffer: BufferWindow::<PendingPacket<SendData>>::with_capacity_limit(
max_window_size, max_window_size,
Self::INITIAL_SEQNUM, Self::INITIAL_SEQNUM,
), ),
receive_buffer: BufferWindow::<ReceiveData>::new(max_window_size, Self::INITIAL_ACKNUM), receive_buffer: BufferWindow::<ReceiveData>::with_capacity_limit(
max_window_size,
Self::INITIAL_ACKNUM,
),
} }
} }
@ -126,7 +144,7 @@ where
/// # use mrp::*; /// # use mrp::*;
/// # use std::time::{Duration, Instant}; /// # use std::time::{Duration, Instant};
/// type Packet = PacketWrapper<i32>; /// type Packet = PacketWrapper<i32>;
/// let mut stream = MrpStream::<Packet, Packet>::new(8); /// let mut stream = MrpStream::<Packet, Packet>::with_capacity_limit(8);
/// let mut inbox = Vec::with_capacity(8); /// let mut inbox = Vec::with_capacity(8);
/// ///
/// for i in 1..=9 { /// for i in 1..=9 {
@ -195,8 +213,8 @@ where
/// let ack = || PacketWrapper(MrpHeader::default(), "".to_string()); /// let ack = || PacketWrapper(MrpHeader::default(), "".to_string());
/// let (to_alice, alice_inbox) : (Sender<Packet>, Receiver<Packet>) = mpsc::channel(); /// let (to_alice, alice_inbox) : (Sender<Packet>, Receiver<Packet>) = mpsc::channel();
/// let (to_bob, bob_inbox) : (Sender<Packet>, Receiver<Packet>) = mpsc::channel(); /// let (to_bob, bob_inbox) : (Sender<Packet>, Receiver<Packet>) = mpsc::channel();
/// let mut alice = MrpStream::<Packet, Packet>::new(8); /// let mut alice = MrpStream::<Packet, Packet>::with_capacity_limit(8);
/// let mut bob = MrpStream::<Packet, Packet>::new(8); /// let mut bob = MrpStream::<Packet, Packet>::with_capacity_limit(8);
/// let tick = Duration::from_millis(10); /// let tick = Duration::from_millis(10);
/// ///
/// thread::spawn(move || { /// thread::spawn(move || {
@ -290,6 +308,14 @@ where
} }
} }
pub fn send_len(&self) -> usize {
self.send_buffer.len()
}
pub fn receive_len(&self) -> usize {
self.receive_buffer.len()
}
fn update_send_window( fn update_send_window(
&mut self, &mut self,
received_ack_num: u64, received_ack_num: u64,
@ -430,13 +456,13 @@ mod tests {
impl TestCase { impl TestCase {
fn new( fn new(
buffer_size: usize, buffer_size: Option<usize>,
alice_schedule: PacketSchedule, alice_schedule: PacketSchedule,
bob_schedule: PacketSchedule, bob_schedule: PacketSchedule,
) -> Self { ) -> Self {
TestCase { TestCase {
alice: MrpStream::new(buffer_size), alice: buffer_size.map_or_else(MrpStream::default, MrpStream::with_capacity_limit),
bob: MrpStream::new(buffer_size), bob: buffer_size.map_or_else(MrpStream::default, MrpStream::with_capacity_limit),
alice_inbox: RefCell::new(vec![]), alice_inbox: RefCell::new(vec![]),
bob_inbox: RefCell::new(vec![]), bob_inbox: RefCell::new(vec![]),
alice_schedule, alice_schedule,
@ -584,12 +610,40 @@ mod tests {
static NEVER_TIMEOUT: u64 = 10000000; static NEVER_TIMEOUT: u64 = 10000000;
static NO_RECEIVES: Vec<u64> = vec![]; static NO_RECEIVES: Vec<u64> = vec![];
#[test]
fn test_unlimited_buffers() {
// we send a large number at once so both send and receive buffers grow
let num_to_send = 512;
let mut tc = TestCase::new(
None,
Event::schedule_of((0..num_to_send).map(|_| (1, 5)).collect()),
Event::schedule_of((0..num_to_send).map(|_| (1, 5)).collect()),
);
tc.run_to(1);
assert_sent(tc.send_from_alice(NEVER_TIMEOUT), num_to_send);
assert_sent(tc.send_from_bob(NEVER_TIMEOUT), num_to_send);
assert_eq!(tc.recv_for_alice(), NO_RECEIVES);
assert_eq!(tc.recv_for_bob(), NO_RECEIVES);
assert_eq!(tc.updates_from_alice(), NO_UPDATES);
assert_eq!(tc.updates_from_bob(), NO_UPDATES);
let expected_recv = (1..=num_to_send as u64).collect::<Vec<_>>();
tc.run_to(5);
assert_sent(tc.send_from_alice(NEVER_TIMEOUT), 0);
assert_sent(tc.send_from_bob(NEVER_TIMEOUT), 0);
assert_eq!(tc.recv_for_alice(), expected_recv);
assert_eq!(tc.recv_for_bob(), expected_recv);
assert_eq!(tc.updates_from_alice(), acked(num_to_send as u64 + 1));
assert_eq!(tc.updates_from_bob(), acked(num_to_send as u64 + 1));
}
#[test] #[test]
fn test_ping_pong_one_direction() { fn test_ping_pong_one_direction() {
// Every tick, Alice sends a packet, Bob receives it and acks it // Every tick, Alice sends a packet, Bob receives it and acks it
// and Alice receives the ack // and Alice receives the ack
let mut tc = TestCase::new( let mut tc = TestCase::new(
16, Some(16),
Event::schedule_of((1..50).map(|i| (i, i)).collect()), Event::schedule_of((1..50).map(|i| (i, i)).collect()),
Event::schedule_of(vec![]), Event::schedule_of(vec![]),
); );
@ -609,7 +663,7 @@ mod tests {
fn test_ping_pong_two_directions() { fn test_ping_pong_two_directions() {
// Both Bob and Alice send, receive, ack, and receive ack in the same tick // Both Bob and Alice send, receive, ack, and receive ack in the same tick
let mut tc = TestCase::new( let mut tc = TestCase::new(
16, Some(16),
Event::schedule_of((1..50).map(|i| (i, i)).collect()), Event::schedule_of((1..50).map(|i| (i, i)).collect()),
Event::schedule_of((1..50).map(|i| (i, i)).collect()), Event::schedule_of((1..50).map(|i| (i, i)).collect()),
); );
@ -642,7 +696,7 @@ mod tests {
(ts, ts + delay) (ts, ts + delay)
}; };
let mut tc = TestCase::new( let mut tc = TestCase::new(
16, Some(16),
Event::schedule_of((10..=60).map(event).collect()), Event::schedule_of((10..=60).map(event).collect()),
Event::schedule_of((10..=60).map(event).collect()), Event::schedule_of((10..=60).map(event).collect()),
); );
@ -675,7 +729,7 @@ mod tests {
// Bob sends packets with similar pattern to // Bob sends packets with similar pattern to
// [test_out_of_order_buffering], receiving 9 every 10th tick // [test_out_of_order_buffering], receiving 9 every 10th tick
let mut tc = TestCase::new( let mut tc = TestCase::new(
16, Some(16),
Event::schedule_of(vec![ Event::schedule_of(vec![
(1, 1), (1, 1),
(2, 2), (2, 2),
@ -807,7 +861,7 @@ mod tests {
// Alice sends packets with timeouts that cause retransmissions. // Alice sends packets with timeouts that cause retransmissions.
// Retransmissions will instantly succeed (same tick). // Retransmissions will instantly succeed (same tick).
let mut tc = TestCase::new( let mut tc = TestCase::new(
16, Some(16),
Event::schedule_of(vec![ Event::schedule_of(vec![
// Packets 1-7: Test head of line blocking. Packet 4 is resent at t=10, // Packets 1-7: Test head of line blocking. Packet 4 is resent at t=10,
// so Packets 4-6 are returned at t=10 resulting in ack(7) at t=10, ack(8) at t=11 // so Packets 4-6 are returned at t=10 resulting in ack(7) at t=10, ack(8) at t=11
@ -956,8 +1010,8 @@ mod tests {
delay_min: Duration, delay_min: Duration,
delay_max: Duration, delay_max: Duration,
) { ) {
let alice = MrpStream::new(64); let alice = MrpStream::with_capacity_limit(64);
let bob = MrpStream::new(64); let bob = MrpStream::with_capacity_limit(64);
let (to_alice, alice_inbox) = mpsc::channel(); let (to_alice, alice_inbox) = mpsc::channel();
let (to_bob, bob_inbox) = mpsc::channel(); let (to_bob, bob_inbox) = mpsc::channel();
let alice_receiver = DelayReceiver::new( let alice_receiver = DelayReceiver::new(

View file

@ -3,6 +3,7 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
// //
use std::cmp::min;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt::Debug; use std::fmt::Debug;
use std::result::Result; use std::result::Result;
@ -20,18 +21,31 @@ pub enum WindowError {
pub struct BufferWindow<T: Debug> { pub struct BufferWindow<T: Debug> {
left: u64, left: u64,
data: VecDeque<Option<T>>, data: VecDeque<Option<T>>,
capacity_limit: usize,
} }
impl<T: Debug> BufferWindow<T> { impl<T: Debug> BufferWindow<T> {
const DEFAULT_SIZE: usize = 16;
/// left_bounds must be greater than 0 /// left_bounds must be greater than 0
pub fn new(max_size: usize, left_bounds: u64) -> Self { /// capacity_limit must be greater than 0
pub fn with_capacity_limit(capacity_limit: usize, left_bounds: u64) -> Self {
assert_ne!(left_bounds, 0, "Left bounds must be greater than 0"); assert_ne!(left_bounds, 0, "Left bounds must be greater than 0");
assert_ne!(capacity_limit, 0, "Capacity limit must be greater than 0");
let initial_capacity = min(capacity_limit, Self::DEFAULT_SIZE);
Self { Self {
left: left_bounds, left: left_bounds,
data: VecDeque::with_capacity(max_size), data: VecDeque::with_capacity(initial_capacity),
capacity_limit,
} }
} }
pub fn new(left_bounds: u64) -> Self {
assert_ne!(left_bounds, 0, "Left bounds must be greater than 0");
Self::with_capacity_limit(usize::MAX, left_bounds)
}
fn get_pos(&self, seqnum: u64) -> Result<usize, WindowError> { fn get_pos(&self, seqnum: u64) -> Result<usize, WindowError> {
if seqnum < self.left_bounds() { if seqnum < self.left_bounds() {
return Err(WindowError::BeforeWindow); return Err(WindowError::BeforeWindow);
@ -43,13 +57,17 @@ impl<T: Debug> BufferWindow<T> {
Ok((seqnum - self.left) as usize) Ok((seqnum - self.left) as usize)
} }
/// Max size of the window /// Max capacity of the window. Is not the current capacity
fn capacity(&self) -> usize { fn capacity_limit(&self) -> usize {
self.data.capacity() self.capacity_limit
}
pub fn len(&self) -> usize {
self.data.len()
} }
pub fn is_full(&self) -> bool { pub fn is_full(&self) -> bool {
self.data.len() == self.capacity() self.data.len() == self.capacity_limit()
} }
/// the highest seqnum of an element in the window or previously processed /// the highest seqnum of an element in the window or previously processed
@ -65,7 +83,7 @@ impl<T: Debug> BufferWindow<T> {
/// Current highest valid seqnum /// Current highest valid seqnum
pub fn right_bounds(&self) -> u64 { pub fn right_bounds(&self) -> u64 {
self.left + (self.capacity() as u64) - 1 self.left.saturating_add(self.capacity_limit() as u64 - 1)
} }
#[cfg(test)] #[cfg(test)]
@ -187,9 +205,9 @@ mod tests {
fn window_basics() { fn window_basics() {
let max_size = 4; let max_size = 4;
let mut base = 1000; let mut base = 1000;
let mut w = BufferWindow::new(max_size, 1); let mut w = BufferWindow::with_capacity_limit(max_size, 1);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// initializes to correct bounds // initializes to correct bounds
assert_eq!(w.left_bounds(), 1); assert_eq!(w.left_bounds(), 1);
@ -198,14 +216,14 @@ mod tests {
// Past bounds checking // Past bounds checking
assert_eq!(w.put(0, 0), Err(WindowError::BeforeWindow)); assert_eq!(w.put(0, 0), Err(WindowError::BeforeWindow));
assert_eq!(w.put(5, 0), Err(WindowError::AfterWindow)); assert_eq!(w.put(5, 0), Err(WindowError::AfterWindow));
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// Fill up initial window without changing bounds // Fill up initial window without changing bounds
for s in 1..=max_size as u64 { for s in 1..=max_size as u64 {
assert_eq!(w.put(s, base + s), Ok(())); assert_eq!(w.put(s, base + s), Ok(()));
assert_eq!(w.left_bounds(), 1); assert_eq!(w.left_bounds(), 1);
assert_eq!(w.right_bounds(), 4); assert_eq!(w.right_bounds(), 4);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
} }
assert!(w.is_full()); assert!(w.is_full());
@ -216,14 +234,14 @@ mod tests {
assert_eq!(w.put(s, base + s), Ok(())); assert_eq!(w.put(s, base + s), Ok(()));
assert_eq!(w.left_bounds(), 1); assert_eq!(w.left_bounds(), 1);
assert_eq!(w.right_bounds(), 4); assert_eq!(w.right_bounds(), 4);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
} }
// drains contiguous front and updates bounds // drains contiguous front and updates bounds
assert_eq!(w.drain_front(), Some((5, vec![2001, 2002, 2003, 2004]))); assert_eq!(w.drain_front(), Some((5, vec![2001, 2002, 2003, 2004])));
assert_eq!(w.left_bounds(), 5); assert_eq!(w.left_bounds(), 5);
assert_eq!(w.right_bounds(), 8); assert_eq!(w.right_bounds(), 8);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// Past bounds checking // Past bounds checking
assert_eq!(w.put(1, 0), Err(WindowError::BeforeWindow)); assert_eq!(w.put(1, 0), Err(WindowError::BeforeWindow));
@ -235,20 +253,20 @@ mod tests {
assert_eq!(w.put(6, 3002), Ok(())); assert_eq!(w.put(6, 3002), Ok(()));
assert_eq!(w.put(8, 3004), Ok(())); assert_eq!(w.put(8, 3004), Ok(()));
assert!(w.is_full()); assert!(w.is_full());
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// no drain, window does not move // no drain, window does not move
assert_eq!(w.drain_front(), None); assert_eq!(w.drain_front(), None);
assert_eq!(w.left_bounds(), 5); assert_eq!(w.left_bounds(), 5);
assert_eq!(w.right_bounds(), 8); assert_eq!(w.right_bounds(), 8);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// partial drain, window moves // partial drain, window moves
assert_eq!(w.put(5, 3001), Ok(())); assert_eq!(w.put(5, 3001), Ok(()));
assert_eq!(w.drain_front(), Some((7, vec![3001, 3002]))); assert_eq!(w.drain_front(), Some((7, vec![3001, 3002])));
assert_eq!(w.left_bounds(), 7); assert_eq!(w.left_bounds(), 7);
assert_eq!(w.right_bounds(), 10); assert_eq!(w.right_bounds(), 10);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// Past bounds checking // Past bounds checking
assert_eq!(w.put(6, 0), Err(WindowError::BeforeWindow)); assert_eq!(w.put(6, 0), Err(WindowError::BeforeWindow));
@ -261,22 +279,22 @@ mod tests {
assert_eq!(w.drop_front(1), 8); assert_eq!(w.drop_front(1), 8);
assert_eq!(w.left_bounds(), 8); assert_eq!(w.left_bounds(), 8);
assert_eq!(w.right_bounds(), 11); assert_eq!(w.right_bounds(), 11);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// can now drain, advance window // can now drain, advance window
assert_eq!(w.drain_front(), Some((9, vec![3004]))); assert_eq!(w.drain_front(), Some((9, vec![3004])));
assert_eq!(w.left_bounds(), 9); assert_eq!(w.left_bounds(), 9);
assert_eq!(w.right_bounds(), 12); assert_eq!(w.right_bounds(), 12);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// clear removes all data, and updates window // clear removes all data, and updates window
assert_eq!(w.put(9, 5001), Ok(())); assert_eq!(w.put(9, 5001), Ok(()));
assert_eq!(w.put(10, 5002), Ok(())); assert_eq!(w.put(10, 5002), Ok(()));
assert_eq!(w.put(11, 5003), Ok(())); assert_eq!(w.put(11, 5003), Ok(()));
assert_eq!(w.put(12, 5004), Ok(())); assert_eq!(w.put(12, 5004), Ok(()));
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
w.clear(100); w.clear(100);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
assert_eq!(w.left_bounds(), 100); assert_eq!(w.left_bounds(), 100);
assert_eq!(w.right_bounds(), 103); assert_eq!(w.right_bounds(), 103);
assert_eq!(w.drain_front(), None); assert_eq!(w.drain_front(), None);
@ -290,7 +308,7 @@ mod tests {
assert_eq!(w.drop_front(9_800), 10_000); assert_eq!(w.drop_front(9_800), 10_000);
assert_eq!(w.left_bounds(), 10_000); assert_eq!(w.left_bounds(), 10_000);
assert_eq!(w.right_bounds(), 10_003); assert_eq!(w.right_bounds(), 10_003);
assert_eq!(w.capacity(), max_size); assert_eq!(w.capacity_limit(), max_size);
// drop non-contiguous works // drop non-contiguous works
assert_eq!(w.put(10_003, 4001), Ok(())); assert_eq!(w.put(10_003, 4001), Ok(()));
@ -302,7 +320,7 @@ mod tests {
#[test] #[test]
fn window_iter() { fn window_iter() {
let mut w: BufferWindow<u64> = BufferWindow::new(4, 1); let mut w: BufferWindow<u64> = BufferWindow::with_capacity_limit(4, 1);
// empty window test // empty window test
let mut iter = w.iter(); let mut iter = w.iter();
@ -353,4 +371,28 @@ mod tests {
assert_eq!(iter.next(), Some(Some(&2003))); assert_eq!(iter.next(), Some(Some(&2003)));
assert_eq!(iter.next(), None); assert_eq!(iter.next(), None);
} }
#[test]
fn test_growing_window() {
fn test_window(mut w: BufferWindow<bool>) {
for i in 1..(2 * w.len() as u64) {
assert_eq!(
Ok(()),
w.put(i, true),
"Should grow with contiguous appends"
);
}
for i in (w.max_seen_seqnum()..(4 * w.max_seen_seqnum())).step_by(4) {
assert_eq!(
Ok(()),
w.put(i, true),
"Should grow with non-contiguous appends"
);
}
}
test_window(BufferWindow::new(1));
test_window(BufferWindow::with_capacity_limit(1024, 1));
}
} }

View file

@ -1354,7 +1354,7 @@ impl Client {
raised_hands: Vec::new(), raised_hands: Vec::new(),
raise_hand_state: RaiseHandState::default(), raise_hand_state: RaiseHandState::default(),
sfu_reliable_stream: MrpStream::new(RELIABLE_RTP_BUFFER_SIZE), sfu_reliable_stream: MrpStream::with_capacity_limit(RELIABLE_RTP_BUFFER_SIZE),
actor, actor,
}) })