actix_web_lab/
sse.rs

1//! Semantic server-sent events (SSE) responder with a channel-like interface.
2//!
3//! # Examples
4//! ```no_run
5//! use std::{convert::Infallible, time::Duration};
6//!
7//! use actix_web::{get, Responder};
8//! use actix_web_lab::sse;
9//!
10//! #[get("/from-channel")]
11//! async fn from_channel() -> impl Responder {
12//!     let (sender, sse_stream) = sse::channel(10);
13//!
14//!     // note: sender will typically be spawned or handed off somewhere else
15//!     let _ = sender.send(sse::Event::Comment("my comment".into())).await;
16//!     let _ = sender
17//!         .send(sse::Data::new("my data").event("chat_msg"))
18//!         .await;
19//!
20//!     sse_stream.with_retry_duration(Duration::from_secs(10))
21//! }
22//!
23//! #[get("/from-stream")]
24//! async fn from_stream() -> impl Responder {
25//!     let event_stream = futures_util::stream::iter([Ok::<_, Infallible>(sse::Event::Data(
26//!         sse::Data::new("foo"),
27//!     ))]);
28//!
29//!     sse::Sse::from_stream(event_stream).with_keep_alive(Duration::from_secs(5))
30//! }
31//! ```
32//!
33//! Complete usage examples can be found in the examples directory of the source code repo.
34
35#![doc(
36    alias = "server sent",
37    alias = "server-sent",
38    alias = "server sent events",
39    alias = "server-sent events",
40    alias = "event-stream"
41)]
42
43use std::{
44    convert::Infallible,
45    pin::Pin,
46    task::{Context, Poll},
47    time::Duration,
48};
49
50use actix_web::{
51    body::{BodySize, BoxBody, MessageBody},
52    http::header::ContentEncoding,
53    HttpRequest, HttpResponse, Responder,
54};
55use bytes::{BufMut as _, Bytes, BytesMut};
56use bytestring::ByteString;
57use derive_more::{Display, Error};
58use futures_core::Stream;
59use pin_project_lite::pin_project;
60use serde::Serialize;
61use tokio::{
62    sync::mpsc,
63    time::{interval, Interval},
64};
65
66use crate::{
67    header::{CacheControl, CacheDirective},
68    BoxError,
69};
70
71/// Error returned from [`SseSender::send()`].
72#[derive(Debug, Display, Error)]
73#[display(fmt = "channel closed")]
74#[non_exhaustive]
75pub struct SendError(#[error(not(source))] Event);
76
77#[doc(hidden)]
78#[deprecated(
79    since = "0.17.0",
80    note = "Renamed to `SendError`. Prefer `sse::SendError`."
81)]
82pub type SseSendError = SendError;
83
84/// Error returned from [`SseSender::try_send()`].
85///
86/// In each case, the original message is returned back to you.
87#[derive(Debug, Display, Error)]
88#[non_exhaustive]
89pub enum TrySendError {
90    /// The SSE send buffer is full.
91    #[display(fmt = "buffer full")]
92    Full(#[error(not(source))] Event),
93
94    /// The receiving ([`Sse`]) has been dropped, likely because the client disconnected.
95    #[display(fmt = "channel closed")]
96    Closed(#[error(not(source))] Event),
97}
98
99#[doc(hidden)]
100#[deprecated(
101    since = "0.17.0",
102    note = "Renamed to `TrySendError`. Prefer `sse::TrySendError`."
103)]
104pub type SseTrySendError = TrySendError;
105
106/// Server-sent events data message containing a `data` field and optional `id` and `event` fields.
107///
108/// Since it implements `Into<SseMessage>`, this can be passed directly to [`send`](SseSender::send)
109/// or [`try_send`](SseSender::try_send).
110///
111/// # Examples
112/// ```
113/// # #[actix_web::main] async fn test() {
114/// use std::convert::Infallible;
115///
116/// use actix_web::body;
117/// use actix_web_lab::sse;
118/// use futures_util::stream;
119/// use serde::Serialize;
120///
121/// #[derive(serde::Serialize)]
122/// struct Foo {
123///     bar: u32,
124/// }
125///
126/// let sse = sse::Sse::from_stream(stream::iter([
127///     Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo"))),
128///     Ok::<_, Infallible>(sse::Event::Data(
129///         sse::Data::new_json(Foo { bar: 42 }).unwrap(),
130///     )),
131/// ]));
132///
133/// assert_eq!(
134///     body::to_bytes(sse).await.unwrap(),
135///     "data: foo\n\ndata: {\"bar\":42}\n\n",
136/// );
137/// # }; test();
138/// ```
139#[must_use]
140#[derive(Debug, Clone)]
141pub struct Data {
142    id: Option<ByteString>,
143    event: Option<ByteString>,
144    data: ByteString,
145}
146
147#[doc(hidden)]
148#[deprecated(since = "0.17.0", note = "Renamed to `Data`. Prefer `sse::Data`.")]
149pub type SseData = Data;
150
151impl Data {
152    /// Constructs a new SSE data message with just the `data` field.
153    ///
154    /// # Examples
155    /// ```
156    /// use actix_web_lab::sse;
157    /// let event = sse::Event::Data(sse::Data::new("foo"));
158    /// ```
159    pub fn new(data: impl Into<ByteString>) -> Self {
160        Self {
161            id: None,
162            event: None,
163            data: data.into(),
164        }
165    }
166
167    /// Constructs a new SSE data message the `data` field set to `data` serialized as JSON.
168    ///
169    /// # Examples
170    /// ```
171    /// use actix_web_lab::sse;
172    ///
173    /// #[derive(serde::Serialize)]
174    /// struct Foo {
175    ///     bar: u32,
176    /// }
177    ///
178    /// let event = sse::Event::Data(sse::Data::new_json(Foo { bar: 42 }).unwrap());
179    /// ```
180    pub fn new_json(data: impl Serialize) -> Result<Self, serde_json::Error> {
181        Ok(Self {
182            id: None,
183            event: None,
184            data: serde_json::to_string(&data)?.into(),
185        })
186    }
187
188    /// Sets `data` field.
189    pub fn set_data(&mut self, data: impl Into<ByteString>) {
190        self.data = data.into();
191    }
192
193    /// Sets `id` field, returning a new data message.
194    pub fn id(mut self, id: impl Into<ByteString>) -> Self {
195        self.id = Some(id.into());
196        self
197    }
198
199    /// Sets `id` field.
200    pub fn set_id(&mut self, id: impl Into<ByteString>) {
201        self.id = Some(id.into());
202    }
203
204    /// Sets `event` name field, returning a new data message.
205    pub fn event(mut self, event: impl Into<ByteString>) -> Self {
206        self.event = Some(event.into());
207        self
208    }
209
210    /// Sets `event` name field.
211    pub fn set_event(&mut self, event: impl Into<ByteString>) {
212        self.event = Some(event.into());
213    }
214}
215
216impl From<Data> for Event {
217    fn from(data: Data) -> Self {
218        Self::Data(data)
219    }
220}
221
222/// Server-sent events message containing one or more fields.
223#[must_use]
224#[derive(Debug, Clone)]
225pub enum Event {
226    /// A `data` message with optional ID and event name.
227    ///
228    /// Data messages looks like this in the response stream.
229    /// ```plain
230    /// event: foo
231    /// id: 42
232    /// data: my data
233    ///
234    /// data: {
235    /// data:   "multiline": "data"
236    /// data: }
237    /// ```
238    Data(Data),
239
240    /// A comment message.
241    ///
242    /// Comments look like this in the response stream.
243    /// ```plain
244    /// : my comment
245    ///
246    /// : another comment
247    /// ```
248    Comment(ByteString),
249}
250
251#[doc(hidden)]
252#[deprecated(since = "0.17.0", note = "Renamed to `Event`. Prefer `sse::Event`.")]
253pub type SseMessage = Event;
254
255impl Event {
256    /// Splits data into lines and prepend each line with `prefix`.
257    fn line_split_with_prefix(buf: &mut BytesMut, prefix: &'static str, data: ByteString) {
258        // initial buffer size guess is len(data) + 10 lines of prefix + EOLs + EOF
259        buf.reserve(data.len() + (10 * (prefix.len() + 1)) + 1);
260
261        // append prefix + space + line to buffer
262        for line in data.split('\n') {
263            buf.put_slice(prefix.as_bytes());
264            buf.put_slice(line.as_bytes());
265            buf.put_u8(b'\n');
266        }
267    }
268
269    /// Serializes message into event-stream format.
270    fn into_bytes(self) -> Bytes {
271        let mut buf = BytesMut::new();
272
273        match self {
274            Event::Data(Data { id, event, data }) => {
275                if let Some(text) = id {
276                    buf.put_slice(b"id: ");
277                    buf.put_slice(text.as_bytes());
278                    buf.put_u8(b'\n');
279                }
280
281                if let Some(text) = event {
282                    buf.put_slice(b"event: ");
283                    buf.put_slice(text.as_bytes());
284                    buf.put_u8(b'\n');
285                }
286
287                Self::line_split_with_prefix(&mut buf, "data: ", data);
288            }
289
290            Event::Comment(text) => Self::line_split_with_prefix(&mut buf, ": ", text),
291        }
292
293        // final newline to mark end of message
294        buf.put_u8(b'\n');
295
296        buf.freeze()
297    }
298
299    /// Serializes retry message into event-stream format.
300    fn retry_to_bytes(retry: Duration) -> Bytes {
301        Bytes::from(format!("retry: {}\n\n", retry.as_millis()))
302    }
303
304    /// Serializes a keep-alive event-stream comment message into bytes.
305    const fn keep_alive_bytes() -> Bytes {
306        Bytes::from_static(b": keep-alive\n\n")
307    }
308}
309
310/// Sender half of a server-sent events stream.
311#[must_use]
312#[derive(Debug, Clone)]
313pub struct Sender {
314    tx: mpsc::Sender<Event>,
315}
316
317#[doc(hidden)]
318#[deprecated(since = "0.17.0", note = "Renamed to `Sender`. Prefer `sse::Sender`.")]
319pub type SseSender = Sender;
320
321impl Sender {
322    /// Send an SSE message.
323    ///
324    /// # Errors
325    /// Errors if the receiving ([`Sse`]) has been dropped, likely because the client disconnected.
326    ///
327    /// # Examples
328    /// ```
329    /// #[actix_web::main] async fn test() {
330    /// use actix_web_lab::sse;
331    ///
332    /// let (sender, sse_stream) = sse::channel(5);
333    /// sender.send(sse::Data::new("my data").event("my event name")).await.unwrap();
334    /// sender.send(sse::Event::Comment("my comment".into())).await.unwrap();
335    /// # } test();
336    /// ```
337    pub async fn send(&self, msg: impl Into<Event>) -> Result<(), SendError> {
338        self.tx
339            .send(msg.into())
340            .await
341            .map_err(|mpsc::error::SendError(ev)| SendError(ev))
342    }
343
344    /// Tries to send SSE message.
345    ///
346    /// # Errors
347    /// Errors if:
348    /// - the the SSE buffer is currently full;
349    /// - the receiving ([`Sse`]) has been dropped, likely because the client disconnected.
350    ///
351    /// # Examples
352    /// ```
353    /// #[actix_web::main] async fn test() {
354    /// use actix_web_lab::sse;
355    ///
356    /// let (sender, sse_stream) = sse::channel(5);
357    /// sender.try_send(sse::Data::new("my data").event("my event name")).unwrap();
358    /// sender.try_send(sse::Event::Comment("my comment".into())).unwrap();
359    /// # } test();
360    /// ```
361    pub fn try_send(&self, msg: impl Into<Event>) -> Result<(), TrySendError> {
362        self.tx.try_send(msg.into()).map_err(|err| match err {
363            mpsc::error::TrySendError::Full(ev) => TrySendError::Full(ev),
364            mpsc::error::TrySendError::Closed(ev) => TrySendError::Closed(ev),
365        })
366    }
367}
368
369pin_project! {
370    /// Server-sent events (`text/event-stream`) responder.
371    ///
372    /// Constructed with an [SSE channel](channel) or [using your own stream](Self::from_stream).
373    #[must_use]
374    #[derive(Debug)]
375    pub struct Sse<S> {
376        #[pin]
377        stream: S,
378        keep_alive: Option<Interval>,
379        retry_interval: Option<Duration>,
380    }
381}
382
383impl<S, E> Sse<S>
384where
385    S: Stream<Item = Result<Event, E>> + 'static,
386    E: Into<BoxError>,
387{
388    /// Create an SSE response from a stream that yields SSE [Event]s.
389    pub fn from_stream(stream: S) -> Self {
390        Self {
391            stream,
392            keep_alive: None,
393            retry_interval: None,
394        }
395    }
396}
397
398impl<S> Sse<S> {
399    /// Enables "keep-alive" messages to be send in the event stream after a period of inactivity.
400    ///
401    /// By default, no keep-alive is set up.
402    pub fn with_keep_alive(mut self, keep_alive_period: Duration) -> Self {
403        let mut int = interval(keep_alive_period);
404        int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
405
406        self.keep_alive = Some(int);
407        self
408    }
409
410    /// Queues first event message to inform client of custom retry period.
411    ///
412    /// Browsers default to retry every 3 seconds or so.
413    pub fn with_retry_duration(mut self, retry: Duration) -> Self {
414        self.retry_interval = Some(retry);
415        self
416    }
417}
418
419impl<S, E> Responder for Sse<S>
420where
421    S: Stream<Item = Result<Event, E>> + 'static,
422    E: Into<BoxError>,
423{
424    type Body = BoxBody;
425
426    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
427        HttpResponse::Ok()
428            .content_type(mime::TEXT_EVENT_STREAM)
429            .insert_header(ContentEncoding::Identity)
430            .insert_header(CacheControl(vec![CacheDirective::NoCache]))
431            .body(self)
432    }
433}
434
435impl<S, E> MessageBody for Sse<S>
436where
437    S: Stream<Item = Result<Event, E>>,
438    E: Into<BoxError>,
439{
440    type Error = BoxError;
441
442    fn size(&self) -> BodySize {
443        BodySize::Stream
444    }
445
446    fn poll_next(
447        self: Pin<&mut Self>,
448        cx: &mut Context<'_>,
449    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
450        let this = self.project();
451
452        if let Some(retry) = this.retry_interval.take() {
453            cx.waker().wake_by_ref();
454            return Poll::Ready(Some(Ok(Event::retry_to_bytes(retry))));
455        }
456
457        if let Poll::Ready(msg) = this.stream.poll_next(cx) {
458            return match msg {
459                Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_bytes()))),
460                Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
461                None => Poll::Ready(None),
462            };
463        }
464
465        if let Some(ref mut keep_alive) = this.keep_alive {
466            if keep_alive.poll_tick(cx).is_ready() {
467                return Poll::Ready(Some(Ok(Event::keep_alive_bytes())));
468            }
469        }
470
471        Poll::Pending
472    }
473}
474
475/// Create server-sent events (SSE) channel pair.
476///
477/// The `buffer` argument controls how many unsent messages can be stored without waiting.
478///
479/// The first item in the tuple is the sender half. Much like a regular channel, it can be cloned,
480/// sent to another thread/task, and send event messages to the response stream. It provides several
481/// methods that represent the event-stream format.
482///
483/// The second item is the responder and can, therefore, be used as a handler return type directly.
484/// The stream will be closed after all [senders](SseSender) are dropped.
485///
486/// Read more about server-sent events in [this MDN article][mdn-sse].
487///
488/// See [module docs](self) for usage example.
489///
490/// [mdn-sse]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
491pub fn channel(buffer: usize) -> (Sender, Sse<ChannelStream>) {
492    let (tx, rx) = mpsc::channel(buffer);
493
494    (
495        Sender { tx },
496        Sse {
497            stream: ChannelStream(rx),
498            keep_alive: None,
499            retry_interval: None,
500        },
501    )
502}
503
504/// Stream implementation for channel-based SSE [`Sender`].
505#[derive(Debug)]
506pub struct ChannelStream(mpsc::Receiver<Event>);
507
508impl Stream for ChannelStream {
509    type Item = Result<Event, Infallible>;
510
511    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
512        self.0.poll_recv(cx).map(|ev| ev.map(Ok))
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use std::convert::Infallible;
519
520    use actix_web::{body, test::TestRequest};
521    use futures_util::{future::poll_fn, stream, task::noop_waker, FutureExt as _, StreamExt as _};
522    use tokio::time::sleep;
523
524    use super::*;
525    use crate::assert_response_matches;
526
527    #[test]
528    fn format_retry_message() {
529        assert_eq!(
530            Event::retry_to_bytes(Duration::from_millis(1)),
531            "retry: 1\n\n",
532        );
533        assert_eq!(
534            Event::retry_to_bytes(Duration::from_secs(10)),
535            "retry: 10000\n\n",
536        );
537    }
538
539    #[test]
540    fn line_split_format() {
541        let mut buf = BytesMut::new();
542        Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo"));
543        assert_eq!(buf, "data: foo\n");
544
545        let mut buf = BytesMut::new();
546        Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo\nbar"));
547        assert_eq!(buf, "data: foo\ndata: bar\n");
548    }
549
550    #[test]
551    fn into_bytes_format() {
552        assert_eq!(Event::Comment("foo".into()).into_bytes(), ": foo\n\n");
553
554        assert_eq!(
555            Event::Data(Data {
556                id: None,
557                event: None,
558                data: "foo".into()
559            })
560            .into_bytes(),
561            "data: foo\n\n"
562        );
563
564        assert_eq!(
565            Event::Data(Data {
566                id: None,
567                event: None,
568                data: "\n".into()
569            })
570            .into_bytes(),
571            "data: \ndata: \n\n"
572        );
573
574        assert_eq!(
575            Event::Data(Data {
576                id: Some("42".into()),
577                event: None,
578                data: "foo".into()
579            })
580            .into_bytes(),
581            "id: 42\ndata: foo\n\n"
582        );
583
584        assert_eq!(
585            Event::Data(Data {
586                id: None,
587                event: Some("bar".into()),
588                data: "foo".into()
589            })
590            .into_bytes(),
591            "event: bar\ndata: foo\n\n"
592        );
593
594        assert_eq!(
595            Event::Data(Data {
596                id: Some("42".into()),
597                event: Some("bar".into()),
598                data: "foo".into()
599            })
600            .into_bytes(),
601            "id: 42\nevent: bar\ndata: foo\n\n"
602        );
603    }
604
605    #[test]
606    fn retry_is_first_msg() {
607        let waker = noop_waker();
608        let mut cx = Context::from_waker(&waker);
609
610        {
611            let (_sender, mut sse) = channel(9);
612            assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
613        }
614
615        {
616            let (_sender, sse) = channel(9);
617            let mut sse = sse.with_retry_duration(Duration::from_millis(42));
618            match Pin::new(&mut sse).poll_next(&mut cx) {
619                Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "retry: 42\n\n"),
620                res => panic!("poll should return retry message, got {res:?}"),
621            }
622        }
623    }
624
625    #[actix_web::test]
626    async fn dropping_responder_causes_send_fails() {
627        let (sender, sse) = channel(9);
628        drop(sse);
629
630        assert!(sender.send(Data::new("late data")).await.is_err());
631    }
632
633    #[actix_web::test]
634    async fn sse_from_external_streams() {
635        let st = stream::empty::<Result<_, Infallible>>();
636        let sse = Sse::from_stream(st);
637        assert_eq!(body::to_bytes(sse).await.unwrap(), "");
638
639        let st = stream::once(async { Ok::<_, Infallible>(Event::Data(Data::new("foo"))) });
640        let sse = Sse::from_stream(st);
641        assert_eq!(body::to_bytes(sse).await.unwrap(), "data: foo\n\n");
642
643        let st = stream::repeat(Ok::<_, Infallible>(Event::Data(Data::new("foo")))).take(2);
644        let sse = Sse::from_stream(st);
645        assert_eq!(
646            body::to_bytes(sse).await.unwrap(),
647            "data: foo\n\ndata: foo\n\n",
648        );
649    }
650
651    #[actix_web::test]
652    async fn appropriate_headers_are_set_on_responder() {
653        let st = stream::empty::<Result<_, Infallible>>();
654        let sse = Sse::from_stream(st);
655
656        let res = sse.respond_to(&TestRequest::default().to_http_request());
657
658        assert_response_matches!(res, OK;
659            "content-type" => "text/event-stream"
660            "content-encoding" => "identity"
661            "cache-control" => "no-cache"
662        );
663    }
664
665    #[actix_web::test]
666    async fn messages_are_received_from_sender() {
667        let (sender, mut sse) = channel(9);
668
669        assert!(poll_fn(|cx| Pin::new(&mut sse).poll_next(cx))
670            .now_or_never()
671            .is_none());
672
673        sender.send(Data::new("bar").event("foo")).await.unwrap();
674
675        match poll_fn(|cx| Pin::new(&mut sse).poll_next(cx)).now_or_never() {
676            Some(Some(Ok(bytes))) => assert_eq!(bytes, "event: foo\ndata: bar\n\n"),
677            res => panic!("poll should return data message, got {res:?}"),
678        }
679    }
680
681    #[actix_web::test]
682    async fn keep_alive_is_sent() {
683        let waker = noop_waker();
684        let mut cx = Context::from_waker(&waker);
685
686        let (sender, sse) = channel(9);
687        let mut sse = sse.with_keep_alive(Duration::from_millis(4));
688
689        assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
690
691        sleep(Duration::from_millis(20)).await;
692
693        match Pin::new(&mut sse).poll_next(&mut cx) {
694            Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, ": keep-alive\n\n"),
695            res => panic!("poll should return data message, got {res:?}"),
696        }
697
698        assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
699
700        sender.send(Data::new("foo")).await.unwrap();
701
702        match Pin::new(&mut sse).poll_next(&mut cx) {
703            Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "data: foo\n\n"),
704            res => panic!("poll should return data message, got {res:?}"),
705        }
706    }
707}