1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
//! SSE broadcaster.
//!
//! This module contains the Server-Sent Events broadcaster, which is responsible for keeping track of connected clients and broadcasting messages to them.
//! For broadcasting, the broadcaster takes a `map_id` and an `Action` and broadcasts the action to all clients connected to that map.

#![allow(clippy::significant_drop_tightening)]

use actix_web_lab::sse::{self, ChannelStream, Sse};
use futures::{future::ready, stream, StreamExt};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::Mutex, time::interval};

use crate::model::dto::actions::Action;

/// Map that clients are connected to.
#[derive(Debug, Clone)]
struct ConnectedMap {
    /// Id of the map that the clients are connected to.
    map_id: i32,
    /// List of clients connected to the map.
    clients: Vec<sse::Sender>,
}

#[derive(Debug, Clone, Default)]
/// SSE broadcaster.
///
/// Inner `HashMap`:
/// * Map of `map_id` to a list of connected clients.
/// * The `map_id` is the id of the map that the client connected to.
/// * The connected map contains the `map_id` and a list of clients connected to that map.
pub struct Broadcaster(Arc<Mutex<HashMap<i32, ConnectedMap>>>);

impl Broadcaster {
    /// Constructs new broadcaster and spawns ping loop.
    #[must_use]
    pub fn new() -> Self {
        let broadcaster = Self::default();
        Self::spawn_ping(broadcaster.clone());
        broadcaster
    }

    /// Pings clients every 10 minutes to see if they are alive and remove them from the broadcast list if not.
    fn spawn_ping(self) {
        actix_web::rt::spawn(async move {
            let mut interval = interval(Duration::from_secs(600));
            loop {
                interval.tick().await;
                self.clone().remove_stale_clients().await;
            }
        });
    }

    /// Removes all non-responsive clients from broadcast list.
    /// TODO: this is a naive implementation, we should probably use a better data structure for this.
    ///       Things to consider:
    ///        - how can we do this without having to iterate over all clients?
    async fn remove_stale_clients(&self) {
        let mut guard = self.0.lock().await;

        let mut ok_maps = HashMap::with_capacity(guard.capacity());

        stream::iter(guard.values())
            .map(|map| async move {
                (
                    map,
                    stream::iter(&map.clients)
                        .filter(|client| async {
                            client
                                .send(sse::Event::Comment("ping".into()))
                                .await
                                .is_ok()
                        })
                        .map(|client| ready(client.clone()))
                        .buffer_unordered(15)
                        .collect::<Vec<_>>()
                        .await,
                )
            })
            .buffer_unordered(100)
            .filter(|(_, ok_clients)| ready(!ok_clients.is_empty()))
            .for_each(|(map, ok_clients)| {
                ok_maps.insert(
                    map.map_id,
                    ConnectedMap {
                        map_id: map.map_id,
                        clients: ok_clients,
                    },
                );
                ready(())
            })
            .await;

        *guard = ok_maps;
    }

    /// Registers client with broadcaster, returning an SSE response body.
    ///
    /// # Errors
    /// * If `sender.send()` fails for the new client.
    pub async fn new_client(
        &self,
        map_id: i32,
    ) -> Result<Sse<ChannelStream>, Box<dyn std::error::Error>> {
        let (sender, channel_stream) = sse::channel(100);
        let mut guard = self.0.lock().await;

        let map = guard.entry(map_id).or_insert_with(|| ConnectedMap {
            map_id,
            clients: Vec::new(),
        });

        sender.send(sse::Data::new("connected")).await?;

        map.clients.push(sender);

        Ok(channel_stream)
    }

    /// Broadcasts `msg` to all clients on the same map.
    pub async fn broadcast(&self, map_id: i32, action: Action) {
        let action_id = action.action_id.to_string();

        match sse::Data::new_json(action) {
            Ok(mut serialized_action) => {
                let guard = self.0.lock().await;

                serialized_action.set_id(action_id);

                if let Some(map) = guard.get(&map_id) {
                    // try to send to all clients, ignoring failures
                    // disconnected clients will get swept up by `remove_stale_clients`
                    let _ = stream::iter(&map.clients)
                        .map(|client| client.send(serialized_action.clone()))
                        .buffer_unordered(15)
                        .collect::<Vec<_>>()
                        .await;
                }
            }
            Err(err) => {
                // log the error and continue
                // serialization errors are also highly unlikely to happen
                log::error!("{}", err.to_string());
            }
        };
    }

    /// Broadcasts `msg` to all clients on all maps.
    pub async fn broadcast_all_maps(&self, action: Action) {
        let action_id = action.action_id.to_string();

        match sse::Data::new_json(action) {
            Ok(mut serialized_action) => {
                let guard = self.0.lock().await;

                serialized_action.set_id(action_id);

                let values = guard.values();
                for map in values {
                    // try to send to all clients, ignoring failures
                    // disconnected clients will get swept up by `remove_stale_clients`
                    let _ = stream::iter(&map.clients)
                        .map(|client| client.send(serialized_action.clone()))
                        .buffer_unordered(15)
                        .collect::<Vec<_>>()
                        .await;
                }
            }
            Err(err) => {
                // log the error and continue
                // serialization errors are also highly unlikely to happen
                log::error!("{}", err.to_string());
            }
        };
    }
}