1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::time::{Duration, Instant};

use async_std::{channel::Receiver, future::timeout};
use de_messages::{BorrowedFromPlayers, FromGame, ToPlayers};
use de_net::{OutPackage, PackageSender, Peers};
use tracing::{error, info, warn};

use super::{message::InMessage, state::GameState};

pub(super) async fn run(
    port: u16,
    messages: Receiver<InMessage<ToPlayers>>,
    outputs: PackageSender,
    mut state: GameState,
) {
    info!("Starting game player package handler on port {port}...");

    'main: loop {
        if messages.is_closed() {
            break;
        }

        if outputs.is_closed() {
            error!("Outputs channel on port {port} was unexpectedly closed.");
            break;
        }

        let message = match timeout(Duration::from_millis(10), messages.recv()).await {
            Ok(Ok(message)) => Some(message),
            Ok(Err(_)) => break 'main,
            Err(_) => None,
        };

        if let Some(message) = message {
            let time = Instant::now();
            let meta = message.meta();
            let Some(player_id) = state.id(meta.source).await else {
                warn!(
                    "Received a player message from a non-participating client: {:?}.",
                    meta.source
                );

                let _ = outputs
                    .send(
                        OutPackage::encode_single(
                            &FromGame::NotJoined,
                            meta.reliability,
                            Peers::Server,
                            meta.source,
                        )
                        .unwrap(),
                    )
                    .await;
                continue;
            };

            let out_message = BorrowedFromPlayers::new(player_id, message.message());
            for buffer in state.lock().await.buffers_mut(Some(meta.source)) {
                if let Err(err) = buffer.push(meta.reliability, &out_message, time) {
                    warn!("Could not encode player message, skipping: {err:?}");
                }
            }
        }

        let mut guard = state.lock().await;
        let time = Instant::now();
        for buffer in guard.buffers_mut(None) {
            for output in buffer.build(time) {
                let result = outputs.send(output).await;
                if result.is_err() {
                    break 'main;
                }
            }
        }
    }

    info!("Game player package handler on port {port} finished.");
}