1#![doc(
36 alias = "server sent",
37 alias = "server-sent",
38 alias = "server sent events",
39 alias = "server-sent events",
40 alias = "event-stream"
41)]
42
43use std::{
44 convert::Infallible,
45 pin::Pin,
46 task::{Context, Poll},
47 time::Duration,
48};
49
50use actix_web::{
51 body::{BodySize, BoxBody, MessageBody},
52 http::header::ContentEncoding,
53 HttpRequest, HttpResponse, Responder,
54};
55use bytes::{BufMut as _, Bytes, BytesMut};
56use bytestring::ByteString;
57use derive_more::{Display, Error};
58use futures_core::Stream;
59use pin_project_lite::pin_project;
60use serde::Serialize;
61use tokio::{
62 sync::mpsc,
63 time::{interval, Interval},
64};
65
66use crate::{
67 header::{CacheControl, CacheDirective},
68 BoxError,
69};
70
71#[derive(Debug, Display, Error)]
73#[display(fmt = "channel closed")]
74#[non_exhaustive]
75pub struct SendError(#[error(not(source))] Event);
76
77#[doc(hidden)]
78#[deprecated(
79 since = "0.17.0",
80 note = "Renamed to `SendError`. Prefer `sse::SendError`."
81)]
82pub type SseSendError = SendError;
83
84#[derive(Debug, Display, Error)]
88#[non_exhaustive]
89pub enum TrySendError {
90 #[display(fmt = "buffer full")]
92 Full(#[error(not(source))] Event),
93
94 #[display(fmt = "channel closed")]
96 Closed(#[error(not(source))] Event),
97}
98
99#[doc(hidden)]
100#[deprecated(
101 since = "0.17.0",
102 note = "Renamed to `TrySendError`. Prefer `sse::TrySendError`."
103)]
104pub type SseTrySendError = TrySendError;
105
106#[must_use]
140#[derive(Debug, Clone)]
141pub struct Data {
142 id: Option<ByteString>,
143 event: Option<ByteString>,
144 data: ByteString,
145}
146
147#[doc(hidden)]
148#[deprecated(since = "0.17.0", note = "Renamed to `Data`. Prefer `sse::Data`.")]
149pub type SseData = Data;
150
151impl Data {
152 pub fn new(data: impl Into<ByteString>) -> Self {
160 Self {
161 id: None,
162 event: None,
163 data: data.into(),
164 }
165 }
166
167 pub fn new_json(data: impl Serialize) -> Result<Self, serde_json::Error> {
181 Ok(Self {
182 id: None,
183 event: None,
184 data: serde_json::to_string(&data)?.into(),
185 })
186 }
187
188 pub fn set_data(&mut self, data: impl Into<ByteString>) {
190 self.data = data.into();
191 }
192
193 pub fn id(mut self, id: impl Into<ByteString>) -> Self {
195 self.id = Some(id.into());
196 self
197 }
198
199 pub fn set_id(&mut self, id: impl Into<ByteString>) {
201 self.id = Some(id.into());
202 }
203
204 pub fn event(mut self, event: impl Into<ByteString>) -> Self {
206 self.event = Some(event.into());
207 self
208 }
209
210 pub fn set_event(&mut self, event: impl Into<ByteString>) {
212 self.event = Some(event.into());
213 }
214}
215
216impl From<Data> for Event {
217 fn from(data: Data) -> Self {
218 Self::Data(data)
219 }
220}
221
222#[must_use]
224#[derive(Debug, Clone)]
225pub enum Event {
226 Data(Data),
239
240 Comment(ByteString),
249}
250
251#[doc(hidden)]
252#[deprecated(since = "0.17.0", note = "Renamed to `Event`. Prefer `sse::Event`.")]
253pub type SseMessage = Event;
254
255impl Event {
256 fn line_split_with_prefix(buf: &mut BytesMut, prefix: &'static str, data: ByteString) {
258 buf.reserve(data.len() + (10 * (prefix.len() + 1)) + 1);
260
261 for line in data.split('\n') {
263 buf.put_slice(prefix.as_bytes());
264 buf.put_slice(line.as_bytes());
265 buf.put_u8(b'\n');
266 }
267 }
268
269 fn into_bytes(self) -> Bytes {
271 let mut buf = BytesMut::new();
272
273 match self {
274 Event::Data(Data { id, event, data }) => {
275 if let Some(text) = id {
276 buf.put_slice(b"id: ");
277 buf.put_slice(text.as_bytes());
278 buf.put_u8(b'\n');
279 }
280
281 if let Some(text) = event {
282 buf.put_slice(b"event: ");
283 buf.put_slice(text.as_bytes());
284 buf.put_u8(b'\n');
285 }
286
287 Self::line_split_with_prefix(&mut buf, "data: ", data);
288 }
289
290 Event::Comment(text) => Self::line_split_with_prefix(&mut buf, ": ", text),
291 }
292
293 buf.put_u8(b'\n');
295
296 buf.freeze()
297 }
298
299 fn retry_to_bytes(retry: Duration) -> Bytes {
301 Bytes::from(format!("retry: {}\n\n", retry.as_millis()))
302 }
303
304 const fn keep_alive_bytes() -> Bytes {
306 Bytes::from_static(b": keep-alive\n\n")
307 }
308}
309
310#[must_use]
312#[derive(Debug, Clone)]
313pub struct Sender {
314 tx: mpsc::Sender<Event>,
315}
316
317#[doc(hidden)]
318#[deprecated(since = "0.17.0", note = "Renamed to `Sender`. Prefer `sse::Sender`.")]
319pub type SseSender = Sender;
320
321impl Sender {
322 pub async fn send(&self, msg: impl Into<Event>) -> Result<(), SendError> {
338 self.tx
339 .send(msg.into())
340 .await
341 .map_err(|mpsc::error::SendError(ev)| SendError(ev))
342 }
343
344 pub fn try_send(&self, msg: impl Into<Event>) -> Result<(), TrySendError> {
362 self.tx.try_send(msg.into()).map_err(|err| match err {
363 mpsc::error::TrySendError::Full(ev) => TrySendError::Full(ev),
364 mpsc::error::TrySendError::Closed(ev) => TrySendError::Closed(ev),
365 })
366 }
367}
368
369pin_project! {
370 #[must_use]
374 #[derive(Debug)]
375 pub struct Sse<S> {
376 #[pin]
377 stream: S,
378 keep_alive: Option<Interval>,
379 retry_interval: Option<Duration>,
380 }
381}
382
383impl<S, E> Sse<S>
384where
385 S: Stream<Item = Result<Event, E>> + 'static,
386 E: Into<BoxError>,
387{
388 pub fn from_stream(stream: S) -> Self {
390 Self {
391 stream,
392 keep_alive: None,
393 retry_interval: None,
394 }
395 }
396}
397
398impl<S> Sse<S> {
399 pub fn with_keep_alive(mut self, keep_alive_period: Duration) -> Self {
403 let mut int = interval(keep_alive_period);
404 int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
405
406 self.keep_alive = Some(int);
407 self
408 }
409
410 pub fn with_retry_duration(mut self, retry: Duration) -> Self {
414 self.retry_interval = Some(retry);
415 self
416 }
417}
418
419impl<S, E> Responder for Sse<S>
420where
421 S: Stream<Item = Result<Event, E>> + 'static,
422 E: Into<BoxError>,
423{
424 type Body = BoxBody;
425
426 fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
427 HttpResponse::Ok()
428 .content_type(mime::TEXT_EVENT_STREAM)
429 .insert_header(ContentEncoding::Identity)
430 .insert_header(CacheControl(vec![CacheDirective::NoCache]))
431 .body(self)
432 }
433}
434
435impl<S, E> MessageBody for Sse<S>
436where
437 S: Stream<Item = Result<Event, E>>,
438 E: Into<BoxError>,
439{
440 type Error = BoxError;
441
442 fn size(&self) -> BodySize {
443 BodySize::Stream
444 }
445
446 fn poll_next(
447 self: Pin<&mut Self>,
448 cx: &mut Context<'_>,
449 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
450 let this = self.project();
451
452 if let Some(retry) = this.retry_interval.take() {
453 cx.waker().wake_by_ref();
454 return Poll::Ready(Some(Ok(Event::retry_to_bytes(retry))));
455 }
456
457 if let Poll::Ready(msg) = this.stream.poll_next(cx) {
458 return match msg {
459 Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_bytes()))),
460 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
461 None => Poll::Ready(None),
462 };
463 }
464
465 if let Some(ref mut keep_alive) = this.keep_alive {
466 if keep_alive.poll_tick(cx).is_ready() {
467 return Poll::Ready(Some(Ok(Event::keep_alive_bytes())));
468 }
469 }
470
471 Poll::Pending
472 }
473}
474
475pub fn channel(buffer: usize) -> (Sender, Sse<ChannelStream>) {
492 let (tx, rx) = mpsc::channel(buffer);
493
494 (
495 Sender { tx },
496 Sse {
497 stream: ChannelStream(rx),
498 keep_alive: None,
499 retry_interval: None,
500 },
501 )
502}
503
504#[derive(Debug)]
506pub struct ChannelStream(mpsc::Receiver<Event>);
507
508impl Stream for ChannelStream {
509 type Item = Result<Event, Infallible>;
510
511 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
512 self.0.poll_recv(cx).map(|ev| ev.map(Ok))
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use std::convert::Infallible;
519
520 use actix_web::{body, test::TestRequest};
521 use futures_util::{future::poll_fn, stream, task::noop_waker, FutureExt as _, StreamExt as _};
522 use tokio::time::sleep;
523
524 use super::*;
525 use crate::assert_response_matches;
526
527 #[test]
528 fn format_retry_message() {
529 assert_eq!(
530 Event::retry_to_bytes(Duration::from_millis(1)),
531 "retry: 1\n\n",
532 );
533 assert_eq!(
534 Event::retry_to_bytes(Duration::from_secs(10)),
535 "retry: 10000\n\n",
536 );
537 }
538
539 #[test]
540 fn line_split_format() {
541 let mut buf = BytesMut::new();
542 Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo"));
543 assert_eq!(buf, "data: foo\n");
544
545 let mut buf = BytesMut::new();
546 Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo\nbar"));
547 assert_eq!(buf, "data: foo\ndata: bar\n");
548 }
549
550 #[test]
551 fn into_bytes_format() {
552 assert_eq!(Event::Comment("foo".into()).into_bytes(), ": foo\n\n");
553
554 assert_eq!(
555 Event::Data(Data {
556 id: None,
557 event: None,
558 data: "foo".into()
559 })
560 .into_bytes(),
561 "data: foo\n\n"
562 );
563
564 assert_eq!(
565 Event::Data(Data {
566 id: None,
567 event: None,
568 data: "\n".into()
569 })
570 .into_bytes(),
571 "data: \ndata: \n\n"
572 );
573
574 assert_eq!(
575 Event::Data(Data {
576 id: Some("42".into()),
577 event: None,
578 data: "foo".into()
579 })
580 .into_bytes(),
581 "id: 42\ndata: foo\n\n"
582 );
583
584 assert_eq!(
585 Event::Data(Data {
586 id: None,
587 event: Some("bar".into()),
588 data: "foo".into()
589 })
590 .into_bytes(),
591 "event: bar\ndata: foo\n\n"
592 );
593
594 assert_eq!(
595 Event::Data(Data {
596 id: Some("42".into()),
597 event: Some("bar".into()),
598 data: "foo".into()
599 })
600 .into_bytes(),
601 "id: 42\nevent: bar\ndata: foo\n\n"
602 );
603 }
604
605 #[test]
606 fn retry_is_first_msg() {
607 let waker = noop_waker();
608 let mut cx = Context::from_waker(&waker);
609
610 {
611 let (_sender, mut sse) = channel(9);
612 assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
613 }
614
615 {
616 let (_sender, sse) = channel(9);
617 let mut sse = sse.with_retry_duration(Duration::from_millis(42));
618 match Pin::new(&mut sse).poll_next(&mut cx) {
619 Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "retry: 42\n\n"),
620 res => panic!("poll should return retry message, got {res:?}"),
621 }
622 }
623 }
624
625 #[actix_web::test]
626 async fn dropping_responder_causes_send_fails() {
627 let (sender, sse) = channel(9);
628 drop(sse);
629
630 assert!(sender.send(Data::new("late data")).await.is_err());
631 }
632
633 #[actix_web::test]
634 async fn sse_from_external_streams() {
635 let st = stream::empty::<Result<_, Infallible>>();
636 let sse = Sse::from_stream(st);
637 assert_eq!(body::to_bytes(sse).await.unwrap(), "");
638
639 let st = stream::once(async { Ok::<_, Infallible>(Event::Data(Data::new("foo"))) });
640 let sse = Sse::from_stream(st);
641 assert_eq!(body::to_bytes(sse).await.unwrap(), "data: foo\n\n");
642
643 let st = stream::repeat(Ok::<_, Infallible>(Event::Data(Data::new("foo")))).take(2);
644 let sse = Sse::from_stream(st);
645 assert_eq!(
646 body::to_bytes(sse).await.unwrap(),
647 "data: foo\n\ndata: foo\n\n",
648 );
649 }
650
651 #[actix_web::test]
652 async fn appropriate_headers_are_set_on_responder() {
653 let st = stream::empty::<Result<_, Infallible>>();
654 let sse = Sse::from_stream(st);
655
656 let res = sse.respond_to(&TestRequest::default().to_http_request());
657
658 assert_response_matches!(res, OK;
659 "content-type" => "text/event-stream"
660 "content-encoding" => "identity"
661 "cache-control" => "no-cache"
662 );
663 }
664
665 #[actix_web::test]
666 async fn messages_are_received_from_sender() {
667 let (sender, mut sse) = channel(9);
668
669 assert!(poll_fn(|cx| Pin::new(&mut sse).poll_next(cx))
670 .now_or_never()
671 .is_none());
672
673 sender.send(Data::new("bar").event("foo")).await.unwrap();
674
675 match poll_fn(|cx| Pin::new(&mut sse).poll_next(cx)).now_or_never() {
676 Some(Some(Ok(bytes))) => assert_eq!(bytes, "event: foo\ndata: bar\n\n"),
677 res => panic!("poll should return data message, got {res:?}"),
678 }
679 }
680
681 #[actix_web::test]
682 async fn keep_alive_is_sent() {
683 let waker = noop_waker();
684 let mut cx = Context::from_waker(&waker);
685
686 let (sender, sse) = channel(9);
687 let mut sse = sse.with_keep_alive(Duration::from_millis(4));
688
689 assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
690
691 sleep(Duration::from_millis(20)).await;
692
693 match Pin::new(&mut sse).poll_next(&mut cx) {
694 Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, ": keep-alive\n\n"),
695 res => panic!("poll should return data message, got {res:?}"),
696 }
697
698 assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
699
700 sender.send(Data::new("foo")).await.unwrap();
701
702 match Pin::new(&mut sse).poll_next(&mut cx) {
703 Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "data: foo\n\n"),
704 res => panic!("poll should return data message, got {res:?}"),
705 }
706 }
707}