1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
use std::collections::VecDeque;

use ahash::AHashMap;

use crate::header::PackageId;

/// Data buffer based on a ring-buffer.
///
/// The underling data structures are optimized with the assumption that data
/// are inserted and removed in roughly FIFO manner.
pub(super) struct DataBuf {
    data: VecDeque<u8>,
    slots: VecDeque<Slot>,
    /// Mapping from datagram ID to datagram ordinal. See [`Slot::ordinal`].
    ordinals: AHashMap<PackageId, usize>,
}

impl DataBuf {
    pub(super) fn new() -> Self {
        Self {
            data: VecDeque::new(),
            slots: VecDeque::new(),
            ordinals: AHashMap::new(),
        }
    }

    /// Stores new data to the buffer.
    ///
    /// # Panics
    ///
    /// Panics if data with the same `id` is already stored.
    pub(super) fn push(&mut self, id: PackageId, data: &[u8]) {
        let (ordinal, data_offset) = match self.slots.back() {
            Some(back) => (
                back.ordinal.wrapping_add(1),
                back.data_offset.wrapping_add(back.len),
            ),
            None => (0, 0),
        };

        let prev = self.ordinals.insert(id, ordinal);
        assert!(prev.is_none());

        self.slots.push_back(Slot {
            used: true,
            ordinal,
            data_offset,
            len: data.len(),
        });

        self.data.extend(data);
    }

    /// See [`Self::get`] and [`Self::remove`].
    ///
    /// # Panics
    ///
    /// Panics if `buf` len is smaller than length of found data.
    pub(super) fn get_and_remove(&mut self, id: PackageId, buf: &mut [u8]) -> Option<usize> {
        let result = self.get(id, buf);
        if result.is_some() {
            self.remove(id);
        }
        result
    }

    /// Searches for data stored under ID `id`, and if found, writes the data
    /// to `buf` and returns length of the data.
    ///
    /// # Panics
    ///
    /// Panics if `buf` len is smaller than length of found data.
    pub(super) fn get(&self, id: PackageId, buf: &mut [u8]) -> Option<usize> {
        let slot_index = self.slot_index(id)?;

        let front = self.slots.front().unwrap();
        let slot = self.slots.get(slot_index).unwrap();

        let data_index = slot.data_offset.wrapping_sub(front.data_offset);

        assert!(buf.len() >= slot.len);

        for (source, target) in self.data.range(data_index..data_index + slot.len).zip(buf) {
            *target = *source;
        }

        Some(slot.len)
    }

    /// Removes data stored with ID `id` or does nothing if such data do not
    /// exist.
    pub(super) fn remove(&mut self, id: PackageId) {
        let Some(slot_index) = self.slot_index(id) else {
            return;
        };
        self.slots.get_mut(slot_index).unwrap().used = false;

        while let Some(front) = self.slots.front() {
            if front.used {
                break;
            }

            for _ in 0..front.len {
                self.data.pop_front();
            }
            self.slots.pop_front().unwrap();
        }
    }

    /// Get index (withing slots deque) of the slot with ID `id`.
    fn slot_index(&self, id: PackageId) -> Option<usize> {
        let ordinal = self.ordinals.get(&id)?;

        // Slots can't be empty since the ordinal was found.
        let front = self.slots.front().unwrap();
        Some(ordinal.wrapping_sub(front.ordinal))
    }
}

/// A single slot in a bytes ring-buffer.
///
/// A slot corresponds to a single (ring-like) contiguous part of the data
/// buffer. Every byte (position) in the data buffer belongs to exactly one
/// slot (i.e. no gaps and no overlaps).
///
/// A slot may be used or unused. Unused slots are pruned once they reach end
/// of the buffer.
struct Slot {
    /// True if the slot is no longer used and may be pruned.
    used: bool,
    /// Unique number of the slot. Each new slot is assigned an ordinal, which
    /// is a wrapping increment of the ordinal of the previous slot, or 0 if
    /// there is no slots in the buffer at the time of slot creation.
    ///
    /// Given a slot `a` and the first slot in the slots buffer `f`, index of
    /// slot `a` is `a.ordinal.wrapping_sub(f.ordinal)`.
    ordinal: usize,
    /// Bytes offset of the slot since the last time the buffer was empty.
    ///
    /// Due to the fact that slots are pruned, this number may not correspond
    /// to the actual offset from the beginning of the data buffer. Offsets are
    /// modular over the maximum of `usize` (wrapping versions of arithmetic
    /// operations should be used).
    ///
    /// Wrapping difference between data offsets of two (nearby) slots
    /// corresponds to the distance of the slots inside the data buffer.
    data_offset: usize,
    /// Size (number of bytes) of the slot in the data buffer.
    len: usize,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_data_buf() {
        let mut buf = [0u8; 512];
        let mut data = DataBuf::new();

        assert!(data
            .get(PackageId::try_from(1).unwrap(), &mut buf)
            .is_none());

        data.push(PackageId::try_from(12).unwrap(), &[1, 2, 3, 4, 5, 6]);
        data.push(PackageId::try_from(8).unwrap(), &[21, 22, 23]);
        assert!(data
            .get(PackageId::try_from(1).unwrap(), &mut buf)
            .is_none());
        assert_eq!(
            data.get(PackageId::try_from(8).unwrap(), &mut buf).unwrap(),
            3
        );
        assert_eq!(&buf[..3], &[21, 22, 23]);
        assert_eq!(
            data.get(PackageId::try_from(12).unwrap(), &mut buf)
                .unwrap(),
            6
        );
        assert_eq!(&buf[..6], &[1, 2, 3, 4, 5, 6]);

        for i in 100..150 {
            for j in (0..20).rev() {
                let id = PackageId::try_from(i as u32 * 100 + j as u32).unwrap();
                data.push(id, &[i, j, 23]);
            }

            for j in 0..20 {
                let id = i as u32 * 100 + j as u32;
                assert_eq!(
                    data.get(PackageId::try_from(id).unwrap(), &mut buf)
                        .unwrap(),
                    3
                );
                assert_eq!(&buf[..3], &[i, j, 23]);
                data.remove(PackageId::try_from(id).unwrap());
            }
        }
    }
}