opentelemetry_sdk/
runtime.rs1use futures_util::stream::{unfold, Stream};
9use std::{fmt::Debug, future::Future, time::Duration};
10use thiserror::Error;
11
12#[cfg(feature = "experimental_async_runtime")]
23pub trait Runtime: Clone + Send + Sync + 'static {
24 fn spawn<F>(&self, future: F)
36 where
37 F: Future<Output = ()> + Send + 'static;
38
39 fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
41}
42
43#[cfg(feature = "experimental_async_runtime")]
45#[allow(dead_code)]
46pub(crate) fn to_interval_stream<T: Runtime>(
47 runtime: T,
48 interval: Duration,
49) -> impl Stream<Item = ()> {
50 unfold((), move |_| {
51 let runtime_cloned = runtime.clone();
52
53 async move {
54 runtime_cloned.delay(interval).await;
55 Some(((), ()))
56 }
57 })
58}
59
60#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
62#[cfg_attr(
63 docsrs,
64 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
65)]
66#[derive(Debug, Clone)]
67pub struct Tokio;
68
69#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
70#[cfg_attr(
71 docsrs,
72 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
73)]
74impl Runtime for Tokio {
75 fn spawn<F>(&self, future: F)
76 where
77 F: Future<Output = ()> + Send + 'static,
78 {
79 #[allow(clippy::let_underscore_future)]
80 let _ = tokio::spawn(future);
82 }
83
84 fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
85 tokio::time::sleep(duration)
86 }
87}
88
89#[cfg(all(
91 feature = "experimental_async_runtime",
92 feature = "rt-tokio-current-thread"
93))]
94#[cfg_attr(
95 docsrs,
96 doc(cfg(all(
97 feature = "experimental_async_runtime",
98 feature = "rt-tokio-current-thread"
99 )))
100)]
101#[derive(Debug, Clone)]
102pub struct TokioCurrentThread;
103
104#[cfg(all(
105 feature = "experimental_async_runtime",
106 feature = "rt-tokio-current-thread"
107))]
108#[cfg_attr(
109 docsrs,
110 doc(cfg(all(
111 feature = "experimental_async_runtime",
112 feature = "rt-tokio-current-thread"
113 )))
114)]
115impl Runtime for TokioCurrentThread {
116 fn spawn<F>(&self, future: F)
117 where
118 F: Future<Output = ()> + Send + 'static,
119 {
120 std::thread::spawn(move || {
127 let rt = tokio::runtime::Builder::new_current_thread()
128 .enable_all()
129 .build()
130 .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
131 rt.block_on(future);
132 });
133 }
134
135 fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
136 tokio::time::sleep(duration)
137 }
138}
139
140#[cfg(feature = "experimental_async_runtime")]
146pub trait RuntimeChannel: Runtime {
147 type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
149 type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
151
152 fn batch_message_channel<T: Debug + Send>(
154 &self,
155 capacity: usize,
156 ) -> (Self::Sender<T>, Self::Receiver<T>);
157}
158
159#[cfg(feature = "experimental_async_runtime")]
161#[derive(Debug, Error)]
162pub enum TrySendError {
163 #[error("cannot send message to batch processor as the channel is full")]
165 ChannelFull,
166 #[error("cannot send message to batch processor as the channel is closed")]
168 ChannelClosed,
169 #[error(transparent)]
171 Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
172}
173
174#[cfg(feature = "experimental_async_runtime")]
176pub trait TrySend: Sync + Send {
177 type Message;
179
180 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
184}
185
186#[cfg(all(
187 feature = "experimental_async_runtime",
188 any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
189))]
190impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
191 type Message = T;
192
193 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
194 self.try_send(item).map_err(|err| match err {
195 tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
196 tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
197 })
198 }
199}
200
201#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
202#[cfg_attr(
203 docsrs,
204 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
205)]
206impl RuntimeChannel for Tokio {
207 type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
208 type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
209
210 fn batch_message_channel<T: Debug + Send>(
211 &self,
212 capacity: usize,
213 ) -> (Self::Sender<T>, Self::Receiver<T>) {
214 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
215 (
216 sender,
217 tokio_stream::wrappers::ReceiverStream::new(receiver),
218 )
219 }
220}
221
222#[cfg(all(
223 feature = "experimental_async_runtime",
224 feature = "rt-tokio-current-thread"
225))]
226#[cfg_attr(
227 docsrs,
228 doc(cfg(all(
229 feature = "experimental_async_runtime",
230 feature = "rt-tokio-current-thread"
231 )))
232)]
233impl RuntimeChannel for TokioCurrentThread {
234 type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
235 type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
236
237 fn batch_message_channel<T: Debug + Send>(
238 &self,
239 capacity: usize,
240 ) -> (Self::Sender<T>, Self::Receiver<T>) {
241 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
242 (
243 sender,
244 tokio_stream::wrappers::ReceiverStream::new(receiver),
245 )
246 }
247}