opentelemetry_sdk/
runtime.rs

1//! Provides an abstraction of several async runtimes
2//!
3//! This  allows OpenTelemetry to work with any current or future runtime. There is currently
4//! built-in implementation for [Tokio].
5//!
6//! [Tokio]: https://crates.io/crates/tokio
7
8use futures_util::stream::{unfold, Stream};
9use std::{fmt::Debug, future::Future, time::Duration};
10use thiserror::Error;
11
12/// A runtime is an abstraction of an async runtime like [Tokio]. It allows
13/// OpenTelemetry to work with any current and hopefully future runtime implementations.
14///
15/// [Tokio]: https://crates.io/crates/tokio
16///
17/// # Note
18///
19/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads.
20/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
21/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
22#[cfg(feature = "experimental_async_runtime")]
23pub trait Runtime: Clone + Send + Sync + 'static {
24    /// Spawn a new task or thread, which executes the given future.
25    ///
26    /// # Note
27    ///
28    /// This is mainly used to run batch span processing in the background. Note, that the function
29    /// does not return a handle. OpenTelemetry will use a different way to wait for the future to
30    /// finish when the caller shuts down.
31    ///
32    /// At the moment, the shutdown happens by blocking the
33    /// current thread. This means runtime implementations need to make sure they can still execute
34    /// the given future even if the main thread is blocked.
35    fn spawn<F>(&self, future: F)
36    where
37        F: Future<Output = ()> + Send + 'static;
38
39    /// Return a future that resolves after the specified [Duration].
40    fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
41}
42
43/// Uses the given runtime to produce an interval stream.
44#[cfg(feature = "experimental_async_runtime")]
45#[allow(dead_code)]
46pub(crate) fn to_interval_stream<T: Runtime>(
47    runtime: T,
48    interval: Duration,
49) -> impl Stream<Item = ()> {
50    unfold((), move |_| {
51        let runtime_cloned = runtime.clone();
52
53        async move {
54            runtime_cloned.delay(interval).await;
55            Some(((), ()))
56        }
57    })
58}
59
60/// Runtime implementation, which works with Tokio's multi thread runtime.
61#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
62#[cfg_attr(
63    docsrs,
64    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
65)]
66#[derive(Debug, Clone)]
67pub struct Tokio;
68
69#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
70#[cfg_attr(
71    docsrs,
72    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
73)]
74impl Runtime for Tokio {
75    fn spawn<F>(&self, future: F)
76    where
77        F: Future<Output = ()> + Send + 'static,
78    {
79        #[allow(clippy::let_underscore_future)]
80        // we don't have to await on the returned future to execute
81        let _ = tokio::spawn(future);
82    }
83
84    fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
85        tokio::time::sleep(duration)
86    }
87}
88
89/// Runtime implementation, which works with Tokio's current thread runtime.
90#[cfg(all(
91    feature = "experimental_async_runtime",
92    feature = "rt-tokio-current-thread"
93))]
94#[cfg_attr(
95    docsrs,
96    doc(cfg(all(
97        feature = "experimental_async_runtime",
98        feature = "rt-tokio-current-thread"
99    )))
100)]
101#[derive(Debug, Clone)]
102pub struct TokioCurrentThread;
103
104#[cfg(all(
105    feature = "experimental_async_runtime",
106    feature = "rt-tokio-current-thread"
107))]
108#[cfg_attr(
109    docsrs,
110    doc(cfg(all(
111        feature = "experimental_async_runtime",
112        feature = "rt-tokio-current-thread"
113    )))
114)]
115impl Runtime for TokioCurrentThread {
116    fn spawn<F>(&self, future: F)
117    where
118        F: Future<Output = ()> + Send + 'static,
119    {
120        // We cannot force push tracing in current thread tokio scheduler because we rely on
121        // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
122        // shutdown function so that the runtime will not finish the blocked task and kill any
123        // remaining tasks. But there is only one thread to run task, so it's a deadlock
124        //
125        // Thus, we spawn the background task in a separate thread.
126        std::thread::spawn(move || {
127            let rt = tokio::runtime::Builder::new_current_thread()
128                .enable_all()
129                .build()
130                .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
131            rt.block_on(future);
132        });
133    }
134
135    fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
136        tokio::time::sleep(duration)
137    }
138}
139
140/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
141/// channel that is used by the [log] and [span] batch processors.
142///
143/// [log]: crate::logs::BatchLogProcessor
144/// [span]: crate::trace::BatchSpanProcessor
145#[cfg(feature = "experimental_async_runtime")]
146pub trait RuntimeChannel: Runtime {
147    /// A future stream to receive batch messages from channels.
148    type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
149    /// A batch messages sender that can be sent across threads safely.
150    type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
151
152    /// Return the sender and receiver used to send batch messages.
153    fn batch_message_channel<T: Debug + Send>(
154        &self,
155        capacity: usize,
156    ) -> (Self::Sender<T>, Self::Receiver<T>);
157}
158
159/// Error returned by a [`TrySend`] implementation.
160#[cfg(feature = "experimental_async_runtime")]
161#[derive(Debug, Error)]
162pub enum TrySendError {
163    /// Send failed due to the channel being full.
164    #[error("cannot send message to batch processor as the channel is full")]
165    ChannelFull,
166    /// Send failed due to the channel being closed.
167    #[error("cannot send message to batch processor as the channel is closed")]
168    ChannelClosed,
169    /// Any other send error that isn't covered above.
170    #[error(transparent)]
171    Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
172}
173
174/// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference.
175#[cfg(feature = "experimental_async_runtime")]
176pub trait TrySend: Sync + Send {
177    /// The message that will be sent.
178    type Message;
179
180    /// Try to send a message batch to a worker thread.
181    ///
182    /// A failure can be due to either a closed receiver, or a depleted buffer.
183    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
184}
185
186#[cfg(all(
187    feature = "experimental_async_runtime",
188    any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
189))]
190impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
191    type Message = T;
192
193    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
194        self.try_send(item).map_err(|err| match err {
195            tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
196            tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
197        })
198    }
199}
200
201#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
202#[cfg_attr(
203    docsrs,
204    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
205)]
206impl RuntimeChannel for Tokio {
207    type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
208    type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
209
210    fn batch_message_channel<T: Debug + Send>(
211        &self,
212        capacity: usize,
213    ) -> (Self::Sender<T>, Self::Receiver<T>) {
214        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
215        (
216            sender,
217            tokio_stream::wrappers::ReceiverStream::new(receiver),
218        )
219    }
220}
221
222#[cfg(all(
223    feature = "experimental_async_runtime",
224    feature = "rt-tokio-current-thread"
225))]
226#[cfg_attr(
227    docsrs,
228    doc(cfg(all(
229        feature = "experimental_async_runtime",
230        feature = "rt-tokio-current-thread"
231    )))
232)]
233impl RuntimeChannel for TokioCurrentThread {
234    type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
235    type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
236
237    fn batch_message_channel<T: Debug + Send>(
238        &self,
239        capacity: usize,
240    ) -> (Self::Sender<T>, Self::Receiver<T>) {
241        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
242        (
243            sender,
244            tokio_stream::wrappers::ReceiverStream::new(receiver),
245        )
246    }
247}