1#[allow(unreachable_pub)]
43#[allow(unused)]
44pub(crate) mod aggregation;
45pub mod data;
46mod error;
47pub mod exporter;
48pub(crate) mod instrument;
49pub(crate) mod internal;
50#[cfg(feature = "experimental_metrics_custom_reader")]
51pub(crate) mod manual_reader;
52pub(crate) mod meter;
53mod meter_provider;
54pub(crate) mod noop;
55pub(crate) mod periodic_reader;
56#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
57pub mod periodic_reader_with_async_runtime;
59pub(crate) mod pipeline;
60#[cfg(feature = "experimental_metrics_custom_reader")]
61pub mod reader;
62#[cfg(not(feature = "experimental_metrics_custom_reader"))]
63pub(crate) mod reader;
64pub(crate) mod view;
65
66#[cfg(any(feature = "testing", test))]
68#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
69pub mod in_memory_exporter;
70#[cfg(any(feature = "testing", test))]
71#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
72pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
73
74#[cfg(feature = "spec_unstable_metrics_views")]
75pub use aggregation::*;
76#[cfg(feature = "experimental_metrics_custom_reader")]
77pub use manual_reader::*;
78pub use meter_provider::*;
79pub use periodic_reader::*;
80#[cfg(feature = "experimental_metrics_custom_reader")]
81pub use pipeline::Pipeline;
82
83pub use instrument::{Instrument, InstrumentKind, Stream, StreamBuilder};
84
85use std::hash::Hash;
86
87#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
89#[non_exhaustive]
90pub enum Temporality {
91 #[default]
96 Cumulative,
97
98 Delta,
103
104 LowMemory,
108}
109
110#[cfg(all(test, feature = "testing"))]
111mod tests {
112 use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
113 use super::data::MetricData;
114 use super::internal::Number;
115 use super::*;
116 use crate::metrics::data::ResourceMetrics;
117 use crate::metrics::internal::AggregatedMetricsAccess;
118 use crate::metrics::InMemoryMetricExporter;
119 use crate::metrics::InMemoryMetricExporterBuilder;
120 use data::GaugeDataPoint;
121 use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
122 use opentelemetry::InstrumentationScope;
123 use opentelemetry::Value;
124 use opentelemetry::{metrics::MeterProvider as _, KeyValue};
125 use rand::{rngs, Rng, SeedableRng};
126 use std::cmp::{max, min};
127 use std::sync::atomic::{AtomicBool, Ordering};
128 use std::sync::{Arc, Mutex};
129 use std::thread;
130 use std::time::Duration;
131
132 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
139 #[cfg(not(feature = "experimental_metrics_disable_name_validation"))]
140 async fn invalid_instrument_config_noops() {
141 let invalid_instrument_names = vec![
144 "_startWithNoneAlphabet",
145 "utf8char锈",
146 "a".repeat(256).leak(),
147 "invalid name",
148 ];
149 for name in invalid_instrument_names {
150 let test_context = TestContext::new(Temporality::Cumulative);
151 let counter = test_context.meter().u64_counter(name).build();
152 counter.add(1, &[]);
153
154 let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
155 up_down_counter.add(1, &[]);
156
157 let gauge = test_context.meter().f64_gauge(name).build();
158 gauge.record(1.9, &[]);
159
160 let histogram = test_context.meter().f64_histogram(name).build();
161 histogram.record(1.0, &[]);
162
163 let _observable_counter = test_context
164 .meter()
165 .u64_observable_counter(name)
166 .with_callback(move |observer| {
167 observer.observe(1, &[]);
168 })
169 .build();
170
171 let _observable_gauge = test_context
172 .meter()
173 .f64_observable_gauge(name)
174 .with_callback(move |observer| {
175 observer.observe(1.0, &[]);
176 })
177 .build();
178
179 let _observable_up_down_counter = test_context
180 .meter()
181 .i64_observable_up_down_counter(name)
182 .with_callback(move |observer| {
183 observer.observe(1, &[]);
184 })
185 .build();
186
187 test_context.flush_metrics();
188
189 test_context.check_no_metrics();
191 }
192
193 let invalid_bucket_boundaries = vec![
194 vec![1.0, 1.0], vec![1.0, 2.0, 3.0, 2.0], vec![1.0, 2.0, 3.0, 4.0, 2.5], vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], vec![1.0, 2.0, 3.0, f64::NAN], vec![f64::NEG_INFINITY, 2.0, 3.0], ];
201 for bucket_boundaries in invalid_bucket_boundaries {
202 let test_context = TestContext::new(Temporality::Cumulative);
203 let histogram = test_context
204 .meter()
205 .f64_histogram("test")
206 .with_boundaries(bucket_boundaries)
207 .build();
208 histogram.record(1.9, &[]);
209 test_context.flush_metrics();
210
211 test_context.check_no_metrics();
214 }
215 }
216
217 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
218 #[cfg(feature = "experimental_metrics_disable_name_validation")]
219 async fn valid_instrument_config_with_feature_experimental_metrics_disable_name_validation() {
220 let invalid_instrument_names = vec![
223 "_startWithNoneAlphabet",
224 "utf8char锈",
225 "",
226 "a".repeat(256).leak(),
227 "\\allow\\slash /sec",
228 "\\allow\\$$slash /sec",
229 "Total $ Count",
230 "\\test\\UsagePercent(Total) > 80%",
231 "invalid name",
232 ];
233 for name in invalid_instrument_names {
234 let test_context = TestContext::new(Temporality::Cumulative);
235 let counter = test_context.meter().u64_counter(name).build();
236 counter.add(1, &[]);
237
238 let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
239 up_down_counter.add(1, &[]);
240
241 let gauge = test_context.meter().f64_gauge(name).build();
242 gauge.record(1.9, &[]);
243
244 let histogram = test_context.meter().f64_histogram(name).build();
245 histogram.record(1.0, &[]);
246
247 let _observable_counter = test_context
248 .meter()
249 .u64_observable_counter(name)
250 .with_callback(move |observer| {
251 observer.observe(1, &[]);
252 })
253 .build();
254
255 let _observable_gauge = test_context
256 .meter()
257 .f64_observable_gauge(name)
258 .with_callback(move |observer| {
259 observer.observe(1.0, &[]);
260 })
261 .build();
262
263 let _observable_up_down_counter = test_context
264 .meter()
265 .i64_observable_up_down_counter(name)
266 .with_callback(move |observer| {
267 observer.observe(1, &[]);
268 })
269 .build();
270
271 test_context.flush_metrics();
272
273 let resource_metrics = test_context
275 .exporter
276 .get_finished_metrics()
277 .expect("metrics expected to be exported");
278
279 assert!(!resource_metrics.is_empty(), "metrics should be exported");
280 }
281
282 let invalid_bucket_boundaries = vec![
285 vec![1.0, 1.0], vec![1.0, 2.0, 3.0, 2.0], vec![1.0, 2.0, 3.0, 4.0, 2.5], vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], vec![1.0, 2.0, 3.0, f64::NAN], vec![f64::NEG_INFINITY, 2.0, 3.0], ];
292 for bucket_boundaries in invalid_bucket_boundaries {
293 let test_context = TestContext::new(Temporality::Cumulative);
294 let histogram = test_context
295 .meter()
296 .f64_histogram("test")
297 .with_boundaries(bucket_boundaries)
298 .build();
299 histogram.record(1.9, &[]);
300 test_context.flush_metrics();
301
302 test_context.check_no_metrics();
305 }
306 }
307
308 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
309 async fn counter_aggregation_delta() {
310 counter_aggregation_helper(Temporality::Delta);
313 }
314
315 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
316 async fn counter_aggregation_cumulative() {
317 counter_aggregation_helper(Temporality::Cumulative);
320 }
321
322 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
323 async fn counter_aggregation_no_attributes_cumulative() {
324 let mut test_context = TestContext::new(Temporality::Cumulative);
325 let counter = test_context.u64_counter("test", "my_counter", None);
326
327 counter.add(50, &[]);
328 test_context.flush_metrics();
329
330 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
331 unreachable!()
332 };
333
334 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
335 assert!(sum.is_monotonic, "Should produce monotonic.");
336 assert_eq!(
337 sum.temporality,
338 Temporality::Cumulative,
339 "Should produce cumulative"
340 );
341
342 let data_point = &sum.data_points[0];
343 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
344 assert_eq!(data_point.value, 50, "Unexpected data point value");
345 }
346
347 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
348 async fn counter_aggregation_no_attributes_delta() {
349 let mut test_context = TestContext::new(Temporality::Delta);
350 let counter = test_context.u64_counter("test", "my_counter", None);
351
352 counter.add(50, &[]);
353 test_context.flush_metrics();
354
355 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
356 unreachable!()
357 };
358
359 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
360 assert!(sum.is_monotonic, "Should produce monotonic.");
361 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
362
363 let data_point = &sum.data_points[0];
364 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
365 assert_eq!(data_point.value, 50, "Unexpected data point value");
366 }
367
368 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
369 async fn counter_aggregation_overflow_delta() {
370 counter_aggregation_overflow_helper(Temporality::Delta);
371 counter_aggregation_overflow_helper_custom_limit(Temporality::Delta);
372 }
373
374 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
375 async fn counter_aggregation_overflow_cumulative() {
376 counter_aggregation_overflow_helper(Temporality::Cumulative);
377 counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative);
378 }
379
380 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
381 async fn counter_aggregation_attribute_order_sorted_first_delta() {
382 counter_aggregation_attribute_order_helper(Temporality::Delta, true);
385 }
386
387 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
388 async fn counter_aggregation_attribute_order_sorted_first_cumulative() {
389 counter_aggregation_attribute_order_helper(Temporality::Cumulative, true);
392 }
393
394 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
395 async fn counter_aggregation_attribute_order_unsorted_first_delta() {
396 counter_aggregation_attribute_order_helper(Temporality::Delta, false);
400 }
401
402 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
403 async fn counter_aggregation_attribute_order_unsorted_first_cumulative() {
404 counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
408 }
409
410 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
411 async fn histogram_aggregation_cumulative() {
412 histogram_aggregation_helper(Temporality::Cumulative);
415 }
416
417 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
418 async fn histogram_aggregation_delta() {
419 histogram_aggregation_helper(Temporality::Delta);
422 }
423
424 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
425 async fn histogram_aggregation_with_custom_bounds() {
426 histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
429 histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
430 }
431
432 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
433 async fn histogram_aggregation_with_empty_bounds() {
434 histogram_aggregation_with_empty_bounds_helper(Temporality::Delta);
437 histogram_aggregation_with_empty_bounds_helper(Temporality::Cumulative);
438 }
439
440 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
441 async fn updown_counter_aggregation_cumulative() {
442 updown_counter_aggregation_helper(Temporality::Cumulative);
445 }
446
447 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448 async fn updown_counter_aggregation_delta() {
449 updown_counter_aggregation_helper(Temporality::Delta);
452 }
453
454 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455 async fn gauge_aggregation() {
456 gauge_aggregation_helper(Temporality::Delta);
461 gauge_aggregation_helper(Temporality::Cumulative);
462 }
463
464 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
465 async fn observable_gauge_aggregation() {
466 observable_gauge_aggregation_helper(Temporality::Delta, false);
471 observable_gauge_aggregation_helper(Temporality::Delta, true);
472 observable_gauge_aggregation_helper(Temporality::Cumulative, false);
473 observable_gauge_aggregation_helper(Temporality::Cumulative, true);
474 }
475
476 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
477 async fn observable_counter_aggregation_cumulative_non_zero_increment() {
478 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
481 }
482
483 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
484 async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
485 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
488 }
489
490 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
491 async fn observable_counter_aggregation_delta_non_zero_increment() {
492 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
495 }
496
497 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
498 async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
499 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
502 }
503
504 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
505 async fn observable_counter_aggregation_cumulative_zero_increment() {
506 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
509 }
510
511 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
512 async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
513 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
516 }
517
518 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
519 async fn observable_counter_aggregation_delta_zero_increment() {
520 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
523 }
524
525 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
526 async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
527 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
530 }
531
532 fn observable_counter_aggregation_helper(
533 temporality: Temporality,
534 start: u64,
535 increment: u64,
536 length: u64,
537 is_empty_attributes: bool,
538 ) {
539 let mut test_context = TestContext::new(temporality);
541 let attributes = if is_empty_attributes {
542 vec![]
543 } else {
544 vec![KeyValue::new("key1", "value1")]
545 };
546 let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
548 println!("Testing with observable values: {values:?}");
549 let values = Arc::new(values);
550 let values_clone = values.clone();
551 let i = Arc::new(Mutex::new(0));
552 let _observable_counter = test_context
553 .meter()
554 .u64_observable_counter("my_observable_counter")
555 .with_unit("my_unit")
556 .with_callback(move |observer| {
557 let mut index = i.lock().unwrap();
558 if *index < values.len() {
559 observer.observe(values[*index], &attributes);
560 *index += 1;
561 }
562 })
563 .build();
564
565 for (iter, v) in values_clone.iter().enumerate() {
566 test_context.flush_metrics();
567 let MetricData::Sum(sum) =
568 test_context.get_aggregation::<u64>("my_observable_counter", None)
569 else {
570 unreachable!()
571 };
572 assert_eq!(sum.data_points.len(), 1);
573 assert!(sum.is_monotonic, "Counter should produce monotonic.");
574 if let Temporality::Cumulative = temporality {
575 assert_eq!(
576 sum.temporality,
577 Temporality::Cumulative,
578 "Should produce cumulative"
579 );
580 } else {
581 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
582 }
583
584 let data_point = if is_empty_attributes {
586 &sum.data_points[0]
587 } else {
588 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
589 .expect("datapoint with key1=value1 expected")
590 };
591
592 if let Temporality::Cumulative = temporality {
593 assert_eq!(data_point.value, *v);
595 } else {
596 if iter == 0 {
599 assert_eq!(data_point.value, start);
600 } else {
601 assert_eq!(data_point.value, increment);
602 }
603 }
604
605 test_context.reset_metrics();
606 }
607 }
608
609 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
610 async fn empty_meter_name_retained() {
611 async fn meter_name_retained_helper(
612 meter: Meter,
613 provider: SdkMeterProvider,
614 exporter: InMemoryMetricExporter,
615 ) {
616 let counter = meter.u64_counter("my_counter").build();
618
619 counter.add(10, &[]);
620 provider.force_flush().unwrap();
621
622 let resource_metrics = exporter
624 .get_finished_metrics()
625 .expect("metrics are expected to be exported.");
626 assert!(
627 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
628 "There should be a single metric"
629 );
630 let meter_name = resource_metrics[0].scope_metrics[0].scope.name();
631 assert_eq!(meter_name, "");
632 }
633
634 let exporter = InMemoryMetricExporter::default();
635 let meter_provider = SdkMeterProvider::builder()
636 .with_periodic_exporter(exporter.clone())
637 .build();
638
639 let meter1 = meter_provider.meter("");
641 meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await;
642
643 let meter_scope = InstrumentationScope::builder("").build();
644 let meter2 = meter_provider.meter_with_scope(meter_scope);
645 meter_name_retained_helper(meter2, meter_provider, exporter).await;
646 }
647
648 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
649 async fn counter_duplicate_instrument_merge() {
650 let exporter = InMemoryMetricExporter::default();
652 let meter_provider = SdkMeterProvider::builder()
653 .with_periodic_exporter(exporter.clone())
654 .build();
655
656 let meter = meter_provider.meter("test");
658 let counter = meter
659 .u64_counter("my_counter")
660 .with_unit("my_unit")
661 .with_description("my_description")
662 .build();
663
664 let counter_duplicated = meter
665 .u64_counter("my_counter")
666 .with_unit("my_unit")
667 .with_description("my_description")
668 .build();
669
670 let attribute = vec![KeyValue::new("key1", "value1")];
671 counter.add(10, &attribute);
672 counter_duplicated.add(5, &attribute);
673
674 meter_provider.force_flush().unwrap();
675
676 let resource_metrics = exporter
678 .get_finished_metrics()
679 .expect("metrics are expected to be exported.");
680 assert!(
681 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
682 "There should be single metric merging duplicate instruments"
683 );
684 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
685 assert_eq!(metric.name, "my_counter");
686 assert_eq!(metric.unit, "my_unit");
687 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
688 .expect("Sum aggregation expected for Counter instruments by default")
689 else {
690 unreachable!()
691 };
692
693 assert_eq!(sum.data_points.len(), 1);
695
696 let datapoint = &sum.data_points[0];
697 assert_eq!(datapoint.value, 15);
698 }
699
700 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
701 async fn counter_duplicate_instrument_different_meter_no_merge() {
702 let exporter = InMemoryMetricExporter::default();
704 let meter_provider = SdkMeterProvider::builder()
705 .with_periodic_exporter(exporter.clone())
706 .build();
707
708 let meter1 = meter_provider.meter("test.meter1");
710 let meter2 = meter_provider.meter("test.meter2");
711 let counter1 = meter1
712 .u64_counter("my_counter")
713 .with_unit("my_unit")
714 .with_description("my_description")
715 .build();
716
717 let counter2 = meter2
718 .u64_counter("my_counter")
719 .with_unit("my_unit")
720 .with_description("my_description")
721 .build();
722
723 let attribute = vec![KeyValue::new("key1", "value1")];
724 counter1.add(10, &attribute);
725 counter2.add(5, &attribute);
726
727 meter_provider.force_flush().unwrap();
728
729 let resource_metrics = exporter
731 .get_finished_metrics()
732 .expect("metrics are expected to be exported.");
733 assert!(
734 resource_metrics[0].scope_metrics.len() == 2,
735 "There should be 2 separate scope"
736 );
737 assert!(
738 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
739 "There should be single metric for the scope"
740 );
741 assert!(
742 resource_metrics[0].scope_metrics[1].metrics.len() == 1,
743 "There should be single metric for the scope"
744 );
745
746 let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
747 let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
748
749 if let Some(scope1) = scope1 {
750 let metric1 = &scope1.metrics[0];
751 assert_eq!(metric1.name, "my_counter");
752 assert_eq!(metric1.unit, "my_unit");
753 assert_eq!(metric1.description, "my_description");
754 let MetricData::Sum(sum1) = u64::extract_metrics_data_ref(&metric1.data)
755 .expect("Sum aggregation expected for Counter instruments by default")
756 else {
757 unreachable!()
758 };
759
760 assert_eq!(sum1.data_points.len(), 1);
762
763 let datapoint1 = &sum1.data_points[0];
764 assert_eq!(datapoint1.value, 10);
765 } else {
766 panic!("No MetricScope found for 'test.meter1'");
767 }
768
769 if let Some(scope2) = scope2 {
770 let metric2 = &scope2.metrics[0];
771 assert_eq!(metric2.name, "my_counter");
772 assert_eq!(metric2.unit, "my_unit");
773 assert_eq!(metric2.description, "my_description");
774
775 let MetricData::Sum(sum2) = u64::extract_metrics_data_ref(&metric2.data)
776 .expect("Sum aggregation expected for Counter instruments by default")
777 else {
778 unreachable!()
779 };
780
781 assert_eq!(sum2.data_points.len(), 1);
783
784 let datapoint2 = &sum2.data_points[0];
785 assert_eq!(datapoint2.value, 5);
786 } else {
787 panic!("No MetricScope found for 'test.meter2'");
788 }
789 }
790
791 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
792 async fn instrumentation_scope_identity_test() {
793 let exporter = InMemoryMetricExporter::default();
795 let meter_provider = SdkMeterProvider::builder()
796 .with_periodic_exporter(exporter.clone())
797 .build();
798
799 let make_scope = |attributes| {
803 InstrumentationScope::builder("test.meter")
804 .with_version("v0.1.0")
805 .with_schema_url("http://example.com")
806 .with_attributes(attributes)
807 .build()
808 };
809
810 let meter1 =
811 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
812 let meter2 =
813 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
814
815 let counter1 = meter1
816 .u64_counter("my_counter")
817 .with_unit("my_unit")
818 .with_description("my_description")
819 .build();
820
821 let counter2 = meter2
822 .u64_counter("my_counter")
823 .with_unit("my_unit")
824 .with_description("my_description")
825 .build();
826
827 let attribute = vec![KeyValue::new("key1", "value1")];
828 counter1.add(10, &attribute);
829 counter2.add(5, &attribute);
830
831 meter_provider.force_flush().unwrap();
832
833 let resource_metrics = exporter
835 .get_finished_metrics()
836 .expect("metrics are expected to be exported.");
837 println!("resource_metrics: {resource_metrics:?}");
838 assert!(
839 resource_metrics[0].scope_metrics.len() == 1,
840 "There should be a single scope as the meters are identical"
841 );
842 assert!(
843 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
844 "There should be single metric for the scope as instruments are identical"
845 );
846
847 let scope = &resource_metrics[0].scope_metrics[0].scope;
848 assert_eq!(scope.name(), "test.meter");
849 assert_eq!(scope.version(), Some("v0.1.0"));
850 assert_eq!(scope.schema_url(), Some("http://example.com"));
851
852 assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")]));
855
856 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
857 assert_eq!(metric.name, "my_counter");
858 assert_eq!(metric.unit, "my_unit");
859 assert_eq!(metric.description, "my_description");
860
861 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
862 .expect("Sum aggregation expected for Counter instruments by default")
863 else {
864 unreachable!()
865 };
866
867 assert_eq!(sum.data_points.len(), 1);
869
870 let datapoint = &sum.data_points[0];
871 assert_eq!(datapoint.value, 15);
872 }
873
874 #[cfg(feature = "spec_unstable_metrics_views")]
875 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
876 async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
877 let exporter = InMemoryMetricExporter::default();
882 let view = |i: &Instrument| {
883 if i.name == "test_histogram" {
884 Stream::builder()
885 .with_aggregation(aggregation::Aggregation::ExplicitBucketHistogram {
886 boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], record_min_max: false,
888 })
889 .with_name("test_histogram_renamed")
890 .with_unit("test_unit_renamed")
891 .build()
892 .ok()
893 } else {
894 None
895 }
896 };
897 let meter_provider = SdkMeterProvider::builder()
898 .with_periodic_exporter(exporter.clone())
899 .with_view(view)
900 .build();
901
902 let meter = meter_provider.meter("test");
904 let histogram = meter
905 .f64_histogram("test_histogram")
906 .with_unit("test_unit")
907 .build();
908
909 histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
910 meter_provider.force_flush().unwrap();
911
912 let resource_metrics = exporter
914 .get_finished_metrics()
915 .expect("metrics are expected to be exported.");
916 assert!(!resource_metrics.is_empty());
917 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
918 assert_eq!(
919 metric.name, "test_histogram",
920 "View rename should be ignored and original name retained."
921 );
922 assert_eq!(
923 metric.unit, "test_unit",
924 "View rename of unit should be ignored and original unit retained."
925 );
926 }
927
928 #[cfg(feature = "spec_unstable_metrics_views")]
929 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
930 #[ignore = "Spatial aggregation is not yet implemented."]
931 async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
932 let exporter = InMemoryMetricExporter::default();
936 let view = |i: &Instrument| {
938 if i.name == "my_observable_counter" {
939 Stream::builder()
940 .with_allowed_attribute_keys(vec![])
941 .build()
942 .ok()
943 } else {
944 None
945 }
946 };
947 let meter_provider = SdkMeterProvider::builder()
948 .with_periodic_exporter(exporter.clone())
949 .with_view(view)
950 .build();
951
952 let meter = meter_provider.meter("test");
954 let _observable_counter = meter
955 .u64_observable_counter("my_observable_counter")
956 .with_callback(|observer| {
957 observer.observe(
958 100,
959 &[
960 KeyValue::new("statusCode", "200"),
961 KeyValue::new("verb", "get"),
962 ],
963 );
964
965 observer.observe(
966 100,
967 &[
968 KeyValue::new("statusCode", "200"),
969 KeyValue::new("verb", "post"),
970 ],
971 );
972
973 observer.observe(
974 100,
975 &[
976 KeyValue::new("statusCode", "500"),
977 KeyValue::new("verb", "get"),
978 ],
979 );
980 })
981 .build();
982
983 meter_provider.force_flush().unwrap();
984
985 let resource_metrics = exporter
987 .get_finished_metrics()
988 .expect("metrics are expected to be exported.");
989 assert!(!resource_metrics.is_empty());
990 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
991 assert_eq!(metric.name, "my_observable_counter",);
992
993 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
994 .expect("Sum aggregation expected for ObservableCounter instruments by default")
995 else {
996 unreachable!()
997 };
998
999 assert_eq!(sum.data_points.len(), 1);
1003
1004 let data_point = &sum.data_points[0];
1006 assert_eq!(data_point.value, 300);
1007 }
1008
1009 #[cfg(feature = "spec_unstable_metrics_views")]
1010 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1011 async fn spatial_aggregation_when_view_drops_attributes_counter() {
1012 let exporter = InMemoryMetricExporter::default();
1016 let view = |i: &Instrument| {
1018 if i.name == "my_counter" {
1019 Some(
1020 Stream::builder()
1021 .with_allowed_attribute_keys(vec![])
1022 .build()
1023 .unwrap(),
1024 )
1025 } else {
1026 None
1027 }
1028 };
1029 let meter_provider = SdkMeterProvider::builder()
1030 .with_periodic_exporter(exporter.clone())
1031 .with_view(view)
1032 .build();
1033
1034 let meter = meter_provider.meter("test");
1036 let counter = meter.u64_counter("my_counter").build();
1037
1038 counter.add(
1041 10,
1042 [
1043 KeyValue::new("statusCode", "200"),
1044 KeyValue::new("verb", "Get"),
1045 ]
1046 .as_ref(),
1047 );
1048
1049 counter.add(
1050 10,
1051 [
1052 KeyValue::new("statusCode", "500"),
1053 KeyValue::new("verb", "Get"),
1054 ]
1055 .as_ref(),
1056 );
1057
1058 counter.add(
1059 10,
1060 [
1061 KeyValue::new("statusCode", "200"),
1062 KeyValue::new("verb", "Post"),
1063 ]
1064 .as_ref(),
1065 );
1066
1067 meter_provider.force_flush().unwrap();
1068
1069 let resource_metrics = exporter
1071 .get_finished_metrics()
1072 .expect("metrics are expected to be exported.");
1073 assert!(!resource_metrics.is_empty());
1074 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1075 assert_eq!(metric.name, "my_counter",);
1076
1077 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
1078 .expect("Sum aggregation expected for Counter instruments by default")
1079 else {
1080 unreachable!()
1081 };
1082
1083 assert_eq!(sum.data_points.len(), 1);
1087 let data_point = &sum.data_points[0];
1089 assert_eq!(data_point.value, 30);
1090 }
1091
1092 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1093 async fn no_attr_cumulative_up_down_counter() {
1094 let mut test_context = TestContext::new(Temporality::Cumulative);
1095 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1096
1097 counter.add(50, &[]);
1098 test_context.flush_metrics();
1099
1100 let MetricData::Sum(sum) =
1101 test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1102 else {
1103 unreachable!()
1104 };
1105
1106 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1107 assert!(!sum.is_monotonic, "Should not produce monotonic.");
1108 assert_eq!(
1109 sum.temporality,
1110 Temporality::Cumulative,
1111 "Should produce cumulative"
1112 );
1113
1114 let data_point = &sum.data_points[0];
1115 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1116 assert_eq!(data_point.value, 50, "Unexpected data point value");
1117 }
1118
1119 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1120 async fn no_attr_up_down_counter_always_cumulative() {
1121 let mut test_context = TestContext::new(Temporality::Delta);
1122 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1123
1124 counter.add(50, &[]);
1125 test_context.flush_metrics();
1126
1127 let MetricData::Sum(sum) =
1128 test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1129 else {
1130 unreachable!()
1131 };
1132
1133 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1134 assert!(!sum.is_monotonic, "Should not produce monotonic.");
1135 assert_eq!(
1136 sum.temporality,
1137 Temporality::Cumulative,
1138 "Should produce Cumulative due to UpDownCounter temporality_preference"
1139 );
1140
1141 let data_point = &sum.data_points[0];
1142 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1143 assert_eq!(data_point.value, 50, "Unexpected data point value");
1144 }
1145
1146 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1147 async fn no_attr_cumulative_counter_value_added_after_export() {
1148 let mut test_context = TestContext::new(Temporality::Cumulative);
1149 let counter = test_context.u64_counter("test", "my_counter", None);
1150
1151 counter.add(50, &[]);
1152 test_context.flush_metrics();
1153 let _ = test_context.get_aggregation::<u64>("my_counter", None);
1154 test_context.reset_metrics();
1155
1156 counter.add(5, &[]);
1157 test_context.flush_metrics();
1158 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1159 unreachable!()
1160 };
1161
1162 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1163 assert!(sum.is_monotonic, "Should produce monotonic.");
1164 assert_eq!(
1165 sum.temporality,
1166 Temporality::Cumulative,
1167 "Should produce cumulative"
1168 );
1169
1170 let data_point = &sum.data_points[0];
1171 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1172 assert_eq!(data_point.value, 55, "Unexpected data point value");
1173 }
1174
1175 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1176 async fn no_attr_delta_counter_value_reset_after_export() {
1177 let mut test_context = TestContext::new(Temporality::Delta);
1178 let counter = test_context.u64_counter("test", "my_counter", None);
1179
1180 counter.add(50, &[]);
1181 test_context.flush_metrics();
1182 let _ = test_context.get_aggregation::<u64>("my_counter", None);
1183 test_context.reset_metrics();
1184
1185 counter.add(5, &[]);
1186 test_context.flush_metrics();
1187 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1188 unreachable!()
1189 };
1190
1191 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1192 assert!(sum.is_monotonic, "Should produce monotonic.");
1193 assert_eq!(
1194 sum.temporality,
1195 Temporality::Delta,
1196 "Should produce cumulative"
1197 );
1198
1199 let data_point = &sum.data_points[0];
1200 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1201 assert_eq!(data_point.value, 5, "Unexpected data point value");
1202 }
1203
1204 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1205 async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
1206 let mut test_context = TestContext::new(Temporality::Delta);
1207 let counter = test_context.u64_counter("test", "my_counter", None);
1208
1209 counter.add(50, &[]);
1210 test_context.flush_metrics();
1211 let _ = test_context.get_aggregation::<u64>("my_counter", None);
1212 test_context.reset_metrics();
1213
1214 counter.add(50, &[KeyValue::new("a", "b")]);
1215 test_context.flush_metrics();
1216 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1217 unreachable!()
1218 };
1219
1220 let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
1221
1222 assert!(
1223 no_attr_data_point.is_none(),
1224 "Expected no data points with no attributes"
1225 );
1226 }
1227
1228 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1229 async fn delta_memory_efficiency_test() {
1230 let mut test_context = TestContext::new(Temporality::Delta);
1235 let counter = test_context.u64_counter("test", "my_counter", None);
1236
1237 counter.add(1, &[KeyValue::new("key1", "value1")]);
1239 counter.add(1, &[KeyValue::new("key1", "value1")]);
1240 counter.add(1, &[KeyValue::new("key1", "value1")]);
1241 counter.add(1, &[KeyValue::new("key1", "value1")]);
1242 counter.add(1, &[KeyValue::new("key1", "value1")]);
1243
1244 counter.add(1, &[KeyValue::new("key1", "value2")]);
1245 counter.add(1, &[KeyValue::new("key1", "value2")]);
1246 counter.add(1, &[KeyValue::new("key1", "value2")]);
1247 test_context.flush_metrics();
1248
1249 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1250 unreachable!()
1251 };
1252
1253 assert_eq!(sum.data_points.len(), 2);
1255
1256 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1258 .expect("datapoint with key1=value1 expected");
1259 assert_eq!(data_point1.value, 5);
1260
1261 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1263 .expect("datapoint with key1=value2 expected");
1264 assert_eq!(data_point1.value, 3);
1265
1266 test_context.exporter.reset();
1267 test_context.flush_metrics();
1270
1271 let resource_metrics = test_context
1272 .exporter
1273 .get_finished_metrics()
1274 .expect("metrics are expected to be exported.");
1275 println!("resource_metrics: {resource_metrics:?}");
1276 assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1277 }
1278
1279 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1280 async fn counter_multithreaded() {
1281 counter_multithreaded_aggregation_helper(Temporality::Delta);
1285 counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1286 }
1287
1288 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1289 async fn counter_f64_multithreaded() {
1290 counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
1294 counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1295 }
1296
1297 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1298 async fn histogram_multithreaded() {
1299 histogram_multithreaded_aggregation_helper(Temporality::Delta);
1303 histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
1304 }
1305
1306 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1307 async fn histogram_f64_multithreaded() {
1308 histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
1312 histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1313 }
1314 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1315 async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
1316 synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1320 synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1321 synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1322 synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1323 }
1324
1325 fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
1326 instrument_name: &'static str,
1327 ) {
1328 let mut test_context = TestContext::new(Temporality::Cumulative);
1329 let attributes = &[KeyValue::new("key1", "value1")];
1330
1331 match instrument_name {
1333 "counter" => {
1334 let counter = test_context.meter().u64_counter("test_counter").build();
1335 counter.add(5, &[]);
1336 counter.add(10, attributes);
1337 }
1338 "updown_counter" => {
1339 let updown_counter = test_context
1340 .meter()
1341 .i64_up_down_counter("test_updowncounter")
1342 .build();
1343 updown_counter.add(15, &[]);
1344 updown_counter.add(20, attributes);
1345 }
1346 "histogram" => {
1347 let histogram = test_context.meter().u64_histogram("test_histogram").build();
1348 histogram.record(25, &[]);
1349 histogram.record(30, attributes);
1350 }
1351 "gauge" => {
1352 let gauge = test_context.meter().u64_gauge("test_gauge").build();
1353 gauge.record(35, &[]);
1354 gauge.record(40, attributes);
1355 }
1356 _ => panic!("Incorrect instrument kind provided"),
1357 };
1358
1359 test_context.flush_metrics();
1360
1361 assert_correct_export(&mut test_context, instrument_name);
1363
1364 test_context.reset_metrics();
1366
1367 test_context.flush_metrics();
1368
1369 assert_correct_export(&mut test_context, instrument_name);
1371
1372 fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1373 match instrument_name {
1374 "counter" => {
1375 let MetricData::Sum(sum) =
1376 test_context.get_aggregation::<u64>("test_counter", None)
1377 else {
1378 unreachable!()
1379 };
1380 assert_eq!(sum.data_points.len(), 2);
1381 let zero_attribute_datapoint =
1382 find_sum_datapoint_with_no_attributes(&sum.data_points)
1383 .expect("datapoint with no attributes expected");
1384 assert_eq!(zero_attribute_datapoint.value, 5);
1385 let data_point1 =
1386 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1387 .expect("datapoint with key1=value1 expected");
1388 assert_eq!(data_point1.value, 10);
1389 }
1390 "updown_counter" => {
1391 let MetricData::Sum(sum) =
1392 test_context.get_aggregation::<i64>("test_updowncounter", None)
1393 else {
1394 unreachable!()
1395 };
1396 assert_eq!(sum.data_points.len(), 2);
1397 let zero_attribute_datapoint =
1398 find_sum_datapoint_with_no_attributes(&sum.data_points)
1399 .expect("datapoint with no attributes expected");
1400 assert_eq!(zero_attribute_datapoint.value, 15);
1401 let data_point1 =
1402 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1403 .expect("datapoint with key1=value1 expected");
1404 assert_eq!(data_point1.value, 20);
1405 }
1406 "histogram" => {
1407 let MetricData::Histogram(histogram_data) =
1408 test_context.get_aggregation::<u64>("test_histogram", None)
1409 else {
1410 unreachable!()
1411 };
1412 assert_eq!(histogram_data.data_points.len(), 2);
1413 let zero_attribute_datapoint =
1414 find_histogram_datapoint_with_no_attributes(&histogram_data.data_points)
1415 .expect("datapoint with no attributes expected");
1416 assert_eq!(zero_attribute_datapoint.count, 1);
1417 assert_eq!(zero_attribute_datapoint.sum, 25);
1418 assert_eq!(zero_attribute_datapoint.min, Some(25));
1419 assert_eq!(zero_attribute_datapoint.max, Some(25));
1420 let data_point1 = find_histogram_datapoint_with_key_value(
1421 &histogram_data.data_points,
1422 "key1",
1423 "value1",
1424 )
1425 .expect("datapoint with key1=value1 expected");
1426 assert_eq!(data_point1.count, 1);
1427 assert_eq!(data_point1.sum, 30);
1428 assert_eq!(data_point1.min, Some(30));
1429 assert_eq!(data_point1.max, Some(30));
1430 }
1431 "gauge" => {
1432 let MetricData::Gauge(gauge_data) =
1433 test_context.get_aggregation::<u64>("test_gauge", None)
1434 else {
1435 unreachable!()
1436 };
1437 assert_eq!(gauge_data.data_points.len(), 2);
1438 let zero_attribute_datapoint =
1439 find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1440 .expect("datapoint with no attributes expected");
1441 assert_eq!(zero_attribute_datapoint.value, 35);
1442 let data_point1 = find_gauge_datapoint_with_key_value(
1443 &gauge_data.data_points,
1444 "key1",
1445 "value1",
1446 )
1447 .expect("datapoint with key1=value1 expected");
1448 assert_eq!(data_point1.value, 40);
1449 }
1450 _ => panic!("Incorrect instrument kind provided"),
1451 }
1452 }
1453 }
1454
1455 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1456 async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
1457 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1461 "gauge", true,
1462 );
1463 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1466 "counter", false,
1467 );
1468 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1469 "updown_counter",
1470 false,
1471 );
1472 }
1473
1474 #[test]
1475 fn view_test_rename() {
1476 test_view_customization(
1477 |i| {
1478 if i.name == "my_counter" {
1479 Some(
1480 Stream::builder()
1481 .with_name("my_counter_renamed")
1482 .build()
1483 .unwrap(),
1484 )
1485 } else {
1486 None
1487 }
1488 },
1489 "my_counter_renamed",
1490 "my_unit",
1491 "my_description",
1492 )
1493 }
1494
1495 #[test]
1496 fn view_test_change_unit() {
1497 test_view_customization(
1498 |i| {
1499 if i.name == "my_counter" {
1500 Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1501 } else {
1502 None
1503 }
1504 },
1505 "my_counter",
1506 "my_unit_new",
1507 "my_description",
1508 )
1509 }
1510
1511 #[test]
1512 fn view_test_change_description() {
1513 test_view_customization(
1514 |i| {
1515 if i.name == "my_counter" {
1516 Some(
1517 Stream::builder()
1518 .with_description("my_description_new")
1519 .build()
1520 .unwrap(),
1521 )
1522 } else {
1523 None
1524 }
1525 },
1526 "my_counter",
1527 "my_unit",
1528 "my_description_new",
1529 )
1530 }
1531
1532 #[test]
1533 fn view_test_change_name_unit() {
1534 test_view_customization(
1535 |i| {
1536 if i.name == "my_counter" {
1537 Some(
1538 Stream::builder()
1539 .with_name("my_counter_renamed")
1540 .with_unit("my_unit_new")
1541 .build()
1542 .unwrap(),
1543 )
1544 } else {
1545 None
1546 }
1547 },
1548 "my_counter_renamed",
1549 "my_unit_new",
1550 "my_description",
1551 )
1552 }
1553
1554 #[test]
1555 fn view_test_change_name_unit_desc() {
1556 test_view_customization(
1557 |i| {
1558 if i.name == "my_counter" {
1559 Some(
1560 Stream::builder()
1561 .with_name("my_counter_renamed")
1562 .with_unit("my_unit_new")
1563 .with_description("my_description_new")
1564 .build()
1565 .unwrap(),
1566 )
1567 } else {
1568 None
1569 }
1570 },
1571 "my_counter_renamed",
1572 "my_unit_new",
1573 "my_description_new",
1574 )
1575 }
1576
1577 #[test]
1578 fn view_test_match_unit() {
1579 test_view_customization(
1580 |i| {
1581 if i.unit == "my_unit" {
1582 Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1583 } else {
1584 None
1585 }
1586 },
1587 "my_counter",
1588 "my_unit_new",
1589 "my_description",
1590 )
1591 }
1592
1593 #[test]
1594 fn view_test_match_none() {
1595 test_view_customization(
1596 |i| {
1597 if i.name == "not_expected_to_match" {
1598 Some(Stream::builder().build().unwrap())
1599 } else {
1600 None
1601 }
1602 },
1603 "my_counter",
1604 "my_unit",
1605 "my_description",
1606 )
1607 }
1608
1609 #[test]
1610 fn view_test_match_multiple() {
1611 test_view_customization(
1612 |i| {
1613 if i.name == "my_counter" && i.unit == "my_unit" {
1614 Some(
1615 Stream::builder()
1616 .with_name("my_counter_renamed")
1617 .build()
1618 .unwrap(),
1619 )
1620 } else {
1621 None
1622 }
1623 },
1624 "my_counter_renamed",
1625 "my_unit",
1626 "my_description",
1627 )
1628 }
1629
1630 fn test_view_customization<F>(
1632 view_function: F,
1633 expected_name: &str,
1634 expected_unit: &str,
1635 expected_description: &str,
1636 ) where
1637 F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
1638 {
1639 let exporter = InMemoryMetricExporter::default();
1644 let meter_provider = SdkMeterProvider::builder()
1645 .with_periodic_exporter(exporter.clone())
1646 .with_view(view_function)
1647 .build();
1648
1649 let meter = meter_provider.meter("test");
1651 let counter = meter
1652 .f64_counter("my_counter")
1653 .with_unit("my_unit")
1654 .with_description("my_description")
1655 .build();
1656
1657 counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1658 meter_provider.force_flush().unwrap();
1659
1660 let resource_metrics = exporter
1662 .get_finished_metrics()
1663 .expect("metrics are expected to be exported.");
1664 assert_eq!(resource_metrics.len(), 1);
1665 assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1666 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1667 assert_eq!(
1668 metric.name, expected_name,
1669 "Expected name: {expected_name}."
1670 );
1671 assert_eq!(
1672 metric.unit, expected_unit,
1673 "Expected unit: {expected_unit}."
1674 );
1675 assert_eq!(
1676 metric.description, expected_description,
1677 "Expected description: {expected_description}."
1678 );
1679 }
1680
1681 #[test]
1688 fn test_view_single_instrument_multiple_stream() {
1689 let view1 = |i: &Instrument| {
1697 if i.name() == "my_counter" {
1698 Some(Stream::builder().with_name("my_counter_1").build().unwrap())
1699 } else {
1700 None
1701 }
1702 };
1703
1704 let view2 = |i: &Instrument| {
1705 if i.name() == "my_counter" {
1706 Some(Stream::builder().with_name("my_counter_2").build().unwrap())
1707 } else {
1708 None
1709 }
1710 };
1711
1712 let exporter = InMemoryMetricExporter::default();
1714 let meter_provider = SdkMeterProvider::builder()
1715 .with_periodic_exporter(exporter.clone())
1716 .with_view(view1)
1717 .with_view(view2)
1718 .build();
1719
1720 let meter = meter_provider.meter("test");
1722 let counter = meter.f64_counter("my_counter").build();
1723
1724 counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1725 meter_provider.force_flush().unwrap();
1726
1727 let resource_metrics = exporter
1729 .get_finished_metrics()
1730 .expect("metrics are expected to be exported.");
1731 assert_eq!(resource_metrics.len(), 1);
1732 assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1733 let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1734 assert_eq!(metrics.len(), 2);
1735 assert_eq!(metrics[0].name, "my_counter_1");
1736 assert_eq!(metrics[1].name, "my_counter_2");
1737 }
1738
1739 #[test]
1740 fn test_view_multiple_instrument_single_stream() {
1741 let view = |i: &Instrument| {
1748 if i.name() == "my_counter1" || i.name() == "my_counter2" {
1749 Some(Stream::builder().with_name("my_counter").build().unwrap())
1750 } else {
1751 None
1752 }
1753 };
1754
1755 let exporter = InMemoryMetricExporter::default();
1757 let meter_provider = SdkMeterProvider::builder()
1758 .with_periodic_exporter(exporter.clone())
1759 .with_view(view)
1760 .build();
1761
1762 let meter = meter_provider.meter("test");
1764 let counter1 = meter.f64_counter("my_counter1").build();
1765 let counter2 = meter.f64_counter("my_counter2").build();
1766
1767 counter1.add(1.5, &[KeyValue::new("key1", "value1")]);
1768 counter2.add(1.5, &[KeyValue::new("key1", "value1")]);
1769 meter_provider.force_flush().unwrap();
1770
1771 let resource_metrics = exporter
1773 .get_finished_metrics()
1774 .expect("metrics are expected to be exported.");
1775 assert_eq!(resource_metrics.len(), 1);
1776 assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1777 let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1778 assert_eq!(metrics.len(), 1);
1779 assert_eq!(metrics[0].name, "my_counter");
1780 }
1782
1783 fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1784 instrument_name: &'static str,
1785 should_not_emit: bool,
1786 ) {
1787 let mut test_context = TestContext::new(Temporality::Cumulative);
1788 let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1789
1790 match instrument_name {
1792 "counter" => {
1793 let has_run = AtomicBool::new(false);
1794 let _observable_counter = test_context
1795 .meter()
1796 .u64_observable_counter("test_counter")
1797 .with_callback(move |observer| {
1798 if !has_run.load(Ordering::SeqCst) {
1799 observer.observe(5, &[]);
1800 observer.observe(10, &*attributes.clone());
1801 has_run.store(true, Ordering::SeqCst);
1802 }
1803 })
1804 .build();
1805 }
1806 "updown_counter" => {
1807 let has_run = AtomicBool::new(false);
1808 let _observable_up_down_counter = test_context
1809 .meter()
1810 .i64_observable_up_down_counter("test_updowncounter")
1811 .with_callback(move |observer| {
1812 if !has_run.load(Ordering::SeqCst) {
1813 observer.observe(15, &[]);
1814 observer.observe(20, &*attributes.clone());
1815 has_run.store(true, Ordering::SeqCst);
1816 }
1817 })
1818 .build();
1819 }
1820 "gauge" => {
1821 let has_run = AtomicBool::new(false);
1822 let _observable_gauge = test_context
1823 .meter()
1824 .u64_observable_gauge("test_gauge")
1825 .with_callback(move |observer| {
1826 if !has_run.load(Ordering::SeqCst) {
1827 observer.observe(25, &[]);
1828 observer.observe(30, &*attributes.clone());
1829 has_run.store(true, Ordering::SeqCst);
1830 }
1831 })
1832 .build();
1833 }
1834 _ => panic!("Incorrect instrument kind provided"),
1835 };
1836
1837 test_context.flush_metrics();
1838
1839 assert_correct_export(&mut test_context, instrument_name);
1841
1842 test_context.reset_metrics();
1844
1845 test_context.flush_metrics();
1846
1847 if should_not_emit {
1848 test_context.check_no_metrics();
1849 } else {
1850 assert_correct_export(&mut test_context, instrument_name);
1852 }
1853
1854 fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1855 match instrument_name {
1856 "counter" => {
1857 let MetricData::Sum(sum) =
1858 test_context.get_aggregation::<u64>("test_counter", None)
1859 else {
1860 unreachable!()
1861 };
1862 assert_eq!(sum.data_points.len(), 2);
1863 assert!(sum.is_monotonic);
1864 let zero_attribute_datapoint =
1865 find_sum_datapoint_with_no_attributes(&sum.data_points)
1866 .expect("datapoint with no attributes expected");
1867 assert_eq!(zero_attribute_datapoint.value, 5);
1868 let data_point1 =
1869 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1870 .expect("datapoint with key1=value1 expected");
1871 assert_eq!(data_point1.value, 10);
1872 }
1873 "updown_counter" => {
1874 let MetricData::Sum(sum) =
1875 test_context.get_aggregation::<i64>("test_updowncounter", None)
1876 else {
1877 unreachable!()
1878 };
1879 assert_eq!(sum.data_points.len(), 2);
1880 assert!(!sum.is_monotonic);
1881 let zero_attribute_datapoint =
1882 find_sum_datapoint_with_no_attributes(&sum.data_points)
1883 .expect("datapoint with no attributes expected");
1884 assert_eq!(zero_attribute_datapoint.value, 15);
1885 let data_point1 =
1886 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1887 .expect("datapoint with key1=value1 expected");
1888 assert_eq!(data_point1.value, 20);
1889 }
1890 "gauge" => {
1891 let MetricData::Gauge(gauge_data) =
1892 test_context.get_aggregation::<u64>("test_gauge", None)
1893 else {
1894 unreachable!()
1895 };
1896 assert_eq!(gauge_data.data_points.len(), 2);
1897 let zero_attribute_datapoint =
1898 find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1899 .expect("datapoint with no attributes expected");
1900 assert_eq!(zero_attribute_datapoint.value, 25);
1901 let data_point1 = find_gauge_datapoint_with_key_value(
1902 &gauge_data.data_points,
1903 "key1",
1904 "value1",
1905 )
1906 .expect("datapoint with key1=value1 expected");
1907 assert_eq!(data_point1.value, 30);
1908 }
1909 _ => panic!("Incorrect instrument kind provided"),
1910 }
1911 }
1912 }
1913
1914 fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1915 let mut test_context = TestContext::new(temporality);
1917 let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1918
1919 for i in 0..10 {
1920 thread::scope(|s| {
1921 s.spawn(|| {
1922 counter.add(1, &[]);
1923
1924 counter.add(1, &[KeyValue::new("key1", "value1")]);
1925 counter.add(1, &[KeyValue::new("key1", "value1")]);
1926 counter.add(1, &[KeyValue::new("key1", "value1")]);
1927
1928 if i % 2 == 0 {
1930 test_context.flush_metrics();
1931 thread::sleep(Duration::from_millis(i)); }
1933
1934 counter.add(1, &[KeyValue::new("key1", "value1")]);
1935 counter.add(1, &[KeyValue::new("key1", "value1")]);
1936 });
1937 });
1938 }
1939
1940 test_context.flush_metrics();
1941
1942 let sums = test_context
1945 .get_from_multiple_aggregations::<u64>("my_counter", None, 6)
1946 .into_iter()
1947 .map(|data| {
1948 if let MetricData::Sum(sum) = data {
1949 sum
1950 } else {
1951 unreachable!()
1952 }
1953 })
1954 .collect::<Vec<_>>();
1955
1956 let mut sum_zero_attributes = 0;
1957 let mut sum_key1_value1 = 0;
1958 sums.iter().for_each(|sum| {
1959 assert_eq!(sum.data_points.len(), 2); assert!(sum.is_monotonic, "Counter should produce monotonic.");
1961 assert_eq!(sum.temporality, temporality);
1962
1963 if temporality == Temporality::Delta {
1964 sum_zero_attributes += sum.data_points[0].value;
1965 sum_key1_value1 += sum.data_points[1].value;
1966 } else {
1967 sum_zero_attributes = sum.data_points[0].value;
1968 sum_key1_value1 = sum.data_points[1].value;
1969 };
1970 });
1971
1972 assert_eq!(sum_zero_attributes, 10);
1973 assert_eq!(sum_key1_value1, 50); }
1975
1976 fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1977 let mut test_context = TestContext::new(temporality);
1979 let counter = Arc::new(test_context.meter().f64_counter("test_counter").build());
1980
1981 for i in 0..10 {
1982 thread::scope(|s| {
1983 s.spawn(|| {
1984 counter.add(1.23, &[]);
1985
1986 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1987 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1988 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1989
1990 if i % 2 == 0 {
1992 test_context.flush_metrics();
1993 thread::sleep(Duration::from_millis(i)); }
1995
1996 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1997 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1998 });
1999 });
2000 }
2001
2002 test_context.flush_metrics();
2003
2004 let sums = test_context
2007 .get_from_multiple_aggregations::<f64>("test_counter", None, 6)
2008 .into_iter()
2009 .map(|data| {
2010 if let MetricData::Sum(sum) = data {
2011 sum
2012 } else {
2013 unreachable!()
2014 }
2015 })
2016 .collect::<Vec<_>>();
2017
2018 let mut sum_zero_attributes = 0.0;
2019 let mut sum_key1_value1 = 0.0;
2020 sums.iter().for_each(|sum| {
2021 assert_eq!(sum.data_points.len(), 2); assert!(sum.is_monotonic, "Counter should produce monotonic.");
2023 assert_eq!(sum.temporality, temporality);
2024
2025 if temporality == Temporality::Delta {
2026 sum_zero_attributes += sum.data_points[0].value;
2027 sum_key1_value1 += sum.data_points[1].value;
2028 } else {
2029 sum_zero_attributes = sum.data_points[0].value;
2030 sum_key1_value1 = sum.data_points[1].value;
2031 };
2032 });
2033
2034 assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
2035 assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); }
2037
2038 fn histogram_multithreaded_aggregation_helper(temporality: Temporality) {
2039 let mut test_context = TestContext::new(temporality);
2041 let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build());
2042
2043 for i in 0..10 {
2044 thread::scope(|s| {
2045 s.spawn(|| {
2046 histogram.record(1, &[]);
2047 histogram.record(4, &[]);
2048
2049 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2050 histogram.record(7, &[KeyValue::new("key1", "value1")]);
2051 histogram.record(18, &[KeyValue::new("key1", "value1")]);
2052
2053 if i % 2 == 0 {
2055 test_context.flush_metrics();
2056 thread::sleep(Duration::from_millis(i)); }
2058
2059 histogram.record(35, &[KeyValue::new("key1", "value1")]);
2060 histogram.record(35, &[KeyValue::new("key1", "value1")]);
2061 });
2062 });
2063 }
2064
2065 test_context.flush_metrics();
2066
2067 let histograms = test_context
2070 .get_from_multiple_aggregations::<u64>("test_histogram", None, 6)
2071 .into_iter()
2072 .map(|data| {
2073 if let MetricData::Histogram(hist) = data {
2074 hist
2075 } else {
2076 unreachable!()
2077 }
2078 })
2079 .collect::<Vec<_>>();
2080
2081 let (
2082 mut sum_zero_attributes,
2083 mut count_zero_attributes,
2084 mut min_zero_attributes,
2085 mut max_zero_attributes,
2086 ) = (0, 0, u64::MAX, u64::MIN);
2087 let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2088 (0, 0, u64::MAX, u64::MIN);
2089
2090 let mut bucket_counts_zero_attributes = vec![0; 16]; let mut bucket_counts_key1_value1 = vec![0; 16];
2092
2093 histograms.iter().for_each(|histogram| {
2094 assert_eq!(histogram.data_points.len(), 2); assert_eq!(histogram.temporality, temporality);
2096
2097 let data_point_zero_attributes =
2098 find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2099 let data_point_key1_value1 =
2100 find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2101 .unwrap();
2102
2103 if temporality == Temporality::Delta {
2104 sum_zero_attributes += data_point_zero_attributes.sum;
2105 sum_key1_value1 += data_point_key1_value1.sum;
2106
2107 count_zero_attributes += data_point_zero_attributes.count;
2108 count_key1_value1 += data_point_key1_value1.count;
2109
2110 min_zero_attributes =
2111 min(min_zero_attributes, data_point_zero_attributes.min.unwrap());
2112 min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap());
2113
2114 max_zero_attributes =
2115 max(max_zero_attributes, data_point_zero_attributes.max.unwrap());
2116 max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap());
2117
2118 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2119 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2120
2121 for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2122 bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2123 }
2124
2125 for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2126 bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2127 }
2128 } else {
2129 sum_zero_attributes = data_point_zero_attributes.sum;
2130 sum_key1_value1 = data_point_key1_value1.sum;
2131
2132 count_zero_attributes = data_point_zero_attributes.count;
2133 count_key1_value1 = data_point_key1_value1.count;
2134
2135 min_zero_attributes = data_point_zero_attributes.min.unwrap();
2136 min_key1_value1 = data_point_key1_value1.min.unwrap();
2137
2138 max_zero_attributes = data_point_zero_attributes.max.unwrap();
2139 max_key1_value1 = data_point_key1_value1.max.unwrap();
2140
2141 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2142 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2143
2144 bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2145 bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2146 };
2147 });
2148
2149 assert_eq!(count_zero_attributes, 20); assert_eq!(sum_zero_attributes, 50); assert_eq!(min_zero_attributes, 1);
2156 assert_eq!(max_zero_attributes, 4);
2157
2158 for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2159 match i {
2160 1 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2162 }
2163 }
2164
2165 assert_eq!(count_key1_value1, 50); assert_eq!(sum_key1_value1, 1000); assert_eq!(min_key1_value1, 5);
2168 assert_eq!(max_key1_value1, 35);
2169
2170 for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2171 match i {
2172 1 => assert_eq!(*count, 10), 2 => assert_eq!(*count, 10), 3 => assert_eq!(*count, 10), 4 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2177 }
2178 }
2179 }
2180
2181 fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
2182 let mut test_context = TestContext::new(temporality);
2184 let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build());
2185
2186 for i in 0..10 {
2187 thread::scope(|s| {
2188 s.spawn(|| {
2189 histogram.record(1.5, &[]);
2190 histogram.record(4.6, &[]);
2191
2192 histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
2193 histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
2194 histogram.record(18.1, &[KeyValue::new("key1", "value1")]);
2195
2196 if i % 2 == 0 {
2198 test_context.flush_metrics();
2199 thread::sleep(Duration::from_millis(i)); }
2201
2202 histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2203 histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2204 });
2205 });
2206 }
2207
2208 test_context.flush_metrics();
2209
2210 let histograms = test_context
2213 .get_from_multiple_aggregations::<f64>("test_histogram", None, 6)
2214 .into_iter()
2215 .map(|data| {
2216 if let MetricData::Histogram(hist) = data {
2217 hist
2218 } else {
2219 unreachable!()
2220 }
2221 })
2222 .collect::<Vec<_>>();
2223
2224 let (
2225 mut sum_zero_attributes,
2226 mut count_zero_attributes,
2227 mut min_zero_attributes,
2228 mut max_zero_attributes,
2229 ) = (0.0, 0, f64::MAX, f64::MIN);
2230 let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2231 (0.0, 0, f64::MAX, f64::MIN);
2232
2233 let mut bucket_counts_zero_attributes = vec![0; 16]; let mut bucket_counts_key1_value1 = vec![0; 16];
2235
2236 histograms.iter().for_each(|histogram| {
2237 assert_eq!(histogram.data_points.len(), 2); assert_eq!(histogram.temporality, temporality);
2239
2240 let data_point_zero_attributes =
2241 find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2242 let data_point_key1_value1 =
2243 find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2244 .unwrap();
2245
2246 if temporality == Temporality::Delta {
2247 sum_zero_attributes += data_point_zero_attributes.sum;
2248 sum_key1_value1 += data_point_key1_value1.sum;
2249
2250 count_zero_attributes += data_point_zero_attributes.count;
2251 count_key1_value1 += data_point_key1_value1.count;
2252
2253 min_zero_attributes =
2254 min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
2255 min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());
2256
2257 max_zero_attributes =
2258 max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
2259 max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());
2260
2261 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2262 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2263
2264 for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2265 bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2266 }
2267
2268 for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2269 bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2270 }
2271 } else {
2272 sum_zero_attributes = data_point_zero_attributes.sum;
2273 sum_key1_value1 = data_point_key1_value1.sum;
2274
2275 count_zero_attributes = data_point_zero_attributes.count;
2276 count_key1_value1 = data_point_key1_value1.count;
2277
2278 min_zero_attributes = data_point_zero_attributes.min.unwrap();
2279 min_key1_value1 = data_point_key1_value1.min.unwrap();
2280
2281 max_zero_attributes = data_point_zero_attributes.max.unwrap();
2282 max_key1_value1 = data_point_key1_value1.max.unwrap();
2283
2284 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2285 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2286
2287 bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2288 bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2289 };
2290 });
2291
2292 assert_eq!(count_zero_attributes, 20); assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); assert_eq!(min_zero_attributes, 1.5);
2299 assert_eq!(max_zero_attributes, 4.6);
2300
2301 for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2302 match i {
2303 1 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2305 }
2306 }
2307
2308 assert_eq!(count_key1_value1, 50); assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); assert_eq!(min_key1_value1, 5.0);
2311 assert_eq!(max_key1_value1, 35.1);
2312
2313 for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2314 match i {
2315 1 => assert_eq!(*count, 10), 2 => assert_eq!(*count, 10), 3 => assert_eq!(*count, 10), 4 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2320 }
2321 }
2322 }
2323
2324 fn histogram_aggregation_helper(temporality: Temporality) {
2325 let mut test_context = TestContext::new(temporality);
2327 let histogram = test_context.meter().u64_histogram("my_histogram").build();
2328
2329 let mut rand = rngs::SmallRng::from_os_rng();
2331 let values_kv1 = (0..50)
2332 .map(|_| rand.random_range(0..100))
2333 .collect::<Vec<u64>>();
2334 for value in values_kv1.iter() {
2335 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2336 }
2337
2338 let values_kv2 = (0..30)
2339 .map(|_| rand.random_range(0..100))
2340 .collect::<Vec<u64>>();
2341 for value in values_kv2.iter() {
2342 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2343 }
2344
2345 test_context.flush_metrics();
2346
2347 let MetricData::Histogram(histogram_data) =
2349 test_context.get_aggregation::<u64>("my_histogram", None)
2350 else {
2351 unreachable!()
2352 };
2353 assert_eq!(histogram_data.data_points.len(), 2);
2355 if let Temporality::Cumulative = temporality {
2356 assert_eq!(
2357 histogram_data.temporality,
2358 Temporality::Cumulative,
2359 "Should produce cumulative"
2360 );
2361 } else {
2362 assert_eq!(
2363 histogram_data.temporality,
2364 Temporality::Delta,
2365 "Should produce delta"
2366 );
2367 }
2368
2369 let data_point1 =
2371 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2372 .expect("datapoint with key1=value1 expected");
2373 assert_eq!(data_point1.count, values_kv1.len() as u64);
2374 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2375 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2376 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2377
2378 let data_point2 =
2379 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2380 .expect("datapoint with key1=value2 expected");
2381 assert_eq!(data_point2.count, values_kv2.len() as u64);
2382 assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
2383 assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
2384 assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
2385
2386 test_context.reset_metrics();
2388 for value in values_kv1.iter() {
2389 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2390 }
2391
2392 for value in values_kv2.iter() {
2393 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2394 }
2395
2396 test_context.flush_metrics();
2397
2398 let MetricData::Histogram(histogram_data) =
2399 test_context.get_aggregation::<u64>("my_histogram", None)
2400 else {
2401 unreachable!()
2402 };
2403 assert_eq!(histogram_data.data_points.len(), 2);
2404 let data_point1 =
2405 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2406 .expect("datapoint with key1=value1 expected");
2407 if temporality == Temporality::Cumulative {
2408 assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
2409 assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
2410 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2411 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2412 } else {
2413 assert_eq!(data_point1.count, values_kv1.len() as u64);
2414 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2415 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2416 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2417 }
2418
2419 let data_point1 =
2420 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2421 .expect("datapoint with key1=value1 expected");
2422 if temporality == Temporality::Cumulative {
2423 assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
2424 assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
2425 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2426 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2427 } else {
2428 assert_eq!(data_point1.count, values_kv2.len() as u64);
2429 assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
2430 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2431 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2432 }
2433 }
2434
2435 fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
2436 let mut test_context = TestContext::new(temporality);
2437 let histogram = test_context
2438 .meter()
2439 .u64_histogram("test_histogram")
2440 .with_boundaries(vec![1.0, 2.5, 5.5])
2441 .build();
2442 histogram.record(1, &[KeyValue::new("key1", "value1")]);
2443 histogram.record(2, &[KeyValue::new("key1", "value1")]);
2444 histogram.record(3, &[KeyValue::new("key1", "value1")]);
2445 histogram.record(4, &[KeyValue::new("key1", "value1")]);
2446 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2447
2448 test_context.flush_metrics();
2449
2450 let MetricData::Histogram(histogram_data) =
2452 test_context.get_aggregation::<u64>("test_histogram", None)
2453 else {
2454 unreachable!()
2455 };
2456 assert_eq!(histogram_data.data_points.len(), 1);
2458 if let Temporality::Cumulative = temporality {
2459 assert_eq!(
2460 histogram_data.temporality,
2461 Temporality::Cumulative,
2462 "Should produce cumulative"
2463 );
2464 } else {
2465 assert_eq!(
2466 histogram_data.temporality,
2467 Temporality::Delta,
2468 "Should produce delta"
2469 );
2470 }
2471
2472 let data_point =
2474 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2475 .expect("datapoint with key1=value1 expected");
2476
2477 assert_eq!(data_point.count, 5);
2478 assert_eq!(data_point.sum, 15);
2479
2480 assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
2487 assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
2488 }
2489
2490 fn histogram_aggregation_with_empty_bounds_helper(temporality: Temporality) {
2491 let mut test_context = TestContext::new(temporality);
2492 let histogram = test_context
2493 .meter()
2494 .u64_histogram("test_histogram")
2495 .with_boundaries(vec![])
2496 .build();
2497 histogram.record(1, &[KeyValue::new("key1", "value1")]);
2498 histogram.record(2, &[KeyValue::new("key1", "value1")]);
2499 histogram.record(3, &[KeyValue::new("key1", "value1")]);
2500 histogram.record(4, &[KeyValue::new("key1", "value1")]);
2501 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2502
2503 test_context.flush_metrics();
2504
2505 let MetricData::Histogram(histogram_data) =
2507 test_context.get_aggregation::<u64>("test_histogram", None)
2508 else {
2509 unreachable!()
2510 };
2511 assert_eq!(histogram_data.data_points.len(), 1);
2513 if let Temporality::Cumulative = temporality {
2514 assert_eq!(
2515 histogram_data.temporality,
2516 Temporality::Cumulative,
2517 "Should produce cumulative"
2518 );
2519 } else {
2520 assert_eq!(
2521 histogram_data.temporality,
2522 Temporality::Delta,
2523 "Should produce delta"
2524 );
2525 }
2526
2527 let data_point =
2529 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2530 .expect("datapoint with key1=value1 expected");
2531
2532 assert_eq!(data_point.count, 5);
2533 assert_eq!(data_point.sum, 15);
2534 assert!(data_point.bounds.is_empty());
2535 assert!(data_point.bucket_counts.is_empty());
2536 }
2537
2538 fn gauge_aggregation_helper(temporality: Temporality) {
2539 let mut test_context = TestContext::new(temporality);
2541 let gauge = test_context.meter().i64_gauge("my_gauge").build();
2542
2543 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2545 gauge.record(2, &[KeyValue::new("key1", "value1")]);
2546 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2547 gauge.record(3, &[KeyValue::new("key1", "value1")]);
2548 gauge.record(4, &[KeyValue::new("key1", "value1")]);
2549
2550 gauge.record(11, &[KeyValue::new("key1", "value2")]);
2551 gauge.record(13, &[KeyValue::new("key1", "value2")]);
2552 gauge.record(6, &[KeyValue::new("key1", "value2")]);
2553
2554 test_context.flush_metrics();
2555
2556 let MetricData::Gauge(gauge_data_point) =
2558 test_context.get_aggregation::<i64>("my_gauge", None)
2559 else {
2560 unreachable!()
2561 };
2562 assert_eq!(gauge_data_point.data_points.len(), 2);
2564
2565 let data_point1 =
2567 find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
2568 .expect("datapoint with key1=value1 expected");
2569 assert_eq!(data_point1.value, 4);
2570
2571 let data_point1 =
2572 find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
2573 .expect("datapoint with key1=value2 expected");
2574 assert_eq!(data_point1.value, 6);
2575
2576 test_context.reset_metrics();
2578 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2579 gauge.record(2, &[KeyValue::new("key1", "value1")]);
2580 gauge.record(11, &[KeyValue::new("key1", "value1")]);
2581 gauge.record(3, &[KeyValue::new("key1", "value1")]);
2582 gauge.record(41, &[KeyValue::new("key1", "value1")]);
2583
2584 gauge.record(34, &[KeyValue::new("key1", "value2")]);
2585 gauge.record(12, &[KeyValue::new("key1", "value2")]);
2586 gauge.record(54, &[KeyValue::new("key1", "value2")]);
2587
2588 test_context.flush_metrics();
2589
2590 let MetricData::Gauge(gauge) = test_context.get_aggregation::<i64>("my_gauge", None) else {
2591 unreachable!()
2592 };
2593 assert_eq!(gauge.data_points.len(), 2);
2594 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2595 .expect("datapoint with key1=value1 expected");
2596 assert_eq!(data_point1.value, 41);
2597
2598 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2")
2599 .expect("datapoint with key1=value2 expected");
2600 assert_eq!(data_point1.value, 54);
2601 }
2602
2603 fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) {
2604 let mut test_context = TestContext::new(temporality);
2606 let _observable_gauge = test_context
2607 .meter()
2608 .i64_observable_gauge("test_observable_gauge")
2609 .with_callback(move |observer| {
2610 if use_empty_attributes {
2611 observer.observe(1, &[]);
2612 }
2613 observer.observe(4, &[KeyValue::new("key1", "value1")]);
2614 observer.observe(5, &[KeyValue::new("key2", "value2")]);
2615 })
2616 .build();
2617
2618 test_context.flush_metrics();
2619
2620 let MetricData::Gauge(gauge) =
2622 test_context.get_aggregation::<i64>("test_observable_gauge", None)
2623 else {
2624 unreachable!()
2625 };
2626 let expected_time_series_count = if use_empty_attributes { 3 } else { 2 };
2628 assert_eq!(gauge.data_points.len(), expected_time_series_count);
2629
2630 if use_empty_attributes {
2631 let zero_attribute_datapoint =
2633 find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2634 .expect("datapoint with no attributes expected");
2635 assert_eq!(zero_attribute_datapoint.value, 1);
2636 }
2637
2638 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2640 .expect("datapoint with key1=value1 expected");
2641 assert_eq!(data_point1.value, 4);
2642
2643 let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2645 .expect("datapoint with key2=value2 expected");
2646 assert_eq!(data_point2.value, 5);
2647
2648 test_context.reset_metrics();
2650
2651 test_context.flush_metrics();
2652
2653 let MetricData::Gauge(gauge) =
2654 test_context.get_aggregation::<i64>("test_observable_gauge", None)
2655 else {
2656 unreachable!()
2657 };
2658 assert_eq!(gauge.data_points.len(), expected_time_series_count);
2659
2660 if use_empty_attributes {
2661 let zero_attribute_datapoint =
2662 find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2663 .expect("datapoint with no attributes expected");
2664 assert_eq!(zero_attribute_datapoint.value, 1);
2665 }
2666
2667 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2668 .expect("datapoint with key1=value1 expected");
2669 assert_eq!(data_point1.value, 4);
2670
2671 let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2672 .expect("datapoint with key2=value2 expected");
2673 assert_eq!(data_point2.value, 5);
2674 }
2675
2676 fn counter_aggregation_helper(temporality: Temporality) {
2677 let mut test_context = TestContext::new(temporality);
2679 let counter = test_context.u64_counter("test", "my_counter", None);
2680
2681 counter.add(1, &[KeyValue::new("key1", "value1")]);
2683 counter.add(1, &[KeyValue::new("key1", "value1")]);
2684 counter.add(1, &[KeyValue::new("key1", "value1")]);
2685 counter.add(1, &[KeyValue::new("key1", "value1")]);
2686 counter.add(1, &[KeyValue::new("key1", "value1")]);
2687
2688 counter.add(1, &[KeyValue::new("key1", "value2")]);
2689 counter.add(1, &[KeyValue::new("key1", "value2")]);
2690 counter.add(1, &[KeyValue::new("key1", "value2")]);
2691
2692 test_context.flush_metrics();
2693
2694 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2696 unreachable!()
2697 };
2698 assert_eq!(sum.data_points.len(), 2);
2700 assert!(sum.is_monotonic, "Counter should produce monotonic.");
2701 if let Temporality::Cumulative = temporality {
2702 assert_eq!(
2703 sum.temporality,
2704 Temporality::Cumulative,
2705 "Should produce cumulative"
2706 );
2707 } else {
2708 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
2709 }
2710
2711 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2713 .expect("datapoint with key1=value1 expected");
2714 assert_eq!(data_point1.value, 5);
2715
2716 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2717 .expect("datapoint with key1=value2 expected");
2718 assert_eq!(data_point1.value, 3);
2719
2720 test_context.reset_metrics();
2722 counter.add(1, &[KeyValue::new("key1", "value1")]);
2723 counter.add(1, &[KeyValue::new("key1", "value1")]);
2724 counter.add(1, &[KeyValue::new("key1", "value1")]);
2725 counter.add(1, &[KeyValue::new("key1", "value1")]);
2726 counter.add(1, &[KeyValue::new("key1", "value1")]);
2727
2728 counter.add(1, &[KeyValue::new("key1", "value2")]);
2729 counter.add(1, &[KeyValue::new("key1", "value2")]);
2730 counter.add(1, &[KeyValue::new("key1", "value2")]);
2731
2732 test_context.flush_metrics();
2733
2734 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2735 unreachable!()
2736 };
2737 assert_eq!(sum.data_points.len(), 2);
2738 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2739 .expect("datapoint with key1=value1 expected");
2740 if temporality == Temporality::Cumulative {
2741 assert_eq!(data_point1.value, 10);
2742 } else {
2743 assert_eq!(data_point1.value, 5);
2744 }
2745
2746 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2747 .expect("datapoint with key1=value2 expected");
2748 if temporality == Temporality::Cumulative {
2749 assert_eq!(data_point1.value, 6);
2750 } else {
2751 assert_eq!(data_point1.value, 3);
2752 }
2753 }
2754
2755 fn counter_aggregation_overflow_helper(temporality: Temporality) {
2756 let mut test_context = TestContext::new(temporality);
2758 let counter = test_context.u64_counter("test", "my_counter", None);
2759
2760 for v in 0..2000 {
2763 counter.add(100, &[KeyValue::new("A", v.to_string())]);
2764 }
2765
2766 counter.add(3, &[]);
2768 counter.add(3, &[]);
2769
2770 counter.add(100, &[KeyValue::new("A", "foo")]);
2772 counter.add(100, &[KeyValue::new("A", "another")]);
2773 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2774 test_context.flush_metrics();
2775
2776 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2777 unreachable!()
2778 };
2779
2780 assert_eq!(sum.data_points.len(), 2002);
2782
2783 let data_point =
2784 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2785 assert_eq!(data_point.value, 300);
2786
2787 let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2789 .expect("Empty attributes point expected");
2790 assert!(
2791 empty_attrs_data_point.attributes.is_empty(),
2792 "Non-empty attribute set"
2793 );
2794 assert_eq!(
2795 empty_attrs_data_point.value, 6,
2796 "Empty attributes value should be 3+3=6"
2797 );
2798
2799 test_context.reset_metrics();
2802 counter.add(100, &[KeyValue::new("A", "foo")]);
2805 counter.add(100, &[KeyValue::new("A", "another")]);
2806 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2807 test_context.flush_metrics();
2808
2809 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2810 unreachable!()
2811 };
2812
2813 if temporality == Temporality::Delta {
2814 assert_eq!(sum.data_points.len(), 3);
2815
2816 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2817 .expect("point expected");
2818 assert_eq!(data_point.value, 100);
2819
2820 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2821 .expect("point expected");
2822 assert_eq!(data_point.value, 100);
2823
2824 let data_point =
2825 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2826 .expect("point expected");
2827 assert_eq!(data_point.value, 100);
2828 } else {
2829 assert_eq!(sum.data_points.len(), 2002);
2831 let data_point =
2832 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2833 assert_eq!(data_point.value, 600);
2834
2835 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2836 assert!(data_point.is_none(), "point should not be present");
2837
2838 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2839 assert!(data_point.is_none(), "point should not be present");
2840
2841 let data_point =
2842 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2843 assert!(data_point.is_none(), "point should not be present");
2844 }
2845 }
2846
2847 fn counter_aggregation_overflow_helper_custom_limit(temporality: Temporality) {
2848 let cardinality_limit = 2300;
2850 let view_change_cardinality = move |i: &Instrument| {
2851 if i.name == "my_counter" {
2852 Some(
2853 Stream::builder()
2854 .with_name("my_counter")
2855 .with_cardinality_limit(cardinality_limit)
2856 .build()
2857 .unwrap(),
2858 )
2859 } else {
2860 None
2861 }
2862 };
2863 let mut test_context = TestContext::new_with_view(temporality, view_change_cardinality);
2864 let counter = test_context.u64_counter("test", "my_counter", None);
2865
2866 for v in 0..cardinality_limit {
2869 counter.add(100, &[KeyValue::new("A", v.to_string())]);
2870 }
2871
2872 counter.add(3, &[]);
2874 counter.add(3, &[]);
2875
2876 counter.add(100, &[KeyValue::new("A", "foo")]);
2878 counter.add(100, &[KeyValue::new("A", "another")]);
2879 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2880 test_context.flush_metrics();
2881
2882 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2883 unreachable!()
2884 };
2885
2886 assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2888
2889 let data_point =
2890 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2891 assert_eq!(data_point.value, 300);
2892
2893 let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2895 .expect("Empty attributes point expected");
2896 assert!(
2897 empty_attrs_data_point.attributes.is_empty(),
2898 "Non-empty attribute set"
2899 );
2900 assert_eq!(
2901 empty_attrs_data_point.value, 6,
2902 "Empty attributes value should be 3+3=6"
2903 );
2904
2905 test_context.reset_metrics();
2908 counter.add(100, &[KeyValue::new("A", "foo")]);
2911 counter.add(100, &[KeyValue::new("A", "another")]);
2912 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2913 test_context.flush_metrics();
2914
2915 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2916 unreachable!()
2917 };
2918
2919 if temporality == Temporality::Delta {
2920 assert_eq!(sum.data_points.len(), 3);
2921
2922 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2923 .expect("point expected");
2924 assert_eq!(data_point.value, 100);
2925
2926 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2927 .expect("point expected");
2928 assert_eq!(data_point.value, 100);
2929
2930 let data_point =
2931 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2932 .expect("point expected");
2933 assert_eq!(data_point.value, 100);
2934 } else {
2935 assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2937 let data_point =
2938 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2939 assert_eq!(data_point.value, 600);
2940
2941 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2942 assert!(data_point.is_none(), "point should not be present");
2943
2944 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2945 assert!(data_point.is_none(), "point should not be present");
2946
2947 let data_point =
2948 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2949 assert!(data_point.is_none(), "point should not be present");
2950 }
2951 }
2952
2953 fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
2954 let mut test_context = TestContext::new(temporality);
2956 let counter = test_context.u64_counter("test", "my_counter", None);
2957
2958 if start_sorted {
2963 counter.add(
2964 1,
2965 &[
2966 KeyValue::new("A", "a"),
2967 KeyValue::new("B", "b"),
2968 KeyValue::new("C", "c"),
2969 ],
2970 );
2971 } else {
2972 counter.add(
2973 1,
2974 &[
2975 KeyValue::new("A", "a"),
2976 KeyValue::new("C", "c"),
2977 KeyValue::new("B", "b"),
2978 ],
2979 );
2980 }
2981
2982 counter.add(
2983 1,
2984 &[
2985 KeyValue::new("A", "a"),
2986 KeyValue::new("C", "c"),
2987 KeyValue::new("B", "b"),
2988 ],
2989 );
2990 counter.add(
2991 1,
2992 &[
2993 KeyValue::new("B", "b"),
2994 KeyValue::new("A", "a"),
2995 KeyValue::new("C", "c"),
2996 ],
2997 );
2998 counter.add(
2999 1,
3000 &[
3001 KeyValue::new("B", "b"),
3002 KeyValue::new("C", "c"),
3003 KeyValue::new("A", "a"),
3004 ],
3005 );
3006 counter.add(
3007 1,
3008 &[
3009 KeyValue::new("C", "c"),
3010 KeyValue::new("B", "b"),
3011 KeyValue::new("A", "a"),
3012 ],
3013 );
3014 counter.add(
3015 1,
3016 &[
3017 KeyValue::new("C", "c"),
3018 KeyValue::new("A", "a"),
3019 KeyValue::new("B", "b"),
3020 ],
3021 );
3022 test_context.flush_metrics();
3023
3024 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
3025 unreachable!()
3026 };
3027
3028 assert_eq!(sum.data_points.len(), 1);
3030
3031 let data_point1 = &sum.data_points[0];
3033 assert_eq!(data_point1.value, 6);
3034 }
3035
3036 fn updown_counter_aggregation_helper(temporality: Temporality) {
3037 let mut test_context = TestContext::new(temporality);
3039 let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
3040
3041 counter.add(10, &[KeyValue::new("key1", "value1")]);
3043 counter.add(-1, &[KeyValue::new("key1", "value1")]);
3044 counter.add(-5, &[KeyValue::new("key1", "value1")]);
3045 counter.add(0, &[KeyValue::new("key1", "value1")]);
3046 counter.add(1, &[KeyValue::new("key1", "value1")]);
3047
3048 counter.add(10, &[KeyValue::new("key1", "value2")]);
3049 counter.add(0, &[KeyValue::new("key1", "value2")]);
3050 counter.add(-3, &[KeyValue::new("key1", "value2")]);
3051
3052 test_context.flush_metrics();
3053
3054 let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3056 else {
3057 unreachable!()
3058 };
3059 assert_eq!(sum.data_points.len(), 2);
3061 assert!(
3062 !sum.is_monotonic,
3063 "UpDownCounter should produce non-monotonic."
3064 );
3065 assert_eq!(
3066 sum.temporality,
3067 Temporality::Cumulative,
3068 "Should produce Cumulative for UpDownCounter"
3069 );
3070
3071 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3073 .expect("datapoint with key1=value1 expected");
3074 assert_eq!(data_point1.value, 5);
3075
3076 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3077 .expect("datapoint with key1=value2 expected");
3078 assert_eq!(data_point1.value, 7);
3079
3080 test_context.reset_metrics();
3082 counter.add(10, &[KeyValue::new("key1", "value1")]);
3083 counter.add(-1, &[KeyValue::new("key1", "value1")]);
3084 counter.add(-5, &[KeyValue::new("key1", "value1")]);
3085 counter.add(0, &[KeyValue::new("key1", "value1")]);
3086 counter.add(1, &[KeyValue::new("key1", "value1")]);
3087
3088 counter.add(10, &[KeyValue::new("key1", "value2")]);
3089 counter.add(0, &[KeyValue::new("key1", "value2")]);
3090 counter.add(-3, &[KeyValue::new("key1", "value2")]);
3091
3092 test_context.flush_metrics();
3093
3094 let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3095 else {
3096 unreachable!()
3097 };
3098 assert_eq!(sum.data_points.len(), 2);
3099 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3100 .expect("datapoint with key1=value1 expected");
3101 assert_eq!(data_point1.value, 10);
3102
3103 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3104 .expect("datapoint with key1=value2 expected");
3105 assert_eq!(data_point1.value, 14);
3106 }
3107
3108 fn find_sum_datapoint_with_key_value<'a, T>(
3109 data_points: &'a [SumDataPoint<T>],
3110 key: &str,
3111 value: &str,
3112 ) -> Option<&'a SumDataPoint<T>> {
3113 data_points.iter().find(|&datapoint| {
3114 datapoint
3115 .attributes
3116 .iter()
3117 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3118 })
3119 }
3120
3121 fn find_overflow_sum_datapoint<T>(data_points: &[SumDataPoint<T>]) -> Option<&SumDataPoint<T>> {
3122 data_points.iter().find(|&datapoint| {
3123 datapoint.attributes.iter().any(|kv| {
3124 kv.key.as_str() == "otel.metric.overflow" && kv.value == Value::Bool(true)
3125 })
3126 })
3127 }
3128
3129 fn find_gauge_datapoint_with_key_value<'a, T>(
3130 data_points: &'a [GaugeDataPoint<T>],
3131 key: &str,
3132 value: &str,
3133 ) -> Option<&'a GaugeDataPoint<T>> {
3134 data_points.iter().find(|&datapoint| {
3135 datapoint
3136 .attributes
3137 .iter()
3138 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3139 })
3140 }
3141
3142 fn find_sum_datapoint_with_no_attributes<T>(
3143 data_points: &[SumDataPoint<T>],
3144 ) -> Option<&SumDataPoint<T>> {
3145 data_points
3146 .iter()
3147 .find(|&datapoint| datapoint.attributes.is_empty())
3148 }
3149
3150 fn find_gauge_datapoint_with_no_attributes<T>(
3151 data_points: &[GaugeDataPoint<T>],
3152 ) -> Option<&GaugeDataPoint<T>> {
3153 data_points
3154 .iter()
3155 .find(|&datapoint| datapoint.attributes.is_empty())
3156 }
3157
3158 fn find_histogram_datapoint_with_key_value<'a, T>(
3159 data_points: &'a [HistogramDataPoint<T>],
3160 key: &str,
3161 value: &str,
3162 ) -> Option<&'a HistogramDataPoint<T>> {
3163 data_points.iter().find(|&datapoint| {
3164 datapoint
3165 .attributes
3166 .iter()
3167 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3168 })
3169 }
3170
3171 fn find_histogram_datapoint_with_no_attributes<T>(
3172 data_points: &[HistogramDataPoint<T>],
3173 ) -> Option<&HistogramDataPoint<T>> {
3174 data_points
3175 .iter()
3176 .find(|&datapoint| datapoint.attributes.is_empty())
3177 }
3178
3179 fn find_scope_metric<'a>(
3180 metrics: &'a [ScopeMetrics],
3181 name: &'a str,
3182 ) -> Option<&'a ScopeMetrics> {
3183 metrics
3184 .iter()
3185 .find(|&scope_metric| scope_metric.scope.name() == name)
3186 }
3187
3188 struct TestContext {
3189 exporter: InMemoryMetricExporter,
3190 meter_provider: SdkMeterProvider,
3191
3192 resource_metrics: Vec<ResourceMetrics>,
3194 }
3195
3196 impl TestContext {
3197 fn new(temporality: Temporality) -> Self {
3198 let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3199 let exporter = exporter.build();
3200 let meter_provider = SdkMeterProvider::builder()
3201 .with_periodic_exporter(exporter.clone())
3202 .build();
3203
3204 TestContext {
3205 exporter,
3206 meter_provider,
3207 resource_metrics: vec![],
3208 }
3209 }
3210
3211 fn new_with_view<T>(temporality: Temporality, view: T) -> Self
3212 where
3213 T: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
3214 {
3215 let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3216 let exporter = exporter.build();
3217 let meter_provider = SdkMeterProvider::builder()
3218 .with_periodic_exporter(exporter.clone())
3219 .with_view(view)
3220 .build();
3221
3222 TestContext {
3223 exporter,
3224 meter_provider,
3225 resource_metrics: vec![],
3226 }
3227 }
3228
3229 fn u64_counter(
3230 &self,
3231 meter_name: &'static str,
3232 counter_name: &'static str,
3233 unit: Option<&'static str>,
3234 ) -> Counter<u64> {
3235 let meter = self.meter_provider.meter(meter_name);
3236 let mut counter_builder = meter.u64_counter(counter_name);
3237 if let Some(unit_name) = unit {
3238 counter_builder = counter_builder.with_unit(unit_name);
3239 }
3240 counter_builder.build()
3241 }
3242
3243 fn i64_up_down_counter(
3244 &self,
3245 meter_name: &'static str,
3246 counter_name: &'static str,
3247 unit: Option<&'static str>,
3248 ) -> UpDownCounter<i64> {
3249 let meter = self.meter_provider.meter(meter_name);
3250 let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
3251 if let Some(unit_name) = unit {
3252 updown_counter_builder = updown_counter_builder.with_unit(unit_name);
3253 }
3254 updown_counter_builder.build()
3255 }
3256
3257 fn meter(&self) -> Meter {
3258 self.meter_provider.meter("test")
3259 }
3260
3261 fn flush_metrics(&self) {
3262 self.meter_provider.force_flush().unwrap();
3263 }
3264
3265 fn reset_metrics(&self) {
3266 self.exporter.reset();
3267 }
3268
3269 fn check_no_metrics(&self) {
3270 let resource_metrics = self
3271 .exporter
3272 .get_finished_metrics()
3273 .expect("metrics expected to be exported"); assert!(resource_metrics.is_empty(), "no metrics should be exported");
3276 }
3277
3278 fn get_aggregation<T: Number>(
3279 &mut self,
3280 counter_name: &str,
3281 unit_name: Option<&str>,
3282 ) -> &MetricData<T> {
3283 self.resource_metrics = self
3284 .exporter
3285 .get_finished_metrics()
3286 .expect("metrics expected to be exported");
3287
3288 assert!(
3289 !self.resource_metrics.is_empty(),
3290 "no metrics were exported"
3291 );
3292
3293 assert!(
3294 self.resource_metrics.len() == 1,
3295 "Expected single resource metrics."
3296 );
3297 let resource_metric = self
3298 .resource_metrics
3299 .first()
3300 .expect("This should contain exactly one resource metric, as validated above.");
3301
3302 assert!(
3303 !resource_metric.scope_metrics.is_empty(),
3304 "No scope metrics in latest export"
3305 );
3306 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3307
3308 let metric = &resource_metric.scope_metrics[0].metrics[0];
3309 assert_eq!(metric.name, counter_name);
3310 if let Some(expected_unit) = unit_name {
3311 assert_eq!(metric.unit, expected_unit);
3312 }
3313
3314 T::extract_metrics_data_ref(&metric.data)
3315 .expect("Failed to cast aggregation to expected type")
3316 }
3317
3318 fn get_from_multiple_aggregations<T: Number>(
3319 &mut self,
3320 counter_name: &str,
3321 unit_name: Option<&str>,
3322 invocation_count: usize,
3323 ) -> Vec<&MetricData<T>> {
3324 self.resource_metrics = self
3325 .exporter
3326 .get_finished_metrics()
3327 .expect("metrics expected to be exported");
3328
3329 assert!(
3330 !self.resource_metrics.is_empty(),
3331 "no metrics were exported"
3332 );
3333
3334 assert_eq!(
3335 self.resource_metrics.len(),
3336 invocation_count,
3337 "Expected collect to be called {invocation_count} times"
3338 );
3339
3340 let result = self
3341 .resource_metrics
3342 .iter()
3343 .map(|resource_metric| {
3344 assert!(
3345 !resource_metric.scope_metrics.is_empty(),
3346 "An export with no scope metrics occurred"
3347 );
3348
3349 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3350
3351 let metric = &resource_metric.scope_metrics[0].metrics[0];
3352 assert_eq!(metric.name, counter_name);
3353
3354 if let Some(expected_unit) = unit_name {
3355 assert_eq!(metric.unit, expected_unit);
3356 }
3357
3358 let aggregation = T::extract_metrics_data_ref(&metric.data)
3359 .expect("Failed to cast aggregation to expected type");
3360 aggregation
3361 })
3362 .collect::<Vec<_>>();
3363
3364 result
3365 }
3366 }
3367}