opentelemetry_sdk/metrics/
periodic_reader.rs1use std::{
2 env, fmt,
3 sync::{
4 mpsc::{self, Receiver, Sender},
5 Arc, Mutex, Weak,
6 },
7 thread,
8 time::{Duration, Instant},
9};
10
11use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
12
13use crate::{
14 error::{OTelSdkError, OTelSdkResult},
15 metrics::{exporter::PushMetricExporter, reader::SdkProducer},
16 Resource,
17};
18
19use super::{
20 data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader,
21 Temporality,
22};
23
24const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
25
26const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
27
28#[derive(Debug)]
30pub struct PeriodicReaderBuilder<E> {
31 interval: Duration,
32 exporter: E,
33}
34
35impl<E> PeriodicReaderBuilder<E>
36where
37 E: PushMetricExporter,
38{
39 fn new(exporter: E) -> Self {
40 let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
41 .ok()
42 .and_then(|v| v.parse().map(Duration::from_millis).ok())
43 .unwrap_or(DEFAULT_INTERVAL);
44
45 PeriodicReaderBuilder { interval, exporter }
46 }
47
48 pub fn with_interval(mut self, interval: Duration) -> Self {
56 if !interval.is_zero() {
57 self.interval = interval;
58 }
59 self
60 }
61
62 pub fn build(self) -> PeriodicReader<E> {
64 PeriodicReader::new(self.exporter, self.interval)
65 }
66}
67
68pub struct PeriodicReader<E: PushMetricExporter> {
129 inner: Arc<PeriodicReaderInner<E>>,
130}
131
132impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
133 fn clone(&self) -> Self {
134 Self {
135 inner: Arc::clone(&self.inner),
136 }
137 }
138}
139
140impl<E: PushMetricExporter> PeriodicReader<E> {
141 pub fn builder(exporter: E) -> PeriodicReaderBuilder<E> {
143 PeriodicReaderBuilder::new(exporter)
144 }
145
146 fn new(exporter: E, interval: Duration) -> Self {
147 let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
148 mpsc::channel();
149 let exporter_arc = Arc::new(exporter);
150 let reader = PeriodicReader {
151 inner: Arc::new(PeriodicReaderInner {
152 message_sender,
153 producer: Mutex::new(None),
154 exporter: exporter_arc.clone(),
155 }),
156 };
157 let cloned_reader = reader.clone();
158
159 let mut rm = ResourceMetrics {
160 resource: Resource::empty(),
161 scope_metrics: Vec::new(),
162 };
163
164 let result_thread_creation = thread::Builder::new()
165 .name("OpenTelemetry.Metrics.PeriodicReader".to_string())
166 .spawn(move || {
167 let _suppress_guard = Context::enter_telemetry_suppressed_scope();
168 let mut interval_start = Instant::now();
169 let mut remaining_interval = interval;
170 otel_debug!(
171 name: "PeriodReaderThreadStarted",
172 interval_in_millisecs = interval.as_millis(),
173 );
174 loop {
175 otel_debug!(
176 name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval_in_millisecs = remaining_interval.as_millis()
177 );
178 match message_receiver.recv_timeout(remaining_interval) {
179 Ok(Message::Flush(response_sender)) => {
180 otel_debug!(
181 name: "PeriodReaderThreadExportingDueToFlush"
182 );
183 let export_result = cloned_reader.collect_and_export(&mut rm);
184 otel_debug!(
185 name: "PeriodReaderInvokedExport",
186 export_result = format!("{:?}", export_result)
187 );
188
189 if export_result.is_err() {
199 if response_sender.send(false).is_err() {
200 otel_debug!(
201 name: "PeriodReader.Flush.ResponseSendError",
202 message = "PeriodicReader's flush has failed, but unable to send this info back to caller.
203 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
204 );
205 }
206 } else if response_sender.send(true).is_err() {
207 otel_debug!(
208 name: "PeriodReader.Flush.ResponseSendError",
209 message = "PeriodicReader's flush has completed successfully, but unable to send this info back to caller.
210 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
211 );
212 }
213
214 let elapsed = interval_start.elapsed();
216 if elapsed < interval {
217 remaining_interval = interval - elapsed;
218 otel_debug!(
219 name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush",
220 remaining_interval = remaining_interval.as_secs()
221 );
222 } else {
223 otel_debug!(
224 name: "PeriodReaderThreadAdjustingExportAfterFlush",
225 );
226 interval_start = Instant::now();
233 remaining_interval = Duration::ZERO;
234 }
235 }
236 Ok(Message::Shutdown(response_sender)) => {
237 otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
239 let export_result = cloned_reader.collect_and_export(&mut rm);
240 otel_debug!(
241 name: "PeriodReaderInvokedExport",
242 export_result = format!("{:?}", export_result)
243 );
244 let shutdown_result = exporter_arc.shutdown();
245 otel_debug!(
246 name: "PeriodReaderInvokedExporterShutdown",
247 shutdown_result = format!("{:?}", shutdown_result)
248 );
249
250 if export_result.is_err() || shutdown_result.is_err() {
260 if response_sender.send(false).is_err() {
261 otel_info!(
262 name: "PeriodReaderThreadShutdown.ResponseSendError",
263 message = "PeriodicReader's shutdown has failed, but unable to send this info back to caller.
264 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
265 );
266 }
267 } else if response_sender.send(true).is_err() {
268 otel_debug!(
269 name: "PeriodReaderThreadShutdown.ResponseSendError",
270 message = "PeriodicReader completed its shutdown, but unable to send this info back to caller.
271 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
272 );
273 }
274
275 otel_debug!(
276 name: "PeriodReaderThreadExiting",
277 reason = "ShutdownRequested"
278 );
279 break;
280 }
281 Err(mpsc::RecvTimeoutError::Timeout) => {
282 let export_start = Instant::now();
283 otel_debug!(
284 name: "PeriodReaderThreadExportingDueToTimer"
285 );
286
287 let export_result = cloned_reader.collect_and_export(&mut rm);
288 otel_debug!(
289 name: "PeriodReaderInvokedExport",
290 export_result = format!("{:?}", export_result)
291 );
292
293 let time_taken_for_export = export_start.elapsed();
294 if time_taken_for_export > interval {
295 otel_debug!(
296 name: "PeriodReaderThreadExportTookLongerThanInterval"
297 );
298 interval_start = Instant::now();
305 remaining_interval = Duration::ZERO;
306 } else {
307 remaining_interval = interval - time_taken_for_export;
308 interval_start = Instant::now();
309 }
310 }
311 Err(mpsc::RecvTimeoutError::Disconnected) => {
312 otel_debug!(
315 name: "PeriodReaderThreadExiting",
316 reason = "MessageSenderDisconnected"
317 );
318 break;
319 }
320 }
321 }
322 otel_debug!(
323 name: "PeriodReaderThreadStopped"
324 );
325 });
326
327 #[allow(unused_variables)]
329 if let Err(e) = result_thread_creation {
330 otel_error!(
331 name: "PeriodReaderThreadStartError",
332 message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
333 error = format!("{:?}", e)
334 );
335 }
336 reader
337 }
338
339 fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
340 self.inner.collect_and_export(rm)
341 }
342}
343
344impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346 f.debug_struct("PeriodicReader").finish()
347 }
348}
349
350struct PeriodicReaderInner<E: PushMetricExporter> {
351 exporter: Arc<E>,
352 message_sender: mpsc::Sender<Message>,
353 producer: Mutex<Option<Weak<dyn SdkProducer>>>,
354}
355
356impl<E: PushMetricExporter> PeriodicReaderInner<E> {
357 fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
358 let mut inner = self.producer.lock().expect("lock poisoned");
359 *inner = Some(producer);
360 }
361
362 fn temporality(&self, _kind: InstrumentKind) -> Temporality {
363 self.exporter.temporality()
364 }
365
366 fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
367 let producer = self.producer.lock().expect("lock poisoned");
368 if let Some(p) = producer.as_ref() {
369 p.upgrade()
370 .ok_or(OTelSdkError::AlreadyShutdown)?
371 .produce(rm)?;
372 Ok(())
373 } else {
374 otel_warn!(
375 name: "PeriodReader.MeterProviderNotRegistered",
376 message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
377 This occurs when a periodic reader is created but not associated with a MeterProvider \
378 by calling `.with_reader(reader)` on MeterProviderBuilder."
379 );
380 Err(OTelSdkError::InternalFailure(
381 "MeterProvider is not registered".into(),
382 ))
383 }
384 }
385
386 fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
387 let current_time = Instant::now();
388 let collect_result = self.collect(rm);
389 let time_taken_for_collect = current_time.elapsed();
390
391 #[allow(clippy::question_mark)]
392 if let Err(e) = collect_result {
393 otel_warn!(
394 name: "PeriodReaderCollectError",
395 error = format!("{:?}", e)
396 );
397 return Err(OTelSdkError::InternalFailure(e.to_string()));
398 }
399
400 if rm.scope_metrics.is_empty() {
401 otel_debug!(name: "NoMetricsCollected");
402 return Ok(());
403 }
404
405 let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
406 count + scope_metrics.metrics.len()
407 });
408 otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
409
410 futures_executor::block_on(self.exporter.export(rm))
413 }
414
415 fn force_flush(&self) -> OTelSdkResult {
416 let (response_tx, response_rx) = mpsc::channel();
430 self.message_sender
431 .send(Message::Flush(response_tx))
432 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
433
434 if let Ok(response) = response_rx.recv() {
435 if response {
437 Ok(())
438 } else {
439 Err(OTelSdkError::InternalFailure("Failed to flush".into()))
440 }
441 } else {
442 Err(OTelSdkError::InternalFailure("Failed to flush".into()))
443 }
444 }
445
446 fn shutdown(&self) -> OTelSdkResult {
447 let (response_tx, response_rx) = mpsc::channel();
449 self.message_sender
450 .send(Message::Shutdown(response_tx))
451 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
452
453 match response_rx.recv_timeout(Duration::from_secs(5)) {
455 Ok(response) => {
456 if response {
457 Ok(())
458 } else {
459 Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
460 }
461 }
462 Err(mpsc::RecvTimeoutError::Timeout) => {
463 Err(OTelSdkError::Timeout(Duration::from_secs(5)))
464 }
465 Err(mpsc::RecvTimeoutError::Disconnected) => {
466 Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
467 }
468 }
469 }
470}
471
472#[derive(Debug)]
473enum Message {
474 Flush(Sender<bool>),
475 Shutdown(Sender<bool>),
476}
477
478impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
479 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
480 self.inner.register_pipeline(pipeline);
481 }
482
483 fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
484 self.inner.collect(rm)
485 }
486
487 fn force_flush(&self) -> OTelSdkResult {
488 self.inner.force_flush()
489 }
490
491 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
496 self.inner.shutdown()
497 }
498
499 fn temporality(&self, kind: InstrumentKind) -> Temporality {
507 kind.temporality_preference(self.inner.temporality(kind))
508 }
509}
510
511#[cfg(all(test, feature = "testing"))]
512mod tests {
513 use super::PeriodicReader;
514 use crate::{
515 error::{OTelSdkError, OTelSdkResult},
516 metrics::{
517 data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
518 InMemoryMetricExporter, SdkMeterProvider, Temporality,
519 },
520 Resource,
521 };
522 use opentelemetry::metrics::MeterProvider;
523 use std::{
524 sync::{
525 atomic::{AtomicBool, AtomicUsize, Ordering},
526 mpsc, Arc,
527 },
528 time::Duration,
529 };
530
531 #[derive(Debug, Clone)]
535 struct MetricExporterThatFailsOnlyOnFirst {
536 count: Arc<AtomicUsize>,
537 }
538
539 impl Default for MetricExporterThatFailsOnlyOnFirst {
540 fn default() -> Self {
541 MetricExporterThatFailsOnlyOnFirst {
542 count: Arc::new(AtomicUsize::new(0)),
543 }
544 }
545 }
546
547 impl MetricExporterThatFailsOnlyOnFirst {
548 fn get_count(&self) -> usize {
549 self.count.load(Ordering::Relaxed)
550 }
551 }
552
553 impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
554 async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
555 if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
556 Err(OTelSdkError::InternalFailure("export failed".into()))
557 } else {
558 Ok(())
559 }
560 }
561
562 fn force_flush(&self) -> OTelSdkResult {
563 Ok(())
564 }
565
566 fn shutdown(&self) -> OTelSdkResult {
567 Ok(())
568 }
569
570 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
571 Ok(())
572 }
573
574 fn temporality(&self) -> Temporality {
575 Temporality::Cumulative
576 }
577 }
578
579 #[derive(Debug, Clone, Default)]
580 struct MockMetricExporter {
581 is_shutdown: Arc<AtomicBool>,
582 }
583
584 impl PushMetricExporter for MockMetricExporter {
585 async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
586 Ok(())
587 }
588
589 fn force_flush(&self) -> OTelSdkResult {
590 Ok(())
591 }
592
593 fn shutdown(&self) -> OTelSdkResult {
594 self.shutdown_with_timeout(Duration::from_secs(5))
595 }
596
597 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
598 self.is_shutdown.store(true, Ordering::Relaxed);
599 Ok(())
600 }
601
602 fn temporality(&self) -> Temporality {
603 Temporality::Cumulative
604 }
605 }
606
607 #[test]
608 fn collection_triggered_by_interval_multiple() {
609 let interval = std::time::Duration::from_millis(1);
611 let exporter = InMemoryMetricExporter::default();
612 let reader = PeriodicReader::builder(exporter.clone())
613 .with_interval(interval)
614 .build();
615 let i = Arc::new(AtomicUsize::new(0));
616 let i_clone = i.clone();
617
618 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
620 let meter = meter_provider.meter("test");
621 let _counter = meter
622 .u64_observable_counter("testcounter")
623 .with_callback(move |_| {
624 i_clone.fetch_add(1, Ordering::Relaxed);
625 })
626 .build();
627
628 std::thread::sleep(interval * 5 * 20);
634
635 assert!(i.load(Ordering::Relaxed) >= 5);
637 }
638
639 #[test]
640 fn shutdown_repeat() {
641 let exporter = InMemoryMetricExporter::default();
643 let reader = PeriodicReader::builder(exporter.clone()).build();
644
645 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
646 let result = meter_provider.shutdown();
647 assert!(result.is_ok());
648
649 let result = meter_provider.shutdown();
651 assert!(result.is_err());
652 assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
653
654 let result = meter_provider.shutdown();
656 assert!(result.is_err());
657 assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
658 }
659
660 #[test]
661 fn flush_after_shutdown() {
662 let exporter = InMemoryMetricExporter::default();
664 let reader = PeriodicReader::builder(exporter.clone()).build();
665
666 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
667 let result = meter_provider.force_flush();
668 assert!(result.is_ok());
669
670 let result = meter_provider.shutdown();
671 assert!(result.is_ok());
672
673 let result = meter_provider.force_flush();
675 assert!(result.is_err());
676 }
677
678 #[test]
679 fn flush_repeat() {
680 let exporter = InMemoryMetricExporter::default();
682 let reader = PeriodicReader::builder(exporter.clone()).build();
683
684 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
685 let result = meter_provider.force_flush();
686 assert!(result.is_ok());
687
688 let result = meter_provider.force_flush();
690 assert!(result.is_ok());
691 }
692
693 #[test]
694 fn periodic_reader_without_pipeline() {
695 let exporter = InMemoryMetricExporter::default();
697 let reader = PeriodicReader::builder(exporter.clone()).build();
698
699 let rm = &mut ResourceMetrics {
700 resource: Resource::empty(),
701 scope_metrics: Vec::new(),
702 };
703 let result = reader.collect(rm);
705 assert!(result.is_err());
706
707 let result = reader.force_flush();
709 assert!(result.is_err());
710
711 let meter_provider = SdkMeterProvider::builder()
714 .with_reader(reader.clone())
715 .build();
716
717 let result = reader.collect(rm);
719 assert!(result.is_ok());
720
721 let result = meter_provider.force_flush();
722 assert!(result.is_ok());
723 }
724
725 #[test]
726 fn exporter_failures_are_handled() {
727 let interval = std::time::Duration::from_millis(10);
732 let exporter = MetricExporterThatFailsOnlyOnFirst::default();
733 let reader = PeriodicReader::builder(exporter.clone())
734 .with_interval(interval)
735 .build();
736
737 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
738 let meter = meter_provider.meter("test");
739 let counter = meter.u64_counter("sync_counter").build();
740 counter.add(1, &[]);
741 let _obs_counter = meter
742 .u64_observable_counter("testcounter")
743 .with_callback(move |observer| {
744 observer.observe(1, &[]);
745 })
746 .build();
747
748 std::thread::sleep(Duration::from_millis(500));
754
755 assert!(exporter.get_count() >= 2);
757 }
758
759 #[test]
760 fn shutdown_passed_to_exporter() {
761 let exporter = MockMetricExporter::default();
763 let reader = PeriodicReader::builder(exporter.clone()).build();
764
765 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
766 let meter = meter_provider.meter("test");
767 let counter = meter.u64_counter("sync_counter").build();
768 counter.add(1, &[]);
769
770 let result = meter_provider.shutdown();
773 assert!(result.is_ok());
774 assert!(exporter.is_shutdown.load(Ordering::Relaxed));
775 }
776
777 #[test]
778 fn collection() {
779 collection_triggered_by_interval_helper();
780 collection_triggered_by_flush_helper();
781 collection_triggered_by_shutdown_helper();
782 collection_triggered_by_drop_helper();
783 }
784
785 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
786 async fn collection_from_tokio_multi_with_one_worker() {
787 collection_triggered_by_interval_helper();
788 collection_triggered_by_flush_helper();
789 collection_triggered_by_shutdown_helper();
790 collection_triggered_by_drop_helper();
791 }
792
793 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
794 async fn collection_from_tokio_with_two_worker() {
795 collection_triggered_by_interval_helper();
796 collection_triggered_by_flush_helper();
797 collection_triggered_by_shutdown_helper();
798 collection_triggered_by_drop_helper();
799 }
800
801 #[tokio::test(flavor = "current_thread")]
802 async fn collection_from_tokio_current() {
803 collection_triggered_by_interval_helper();
804 collection_triggered_by_flush_helper();
805 collection_triggered_by_shutdown_helper();
806 collection_triggered_by_drop_helper();
807 }
808
809 fn collection_triggered_by_interval_helper() {
810 collection_helper(|_| {
811 std::thread::sleep(Duration::from_millis(500));
816 });
817 }
818
819 fn collection_triggered_by_flush_helper() {
820 collection_helper(|meter_provider| {
821 meter_provider.force_flush().expect("flush should succeed");
822 });
823 }
824
825 fn collection_triggered_by_shutdown_helper() {
826 collection_helper(|meter_provider| {
827 meter_provider.shutdown().expect("shutdown should succeed");
828 });
829 }
830
831 fn collection_triggered_by_drop_helper() {
832 collection_helper(|meter_provider| {
833 drop(meter_provider);
834 });
835 }
836
837 fn collection_helper(trigger: fn(SdkMeterProvider)) {
838 let exporter = InMemoryMetricExporter::default();
840 let reader = PeriodicReader::builder(exporter.clone()).build();
841 let (sender, receiver) = mpsc::channel();
842
843 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
844 let meter = meter_provider.meter("test");
845 let _counter = meter
846 .u64_observable_counter("testcounter")
847 .with_callback(move |observer| {
848 observer.observe(1, &[]);
849 sender.send(()).expect("channel should still be open");
850 })
851 .build();
852
853 trigger(meter_provider);
855
856 receiver
858 .recv_timeout(Duration::ZERO)
859 .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback");
860
861 let exported_metrics = exporter
862 .get_finished_metrics()
863 .expect("this should not fail");
864 assert!(
865 !exported_metrics.is_empty(),
866 "Metrics should be available in exporter."
867 );
868 }
869
870 async fn some_async_function() -> u64 {
871 std::thread::sleep(std::time::Duration::from_millis(1));
873 1
874 }
875
876 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
877 async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
878 async_inside_observable_callback_helper();
879 }
880
881 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
882 async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
883 async_inside_observable_callback_helper();
884 }
885
886 #[tokio::test(flavor = "current_thread")]
887 async fn async_inside_observable_callback_from_tokio_current_thread() {
888 async_inside_observable_callback_helper();
889 }
890
891 #[test]
892 fn async_inside_observable_callback_from_regular_main() {
893 async_inside_observable_callback_helper();
894 }
895
896 fn async_inside_observable_callback_helper() {
897 let interval = std::time::Duration::from_millis(10);
898 let exporter = InMemoryMetricExporter::default();
899 let reader = PeriodicReader::builder(exporter.clone())
900 .with_interval(interval)
901 .build();
902
903 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
904 let meter = meter_provider.meter("test");
905 let _gauge = meter
906 .u64_observable_gauge("my_observable_gauge")
907 .with_callback(|observer| {
908 let value = futures_executor::block_on(some_async_function());
911 observer.observe(value, &[]);
912 })
913 .build();
914
915 meter_provider.force_flush().expect("flush should succeed");
916 let exported_metrics = exporter
917 .get_finished_metrics()
918 .expect("this should not fail");
919 assert!(
920 !exported_metrics.is_empty(),
921 "Metrics should be available in exporter."
922 );
923 }
924
925 async fn some_tokio_async_function() -> u64 {
926 tokio::time::sleep(Duration::from_millis(1)).await;
928 1
929 }
930
931 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
932
933 async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
934 tokio_async_inside_observable_callback_helper(true);
935 }
936
937 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
938 async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
939 tokio_async_inside_observable_callback_helper(true);
940 }
941
942 #[tokio::test(flavor = "current_thread")]
943 #[ignore] async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
945 tokio_async_inside_observable_callback_helper(true);
946 }
947
948 #[test]
949 fn tokio_async_inside_observable_callback_from_regular_main() {
950 tokio_async_inside_observable_callback_helper(false);
951 }
952
953 fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
954 let exporter = InMemoryMetricExporter::default();
955 let reader = PeriodicReader::builder(exporter.clone()).build();
956
957 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
958 let meter = meter_provider.meter("test");
959
960 if use_current_tokio_runtime {
961 let rt = tokio::runtime::Handle::current().clone();
962 let _gauge = meter
963 .u64_observable_gauge("my_observable_gauge")
964 .with_callback(move |observer| {
965 let value = rt.block_on(some_tokio_async_function());
967 observer.observe(value, &[]);
968 })
969 .build();
970 } else {
973 let rt = tokio::runtime::Runtime::new().unwrap();
974 let _gauge = meter
975 .u64_observable_gauge("my_observable_gauge")
976 .with_callback(move |observer| {
977 let value = rt.block_on(some_tokio_async_function());
979 observer.observe(value, &[]);
980 })
981 .build();
982 };
986
987 meter_provider.force_flush().expect("flush should succeed");
988 let exported_metrics = exporter
989 .get_finished_metrics()
990 .expect("this should not fail");
991 assert!(
992 !exported_metrics.is_empty(),
993 "Metrics should be available in exporter."
994 );
995 }
996}