1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::{cmp::Ordering, collections::BTreeSet};

use thiserror::Error;

use crate::header::{PackageId, PackageIdRange};

// This must be less than `u32::MAX / 2` due to ID ordering circularity issues.
const MAX_SKIPPED: usize = 1024;

/// Database of already received packages. It servers for duplicate and
/// out-of-order delivery detection.
pub(super) struct Received {
    highest_id: Option<PackageId>,
    holes: BTreeSet<PackageId>,
}

impl Received {
    pub(super) fn new() -> Self {
        Self {
            highest_id: None,
            holes: BTreeSet::new(),
        }
    }

    /// Registers package as received and returns delivery order continuity in
    /// respect with earlier sent packages.
    pub(super) fn process(&mut self, id: PackageId) -> Result<IdContinuity, ReceivedIdError> {
        let range_start = match self.highest_id {
            Some(highest) => match highest.cmp(&id) {
                Ordering::Less => highest.incremented(),
                Ordering::Greater => {
                    if self.holes.remove(&id) {
                        return Ok(match self.holes.first() {
                            Some(first) => {
                                if first.cmp(&id).is_ge() {
                                    IdContinuity::Continuous(*first)
                                } else {
                                    IdContinuity::Sparse
                                }
                            }
                            None => IdContinuity::Continuous(highest.incremented()),
                        });
                    } else {
                        return Err(ReceivedIdError::Duplicate);
                    }
                }
                Ordering::Equal => {
                    return Err(ReceivedIdError::Duplicate);
                }
            },
            None => PackageId::zero(),
        };

        let range = PackageIdRange::range(range_start, id);
        let skipped = range.size_hint().1.unwrap() + self.holes.len();
        if skipped > MAX_SKIPPED {
            return Err(ReceivedIdError::TooManySkipped(skipped));
        }

        self.highest_id = Some(id);
        for hole in range {
            self.holes.insert(hole);
        }

        Ok(if skipped == 0 {
            IdContinuity::Continuous(id.incremented())
        } else {
            IdContinuity::Sparse
        })
    }
}

#[derive(Debug, PartialEq, Eq)]
pub(super) enum IdContinuity {
    /// Some of the earlier packages has not yet been received.
    Sparse,
    /// All earlier packages has been received. Delivery discontinuity starts
    /// at the attached ID (i.e. first non yet received package).
    Continuous(PackageId),
}

#[derive(Error, Debug, PartialEq, Eq)]
pub(crate) enum ReceivedIdError {
    #[error("Duplicate package")]
    Duplicate,
    #[error("Too many packages skipped: {0}")]
    TooManySkipped(usize),
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_received() {
        let mut received = Received::new();

        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 2])),
            Ok(IdContinuity::Sparse)
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 1])),
            Ok(IdContinuity::Sparse)
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 1])),
            Err(ReceivedIdError::Duplicate)
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 0])),
            Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 3])))
        );

        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 5])),
            Ok(IdContinuity::Sparse)
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 3])),
            Ok(IdContinuity::Continuous(PackageId::from_bytes(&[0, 0, 4])))
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 5])),
            Err(ReceivedIdError::Duplicate)
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 6])),
            Ok(IdContinuity::Sparse)
        );
        assert_eq!(
            received.process(PackageId::from_bytes(&[0, 0, 3])),
            Err(ReceivedIdError::Duplicate)
        );

        assert_eq!(
            received.process(PackageId::from_bytes(&[50, 0, 6])),
            Err(ReceivedIdError::TooManySkipped(3276800))
        );
    }
}