1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::time::Duration;

use async_std::{
    channel::{Receiver, Sender},
    future::timeout,
};
use tracing::{error, info, trace, warn};

use super::{cancellation::CancellationSender, dreceiver::InPackageDatagram};
use crate::{
    connection::{DeliveryHandler, ReceivedIdError},
    record::DeliveryRecord,
    InPackage, MAX_PACKAGE_SIZE,
};

/// Handler of user datagrams, i.e. datagrams with user data targeted to
/// higher-level users of the network protocol.
///
/// The handler runs a loop which finishes when `datagrams` or `packages`
/// channel is closed.
pub(super) async fn run(
    port: u16,
    _cancellation: CancellationSender,
    datagrams: Receiver<InPackageDatagram>,
    packages: Sender<InPackage>,
    mut delivery_handler: DeliveryHandler,
) {
    info!("Starting package receiver on port {port}...");

    let mut buf = vec![0; MAX_PACKAGE_SIZE];

    'main: loop {
        let Ok(result) = timeout(Duration::from_millis(500), datagrams.recv()).await else {
            if packages.is_closed() {
                // This must be here in case of no incoming packages to ensure
                // that the check is done at least once every 500ms.
                break;
            } else {
                continue;
            }
        };

        // This must be here in case of both a) packages are incoming
        // frequently (so no timeouts above), b) packages are skipped because
        // they are duplicates (so no packages.send() is called).
        if packages.is_closed() {
            break;
        }

        let Ok(datagram) = result else {
            error!("Datagram receiver channel is unexpectedly closed.");
            break;
        };
        let record = DeliveryRecord::now(datagram.header);

        if datagram.header.reliability().is_reliable() {
            let mut guard = delivery_handler.lock().await;
            match guard.received(datagram.source, record, datagram.data, &mut buf) {
                Ok(deliveries) => {
                    for (record, data) in deliveries {
                        let result = packages
                            .send(InPackage::new(
                                data,
                                record.header().reliability(),
                                record.header().peers(),
                                datagram.source,
                                record.time(),
                            ))
                            .await;
                        if result.is_err() {
                            break 'main;
                        }
                    }
                }
                Err(ReceivedIdError::Duplicate) => {
                    trace!(
                        "Duplicate delivery of package {:?} from {:?}.",
                        datagram.header.id(),
                        datagram.source
                    );
                }
                Err(err) => {
                    warn!("Received package error: {err:?}");
                }
            }
        } else {
            let result = packages
                .send(InPackage::new(
                    datagram.data,
                    datagram.header.reliability(),
                    datagram.header.peers(),
                    datagram.source,
                    record.time(),
                ))
                .await;
            if result.is_err() {
                break 'main;
            }
        }
    }

    info!("Package receiver on port {port} finished.");
}