1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
use std::collections::VecDeque;
use ahash::AHashMap;
use crate::header::PackageId;
/// Data buffer based on a ring-buffer.
///
/// The underling data structures are optimized with the assumption that data
/// are inserted and removed in roughly FIFO manner.
pub(super) struct DataBuf {
data: VecDeque<u8>,
slots: VecDeque<Slot>,
/// Mapping from datagram ID to datagram ordinal. See [`Slot::ordinal`].
ordinals: AHashMap<PackageId, usize>,
}
impl DataBuf {
pub(super) fn new() -> Self {
Self {
data: VecDeque::new(),
slots: VecDeque::new(),
ordinals: AHashMap::new(),
}
}
/// Stores new data to the buffer.
///
/// # Panics
///
/// Panics if data with the same `id` is already stored.
pub(super) fn push(&mut self, id: PackageId, data: &[u8]) {
let (ordinal, data_offset) = match self.slots.back() {
Some(back) => (
back.ordinal.wrapping_add(1),
back.data_offset.wrapping_add(back.len),
),
None => (0, 0),
};
let prev = self.ordinals.insert(id, ordinal);
assert!(prev.is_none());
self.slots.push_back(Slot {
used: true,
ordinal,
data_offset,
len: data.len(),
});
self.data.extend(data);
}
/// See [`Self::get`] and [`Self::remove`].
///
/// # Panics
///
/// Panics if `buf` len is smaller than length of found data.
pub(super) fn get_and_remove(&mut self, id: PackageId, buf: &mut [u8]) -> Option<usize> {
let result = self.get(id, buf);
if result.is_some() {
self.remove(id);
}
result
}
/// Searches for data stored under ID `id`, and if found, writes the data
/// to `buf` and returns length of the data.
///
/// # Panics
///
/// Panics if `buf` len is smaller than length of found data.
pub(super) fn get(&self, id: PackageId, buf: &mut [u8]) -> Option<usize> {
let slot_index = self.slot_index(id)?;
let front = self.slots.front().unwrap();
let slot = self.slots.get(slot_index).unwrap();
let data_index = slot.data_offset.wrapping_sub(front.data_offset);
assert!(buf.len() >= slot.len);
for (source, target) in self.data.range(data_index..data_index + slot.len).zip(buf) {
*target = *source;
}
Some(slot.len)
}
/// Removes data stored with ID `id` or does nothing if such data do not
/// exist.
pub(super) fn remove(&mut self, id: PackageId) {
let Some(slot_index) = self.slot_index(id) else {
return;
};
self.slots.get_mut(slot_index).unwrap().used = false;
while let Some(front) = self.slots.front() {
if front.used {
break;
}
for _ in 0..front.len {
self.data.pop_front();
}
self.slots.pop_front().unwrap();
}
}
/// Get index (withing slots deque) of the slot with ID `id`.
fn slot_index(&self, id: PackageId) -> Option<usize> {
let ordinal = self.ordinals.get(&id)?;
// Slots can't be empty since the ordinal was found.
let front = self.slots.front().unwrap();
Some(ordinal.wrapping_sub(front.ordinal))
}
}
/// A single slot in a bytes ring-buffer.
///
/// A slot corresponds to a single (ring-like) contiguous part of the data
/// buffer. Every byte (position) in the data buffer belongs to exactly one
/// slot (i.e. no gaps and no overlaps).
///
/// A slot may be used or unused. Unused slots are pruned once they reach end
/// of the buffer.
struct Slot {
/// True if the slot is no longer used and may be pruned.
used: bool,
/// Unique number of the slot. Each new slot is assigned an ordinal, which
/// is a wrapping increment of the ordinal of the previous slot, or 0 if
/// there is no slots in the buffer at the time of slot creation.
///
/// Given a slot `a` and the first slot in the slots buffer `f`, index of
/// slot `a` is `a.ordinal.wrapping_sub(f.ordinal)`.
ordinal: usize,
/// Bytes offset of the slot since the last time the buffer was empty.
///
/// Due to the fact that slots are pruned, this number may not correspond
/// to the actual offset from the beginning of the data buffer. Offsets are
/// modular over the maximum of `usize` (wrapping versions of arithmetic
/// operations should be used).
///
/// Wrapping difference between data offsets of two (nearby) slots
/// corresponds to the distance of the slots inside the data buffer.
data_offset: usize,
/// Size (number of bytes) of the slot in the data buffer.
len: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_data_buf() {
let mut buf = [0u8; 512];
let mut data = DataBuf::new();
assert!(data
.get(PackageId::try_from(1).unwrap(), &mut buf)
.is_none());
data.push(PackageId::try_from(12).unwrap(), &[1, 2, 3, 4, 5, 6]);
data.push(PackageId::try_from(8).unwrap(), &[21, 22, 23]);
assert!(data
.get(PackageId::try_from(1).unwrap(), &mut buf)
.is_none());
assert_eq!(
data.get(PackageId::try_from(8).unwrap(), &mut buf).unwrap(),
3
);
assert_eq!(&buf[..3], &[21, 22, 23]);
assert_eq!(
data.get(PackageId::try_from(12).unwrap(), &mut buf)
.unwrap(),
6
);
assert_eq!(&buf[..6], &[1, 2, 3, 4, 5, 6]);
for i in 100..150 {
for j in (0..20).rev() {
let id = PackageId::try_from(i as u32 * 100 + j as u32).unwrap();
data.push(id, &[i, j, 23]);
}
for j in 0..20 {
let id = i as u32 * 100 + j as u32;
assert_eq!(
data.get(PackageId::try_from(id).unwrap(), &mut buf)
.unwrap(),
3
);
assert_eq!(&buf[..3], &[i, j, 23]);
data.remove(PackageId::try_from(id).unwrap());
}
}
}
}