opentelemetry_sdk/metrics/
periodic_reader.rs

1use std::{
2    env, fmt,
3    sync::{
4        mpsc::{self, Receiver, Sender},
5        Arc, Mutex, Weak,
6    },
7    thread,
8    time::{Duration, Instant},
9};
10
11use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
12
13use crate::{
14    error::{OTelSdkError, OTelSdkResult},
15    metrics::{exporter::PushMetricExporter, reader::SdkProducer},
16    Resource,
17};
18
19use super::{
20    data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader,
21    Temporality,
22};
23
24const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
25
26const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
27
28/// Configuration options for [PeriodicReader].
29#[derive(Debug)]
30pub struct PeriodicReaderBuilder<E> {
31    interval: Duration,
32    exporter: E,
33}
34
35impl<E> PeriodicReaderBuilder<E>
36where
37    E: PushMetricExporter,
38{
39    fn new(exporter: E) -> Self {
40        let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
41            .ok()
42            .and_then(|v| v.parse().map(Duration::from_millis).ok())
43            .unwrap_or(DEFAULT_INTERVAL);
44
45        PeriodicReaderBuilder { interval, exporter }
46    }
47
48    /// Configures the intervening time between exports for a [PeriodicReader].
49    ///
50    /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL`
51    /// environment variable.
52    ///
53    /// If this option is not used or `interval` is equal to zero, 60 seconds is
54    /// used as the default.
55    pub fn with_interval(mut self, interval: Duration) -> Self {
56        if !interval.is_zero() {
57            self.interval = interval;
58        }
59        self
60    }
61
62    /// Create a [PeriodicReader] with the given config.
63    pub fn build(self) -> PeriodicReader<E> {
64        PeriodicReader::new(self.exporter, self.interval)
65    }
66}
67
68/// A `MetricReader` that periodically collects and exports metrics at a configurable interval.
69///
70/// By default, [`PeriodicReader`] collects and exports metrics every **60 seconds**.
71/// The time taken for export is **not** included in the interval. Use [`PeriodicReaderBuilder`]
72/// to customize the interval.
73///
74/// [`PeriodicReader`] spawns a background thread to handle metric collection and export.
75/// This thread remains active until [`shutdown()`] is called.
76///
77/// ## Collection Process
78/// "Collection" refers to gathering aggregated metrics from the SDK's internal storage.
79/// During this phase, callbacks from observable instruments are also triggered.
80///
81/// [`PeriodicReader`] does **not** enforce a timeout for collection. If an
82/// observable callback takes too long, it may delay the next collection cycle.
83/// If a callback never returns, it **will stall** all metric collection (and exports)
84/// indefinitely.
85///
86/// ## Exporter Compatibility
87/// When used with the [`OTLP Exporter`](https://docs.rs/opentelemetry-otlp), the following
88/// transport options are supported:
89///
90/// - **`grpc-tonic`**: Requires [`MeterProvider`] to be initialized within a `tokio` runtime.
91/// - **`reqwest-blocking-client`**: Works with both a standard (`main`) function and `tokio::main`.
92///
93/// [`PeriodicReader`] does **not** enforce a timeout for exports either. Instead,
94/// the configured exporter is responsible for enforcing timeouts. If an export operation
95/// never returns, [`PeriodicReader`] will **stop exporting new metrics**, stalling
96/// metric collection.
97///
98/// ## Manual Export & Shutdown
99/// Users can manually trigger an export via [`force_flush()`]. Calling [`shutdown()`]
100/// exports any remaining metrics and should be done before application exit to ensure
101/// all data is sent.
102///
103/// **Warning**: If using **tokio’s current-thread runtime**, calling [`shutdown()`]
104/// from the main thread may cause a deadlock. To prevent this, call [`shutdown()`]
105/// from a separate thread or use tokio's `spawn_blocking`.
106///
107/// [`PeriodicReader`]: crate::metrics::PeriodicReader
108/// [`PeriodicReaderBuilder`]: crate::metrics::PeriodicReaderBuilder
109/// [`MeterProvider`]: crate::metrics::SdkMeterProvider
110/// [`shutdown()`]: crate::metrics::SdkMeterProvider::shutdown
111/// [`force_flush()`]: crate::metrics::SdkMeterProvider::force_flush
112///
113/// # Example
114///
115/// ```no_run
116/// use opentelemetry_sdk::metrics::PeriodicReader;
117/// # fn example<E>(get_exporter: impl Fn() -> E)
118/// # where
119/// #     E: opentelemetry_sdk::metrics::exporter::PushMetricExporter,
120/// # {
121///
122/// let exporter = get_exporter(); // set up a push exporter
123///
124/// let reader = PeriodicReader::builder(exporter).build();
125/// # drop(reader);
126/// # }
127/// ```
128pub struct PeriodicReader<E: PushMetricExporter> {
129    inner: Arc<PeriodicReaderInner<E>>,
130}
131
132impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
133    fn clone(&self) -> Self {
134        Self {
135            inner: Arc::clone(&self.inner),
136        }
137    }
138}
139
140impl<E: PushMetricExporter> PeriodicReader<E> {
141    /// Configuration options for a periodic reader with own thread
142    pub fn builder(exporter: E) -> PeriodicReaderBuilder<E> {
143        PeriodicReaderBuilder::new(exporter)
144    }
145
146    fn new(exporter: E, interval: Duration) -> Self {
147        let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
148            mpsc::channel();
149        let exporter_arc = Arc::new(exporter);
150        let reader = PeriodicReader {
151            inner: Arc::new(PeriodicReaderInner {
152                message_sender,
153                producer: Mutex::new(None),
154                exporter: exporter_arc.clone(),
155            }),
156        };
157        let cloned_reader = reader.clone();
158
159        let mut rm = ResourceMetrics {
160            resource: Resource::empty(),
161            scope_metrics: Vec::new(),
162        };
163
164        let result_thread_creation = thread::Builder::new()
165            .name("OpenTelemetry.Metrics.PeriodicReader".to_string())
166            .spawn(move || {
167                let _suppress_guard = Context::enter_telemetry_suppressed_scope();
168                let mut interval_start = Instant::now();
169                let mut remaining_interval = interval;
170                otel_debug!(
171                    name: "PeriodReaderThreadStarted",
172                    interval_in_millisecs = interval.as_millis(),
173                );
174                loop {
175                    otel_debug!(
176                        name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval_in_millisecs = remaining_interval.as_millis()
177                    );
178                    match message_receiver.recv_timeout(remaining_interval) {
179                        Ok(Message::Flush(response_sender)) => {
180                            otel_debug!(
181                                name: "PeriodReaderThreadExportingDueToFlush"
182                            );
183                            let export_result = cloned_reader.collect_and_export(&mut rm);
184                            otel_debug!(
185                                name: "PeriodReaderInvokedExport",
186                                export_result = format!("{:?}", export_result)
187                            );
188
189                            // If response_sender is disconnected, we can't send
190                            // the result back. This occurs when the thread that
191                            // initiated flush gave up due to timeout.
192                            // Gracefully handle that with internal logs. The
193                            // internal errors are of Info level, as this is
194                            // useful for user to know whether the flush was
195                            // successful or not, when flush() itself merely
196                            // tells that it timed out.
197
198                            if export_result.is_err() {
199                                if response_sender.send(false).is_err() {
200                                    otel_debug!(
201                                        name: "PeriodReader.Flush.ResponseSendError",
202                                        message = "PeriodicReader's flush has failed, but unable to send this info back to caller.
203                                        This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
204                                    );
205                                }
206                            } else if response_sender.send(true).is_err() {
207                                otel_debug!(
208                                    name: "PeriodReader.Flush.ResponseSendError",
209                                    message = "PeriodicReader's flush has completed successfully, but unable to send this info back to caller.
210                                    This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
211                                );
212                            }
213
214                            // Adjust the remaining interval after the flush
215                            let elapsed = interval_start.elapsed();
216                            if elapsed < interval {
217                                remaining_interval = interval - elapsed;
218                                otel_debug!(
219                                    name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush",
220                                    remaining_interval = remaining_interval.as_secs()
221                                );
222                            } else {
223                                otel_debug!(
224                                    name: "PeriodReaderThreadAdjustingExportAfterFlush",
225                                );
226                                // Reset the interval if the flush finishes after the expected export time
227                                // effectively missing the normal export.
228                                // Should we attempt to do the missed export immediately?
229                                // Or do the next export at the next interval?
230                                // Currently this attempts the next export immediately.
231                                // i.e calling Flush can affect the regularity.
232                                interval_start = Instant::now();
233                                remaining_interval = Duration::ZERO;
234                            }
235                        }
236                        Ok(Message::Shutdown(response_sender)) => {
237                            // Perform final export and break out of loop and exit the thread
238                            otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
239                            let export_result = cloned_reader.collect_and_export(&mut rm);
240                            otel_debug!(
241                                name: "PeriodReaderInvokedExport",
242                                export_result = format!("{:?}", export_result)
243                            );
244                            let shutdown_result = exporter_arc.shutdown();
245                            otel_debug!(
246                                name: "PeriodReaderInvokedExporterShutdown",
247                                shutdown_result = format!("{:?}", shutdown_result)
248                            );
249
250                            // If response_sender is disconnected, we can't send
251                            // the result back. This occurs when the thread that
252                            // initiated shutdown gave up due to timeout.
253                            // Gracefully handle that with internal logs and
254                            // continue with shutdown (i.e exit thread) The
255                            // internal errors are of Info level, as this is
256                            // useful for user to know whether the shutdown was
257                            // successful or not, when shutdown() itself merely
258                            // tells that it timed out.
259                            if export_result.is_err() || shutdown_result.is_err() {
260                                if response_sender.send(false).is_err() {
261                                    otel_info!(
262                                        name: "PeriodReaderThreadShutdown.ResponseSendError",
263                                        message = "PeriodicReader's shutdown has failed, but unable to send this info back to caller.
264                                        This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
265                                    );
266                                }
267                            } else if response_sender.send(true).is_err() {
268                                otel_debug!(
269                                    name: "PeriodReaderThreadShutdown.ResponseSendError",
270                                    message = "PeriodicReader completed its shutdown, but unable to send this info back to caller.
271                                    This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
272                                );
273                            }
274
275                            otel_debug!(
276                                name: "PeriodReaderThreadExiting",
277                                reason = "ShutdownRequested"
278                            );
279                            break;
280                        }
281                        Err(mpsc::RecvTimeoutError::Timeout) => {
282                            let export_start = Instant::now();
283                            otel_debug!(
284                                name: "PeriodReaderThreadExportingDueToTimer"
285                            );
286
287                            let export_result = cloned_reader.collect_and_export(&mut rm);
288                            otel_debug!(
289                                name: "PeriodReaderInvokedExport",
290                                export_result = format!("{:?}", export_result)
291                            );
292
293                            let time_taken_for_export = export_start.elapsed();
294                            if time_taken_for_export > interval {
295                                otel_debug!(
296                                    name: "PeriodReaderThreadExportTookLongerThanInterval"
297                                );
298                                // if export took longer than interval, do the
299                                // next export immediately.
300                                // Alternatively, we could skip the next export
301                                // and wait for the next interval.
302                                // Or enforce that export timeout is less than interval.
303                                // What is the desired behavior?
304                                interval_start = Instant::now();
305                                remaining_interval = Duration::ZERO;
306                            } else {
307                                remaining_interval = interval - time_taken_for_export;
308                                interval_start = Instant::now();
309                            }
310                        }
311                        Err(mpsc::RecvTimeoutError::Disconnected) => {
312                            // Channel disconnected, only thing to do is break
313                            // out (i.e exit the thread)
314                            otel_debug!(
315                                name: "PeriodReaderThreadExiting",
316                                reason = "MessageSenderDisconnected"
317                            );
318                            break;
319                        }
320                    }
321                }
322                otel_debug!(
323                    name: "PeriodReaderThreadStopped"
324                );
325            });
326
327        // TODO: Should we fail-fast here and bubble up the error to user?
328        #[allow(unused_variables)]
329        if let Err(e) = result_thread_creation {
330            otel_error!(
331                name: "PeriodReaderThreadStartError",
332                message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
333                error = format!("{:?}", e)
334            );
335        }
336        reader
337    }
338
339    fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
340        self.inner.collect_and_export(rm)
341    }
342}
343
344impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        f.debug_struct("PeriodicReader").finish()
347    }
348}
349
350struct PeriodicReaderInner<E: PushMetricExporter> {
351    exporter: Arc<E>,
352    message_sender: mpsc::Sender<Message>,
353    producer: Mutex<Option<Weak<dyn SdkProducer>>>,
354}
355
356impl<E: PushMetricExporter> PeriodicReaderInner<E> {
357    fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
358        let mut inner = self.producer.lock().expect("lock poisoned");
359        *inner = Some(producer);
360    }
361
362    fn temporality(&self, _kind: InstrumentKind) -> Temporality {
363        self.exporter.temporality()
364    }
365
366    fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
367        let producer = self.producer.lock().expect("lock poisoned");
368        if let Some(p) = producer.as_ref() {
369            p.upgrade()
370                .ok_or(OTelSdkError::AlreadyShutdown)?
371                .produce(rm)?;
372            Ok(())
373        } else {
374            otel_warn!(
375            name: "PeriodReader.MeterProviderNotRegistered",
376            message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
377                   This occurs when a periodic reader is created but not associated with a MeterProvider \
378                   by calling `.with_reader(reader)` on MeterProviderBuilder."
379            );
380            Err(OTelSdkError::InternalFailure(
381                "MeterProvider is not registered".into(),
382            ))
383        }
384    }
385
386    fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
387        let current_time = Instant::now();
388        let collect_result = self.collect(rm);
389        let time_taken_for_collect = current_time.elapsed();
390
391        #[allow(clippy::question_mark)]
392        if let Err(e) = collect_result {
393            otel_warn!(
394                name: "PeriodReaderCollectError",
395                error = format!("{:?}", e)
396            );
397            return Err(OTelSdkError::InternalFailure(e.to_string()));
398        }
399
400        if rm.scope_metrics.is_empty() {
401            otel_debug!(name: "NoMetricsCollected");
402            return Ok(());
403        }
404
405        let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
406            count + scope_metrics.metrics.len()
407        });
408        otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
409
410        // Relying on futures executor to execute async call.
411        // TODO: Pass timeout to exporter
412        futures_executor::block_on(self.exporter.export(rm))
413    }
414
415    fn force_flush(&self) -> OTelSdkResult {
416        // TODO: Better message for this scenario.
417        // Flush and Shutdown called from 2 threads Flush check shutdown
418        // flag before shutdown thread sets it. Both threads attempt to send
419        // message to the same channel. Case1: Flush thread sends message first,
420        // shutdown thread sends message next. Flush would succeed, as
421        // background thread won't process shutdown message until flush
422        // triggered export is done. Case2: Shutdown thread sends message first,
423        // flush thread sends message next. Shutdown would succeed, as
424        // background thread would process shutdown message first. The
425        // background exits so it won't receive the flush message. ForceFlush
426        // returns Failure, but we could indicate specifically that shutdown has
427        // completed. TODO is to see if this message can be improved.
428
429        let (response_tx, response_rx) = mpsc::channel();
430        self.message_sender
431            .send(Message::Flush(response_tx))
432            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
433
434        if let Ok(response) = response_rx.recv() {
435            // TODO: call exporter's force_flush method.
436            if response {
437                Ok(())
438            } else {
439                Err(OTelSdkError::InternalFailure("Failed to flush".into()))
440            }
441        } else {
442            Err(OTelSdkError::InternalFailure("Failed to flush".into()))
443        }
444    }
445
446    fn shutdown(&self) -> OTelSdkResult {
447        // TODO: See if this is better to be created upfront.
448        let (response_tx, response_rx) = mpsc::channel();
449        self.message_sender
450            .send(Message::Shutdown(response_tx))
451            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
452
453        // TODO: Make this timeout configurable.
454        match response_rx.recv_timeout(Duration::from_secs(5)) {
455            Ok(response) => {
456                if response {
457                    Ok(())
458                } else {
459                    Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
460                }
461            }
462            Err(mpsc::RecvTimeoutError::Timeout) => {
463                Err(OTelSdkError::Timeout(Duration::from_secs(5)))
464            }
465            Err(mpsc::RecvTimeoutError::Disconnected) => {
466                Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
467            }
468        }
469    }
470}
471
472#[derive(Debug)]
473enum Message {
474    Flush(Sender<bool>),
475    Shutdown(Sender<bool>),
476}
477
478impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
479    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
480        self.inner.register_pipeline(pipeline);
481    }
482
483    fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
484        self.inner.collect(rm)
485    }
486
487    fn force_flush(&self) -> OTelSdkResult {
488        self.inner.force_flush()
489    }
490
491    // TODO: Offer an async version of shutdown so users can await the shutdown
492    // completion, and avoid blocking the thread. The default shutdown on drop
493    // can still use blocking call. If user already explicitly called shutdown,
494    // drop won't call shutdown again.
495    fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
496        self.inner.shutdown()
497    }
498
499    /// To construct a [MetricReader][metric-reader] when setting up an SDK,
500    /// The output temporality (optional), a function of instrument kind.
501    /// This function SHOULD be obtained from the exporter.
502    ///
503    /// If not configured, the Cumulative temporality SHOULD be used.
504    ///
505    /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader
506    fn temporality(&self, kind: InstrumentKind) -> Temporality {
507        kind.temporality_preference(self.inner.temporality(kind))
508    }
509}
510
511#[cfg(all(test, feature = "testing"))]
512mod tests {
513    use super::PeriodicReader;
514    use crate::{
515        error::{OTelSdkError, OTelSdkResult},
516        metrics::{
517            data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
518            InMemoryMetricExporter, SdkMeterProvider, Temporality,
519        },
520        Resource,
521    };
522    use opentelemetry::metrics::MeterProvider;
523    use std::{
524        sync::{
525            atomic::{AtomicBool, AtomicUsize, Ordering},
526            mpsc, Arc,
527        },
528        time::Duration,
529    };
530
531    // use below command to run all tests
532    // cargo test metrics::periodic_reader::tests --features=testing,spec_unstable_metrics_views -- --nocapture
533
534    #[derive(Debug, Clone)]
535    struct MetricExporterThatFailsOnlyOnFirst {
536        count: Arc<AtomicUsize>,
537    }
538
539    impl Default for MetricExporterThatFailsOnlyOnFirst {
540        fn default() -> Self {
541            MetricExporterThatFailsOnlyOnFirst {
542                count: Arc::new(AtomicUsize::new(0)),
543            }
544        }
545    }
546
547    impl MetricExporterThatFailsOnlyOnFirst {
548        fn get_count(&self) -> usize {
549            self.count.load(Ordering::Relaxed)
550        }
551    }
552
553    impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
554        async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
555            if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
556                Err(OTelSdkError::InternalFailure("export failed".into()))
557            } else {
558                Ok(())
559            }
560        }
561
562        fn force_flush(&self) -> OTelSdkResult {
563            Ok(())
564        }
565
566        fn shutdown(&self) -> OTelSdkResult {
567            Ok(())
568        }
569
570        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
571            Ok(())
572        }
573
574        fn temporality(&self) -> Temporality {
575            Temporality::Cumulative
576        }
577    }
578
579    #[derive(Debug, Clone, Default)]
580    struct MockMetricExporter {
581        is_shutdown: Arc<AtomicBool>,
582    }
583
584    impl PushMetricExporter for MockMetricExporter {
585        async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
586            Ok(())
587        }
588
589        fn force_flush(&self) -> OTelSdkResult {
590            Ok(())
591        }
592
593        fn shutdown(&self) -> OTelSdkResult {
594            self.shutdown_with_timeout(Duration::from_secs(5))
595        }
596
597        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
598            self.is_shutdown.store(true, Ordering::Relaxed);
599            Ok(())
600        }
601
602        fn temporality(&self) -> Temporality {
603            Temporality::Cumulative
604        }
605    }
606
607    #[test]
608    fn collection_triggered_by_interval_multiple() {
609        // Arrange
610        let interval = std::time::Duration::from_millis(1);
611        let exporter = InMemoryMetricExporter::default();
612        let reader = PeriodicReader::builder(exporter.clone())
613            .with_interval(interval)
614            .build();
615        let i = Arc::new(AtomicUsize::new(0));
616        let i_clone = i.clone();
617
618        // Act
619        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
620        let meter = meter_provider.meter("test");
621        let _counter = meter
622            .u64_observable_counter("testcounter")
623            .with_callback(move |_| {
624                i_clone.fetch_add(1, Ordering::Relaxed);
625            })
626            .build();
627
628        // Sleep for a duration 5X (plus liberal buffer to account for potential
629        // CI slowness) the interval to ensure multiple collection.
630        // Not a fan of such tests, but this seems to be the only way to test
631        // if periodic reader is doing its job.
632        // TODO: Decide if this should be ignored in CI
633        std::thread::sleep(interval * 5 * 20);
634
635        // Assert
636        assert!(i.load(Ordering::Relaxed) >= 5);
637    }
638
639    #[test]
640    fn shutdown_repeat() {
641        // Arrange
642        let exporter = InMemoryMetricExporter::default();
643        let reader = PeriodicReader::builder(exporter.clone()).build();
644
645        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
646        let result = meter_provider.shutdown();
647        assert!(result.is_ok());
648
649        // calling shutdown again should return Err
650        let result = meter_provider.shutdown();
651        assert!(result.is_err());
652        assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
653
654        // calling shutdown again should return Err
655        let result = meter_provider.shutdown();
656        assert!(result.is_err());
657        assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
658    }
659
660    #[test]
661    fn flush_after_shutdown() {
662        // Arrange
663        let exporter = InMemoryMetricExporter::default();
664        let reader = PeriodicReader::builder(exporter.clone()).build();
665
666        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
667        let result = meter_provider.force_flush();
668        assert!(result.is_ok());
669
670        let result = meter_provider.shutdown();
671        assert!(result.is_ok());
672
673        // calling force_flush after shutdown should return Err
674        let result = meter_provider.force_flush();
675        assert!(result.is_err());
676    }
677
678    #[test]
679    fn flush_repeat() {
680        // Arrange
681        let exporter = InMemoryMetricExporter::default();
682        let reader = PeriodicReader::builder(exporter.clone()).build();
683
684        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
685        let result = meter_provider.force_flush();
686        assert!(result.is_ok());
687
688        // calling force_flush again should return Ok
689        let result = meter_provider.force_flush();
690        assert!(result.is_ok());
691    }
692
693    #[test]
694    fn periodic_reader_without_pipeline() {
695        // Arrange
696        let exporter = InMemoryMetricExporter::default();
697        let reader = PeriodicReader::builder(exporter.clone()).build();
698
699        let rm = &mut ResourceMetrics {
700            resource: Resource::empty(),
701            scope_metrics: Vec::new(),
702        };
703        // Pipeline is not registered, so collect should return an error
704        let result = reader.collect(rm);
705        assert!(result.is_err());
706
707        // Pipeline is not registered, so flush should return an error
708        let result = reader.force_flush();
709        assert!(result.is_err());
710
711        // Adding reader to meter provider should register the pipeline
712        // TODO: This part might benefit from a different design.
713        let meter_provider = SdkMeterProvider::builder()
714            .with_reader(reader.clone())
715            .build();
716
717        // Now collect and flush should succeed
718        let result = reader.collect(rm);
719        assert!(result.is_ok());
720
721        let result = meter_provider.force_flush();
722        assert!(result.is_ok());
723    }
724
725    #[test]
726    fn exporter_failures_are_handled() {
727        // create a mock exporter that fails 1st time and succeeds 2nd time
728        // Validate using this exporter that periodic reader can handle exporter failure
729        // and continue to export metrics.
730        // Arrange
731        let interval = std::time::Duration::from_millis(10);
732        let exporter = MetricExporterThatFailsOnlyOnFirst::default();
733        let reader = PeriodicReader::builder(exporter.clone())
734            .with_interval(interval)
735            .build();
736
737        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
738        let meter = meter_provider.meter("test");
739        let counter = meter.u64_counter("sync_counter").build();
740        counter.add(1, &[]);
741        let _obs_counter = meter
742            .u64_observable_counter("testcounter")
743            .with_callback(move |observer| {
744                observer.observe(1, &[]);
745            })
746            .build();
747
748        // Sleep for a duration much longer than the interval to trigger
749        // multiple exports, including failures.
750        // Not a fan of such tests, but this seems to be the
751        // only way to test if periodic reader is doing its job. TODO: Decide if
752        // this should be ignored in CI
753        std::thread::sleep(Duration::from_millis(500));
754
755        // Assert that atleast 2 exports are attempted given the 1st one fails.
756        assert!(exporter.get_count() >= 2);
757    }
758
759    #[test]
760    fn shutdown_passed_to_exporter() {
761        // Arrange
762        let exporter = MockMetricExporter::default();
763        let reader = PeriodicReader::builder(exporter.clone()).build();
764
765        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
766        let meter = meter_provider.meter("test");
767        let counter = meter.u64_counter("sync_counter").build();
768        counter.add(1, &[]);
769
770        // shutdown the provider, which should call shutdown on periodic reader
771        // which in turn should call shutdown on exporter.
772        let result = meter_provider.shutdown();
773        assert!(result.is_ok());
774        assert!(exporter.is_shutdown.load(Ordering::Relaxed));
775    }
776
777    #[test]
778    fn collection() {
779        collection_triggered_by_interval_helper();
780        collection_triggered_by_flush_helper();
781        collection_triggered_by_shutdown_helper();
782        collection_triggered_by_drop_helper();
783    }
784
785    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
786    async fn collection_from_tokio_multi_with_one_worker() {
787        collection_triggered_by_interval_helper();
788        collection_triggered_by_flush_helper();
789        collection_triggered_by_shutdown_helper();
790        collection_triggered_by_drop_helper();
791    }
792
793    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
794    async fn collection_from_tokio_with_two_worker() {
795        collection_triggered_by_interval_helper();
796        collection_triggered_by_flush_helper();
797        collection_triggered_by_shutdown_helper();
798        collection_triggered_by_drop_helper();
799    }
800
801    #[tokio::test(flavor = "current_thread")]
802    async fn collection_from_tokio_current() {
803        collection_triggered_by_interval_helper();
804        collection_triggered_by_flush_helper();
805        collection_triggered_by_shutdown_helper();
806        collection_triggered_by_drop_helper();
807    }
808
809    fn collection_triggered_by_interval_helper() {
810        collection_helper(|_| {
811            // Sleep for a duration longer than the interval to ensure at least one collection
812            // Not a fan of such tests, but this seems to be the only way to test
813            // if periodic reader is doing its job.
814            // TODO: Decide if this should be ignored in CI
815            std::thread::sleep(Duration::from_millis(500));
816        });
817    }
818
819    fn collection_triggered_by_flush_helper() {
820        collection_helper(|meter_provider| {
821            meter_provider.force_flush().expect("flush should succeed");
822        });
823    }
824
825    fn collection_triggered_by_shutdown_helper() {
826        collection_helper(|meter_provider| {
827            meter_provider.shutdown().expect("shutdown should succeed");
828        });
829    }
830
831    fn collection_triggered_by_drop_helper() {
832        collection_helper(|meter_provider| {
833            drop(meter_provider);
834        });
835    }
836
837    fn collection_helper(trigger: fn(SdkMeterProvider)) {
838        // Arrange
839        let exporter = InMemoryMetricExporter::default();
840        let reader = PeriodicReader::builder(exporter.clone()).build();
841        let (sender, receiver) = mpsc::channel();
842
843        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
844        let meter = meter_provider.meter("test");
845        let _counter = meter
846            .u64_observable_counter("testcounter")
847            .with_callback(move |observer| {
848                observer.observe(1, &[]);
849                sender.send(()).expect("channel should still be open");
850            })
851            .build();
852
853        // Act
854        trigger(meter_provider);
855
856        // Assert
857        receiver
858            .recv_timeout(Duration::ZERO)
859            .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback");
860
861        let exported_metrics = exporter
862            .get_finished_metrics()
863            .expect("this should not fail");
864        assert!(
865            !exported_metrics.is_empty(),
866            "Metrics should be available in exporter."
867        );
868    }
869
870    async fn some_async_function() -> u64 {
871        // No dependency on any particular async runtime.
872        std::thread::sleep(std::time::Duration::from_millis(1));
873        1
874    }
875
876    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
877    async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
878        async_inside_observable_callback_helper();
879    }
880
881    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
882    async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
883        async_inside_observable_callback_helper();
884    }
885
886    #[tokio::test(flavor = "current_thread")]
887    async fn async_inside_observable_callback_from_tokio_current_thread() {
888        async_inside_observable_callback_helper();
889    }
890
891    #[test]
892    fn async_inside_observable_callback_from_regular_main() {
893        async_inside_observable_callback_helper();
894    }
895
896    fn async_inside_observable_callback_helper() {
897        let interval = std::time::Duration::from_millis(10);
898        let exporter = InMemoryMetricExporter::default();
899        let reader = PeriodicReader::builder(exporter.clone())
900            .with_interval(interval)
901            .build();
902
903        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
904        let meter = meter_provider.meter("test");
905        let _gauge = meter
906            .u64_observable_gauge("my_observable_gauge")
907            .with_callback(|observer| {
908                // using futures_executor::block_on intentionally and avoiding
909                // any particular async runtime.
910                let value = futures_executor::block_on(some_async_function());
911                observer.observe(value, &[]);
912            })
913            .build();
914
915        meter_provider.force_flush().expect("flush should succeed");
916        let exported_metrics = exporter
917            .get_finished_metrics()
918            .expect("this should not fail");
919        assert!(
920            !exported_metrics.is_empty(),
921            "Metrics should be available in exporter."
922        );
923    }
924
925    async fn some_tokio_async_function() -> u64 {
926        // Tokio specific async function
927        tokio::time::sleep(Duration::from_millis(1)).await;
928        1
929    }
930
931    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
932
933    async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
934        tokio_async_inside_observable_callback_helper(true);
935    }
936
937    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
938    async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
939        tokio_async_inside_observable_callback_helper(true);
940    }
941
942    #[tokio::test(flavor = "current_thread")]
943    #[ignore] //TODO: Investigate if this can be fixed.
944    async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
945        tokio_async_inside_observable_callback_helper(true);
946    }
947
948    #[test]
949    fn tokio_async_inside_observable_callback_from_regular_main() {
950        tokio_async_inside_observable_callback_helper(false);
951    }
952
953    fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
954        let exporter = InMemoryMetricExporter::default();
955        let reader = PeriodicReader::builder(exporter.clone()).build();
956
957        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
958        let meter = meter_provider.meter("test");
959
960        if use_current_tokio_runtime {
961            let rt = tokio::runtime::Handle::current().clone();
962            let _gauge = meter
963                .u64_observable_gauge("my_observable_gauge")
964                .with_callback(move |observer| {
965                    // call tokio specific async function from here
966                    let value = rt.block_on(some_tokio_async_function());
967                    observer.observe(value, &[]);
968                })
969                .build();
970            // rt here is a reference to the current tokio runtime.
971            // Dropping it occurs when the tokio::main itself ends.
972        } else {
973            let rt = tokio::runtime::Runtime::new().unwrap();
974            let _gauge = meter
975                .u64_observable_gauge("my_observable_gauge")
976                .with_callback(move |observer| {
977                    // call tokio specific async function from here
978                    let value = rt.block_on(some_tokio_async_function());
979                    observer.observe(value, &[]);
980                })
981                .build();
982            // rt is not dropped here as it is moved to the closure,
983            // and is dropped only when MeterProvider itself is dropped.
984            // This works when called from normal main.
985        };
986
987        meter_provider.force_flush().expect("flush should succeed");
988        let exported_metrics = exporter
989            .get_finished_metrics()
990            .expect("this should not fail");
991        assert!(
992            !exported_metrics.is_empty(),
993            "Metrics should be available in exporter."
994        );
995    }
996}