opentelemetry_sdk/metrics/
mod.rs

1//! The crust of the OpenTelemetry metrics SDK.
2//!
3//! ## Configuration
4//!
5//! The metrics SDK configuration is stored with each [SdkMeterProvider].
6//! Configuration for [Resource]s, views, and [ManualReader] or
7//! [PeriodicReader] instances can be specified.
8//!
9//! ### Example
10//!
11//! ```
12//! use opentelemetry::global;
13//! use opentelemetry::KeyValue;
14//! use opentelemetry_sdk::{metrics::SdkMeterProvider, Resource};
15//!
16//! // Generate SDK configuration, resource, views, etc
17//! let resource = Resource::builder().build(); // default attributes about the current process
18//!
19//! // Create a meter provider with the desired config
20//! let meter_provider = SdkMeterProvider::builder().with_resource(resource).build();
21//! global::set_meter_provider(meter_provider.clone());
22//!
23//! // Use the meter provider to create meter instances
24//! let meter = global::meter("my_app");
25//!
26//! // Create instruments scoped to the meter
27//! let counter = meter
28//!     .u64_counter("power_consumption")
29//!     .with_unit("kWh")
30//!     .build();
31//!
32//! // use instruments to record measurements
33//! counter.add(10, &[KeyValue::new("rate", "standard")]);
34//!
35//! // shutdown the provider at the end of the application to ensure any metrics not yet
36//! // exported are flushed.
37//! meter_provider.shutdown().unwrap();
38//! ```
39//!
40//! [Resource]: crate::Resource
41
42#[allow(unreachable_pub)]
43#[allow(unused)]
44pub(crate) mod aggregation;
45pub mod data;
46mod error;
47pub mod exporter;
48pub(crate) mod instrument;
49pub(crate) mod internal;
50#[cfg(feature = "experimental_metrics_custom_reader")]
51pub(crate) mod manual_reader;
52pub(crate) mod meter;
53mod meter_provider;
54pub(crate) mod noop;
55pub(crate) mod periodic_reader;
56#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
57/// Module for periodic reader with async runtime.
58pub mod periodic_reader_with_async_runtime;
59pub(crate) mod pipeline;
60#[cfg(feature = "experimental_metrics_custom_reader")]
61pub mod reader;
62#[cfg(not(feature = "experimental_metrics_custom_reader"))]
63pub(crate) mod reader;
64pub(crate) mod view;
65
66/// In-Memory metric exporter for testing purpose.
67#[cfg(any(feature = "testing", test))]
68#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
69pub mod in_memory_exporter;
70#[cfg(any(feature = "testing", test))]
71#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
72pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
73
74#[cfg(feature = "spec_unstable_metrics_views")]
75pub use aggregation::*;
76#[cfg(feature = "experimental_metrics_custom_reader")]
77pub use manual_reader::*;
78pub use meter_provider::*;
79pub use periodic_reader::*;
80#[cfg(feature = "experimental_metrics_custom_reader")]
81pub use pipeline::Pipeline;
82
83pub use instrument::{Instrument, InstrumentKind, Stream, StreamBuilder};
84
85use std::hash::Hash;
86
87/// Defines the window that an aggregation was calculated over.
88#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
89#[non_exhaustive]
90pub enum Temporality {
91    /// A measurement interval that continues to expand forward in time from a
92    /// starting point.
93    ///
94    /// New measurements are added to all previous measurements since a start time.
95    #[default]
96    Cumulative,
97
98    /// A measurement interval that resets each cycle.
99    ///
100    /// Measurements from one cycle are recorded independently, measurements from
101    /// other cycles do not affect them.
102    Delta,
103
104    /// Configures Synchronous Counter and Histogram instruments to use
105    /// Delta aggregation temporality, which allows them to shed memory
106    /// following a cardinality explosion, thus use less memory.
107    LowMemory,
108}
109
110#[cfg(all(test, feature = "testing"))]
111mod tests {
112    use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
113    use super::data::MetricData;
114    use super::internal::Number;
115    use super::*;
116    use crate::metrics::data::ResourceMetrics;
117    use crate::metrics::internal::AggregatedMetricsAccess;
118    use crate::metrics::InMemoryMetricExporter;
119    use crate::metrics::InMemoryMetricExporterBuilder;
120    use data::GaugeDataPoint;
121    use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
122    use opentelemetry::InstrumentationScope;
123    use opentelemetry::Value;
124    use opentelemetry::{metrics::MeterProvider as _, KeyValue};
125    use rand::{rngs, Rng, SeedableRng};
126    use std::cmp::{max, min};
127    use std::sync::atomic::{AtomicBool, Ordering};
128    use std::sync::{Arc, Mutex};
129    use std::thread;
130    use std::time::Duration;
131
132    // Run all tests in this mod
133    // cargo test metrics::tests --features=testing,spec_unstable_metrics_views
134    // Note for all tests from this point onwards in this mod:
135    // "multi_thread" tokio flavor must be used else flush won't
136    // be able to make progress!
137
138    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
139    #[cfg(not(feature = "experimental_metrics_disable_name_validation"))]
140    async fn invalid_instrument_config_noops() {
141        // Run this test with stdout enabled to see output.
142        // cargo test invalid_instrument_config_noops --features=testing,spec_unstable_metrics_views -- --nocapture
143        let invalid_instrument_names = vec![
144            "_startWithNoneAlphabet",
145            "utf8char锈",
146            "a".repeat(256).leak(),
147            "invalid name",
148        ];
149        for name in invalid_instrument_names {
150            let test_context = TestContext::new(Temporality::Cumulative);
151            let counter = test_context.meter().u64_counter(name).build();
152            counter.add(1, &[]);
153
154            let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
155            up_down_counter.add(1, &[]);
156
157            let gauge = test_context.meter().f64_gauge(name).build();
158            gauge.record(1.9, &[]);
159
160            let histogram = test_context.meter().f64_histogram(name).build();
161            histogram.record(1.0, &[]);
162
163            let _observable_counter = test_context
164                .meter()
165                .u64_observable_counter(name)
166                .with_callback(move |observer| {
167                    observer.observe(1, &[]);
168                })
169                .build();
170
171            let _observable_gauge = test_context
172                .meter()
173                .f64_observable_gauge(name)
174                .with_callback(move |observer| {
175                    observer.observe(1.0, &[]);
176                })
177                .build();
178
179            let _observable_up_down_counter = test_context
180                .meter()
181                .i64_observable_up_down_counter(name)
182                .with_callback(move |observer| {
183                    observer.observe(1, &[]);
184                })
185                .build();
186
187            test_context.flush_metrics();
188
189            // As instrument name is invalid, no metrics should be exported
190            test_context.check_no_metrics();
191        }
192
193        let invalid_bucket_boundaries = vec![
194            vec![1.0, 1.0],                          // duplicate boundaries
195            vec![1.0, 2.0, 3.0, 2.0],                // duplicate non consequent boundaries
196            vec![1.0, 2.0, 3.0, 4.0, 2.5],           // unsorted boundaries
197            vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], // boundaries with positive infinity
198            vec![1.0, 2.0, 3.0, f64::NAN],           // boundaries with NaNs
199            vec![f64::NEG_INFINITY, 2.0, 3.0],       // boundaries with negative infinity
200        ];
201        for bucket_boundaries in invalid_bucket_boundaries {
202            let test_context = TestContext::new(Temporality::Cumulative);
203            let histogram = test_context
204                .meter()
205                .f64_histogram("test")
206                .with_boundaries(bucket_boundaries)
207                .build();
208            histogram.record(1.9, &[]);
209            test_context.flush_metrics();
210
211            // As bucket boundaries provided via advisory params are invalid,
212            // no metrics should be exported
213            test_context.check_no_metrics();
214        }
215    }
216
217    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
218    #[cfg(feature = "experimental_metrics_disable_name_validation")]
219    async fn valid_instrument_config_with_feature_experimental_metrics_disable_name_validation() {
220        // Run this test with stdout enabled to see output.
221        // cargo test valid_instrument_config_with_feature_experimental_metrics_disable_name_validation --all-features -- --nocapture
222        let invalid_instrument_names = vec![
223            "_startWithNoneAlphabet",
224            "utf8char锈",
225            "",
226            "a".repeat(256).leak(),
227            "\\allow\\slash /sec",
228            "\\allow\\$$slash /sec",
229            "Total $ Count",
230            "\\test\\UsagePercent(Total) > 80%",
231            "invalid name",
232        ];
233        for name in invalid_instrument_names {
234            let test_context = TestContext::new(Temporality::Cumulative);
235            let counter = test_context.meter().u64_counter(name).build();
236            counter.add(1, &[]);
237
238            let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
239            up_down_counter.add(1, &[]);
240
241            let gauge = test_context.meter().f64_gauge(name).build();
242            gauge.record(1.9, &[]);
243
244            let histogram = test_context.meter().f64_histogram(name).build();
245            histogram.record(1.0, &[]);
246
247            let _observable_counter = test_context
248                .meter()
249                .u64_observable_counter(name)
250                .with_callback(move |observer| {
251                    observer.observe(1, &[]);
252                })
253                .build();
254
255            let _observable_gauge = test_context
256                .meter()
257                .f64_observable_gauge(name)
258                .with_callback(move |observer| {
259                    observer.observe(1.0, &[]);
260                })
261                .build();
262
263            let _observable_up_down_counter = test_context
264                .meter()
265                .i64_observable_up_down_counter(name)
266                .with_callback(move |observer| {
267                    observer.observe(1, &[]);
268                })
269                .build();
270
271            test_context.flush_metrics();
272
273            // As instrument name are valid because of the feature flag, metrics should be exported
274            let resource_metrics = test_context
275                .exporter
276                .get_finished_metrics()
277                .expect("metrics expected to be exported");
278
279            assert!(!resource_metrics.is_empty(), "metrics should be exported");
280        }
281
282        // Ensuring that the Histograms with invalid bucket boundaries are not exported
283        // when using the feature flag
284        let invalid_bucket_boundaries = vec![
285            vec![1.0, 1.0],                          // duplicate boundaries
286            vec![1.0, 2.0, 3.0, 2.0],                // duplicate non consequent boundaries
287            vec![1.0, 2.0, 3.0, 4.0, 2.5],           // unsorted boundaries
288            vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], // boundaries with positive infinity
289            vec![1.0, 2.0, 3.0, f64::NAN],           // boundaries with NaNs
290            vec![f64::NEG_INFINITY, 2.0, 3.0],       // boundaries with negative infinity
291        ];
292        for bucket_boundaries in invalid_bucket_boundaries {
293            let test_context = TestContext::new(Temporality::Cumulative);
294            let histogram = test_context
295                .meter()
296                .f64_histogram("test")
297                .with_boundaries(bucket_boundaries)
298                .build();
299            histogram.record(1.9, &[]);
300            test_context.flush_metrics();
301
302            // As bucket boundaries provided via advisory params are invalid,
303            // no metrics should be exported
304            test_context.check_no_metrics();
305        }
306    }
307
308    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
309    async fn counter_aggregation_delta() {
310        // Run this test with stdout enabled to see output.
311        // cargo test counter_aggregation_delta --features=testing -- --nocapture
312        counter_aggregation_helper(Temporality::Delta);
313    }
314
315    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
316    async fn counter_aggregation_cumulative() {
317        // Run this test with stdout enabled to see output.
318        // cargo test counter_aggregation_cumulative --features=testing -- --nocapture
319        counter_aggregation_helper(Temporality::Cumulative);
320    }
321
322    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
323    async fn counter_aggregation_no_attributes_cumulative() {
324        let mut test_context = TestContext::new(Temporality::Cumulative);
325        let counter = test_context.u64_counter("test", "my_counter", None);
326
327        counter.add(50, &[]);
328        test_context.flush_metrics();
329
330        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
331            unreachable!()
332        };
333
334        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
335        assert!(sum.is_monotonic, "Should produce monotonic.");
336        assert_eq!(
337            sum.temporality,
338            Temporality::Cumulative,
339            "Should produce cumulative"
340        );
341
342        let data_point = &sum.data_points[0];
343        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
344        assert_eq!(data_point.value, 50, "Unexpected data point value");
345    }
346
347    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
348    async fn counter_aggregation_no_attributes_delta() {
349        let mut test_context = TestContext::new(Temporality::Delta);
350        let counter = test_context.u64_counter("test", "my_counter", None);
351
352        counter.add(50, &[]);
353        test_context.flush_metrics();
354
355        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
356            unreachable!()
357        };
358
359        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
360        assert!(sum.is_monotonic, "Should produce monotonic.");
361        assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
362
363        let data_point = &sum.data_points[0];
364        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
365        assert_eq!(data_point.value, 50, "Unexpected data point value");
366    }
367
368    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
369    async fn counter_aggregation_overflow_delta() {
370        counter_aggregation_overflow_helper(Temporality::Delta);
371        counter_aggregation_overflow_helper_custom_limit(Temporality::Delta);
372    }
373
374    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
375    async fn counter_aggregation_overflow_cumulative() {
376        counter_aggregation_overflow_helper(Temporality::Cumulative);
377        counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative);
378    }
379
380    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
381    async fn counter_aggregation_attribute_order_sorted_first_delta() {
382        // Run this test with stdout enabled to see output.
383        // cargo test counter_aggregation_attribute_order_sorted_first_delta --features=testing -- --nocapture
384        counter_aggregation_attribute_order_helper(Temporality::Delta, true);
385    }
386
387    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
388    async fn counter_aggregation_attribute_order_sorted_first_cumulative() {
389        // Run this test with stdout enabled to see output.
390        // cargo test counter_aggregation_attribute_order_sorted_first_cumulative --features=testing -- --nocapture
391        counter_aggregation_attribute_order_helper(Temporality::Cumulative, true);
392    }
393
394    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
395    async fn counter_aggregation_attribute_order_unsorted_first_delta() {
396        // Run this test with stdout enabled to see output.
397        // cargo test counter_aggregation_attribute_order_unsorted_first_delta --features=testing -- --nocapture
398
399        counter_aggregation_attribute_order_helper(Temporality::Delta, false);
400    }
401
402    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
403    async fn counter_aggregation_attribute_order_unsorted_first_cumulative() {
404        // Run this test with stdout enabled to see output.
405        // cargo test counter_aggregation_attribute_order_unsorted_first_cumulative --features=testing -- --nocapture
406
407        counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
408    }
409
410    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
411    async fn histogram_aggregation_cumulative() {
412        // Run this test with stdout enabled to see output.
413        // cargo test histogram_aggregation_cumulative --features=testing -- --nocapture
414        histogram_aggregation_helper(Temporality::Cumulative);
415    }
416
417    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
418    async fn histogram_aggregation_delta() {
419        // Run this test with stdout enabled to see output.
420        // cargo test histogram_aggregation_delta --features=testing -- --nocapture
421        histogram_aggregation_helper(Temporality::Delta);
422    }
423
424    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
425    async fn histogram_aggregation_with_custom_bounds() {
426        // Run this test with stdout enabled to see output.
427        // cargo test histogram_aggregation_with_custom_bounds --features=testing -- --nocapture
428        histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
429        histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
430    }
431
432    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
433    async fn histogram_aggregation_with_empty_bounds() {
434        // Run this test with stdout enabled to see output.
435        // cargo test histogram_aggregation_with_empty_bounds --features=testing -- --nocapture
436        histogram_aggregation_with_empty_bounds_helper(Temporality::Delta);
437        histogram_aggregation_with_empty_bounds_helper(Temporality::Cumulative);
438    }
439
440    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
441    async fn updown_counter_aggregation_cumulative() {
442        // Run this test with stdout enabled to see output.
443        // cargo test updown_counter_aggregation_cumulative --features=testing -- --nocapture
444        updown_counter_aggregation_helper(Temporality::Cumulative);
445    }
446
447    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448    async fn updown_counter_aggregation_delta() {
449        // Run this test with stdout enabled to see output.
450        // cargo test updown_counter_aggregation_delta --features=testing -- --nocapture
451        updown_counter_aggregation_helper(Temporality::Delta);
452    }
453
454    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455    async fn gauge_aggregation() {
456        // Run this test with stdout enabled to see output.
457        // cargo test gauge_aggregation --features=testing -- --nocapture
458
459        // Gauge should use last value aggregation regardless of the aggregation temporality used.
460        gauge_aggregation_helper(Temporality::Delta);
461        gauge_aggregation_helper(Temporality::Cumulative);
462    }
463
464    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
465    async fn observable_gauge_aggregation() {
466        // Run this test with stdout enabled to see output.
467        // cargo test observable_gauge_aggregation --features=testing -- --nocapture
468
469        // Gauge should use last value aggregation regardless of the aggregation temporality used.
470        observable_gauge_aggregation_helper(Temporality::Delta, false);
471        observable_gauge_aggregation_helper(Temporality::Delta, true);
472        observable_gauge_aggregation_helper(Temporality::Cumulative, false);
473        observable_gauge_aggregation_helper(Temporality::Cumulative, true);
474    }
475
476    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
477    async fn observable_counter_aggregation_cumulative_non_zero_increment() {
478        // Run this test with stdout enabled to see output.
479        // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
480        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
481    }
482
483    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
484    async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
485        // Run this test with stdout enabled to see output.
486        // cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture
487        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
488    }
489
490    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
491    async fn observable_counter_aggregation_delta_non_zero_increment() {
492        // Run this test with stdout enabled to see output.
493        // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
494        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
495    }
496
497    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
498    async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
499        // Run this test with stdout enabled to see output.
500        // cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture
501        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
502    }
503
504    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
505    async fn observable_counter_aggregation_cumulative_zero_increment() {
506        // Run this test with stdout enabled to see output.
507        // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
508        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
509    }
510
511    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
512    async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
513        // Run this test with stdout enabled to see output.
514        // cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture
515        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
516    }
517
518    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
519    async fn observable_counter_aggregation_delta_zero_increment() {
520        // Run this test with stdout enabled to see output.
521        // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
522        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
523    }
524
525    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
526    async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
527        // Run this test with stdout enabled to see output.
528        // cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture
529        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
530    }
531
532    fn observable_counter_aggregation_helper(
533        temporality: Temporality,
534        start: u64,
535        increment: u64,
536        length: u64,
537        is_empty_attributes: bool,
538    ) {
539        // Arrange
540        let mut test_context = TestContext::new(temporality);
541        let attributes = if is_empty_attributes {
542            vec![]
543        } else {
544            vec![KeyValue::new("key1", "value1")]
545        };
546        // The Observable counter reports values[0], values[1],....values[n] on each flush.
547        let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
548        println!("Testing with observable values: {values:?}");
549        let values = Arc::new(values);
550        let values_clone = values.clone();
551        let i = Arc::new(Mutex::new(0));
552        let _observable_counter = test_context
553            .meter()
554            .u64_observable_counter("my_observable_counter")
555            .with_unit("my_unit")
556            .with_callback(move |observer| {
557                let mut index = i.lock().unwrap();
558                if *index < values.len() {
559                    observer.observe(values[*index], &attributes);
560                    *index += 1;
561                }
562            })
563            .build();
564
565        for (iter, v) in values_clone.iter().enumerate() {
566            test_context.flush_metrics();
567            let MetricData::Sum(sum) =
568                test_context.get_aggregation::<u64>("my_observable_counter", None)
569            else {
570                unreachable!()
571            };
572            assert_eq!(sum.data_points.len(), 1);
573            assert!(sum.is_monotonic, "Counter should produce monotonic.");
574            if let Temporality::Cumulative = temporality {
575                assert_eq!(
576                    sum.temporality,
577                    Temporality::Cumulative,
578                    "Should produce cumulative"
579                );
580            } else {
581                assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
582            }
583
584            // find and validate datapoint
585            let data_point = if is_empty_attributes {
586                &sum.data_points[0]
587            } else {
588                find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
589                    .expect("datapoint with key1=value1 expected")
590            };
591
592            if let Temporality::Cumulative = temporality {
593                // Cumulative counter should have the value as is.
594                assert_eq!(data_point.value, *v);
595            } else {
596                // Delta counter should have the increment value.
597                // Except for the first value which should be the start value.
598                if iter == 0 {
599                    assert_eq!(data_point.value, start);
600                } else {
601                    assert_eq!(data_point.value, increment);
602                }
603            }
604
605            test_context.reset_metrics();
606        }
607    }
608
609    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
610    async fn empty_meter_name_retained() {
611        async fn meter_name_retained_helper(
612            meter: Meter,
613            provider: SdkMeterProvider,
614            exporter: InMemoryMetricExporter,
615        ) {
616            // Act
617            let counter = meter.u64_counter("my_counter").build();
618
619            counter.add(10, &[]);
620            provider.force_flush().unwrap();
621
622            // Assert
623            let resource_metrics = exporter
624                .get_finished_metrics()
625                .expect("metrics are expected to be exported.");
626            assert!(
627                resource_metrics[0].scope_metrics[0].metrics.len() == 1,
628                "There should be a single metric"
629            );
630            let meter_name = resource_metrics[0].scope_metrics[0].scope.name();
631            assert_eq!(meter_name, "");
632        }
633
634        let exporter = InMemoryMetricExporter::default();
635        let meter_provider = SdkMeterProvider::builder()
636            .with_periodic_exporter(exporter.clone())
637            .build();
638
639        // Test Meter creation in 2 ways, both with empty string as meter name
640        let meter1 = meter_provider.meter("");
641        meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await;
642
643        let meter_scope = InstrumentationScope::builder("").build();
644        let meter2 = meter_provider.meter_with_scope(meter_scope);
645        meter_name_retained_helper(meter2, meter_provider, exporter).await;
646    }
647
648    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
649    async fn counter_duplicate_instrument_merge() {
650        // Arrange
651        let exporter = InMemoryMetricExporter::default();
652        let meter_provider = SdkMeterProvider::builder()
653            .with_periodic_exporter(exporter.clone())
654            .build();
655
656        // Act
657        let meter = meter_provider.meter("test");
658        let counter = meter
659            .u64_counter("my_counter")
660            .with_unit("my_unit")
661            .with_description("my_description")
662            .build();
663
664        let counter_duplicated = meter
665            .u64_counter("my_counter")
666            .with_unit("my_unit")
667            .with_description("my_description")
668            .build();
669
670        let attribute = vec![KeyValue::new("key1", "value1")];
671        counter.add(10, &attribute);
672        counter_duplicated.add(5, &attribute);
673
674        meter_provider.force_flush().unwrap();
675
676        // Assert
677        let resource_metrics = exporter
678            .get_finished_metrics()
679            .expect("metrics are expected to be exported.");
680        assert!(
681            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
682            "There should be single metric merging duplicate instruments"
683        );
684        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
685        assert_eq!(metric.name, "my_counter");
686        assert_eq!(metric.unit, "my_unit");
687        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
688            .expect("Sum aggregation expected for Counter instruments by default")
689        else {
690            unreachable!()
691        };
692
693        // Expecting 1 time-series.
694        assert_eq!(sum.data_points.len(), 1);
695
696        let datapoint = &sum.data_points[0];
697        assert_eq!(datapoint.value, 15);
698    }
699
700    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
701    async fn counter_duplicate_instrument_different_meter_no_merge() {
702        // Arrange
703        let exporter = InMemoryMetricExporter::default();
704        let meter_provider = SdkMeterProvider::builder()
705            .with_periodic_exporter(exporter.clone())
706            .build();
707
708        // Act
709        let meter1 = meter_provider.meter("test.meter1");
710        let meter2 = meter_provider.meter("test.meter2");
711        let counter1 = meter1
712            .u64_counter("my_counter")
713            .with_unit("my_unit")
714            .with_description("my_description")
715            .build();
716
717        let counter2 = meter2
718            .u64_counter("my_counter")
719            .with_unit("my_unit")
720            .with_description("my_description")
721            .build();
722
723        let attribute = vec![KeyValue::new("key1", "value1")];
724        counter1.add(10, &attribute);
725        counter2.add(5, &attribute);
726
727        meter_provider.force_flush().unwrap();
728
729        // Assert
730        let resource_metrics = exporter
731            .get_finished_metrics()
732            .expect("metrics are expected to be exported.");
733        assert!(
734            resource_metrics[0].scope_metrics.len() == 2,
735            "There should be 2 separate scope"
736        );
737        assert!(
738            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
739            "There should be single metric for the scope"
740        );
741        assert!(
742            resource_metrics[0].scope_metrics[1].metrics.len() == 1,
743            "There should be single metric for the scope"
744        );
745
746        let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
747        let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
748
749        if let Some(scope1) = scope1 {
750            let metric1 = &scope1.metrics[0];
751            assert_eq!(metric1.name, "my_counter");
752            assert_eq!(metric1.unit, "my_unit");
753            assert_eq!(metric1.description, "my_description");
754            let MetricData::Sum(sum1) = u64::extract_metrics_data_ref(&metric1.data)
755                .expect("Sum aggregation expected for Counter instruments by default")
756            else {
757                unreachable!()
758            };
759
760            // Expecting 1 time-series.
761            assert_eq!(sum1.data_points.len(), 1);
762
763            let datapoint1 = &sum1.data_points[0];
764            assert_eq!(datapoint1.value, 10);
765        } else {
766            panic!("No MetricScope found for 'test.meter1'");
767        }
768
769        if let Some(scope2) = scope2 {
770            let metric2 = &scope2.metrics[0];
771            assert_eq!(metric2.name, "my_counter");
772            assert_eq!(metric2.unit, "my_unit");
773            assert_eq!(metric2.description, "my_description");
774
775            let MetricData::Sum(sum2) = u64::extract_metrics_data_ref(&metric2.data)
776                .expect("Sum aggregation expected for Counter instruments by default")
777            else {
778                unreachable!()
779            };
780
781            // Expecting 1 time-series.
782            assert_eq!(sum2.data_points.len(), 1);
783
784            let datapoint2 = &sum2.data_points[0];
785            assert_eq!(datapoint2.value, 5);
786        } else {
787            panic!("No MetricScope found for 'test.meter2'");
788        }
789    }
790
791    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
792    async fn instrumentation_scope_identity_test() {
793        // Arrange
794        let exporter = InMemoryMetricExporter::default();
795        let meter_provider = SdkMeterProvider::builder()
796            .with_periodic_exporter(exporter.clone())
797            .build();
798
799        // Act
800        // Meters are identical.
801        // Hence there should be a single metric stream output for this test.
802        let make_scope = |attributes| {
803            InstrumentationScope::builder("test.meter")
804                .with_version("v0.1.0")
805                .with_schema_url("http://example.com")
806                .with_attributes(attributes)
807                .build()
808        };
809
810        let meter1 =
811            meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
812        let meter2 =
813            meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
814
815        let counter1 = meter1
816            .u64_counter("my_counter")
817            .with_unit("my_unit")
818            .with_description("my_description")
819            .build();
820
821        let counter2 = meter2
822            .u64_counter("my_counter")
823            .with_unit("my_unit")
824            .with_description("my_description")
825            .build();
826
827        let attribute = vec![KeyValue::new("key1", "value1")];
828        counter1.add(10, &attribute);
829        counter2.add(5, &attribute);
830
831        meter_provider.force_flush().unwrap();
832
833        // Assert
834        let resource_metrics = exporter
835            .get_finished_metrics()
836            .expect("metrics are expected to be exported.");
837        println!("resource_metrics: {resource_metrics:?}");
838        assert!(
839            resource_metrics[0].scope_metrics.len() == 1,
840            "There should be a single scope as the meters are identical"
841        );
842        assert!(
843            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
844            "There should be single metric for the scope as instruments are identical"
845        );
846
847        let scope = &resource_metrics[0].scope_metrics[0].scope;
848        assert_eq!(scope.name(), "test.meter");
849        assert_eq!(scope.version(), Some("v0.1.0"));
850        assert_eq!(scope.schema_url(), Some("http://example.com"));
851
852        // This is validating current behavior, but it is not guaranteed to be the case in the future,
853        // as this is a user error and SDK reserves right to change this behavior.
854        assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")]));
855
856        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
857        assert_eq!(metric.name, "my_counter");
858        assert_eq!(metric.unit, "my_unit");
859        assert_eq!(metric.description, "my_description");
860
861        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
862            .expect("Sum aggregation expected for Counter instruments by default")
863        else {
864            unreachable!()
865        };
866
867        // Expecting 1 time-series.
868        assert_eq!(sum.data_points.len(), 1);
869
870        let datapoint = &sum.data_points[0];
871        assert_eq!(datapoint.value, 15);
872    }
873
874    #[cfg(feature = "spec_unstable_metrics_views")]
875    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
876    async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
877        // Run this test with stdout enabled to see output.
878        // cargo test histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist --features=testing -- --nocapture
879
880        // Arrange
881        let exporter = InMemoryMetricExporter::default();
882        let view = |i: &Instrument| {
883            if i.name == "test_histogram" {
884                Stream::builder()
885                    .with_aggregation(aggregation::Aggregation::ExplicitBucketHistogram {
886                        boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], // invalid boundaries
887                        record_min_max: false,
888                    })
889                    .with_name("test_histogram_renamed")
890                    .with_unit("test_unit_renamed")
891                    .build()
892                    .ok()
893            } else {
894                None
895            }
896        };
897        let meter_provider = SdkMeterProvider::builder()
898            .with_periodic_exporter(exporter.clone())
899            .with_view(view)
900            .build();
901
902        // Act
903        let meter = meter_provider.meter("test");
904        let histogram = meter
905            .f64_histogram("test_histogram")
906            .with_unit("test_unit")
907            .build();
908
909        histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
910        meter_provider.force_flush().unwrap();
911
912        // Assert
913        let resource_metrics = exporter
914            .get_finished_metrics()
915            .expect("metrics are expected to be exported.");
916        assert!(!resource_metrics.is_empty());
917        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
918        assert_eq!(
919            metric.name, "test_histogram",
920            "View rename should be ignored and original name retained."
921        );
922        assert_eq!(
923            metric.unit, "test_unit",
924            "View rename of unit should be ignored and original unit retained."
925        );
926    }
927
928    #[cfg(feature = "spec_unstable_metrics_views")]
929    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
930    #[ignore = "Spatial aggregation is not yet implemented."]
931    async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
932        // cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing
933
934        // Arrange
935        let exporter = InMemoryMetricExporter::default();
936        // View drops all attributes.
937        let view = |i: &Instrument| {
938            if i.name == "my_observable_counter" {
939                Stream::builder()
940                    .with_allowed_attribute_keys(vec![])
941                    .build()
942                    .ok()
943            } else {
944                None
945            }
946        };
947        let meter_provider = SdkMeterProvider::builder()
948            .with_periodic_exporter(exporter.clone())
949            .with_view(view)
950            .build();
951
952        // Act
953        let meter = meter_provider.meter("test");
954        let _observable_counter = meter
955            .u64_observable_counter("my_observable_counter")
956            .with_callback(|observer| {
957                observer.observe(
958                    100,
959                    &[
960                        KeyValue::new("statusCode", "200"),
961                        KeyValue::new("verb", "get"),
962                    ],
963                );
964
965                observer.observe(
966                    100,
967                    &[
968                        KeyValue::new("statusCode", "200"),
969                        KeyValue::new("verb", "post"),
970                    ],
971                );
972
973                observer.observe(
974                    100,
975                    &[
976                        KeyValue::new("statusCode", "500"),
977                        KeyValue::new("verb", "get"),
978                    ],
979                );
980            })
981            .build();
982
983        meter_provider.force_flush().unwrap();
984
985        // Assert
986        let resource_metrics = exporter
987            .get_finished_metrics()
988            .expect("metrics are expected to be exported.");
989        assert!(!resource_metrics.is_empty());
990        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
991        assert_eq!(metric.name, "my_observable_counter",);
992
993        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
994            .expect("Sum aggregation expected for ObservableCounter instruments by default")
995        else {
996            unreachable!()
997        };
998
999        // Expecting 1 time-series only, as the view drops all attributes resulting
1000        // in a single time-series.
1001        // This is failing today, due to lack of support for spatial aggregation.
1002        assert_eq!(sum.data_points.len(), 1);
1003
1004        // find and validate the single datapoint
1005        let data_point = &sum.data_points[0];
1006        assert_eq!(data_point.value, 300);
1007    }
1008
1009    #[cfg(feature = "spec_unstable_metrics_views")]
1010    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1011    async fn spatial_aggregation_when_view_drops_attributes_counter() {
1012        // cargo test spatial_aggregation_when_view_drops_attributes_counter --features=testing
1013
1014        // Arrange
1015        let exporter = InMemoryMetricExporter::default();
1016        // View drops all attributes.
1017        let view = |i: &Instrument| {
1018            if i.name == "my_counter" {
1019                Some(
1020                    Stream::builder()
1021                        .with_allowed_attribute_keys(vec![])
1022                        .build()
1023                        .unwrap(),
1024                )
1025            } else {
1026                None
1027            }
1028        };
1029        let meter_provider = SdkMeterProvider::builder()
1030            .with_periodic_exporter(exporter.clone())
1031            .with_view(view)
1032            .build();
1033
1034        // Act
1035        let meter = meter_provider.meter("test");
1036        let counter = meter.u64_counter("my_counter").build();
1037
1038        // Normally, this would generate 3 time-series, but since the view
1039        // drops all attributes, we expect only 1 time-series.
1040        counter.add(
1041            10,
1042            [
1043                KeyValue::new("statusCode", "200"),
1044                KeyValue::new("verb", "Get"),
1045            ]
1046            .as_ref(),
1047        );
1048
1049        counter.add(
1050            10,
1051            [
1052                KeyValue::new("statusCode", "500"),
1053                KeyValue::new("verb", "Get"),
1054            ]
1055            .as_ref(),
1056        );
1057
1058        counter.add(
1059            10,
1060            [
1061                KeyValue::new("statusCode", "200"),
1062                KeyValue::new("verb", "Post"),
1063            ]
1064            .as_ref(),
1065        );
1066
1067        meter_provider.force_flush().unwrap();
1068
1069        // Assert
1070        let resource_metrics = exporter
1071            .get_finished_metrics()
1072            .expect("metrics are expected to be exported.");
1073        assert!(!resource_metrics.is_empty());
1074        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1075        assert_eq!(metric.name, "my_counter",);
1076
1077        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
1078            .expect("Sum aggregation expected for Counter instruments by default")
1079        else {
1080            unreachable!()
1081        };
1082
1083        // Expecting 1 time-series only, as the view drops all attributes resulting
1084        // in a single time-series.
1085        // This is failing today, due to lack of support for spatial aggregation.
1086        assert_eq!(sum.data_points.len(), 1);
1087        // find and validate the single datapoint
1088        let data_point = &sum.data_points[0];
1089        assert_eq!(data_point.value, 30);
1090    }
1091
1092    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1093    async fn no_attr_cumulative_up_down_counter() {
1094        let mut test_context = TestContext::new(Temporality::Cumulative);
1095        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1096
1097        counter.add(50, &[]);
1098        test_context.flush_metrics();
1099
1100        let MetricData::Sum(sum) =
1101            test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1102        else {
1103            unreachable!()
1104        };
1105
1106        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1107        assert!(!sum.is_monotonic, "Should not produce monotonic.");
1108        assert_eq!(
1109            sum.temporality,
1110            Temporality::Cumulative,
1111            "Should produce cumulative"
1112        );
1113
1114        let data_point = &sum.data_points[0];
1115        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1116        assert_eq!(data_point.value, 50, "Unexpected data point value");
1117    }
1118
1119    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1120    async fn no_attr_up_down_counter_always_cumulative() {
1121        let mut test_context = TestContext::new(Temporality::Delta);
1122        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1123
1124        counter.add(50, &[]);
1125        test_context.flush_metrics();
1126
1127        let MetricData::Sum(sum) =
1128            test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1129        else {
1130            unreachable!()
1131        };
1132
1133        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1134        assert!(!sum.is_monotonic, "Should not produce monotonic.");
1135        assert_eq!(
1136            sum.temporality,
1137            Temporality::Cumulative,
1138            "Should produce Cumulative due to UpDownCounter temporality_preference"
1139        );
1140
1141        let data_point = &sum.data_points[0];
1142        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1143        assert_eq!(data_point.value, 50, "Unexpected data point value");
1144    }
1145
1146    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1147    async fn no_attr_cumulative_counter_value_added_after_export() {
1148        let mut test_context = TestContext::new(Temporality::Cumulative);
1149        let counter = test_context.u64_counter("test", "my_counter", None);
1150
1151        counter.add(50, &[]);
1152        test_context.flush_metrics();
1153        let _ = test_context.get_aggregation::<u64>("my_counter", None);
1154        test_context.reset_metrics();
1155
1156        counter.add(5, &[]);
1157        test_context.flush_metrics();
1158        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1159            unreachable!()
1160        };
1161
1162        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1163        assert!(sum.is_monotonic, "Should produce monotonic.");
1164        assert_eq!(
1165            sum.temporality,
1166            Temporality::Cumulative,
1167            "Should produce cumulative"
1168        );
1169
1170        let data_point = &sum.data_points[0];
1171        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1172        assert_eq!(data_point.value, 55, "Unexpected data point value");
1173    }
1174
1175    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1176    async fn no_attr_delta_counter_value_reset_after_export() {
1177        let mut test_context = TestContext::new(Temporality::Delta);
1178        let counter = test_context.u64_counter("test", "my_counter", None);
1179
1180        counter.add(50, &[]);
1181        test_context.flush_metrics();
1182        let _ = test_context.get_aggregation::<u64>("my_counter", None);
1183        test_context.reset_metrics();
1184
1185        counter.add(5, &[]);
1186        test_context.flush_metrics();
1187        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1188            unreachable!()
1189        };
1190
1191        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1192        assert!(sum.is_monotonic, "Should produce monotonic.");
1193        assert_eq!(
1194            sum.temporality,
1195            Temporality::Delta,
1196            "Should produce cumulative"
1197        );
1198
1199        let data_point = &sum.data_points[0];
1200        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1201        assert_eq!(data_point.value, 5, "Unexpected data point value");
1202    }
1203
1204    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1205    async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
1206        let mut test_context = TestContext::new(Temporality::Delta);
1207        let counter = test_context.u64_counter("test", "my_counter", None);
1208
1209        counter.add(50, &[]);
1210        test_context.flush_metrics();
1211        let _ = test_context.get_aggregation::<u64>("my_counter", None);
1212        test_context.reset_metrics();
1213
1214        counter.add(50, &[KeyValue::new("a", "b")]);
1215        test_context.flush_metrics();
1216        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1217            unreachable!()
1218        };
1219
1220        let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
1221
1222        assert!(
1223            no_attr_data_point.is_none(),
1224            "Expected no data points with no attributes"
1225        );
1226    }
1227
1228    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1229    async fn delta_memory_efficiency_test() {
1230        // Run this test with stdout enabled to see output.
1231        // cargo test delta_memory_efficiency_test --features=testing -- --nocapture
1232
1233        // Arrange
1234        let mut test_context = TestContext::new(Temporality::Delta);
1235        let counter = test_context.u64_counter("test", "my_counter", None);
1236
1237        // Act
1238        counter.add(1, &[KeyValue::new("key1", "value1")]);
1239        counter.add(1, &[KeyValue::new("key1", "value1")]);
1240        counter.add(1, &[KeyValue::new("key1", "value1")]);
1241        counter.add(1, &[KeyValue::new("key1", "value1")]);
1242        counter.add(1, &[KeyValue::new("key1", "value1")]);
1243
1244        counter.add(1, &[KeyValue::new("key1", "value2")]);
1245        counter.add(1, &[KeyValue::new("key1", "value2")]);
1246        counter.add(1, &[KeyValue::new("key1", "value2")]);
1247        test_context.flush_metrics();
1248
1249        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1250            unreachable!()
1251        };
1252
1253        // Expecting 2 time-series.
1254        assert_eq!(sum.data_points.len(), 2);
1255
1256        // find and validate key1=value1 datapoint
1257        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1258            .expect("datapoint with key1=value1 expected");
1259        assert_eq!(data_point1.value, 5);
1260
1261        // find and validate key1=value2 datapoint
1262        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1263            .expect("datapoint with key1=value2 expected");
1264        assert_eq!(data_point1.value, 3);
1265
1266        test_context.exporter.reset();
1267        // flush again, and validate that nothing is flushed
1268        // as delta temporality.
1269        test_context.flush_metrics();
1270
1271        let resource_metrics = test_context
1272            .exporter
1273            .get_finished_metrics()
1274            .expect("metrics are expected to be exported.");
1275        println!("resource_metrics: {resource_metrics:?}");
1276        assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1277    }
1278
1279    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1280    async fn counter_multithreaded() {
1281        // Run this test with stdout enabled to see output.
1282        // cargo test counter_multithreaded --features=testing -- --nocapture
1283
1284        counter_multithreaded_aggregation_helper(Temporality::Delta);
1285        counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1286    }
1287
1288    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1289    async fn counter_f64_multithreaded() {
1290        // Run this test with stdout enabled to see output.
1291        // cargo test counter_f64_multithreaded --features=testing -- --nocapture
1292
1293        counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
1294        counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1295    }
1296
1297    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1298    async fn histogram_multithreaded() {
1299        // Run this test with stdout enabled to see output.
1300        // cargo test histogram_multithreaded --features=testing -- --nocapture
1301
1302        histogram_multithreaded_aggregation_helper(Temporality::Delta);
1303        histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
1304    }
1305
1306    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1307    async fn histogram_f64_multithreaded() {
1308        // Run this test with stdout enabled to see output.
1309        // cargo test histogram_f64_multithreaded --features=testing -- --nocapture
1310
1311        histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
1312        histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1313    }
1314    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1315    async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
1316        // Run this test with stdout enabled to see output.
1317        // cargo test synchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture
1318
1319        synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1320        synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1321        synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1322        synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1323    }
1324
1325    fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
1326        instrument_name: &'static str,
1327    ) {
1328        let mut test_context = TestContext::new(Temporality::Cumulative);
1329        let attributes = &[KeyValue::new("key1", "value1")];
1330
1331        // Create instrument and emit measurements
1332        match instrument_name {
1333            "counter" => {
1334                let counter = test_context.meter().u64_counter("test_counter").build();
1335                counter.add(5, &[]);
1336                counter.add(10, attributes);
1337            }
1338            "updown_counter" => {
1339                let updown_counter = test_context
1340                    .meter()
1341                    .i64_up_down_counter("test_updowncounter")
1342                    .build();
1343                updown_counter.add(15, &[]);
1344                updown_counter.add(20, attributes);
1345            }
1346            "histogram" => {
1347                let histogram = test_context.meter().u64_histogram("test_histogram").build();
1348                histogram.record(25, &[]);
1349                histogram.record(30, attributes);
1350            }
1351            "gauge" => {
1352                let gauge = test_context.meter().u64_gauge("test_gauge").build();
1353                gauge.record(35, &[]);
1354                gauge.record(40, attributes);
1355            }
1356            _ => panic!("Incorrect instrument kind provided"),
1357        };
1358
1359        test_context.flush_metrics();
1360
1361        // Test the first export
1362        assert_correct_export(&mut test_context, instrument_name);
1363
1364        // Reset and export again without making any measurements
1365        test_context.reset_metrics();
1366
1367        test_context.flush_metrics();
1368
1369        // Test that latest export has the same data as the previous one
1370        assert_correct_export(&mut test_context, instrument_name);
1371
1372        fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1373            match instrument_name {
1374                "counter" => {
1375                    let MetricData::Sum(sum) =
1376                        test_context.get_aggregation::<u64>("test_counter", None)
1377                    else {
1378                        unreachable!()
1379                    };
1380                    assert_eq!(sum.data_points.len(), 2);
1381                    let zero_attribute_datapoint =
1382                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1383                            .expect("datapoint with no attributes expected");
1384                    assert_eq!(zero_attribute_datapoint.value, 5);
1385                    let data_point1 =
1386                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1387                            .expect("datapoint with key1=value1 expected");
1388                    assert_eq!(data_point1.value, 10);
1389                }
1390                "updown_counter" => {
1391                    let MetricData::Sum(sum) =
1392                        test_context.get_aggregation::<i64>("test_updowncounter", None)
1393                    else {
1394                        unreachable!()
1395                    };
1396                    assert_eq!(sum.data_points.len(), 2);
1397                    let zero_attribute_datapoint =
1398                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1399                            .expect("datapoint with no attributes expected");
1400                    assert_eq!(zero_attribute_datapoint.value, 15);
1401                    let data_point1 =
1402                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1403                            .expect("datapoint with key1=value1 expected");
1404                    assert_eq!(data_point1.value, 20);
1405                }
1406                "histogram" => {
1407                    let MetricData::Histogram(histogram_data) =
1408                        test_context.get_aggregation::<u64>("test_histogram", None)
1409                    else {
1410                        unreachable!()
1411                    };
1412                    assert_eq!(histogram_data.data_points.len(), 2);
1413                    let zero_attribute_datapoint =
1414                        find_histogram_datapoint_with_no_attributes(&histogram_data.data_points)
1415                            .expect("datapoint with no attributes expected");
1416                    assert_eq!(zero_attribute_datapoint.count, 1);
1417                    assert_eq!(zero_attribute_datapoint.sum, 25);
1418                    assert_eq!(zero_attribute_datapoint.min, Some(25));
1419                    assert_eq!(zero_attribute_datapoint.max, Some(25));
1420                    let data_point1 = find_histogram_datapoint_with_key_value(
1421                        &histogram_data.data_points,
1422                        "key1",
1423                        "value1",
1424                    )
1425                    .expect("datapoint with key1=value1 expected");
1426                    assert_eq!(data_point1.count, 1);
1427                    assert_eq!(data_point1.sum, 30);
1428                    assert_eq!(data_point1.min, Some(30));
1429                    assert_eq!(data_point1.max, Some(30));
1430                }
1431                "gauge" => {
1432                    let MetricData::Gauge(gauge_data) =
1433                        test_context.get_aggregation::<u64>("test_gauge", None)
1434                    else {
1435                        unreachable!()
1436                    };
1437                    assert_eq!(gauge_data.data_points.len(), 2);
1438                    let zero_attribute_datapoint =
1439                        find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1440                            .expect("datapoint with no attributes expected");
1441                    assert_eq!(zero_attribute_datapoint.value, 35);
1442                    let data_point1 = find_gauge_datapoint_with_key_value(
1443                        &gauge_data.data_points,
1444                        "key1",
1445                        "value1",
1446                    )
1447                    .expect("datapoint with key1=value1 expected");
1448                    assert_eq!(data_point1.value, 40);
1449                }
1450                _ => panic!("Incorrect instrument kind provided"),
1451            }
1452        }
1453    }
1454
1455    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1456    async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
1457        // Run this test with stdout enabled to see output.
1458        // cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture
1459
1460        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1461            "gauge", true,
1462        );
1463        // TODO fix: all asynchronous instruments should not emit data points if not measured
1464        // but these implementations are still buggy
1465        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1466            "counter", false,
1467        );
1468        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1469            "updown_counter",
1470            false,
1471        );
1472    }
1473
1474    #[test]
1475    fn view_test_rename() {
1476        test_view_customization(
1477            |i| {
1478                if i.name == "my_counter" {
1479                    Some(
1480                        Stream::builder()
1481                            .with_name("my_counter_renamed")
1482                            .build()
1483                            .unwrap(),
1484                    )
1485                } else {
1486                    None
1487                }
1488            },
1489            "my_counter_renamed",
1490            "my_unit",
1491            "my_description",
1492        )
1493    }
1494
1495    #[test]
1496    fn view_test_change_unit() {
1497        test_view_customization(
1498            |i| {
1499                if i.name == "my_counter" {
1500                    Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1501                } else {
1502                    None
1503                }
1504            },
1505            "my_counter",
1506            "my_unit_new",
1507            "my_description",
1508        )
1509    }
1510
1511    #[test]
1512    fn view_test_change_description() {
1513        test_view_customization(
1514            |i| {
1515                if i.name == "my_counter" {
1516                    Some(
1517                        Stream::builder()
1518                            .with_description("my_description_new")
1519                            .build()
1520                            .unwrap(),
1521                    )
1522                } else {
1523                    None
1524                }
1525            },
1526            "my_counter",
1527            "my_unit",
1528            "my_description_new",
1529        )
1530    }
1531
1532    #[test]
1533    fn view_test_change_name_unit() {
1534        test_view_customization(
1535            |i| {
1536                if i.name == "my_counter" {
1537                    Some(
1538                        Stream::builder()
1539                            .with_name("my_counter_renamed")
1540                            .with_unit("my_unit_new")
1541                            .build()
1542                            .unwrap(),
1543                    )
1544                } else {
1545                    None
1546                }
1547            },
1548            "my_counter_renamed",
1549            "my_unit_new",
1550            "my_description",
1551        )
1552    }
1553
1554    #[test]
1555    fn view_test_change_name_unit_desc() {
1556        test_view_customization(
1557            |i| {
1558                if i.name == "my_counter" {
1559                    Some(
1560                        Stream::builder()
1561                            .with_name("my_counter_renamed")
1562                            .with_unit("my_unit_new")
1563                            .with_description("my_description_new")
1564                            .build()
1565                            .unwrap(),
1566                    )
1567                } else {
1568                    None
1569                }
1570            },
1571            "my_counter_renamed",
1572            "my_unit_new",
1573            "my_description_new",
1574        )
1575    }
1576
1577    #[test]
1578    fn view_test_match_unit() {
1579        test_view_customization(
1580            |i| {
1581                if i.unit == "my_unit" {
1582                    Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1583                } else {
1584                    None
1585                }
1586            },
1587            "my_counter",
1588            "my_unit_new",
1589            "my_description",
1590        )
1591    }
1592
1593    #[test]
1594    fn view_test_match_none() {
1595        test_view_customization(
1596            |i| {
1597                if i.name == "not_expected_to_match" {
1598                    Some(Stream::builder().build().unwrap())
1599                } else {
1600                    None
1601                }
1602            },
1603            "my_counter",
1604            "my_unit",
1605            "my_description",
1606        )
1607    }
1608
1609    #[test]
1610    fn view_test_match_multiple() {
1611        test_view_customization(
1612            |i| {
1613                if i.name == "my_counter" && i.unit == "my_unit" {
1614                    Some(
1615                        Stream::builder()
1616                            .with_name("my_counter_renamed")
1617                            .build()
1618                            .unwrap(),
1619                    )
1620                } else {
1621                    None
1622                }
1623            },
1624            "my_counter_renamed",
1625            "my_unit",
1626            "my_description",
1627        )
1628    }
1629
1630    /// Helper function to test view customizations
1631    fn test_view_customization<F>(
1632        view_function: F,
1633        expected_name: &str,
1634        expected_unit: &str,
1635        expected_description: &str,
1636    ) where
1637        F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
1638    {
1639        // Run this test with stdout enabled to see output.
1640        // cargo test view_test_* --all-features -- --nocapture
1641
1642        // Arrange
1643        let exporter = InMemoryMetricExporter::default();
1644        let meter_provider = SdkMeterProvider::builder()
1645            .with_periodic_exporter(exporter.clone())
1646            .with_view(view_function)
1647            .build();
1648
1649        // Act
1650        let meter = meter_provider.meter("test");
1651        let counter = meter
1652            .f64_counter("my_counter")
1653            .with_unit("my_unit")
1654            .with_description("my_description")
1655            .build();
1656
1657        counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1658        meter_provider.force_flush().unwrap();
1659
1660        // Assert
1661        let resource_metrics = exporter
1662            .get_finished_metrics()
1663            .expect("metrics are expected to be exported.");
1664        assert_eq!(resource_metrics.len(), 1);
1665        assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1666        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1667        assert_eq!(
1668            metric.name, expected_name,
1669            "Expected name: {expected_name}."
1670        );
1671        assert_eq!(
1672            metric.unit, expected_unit,
1673            "Expected unit: {expected_unit}."
1674        );
1675        assert_eq!(
1676            metric.description, expected_description,
1677            "Expected description: {expected_description}."
1678        );
1679    }
1680
1681    // Following are just a basic set of advanced View tests - Views bring a lot
1682    // of permutations and combinations, and we need
1683    // to expand coverage for more scenarios in future.
1684    // It is best to first split this file into multiple files
1685    // based on scenarios (eg: regular aggregation, cardinality, views, view_advanced, etc)
1686    // and then add more tests for each of the scenarios.
1687    #[test]
1688    fn test_view_single_instrument_multiple_stream() {
1689        // Run this test with stdout enabled to see output.
1690        // cargo test test_view_multiple_stream --all-features
1691
1692        // Each of the views match the instrument name "my_counter" and create a
1693        // new stream with a different name. In other words, View can be used to
1694        // create multiple streams for the same instrument.
1695
1696        let view1 = |i: &Instrument| {
1697            if i.name() == "my_counter" {
1698                Some(Stream::builder().with_name("my_counter_1").build().unwrap())
1699            } else {
1700                None
1701            }
1702        };
1703
1704        let view2 = |i: &Instrument| {
1705            if i.name() == "my_counter" {
1706                Some(Stream::builder().with_name("my_counter_2").build().unwrap())
1707            } else {
1708                None
1709            }
1710        };
1711
1712        // Arrange
1713        let exporter = InMemoryMetricExporter::default();
1714        let meter_provider = SdkMeterProvider::builder()
1715            .with_periodic_exporter(exporter.clone())
1716            .with_view(view1)
1717            .with_view(view2)
1718            .build();
1719
1720        // Act
1721        let meter = meter_provider.meter("test");
1722        let counter = meter.f64_counter("my_counter").build();
1723
1724        counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1725        meter_provider.force_flush().unwrap();
1726
1727        // Assert
1728        let resource_metrics = exporter
1729            .get_finished_metrics()
1730            .expect("metrics are expected to be exported.");
1731        assert_eq!(resource_metrics.len(), 1);
1732        assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1733        let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1734        assert_eq!(metrics.len(), 2);
1735        assert_eq!(metrics[0].name, "my_counter_1");
1736        assert_eq!(metrics[1].name, "my_counter_2");
1737    }
1738
1739    #[test]
1740    fn test_view_multiple_instrument_single_stream() {
1741        // Run this test with stdout enabled to see output.
1742        // cargo test test_view_multiple_instrument_single_stream --all-features
1743
1744        // The view matches the instrument name "my_counter1" and "my_counter1"
1745        // and create a single new stream for both. In other words, View can be used to
1746        // "merge" multiple instruments into a single stream.
1747        let view = |i: &Instrument| {
1748            if i.name() == "my_counter1" || i.name() == "my_counter2" {
1749                Some(Stream::builder().with_name("my_counter").build().unwrap())
1750            } else {
1751                None
1752            }
1753        };
1754
1755        // Arrange
1756        let exporter = InMemoryMetricExporter::default();
1757        let meter_provider = SdkMeterProvider::builder()
1758            .with_periodic_exporter(exporter.clone())
1759            .with_view(view)
1760            .build();
1761
1762        // Act
1763        let meter = meter_provider.meter("test");
1764        let counter1 = meter.f64_counter("my_counter1").build();
1765        let counter2 = meter.f64_counter("my_counter2").build();
1766
1767        counter1.add(1.5, &[KeyValue::new("key1", "value1")]);
1768        counter2.add(1.5, &[KeyValue::new("key1", "value1")]);
1769        meter_provider.force_flush().unwrap();
1770
1771        // Assert
1772        let resource_metrics = exporter
1773            .get_finished_metrics()
1774            .expect("metrics are expected to be exported.");
1775        assert_eq!(resource_metrics.len(), 1);
1776        assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1777        let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1778        assert_eq!(metrics.len(), 1);
1779        assert_eq!(metrics[0].name, "my_counter");
1780        // TODO: Assert that the data points are aggregated correctly.
1781    }
1782
1783    fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1784        instrument_name: &'static str,
1785        should_not_emit: bool,
1786    ) {
1787        let mut test_context = TestContext::new(Temporality::Cumulative);
1788        let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1789
1790        // Create instrument and emit measurements once
1791        match instrument_name {
1792            "counter" => {
1793                let has_run = AtomicBool::new(false);
1794                let _observable_counter = test_context
1795                    .meter()
1796                    .u64_observable_counter("test_counter")
1797                    .with_callback(move |observer| {
1798                        if !has_run.load(Ordering::SeqCst) {
1799                            observer.observe(5, &[]);
1800                            observer.observe(10, &*attributes.clone());
1801                            has_run.store(true, Ordering::SeqCst);
1802                        }
1803                    })
1804                    .build();
1805            }
1806            "updown_counter" => {
1807                let has_run = AtomicBool::new(false);
1808                let _observable_up_down_counter = test_context
1809                    .meter()
1810                    .i64_observable_up_down_counter("test_updowncounter")
1811                    .with_callback(move |observer| {
1812                        if !has_run.load(Ordering::SeqCst) {
1813                            observer.observe(15, &[]);
1814                            observer.observe(20, &*attributes.clone());
1815                            has_run.store(true, Ordering::SeqCst);
1816                        }
1817                    })
1818                    .build();
1819            }
1820            "gauge" => {
1821                let has_run = AtomicBool::new(false);
1822                let _observable_gauge = test_context
1823                    .meter()
1824                    .u64_observable_gauge("test_gauge")
1825                    .with_callback(move |observer| {
1826                        if !has_run.load(Ordering::SeqCst) {
1827                            observer.observe(25, &[]);
1828                            observer.observe(30, &*attributes.clone());
1829                            has_run.store(true, Ordering::SeqCst);
1830                        }
1831                    })
1832                    .build();
1833            }
1834            _ => panic!("Incorrect instrument kind provided"),
1835        };
1836
1837        test_context.flush_metrics();
1838
1839        // Test the first export
1840        assert_correct_export(&mut test_context, instrument_name);
1841
1842        // Reset and export again without making any measurements
1843        test_context.reset_metrics();
1844
1845        test_context.flush_metrics();
1846
1847        if should_not_emit {
1848            test_context.check_no_metrics();
1849        } else {
1850            // Test that latest export has the same data as the previous one
1851            assert_correct_export(&mut test_context, instrument_name);
1852        }
1853
1854        fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1855            match instrument_name {
1856                "counter" => {
1857                    let MetricData::Sum(sum) =
1858                        test_context.get_aggregation::<u64>("test_counter", None)
1859                    else {
1860                        unreachable!()
1861                    };
1862                    assert_eq!(sum.data_points.len(), 2);
1863                    assert!(sum.is_monotonic);
1864                    let zero_attribute_datapoint =
1865                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1866                            .expect("datapoint with no attributes expected");
1867                    assert_eq!(zero_attribute_datapoint.value, 5);
1868                    let data_point1 =
1869                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1870                            .expect("datapoint with key1=value1 expected");
1871                    assert_eq!(data_point1.value, 10);
1872                }
1873                "updown_counter" => {
1874                    let MetricData::Sum(sum) =
1875                        test_context.get_aggregation::<i64>("test_updowncounter", None)
1876                    else {
1877                        unreachable!()
1878                    };
1879                    assert_eq!(sum.data_points.len(), 2);
1880                    assert!(!sum.is_monotonic);
1881                    let zero_attribute_datapoint =
1882                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1883                            .expect("datapoint with no attributes expected");
1884                    assert_eq!(zero_attribute_datapoint.value, 15);
1885                    let data_point1 =
1886                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1887                            .expect("datapoint with key1=value1 expected");
1888                    assert_eq!(data_point1.value, 20);
1889                }
1890                "gauge" => {
1891                    let MetricData::Gauge(gauge_data) =
1892                        test_context.get_aggregation::<u64>("test_gauge", None)
1893                    else {
1894                        unreachable!()
1895                    };
1896                    assert_eq!(gauge_data.data_points.len(), 2);
1897                    let zero_attribute_datapoint =
1898                        find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1899                            .expect("datapoint with no attributes expected");
1900                    assert_eq!(zero_attribute_datapoint.value, 25);
1901                    let data_point1 = find_gauge_datapoint_with_key_value(
1902                        &gauge_data.data_points,
1903                        "key1",
1904                        "value1",
1905                    )
1906                    .expect("datapoint with key1=value1 expected");
1907                    assert_eq!(data_point1.value, 30);
1908                }
1909                _ => panic!("Incorrect instrument kind provided"),
1910            }
1911        }
1912    }
1913
1914    fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1915        // Arrange
1916        let mut test_context = TestContext::new(temporality);
1917        let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1918
1919        for i in 0..10 {
1920            thread::scope(|s| {
1921                s.spawn(|| {
1922                    counter.add(1, &[]);
1923
1924                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1925                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1926                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1927
1928                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1929                    if i % 2 == 0 {
1930                        test_context.flush_metrics();
1931                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1932                    }
1933
1934                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1935                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1936                });
1937            });
1938        }
1939
1940        test_context.flush_metrics();
1941
1942        // Assert
1943        // We invoke `test_context.flush_metrics()` six times.
1944        let sums = test_context
1945            .get_from_multiple_aggregations::<u64>("my_counter", None, 6)
1946            .into_iter()
1947            .map(|data| {
1948                if let MetricData::Sum(sum) = data {
1949                    sum
1950                } else {
1951                    unreachable!()
1952                }
1953            })
1954            .collect::<Vec<_>>();
1955
1956        let mut sum_zero_attributes = 0;
1957        let mut sum_key1_value1 = 0;
1958        sums.iter().for_each(|sum| {
1959            assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
1960            assert!(sum.is_monotonic, "Counter should produce monotonic.");
1961            assert_eq!(sum.temporality, temporality);
1962
1963            if temporality == Temporality::Delta {
1964                sum_zero_attributes += sum.data_points[0].value;
1965                sum_key1_value1 += sum.data_points[1].value;
1966            } else {
1967                sum_zero_attributes = sum.data_points[0].value;
1968                sum_key1_value1 = sum.data_points[1].value;
1969            };
1970        });
1971
1972        assert_eq!(sum_zero_attributes, 10);
1973        assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5.
1974    }
1975
1976    fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1977        // Arrange
1978        let mut test_context = TestContext::new(temporality);
1979        let counter = Arc::new(test_context.meter().f64_counter("test_counter").build());
1980
1981        for i in 0..10 {
1982            thread::scope(|s| {
1983                s.spawn(|| {
1984                    counter.add(1.23, &[]);
1985
1986                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1987                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1988                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1989
1990                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1991                    if i % 2 == 0 {
1992                        test_context.flush_metrics();
1993                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1994                    }
1995
1996                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1997                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1998                });
1999            });
2000        }
2001
2002        test_context.flush_metrics();
2003
2004        // Assert
2005        // We invoke `test_context.flush_metrics()` six times.
2006        let sums = test_context
2007            .get_from_multiple_aggregations::<f64>("test_counter", None, 6)
2008            .into_iter()
2009            .map(|data| {
2010                if let MetricData::Sum(sum) = data {
2011                    sum
2012                } else {
2013                    unreachable!()
2014                }
2015            })
2016            .collect::<Vec<_>>();
2017
2018        let mut sum_zero_attributes = 0.0;
2019        let mut sum_key1_value1 = 0.0;
2020        sums.iter().for_each(|sum| {
2021            assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
2022            assert!(sum.is_monotonic, "Counter should produce monotonic.");
2023            assert_eq!(sum.temporality, temporality);
2024
2025            if temporality == Temporality::Delta {
2026                sum_zero_attributes += sum.data_points[0].value;
2027                sum_key1_value1 += sum.data_points[1].value;
2028            } else {
2029                sum_zero_attributes = sum.data_points[0].value;
2030                sum_key1_value1 = sum.data_points[1].value;
2031            };
2032        });
2033
2034        assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
2035        assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5
2036    }
2037
2038    fn histogram_multithreaded_aggregation_helper(temporality: Temporality) {
2039        // Arrange
2040        let mut test_context = TestContext::new(temporality);
2041        let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build());
2042
2043        for i in 0..10 {
2044            thread::scope(|s| {
2045                s.spawn(|| {
2046                    histogram.record(1, &[]);
2047                    histogram.record(4, &[]);
2048
2049                    histogram.record(5, &[KeyValue::new("key1", "value1")]);
2050                    histogram.record(7, &[KeyValue::new("key1", "value1")]);
2051                    histogram.record(18, &[KeyValue::new("key1", "value1")]);
2052
2053                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
2054                    if i % 2 == 0 {
2055                        test_context.flush_metrics();
2056                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
2057                    }
2058
2059                    histogram.record(35, &[KeyValue::new("key1", "value1")]);
2060                    histogram.record(35, &[KeyValue::new("key1", "value1")]);
2061                });
2062            });
2063        }
2064
2065        test_context.flush_metrics();
2066
2067        // Assert
2068        // We invoke `test_context.flush_metrics()` six times.
2069        let histograms = test_context
2070            .get_from_multiple_aggregations::<u64>("test_histogram", None, 6)
2071            .into_iter()
2072            .map(|data| {
2073                if let MetricData::Histogram(hist) = data {
2074                    hist
2075                } else {
2076                    unreachable!()
2077                }
2078            })
2079            .collect::<Vec<_>>();
2080
2081        let (
2082            mut sum_zero_attributes,
2083            mut count_zero_attributes,
2084            mut min_zero_attributes,
2085            mut max_zero_attributes,
2086        ) = (0, 0, u64::MAX, u64::MIN);
2087        let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2088            (0, 0, u64::MAX, u64::MIN);
2089
2090        let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
2091        let mut bucket_counts_key1_value1 = vec![0; 16];
2092
2093        histograms.iter().for_each(|histogram| {
2094            assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
2095            assert_eq!(histogram.temporality, temporality);
2096
2097            let data_point_zero_attributes =
2098                find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2099            let data_point_key1_value1 =
2100                find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2101                    .unwrap();
2102
2103            if temporality == Temporality::Delta {
2104                sum_zero_attributes += data_point_zero_attributes.sum;
2105                sum_key1_value1 += data_point_key1_value1.sum;
2106
2107                count_zero_attributes += data_point_zero_attributes.count;
2108                count_key1_value1 += data_point_key1_value1.count;
2109
2110                min_zero_attributes =
2111                    min(min_zero_attributes, data_point_zero_attributes.min.unwrap());
2112                min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap());
2113
2114                max_zero_attributes =
2115                    max(max_zero_attributes, data_point_zero_attributes.max.unwrap());
2116                max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap());
2117
2118                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2119                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2120
2121                for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2122                    bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2123                }
2124
2125                for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2126                    bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2127                }
2128            } else {
2129                sum_zero_attributes = data_point_zero_attributes.sum;
2130                sum_key1_value1 = data_point_key1_value1.sum;
2131
2132                count_zero_attributes = data_point_zero_attributes.count;
2133                count_key1_value1 = data_point_key1_value1.count;
2134
2135                min_zero_attributes = data_point_zero_attributes.min.unwrap();
2136                min_key1_value1 = data_point_key1_value1.min.unwrap();
2137
2138                max_zero_attributes = data_point_zero_attributes.max.unwrap();
2139                max_key1_value1 = data_point_key1_value1.max.unwrap();
2140
2141                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2142                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2143
2144                bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2145                bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2146            };
2147        });
2148
2149        // Default buckets:
2150        // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
2151        // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
2152
2153        assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
2154        assert_eq!(sum_zero_attributes, 50); // Each of the 10 update threads record measurements summing up to 5.
2155        assert_eq!(min_zero_attributes, 1);
2156        assert_eq!(max_zero_attributes, 4);
2157
2158        for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2159            match i {
2160                1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1 and 4 fall under the bucket (0, 5].
2161                _ => assert_eq!(*count, 0),
2162            }
2163        }
2164
2165        assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
2166        assert_eq!(sum_key1_value1, 1000); // Each of the 10 update threads record measurements summing up to 100 (5 + 7 + 18 + 35 + 35).
2167        assert_eq!(min_key1_value1, 5);
2168        assert_eq!(max_key1_value1, 35);
2169
2170        for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2171            match i {
2172                1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5 falls under the bucket (0, 5].
2173                2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7 falls under the bucket (5, 10].
2174                3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18 falls under the bucket (10, 25].
2175                4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35 (recorded twice) falls under the bucket (25, 50].
2176                _ => assert_eq!(*count, 0),
2177            }
2178        }
2179    }
2180
2181    fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
2182        // Arrange
2183        let mut test_context = TestContext::new(temporality);
2184        let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build());
2185
2186        for i in 0..10 {
2187            thread::scope(|s| {
2188                s.spawn(|| {
2189                    histogram.record(1.5, &[]);
2190                    histogram.record(4.6, &[]);
2191
2192                    histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
2193                    histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
2194                    histogram.record(18.1, &[KeyValue::new("key1", "value1")]);
2195
2196                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
2197                    if i % 2 == 0 {
2198                        test_context.flush_metrics();
2199                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
2200                    }
2201
2202                    histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2203                    histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2204                });
2205            });
2206        }
2207
2208        test_context.flush_metrics();
2209
2210        // Assert
2211        // We invoke `test_context.flush_metrics()` six times.
2212        let histograms = test_context
2213            .get_from_multiple_aggregations::<f64>("test_histogram", None, 6)
2214            .into_iter()
2215            .map(|data| {
2216                if let MetricData::Histogram(hist) = data {
2217                    hist
2218                } else {
2219                    unreachable!()
2220                }
2221            })
2222            .collect::<Vec<_>>();
2223
2224        let (
2225            mut sum_zero_attributes,
2226            mut count_zero_attributes,
2227            mut min_zero_attributes,
2228            mut max_zero_attributes,
2229        ) = (0.0, 0, f64::MAX, f64::MIN);
2230        let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2231            (0.0, 0, f64::MAX, f64::MIN);
2232
2233        let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
2234        let mut bucket_counts_key1_value1 = vec![0; 16];
2235
2236        histograms.iter().for_each(|histogram| {
2237            assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
2238            assert_eq!(histogram.temporality, temporality);
2239
2240            let data_point_zero_attributes =
2241                find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2242            let data_point_key1_value1 =
2243                find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2244                    .unwrap();
2245
2246            if temporality == Temporality::Delta {
2247                sum_zero_attributes += data_point_zero_attributes.sum;
2248                sum_key1_value1 += data_point_key1_value1.sum;
2249
2250                count_zero_attributes += data_point_zero_attributes.count;
2251                count_key1_value1 += data_point_key1_value1.count;
2252
2253                min_zero_attributes =
2254                    min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
2255                min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());
2256
2257                max_zero_attributes =
2258                    max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
2259                max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());
2260
2261                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2262                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2263
2264                for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2265                    bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2266                }
2267
2268                for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2269                    bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2270                }
2271            } else {
2272                sum_zero_attributes = data_point_zero_attributes.sum;
2273                sum_key1_value1 = data_point_key1_value1.sum;
2274
2275                count_zero_attributes = data_point_zero_attributes.count;
2276                count_key1_value1 = data_point_key1_value1.count;
2277
2278                min_zero_attributes = data_point_zero_attributes.min.unwrap();
2279                min_key1_value1 = data_point_key1_value1.min.unwrap();
2280
2281                max_zero_attributes = data_point_zero_attributes.max.unwrap();
2282                max_key1_value1 = data_point_key1_value1.max.unwrap();
2283
2284                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2285                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2286
2287                bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2288                bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2289            };
2290        });
2291
2292        // Default buckets:
2293        // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
2294        // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
2295
2296        assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
2297        assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6)
2298        assert_eq!(min_zero_attributes, 1.5);
2299        assert_eq!(max_zero_attributes, 4.6);
2300
2301        for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2302            match i {
2303                1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0].
2304                _ => assert_eq!(*count, 0),
2305            }
2306        }
2307
2308        assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
2309        assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1).
2310        assert_eq!(min_key1_value1, 5.0);
2311        assert_eq!(max_key1_value1, 35.1);
2312
2313        for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2314            match i {
2315                1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0].
2316                2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0].
2317                3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0].
2318                4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0].
2319                _ => assert_eq!(*count, 0),
2320            }
2321        }
2322    }
2323
2324    fn histogram_aggregation_helper(temporality: Temporality) {
2325        // Arrange
2326        let mut test_context = TestContext::new(temporality);
2327        let histogram = test_context.meter().u64_histogram("my_histogram").build();
2328
2329        // Act
2330        let mut rand = rngs::SmallRng::from_os_rng();
2331        let values_kv1 = (0..50)
2332            .map(|_| rand.random_range(0..100))
2333            .collect::<Vec<u64>>();
2334        for value in values_kv1.iter() {
2335            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2336        }
2337
2338        let values_kv2 = (0..30)
2339            .map(|_| rand.random_range(0..100))
2340            .collect::<Vec<u64>>();
2341        for value in values_kv2.iter() {
2342            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2343        }
2344
2345        test_context.flush_metrics();
2346
2347        // Assert
2348        let MetricData::Histogram(histogram_data) =
2349            test_context.get_aggregation::<u64>("my_histogram", None)
2350        else {
2351            unreachable!()
2352        };
2353        // Expecting 2 time-series.
2354        assert_eq!(histogram_data.data_points.len(), 2);
2355        if let Temporality::Cumulative = temporality {
2356            assert_eq!(
2357                histogram_data.temporality,
2358                Temporality::Cumulative,
2359                "Should produce cumulative"
2360            );
2361        } else {
2362            assert_eq!(
2363                histogram_data.temporality,
2364                Temporality::Delta,
2365                "Should produce delta"
2366            );
2367        }
2368
2369        // find and validate key1=value2 datapoint
2370        let data_point1 =
2371            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2372                .expect("datapoint with key1=value1 expected");
2373        assert_eq!(data_point1.count, values_kv1.len() as u64);
2374        assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2375        assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2376        assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2377
2378        let data_point2 =
2379            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2380                .expect("datapoint with key1=value2 expected");
2381        assert_eq!(data_point2.count, values_kv2.len() as u64);
2382        assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
2383        assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
2384        assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
2385
2386        // Reset and report more measurements
2387        test_context.reset_metrics();
2388        for value in values_kv1.iter() {
2389            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2390        }
2391
2392        for value in values_kv2.iter() {
2393            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2394        }
2395
2396        test_context.flush_metrics();
2397
2398        let MetricData::Histogram(histogram_data) =
2399            test_context.get_aggregation::<u64>("my_histogram", None)
2400        else {
2401            unreachable!()
2402        };
2403        assert_eq!(histogram_data.data_points.len(), 2);
2404        let data_point1 =
2405            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2406                .expect("datapoint with key1=value1 expected");
2407        if temporality == Temporality::Cumulative {
2408            assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
2409            assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
2410            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2411            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2412        } else {
2413            assert_eq!(data_point1.count, values_kv1.len() as u64);
2414            assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2415            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2416            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2417        }
2418
2419        let data_point1 =
2420            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2421                .expect("datapoint with key1=value1 expected");
2422        if temporality == Temporality::Cumulative {
2423            assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
2424            assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
2425            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2426            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2427        } else {
2428            assert_eq!(data_point1.count, values_kv2.len() as u64);
2429            assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
2430            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2431            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2432        }
2433    }
2434
2435    fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
2436        let mut test_context = TestContext::new(temporality);
2437        let histogram = test_context
2438            .meter()
2439            .u64_histogram("test_histogram")
2440            .with_boundaries(vec![1.0, 2.5, 5.5])
2441            .build();
2442        histogram.record(1, &[KeyValue::new("key1", "value1")]);
2443        histogram.record(2, &[KeyValue::new("key1", "value1")]);
2444        histogram.record(3, &[KeyValue::new("key1", "value1")]);
2445        histogram.record(4, &[KeyValue::new("key1", "value1")]);
2446        histogram.record(5, &[KeyValue::new("key1", "value1")]);
2447
2448        test_context.flush_metrics();
2449
2450        // Assert
2451        let MetricData::Histogram(histogram_data) =
2452            test_context.get_aggregation::<u64>("test_histogram", None)
2453        else {
2454            unreachable!()
2455        };
2456        // Expecting 2 time-series.
2457        assert_eq!(histogram_data.data_points.len(), 1);
2458        if let Temporality::Cumulative = temporality {
2459            assert_eq!(
2460                histogram_data.temporality,
2461                Temporality::Cumulative,
2462                "Should produce cumulative"
2463            );
2464        } else {
2465            assert_eq!(
2466                histogram_data.temporality,
2467                Temporality::Delta,
2468                "Should produce delta"
2469            );
2470        }
2471
2472        // find and validate key1=value1 datapoint
2473        let data_point =
2474            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2475                .expect("datapoint with key1=value1 expected");
2476
2477        assert_eq!(data_point.count, 5);
2478        assert_eq!(data_point.sum, 15);
2479
2480        // Check the bucket counts
2481        // -∞ to 1.0: 1
2482        // 1.0 to 2.5: 1
2483        // 2.5 to 5.5: 3
2484        // 5.5 to +∞: 0
2485
2486        assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
2487        assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
2488    }
2489
2490    fn histogram_aggregation_with_empty_bounds_helper(temporality: Temporality) {
2491        let mut test_context = TestContext::new(temporality);
2492        let histogram = test_context
2493            .meter()
2494            .u64_histogram("test_histogram")
2495            .with_boundaries(vec![])
2496            .build();
2497        histogram.record(1, &[KeyValue::new("key1", "value1")]);
2498        histogram.record(2, &[KeyValue::new("key1", "value1")]);
2499        histogram.record(3, &[KeyValue::new("key1", "value1")]);
2500        histogram.record(4, &[KeyValue::new("key1", "value1")]);
2501        histogram.record(5, &[KeyValue::new("key1", "value1")]);
2502
2503        test_context.flush_metrics();
2504
2505        // Assert
2506        let MetricData::Histogram(histogram_data) =
2507            test_context.get_aggregation::<u64>("test_histogram", None)
2508        else {
2509            unreachable!()
2510        };
2511        // Expecting 1 time-series.
2512        assert_eq!(histogram_data.data_points.len(), 1);
2513        if let Temporality::Cumulative = temporality {
2514            assert_eq!(
2515                histogram_data.temporality,
2516                Temporality::Cumulative,
2517                "Should produce cumulative"
2518            );
2519        } else {
2520            assert_eq!(
2521                histogram_data.temporality,
2522                Temporality::Delta,
2523                "Should produce delta"
2524            );
2525        }
2526
2527        // find and validate key1=value1 datapoint
2528        let data_point =
2529            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2530                .expect("datapoint with key1=value1 expected");
2531
2532        assert_eq!(data_point.count, 5);
2533        assert_eq!(data_point.sum, 15);
2534        assert!(data_point.bounds.is_empty());
2535        assert!(data_point.bucket_counts.is_empty());
2536    }
2537
2538    fn gauge_aggregation_helper(temporality: Temporality) {
2539        // Arrange
2540        let mut test_context = TestContext::new(temporality);
2541        let gauge = test_context.meter().i64_gauge("my_gauge").build();
2542
2543        // Act
2544        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2545        gauge.record(2, &[KeyValue::new("key1", "value1")]);
2546        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2547        gauge.record(3, &[KeyValue::new("key1", "value1")]);
2548        gauge.record(4, &[KeyValue::new("key1", "value1")]);
2549
2550        gauge.record(11, &[KeyValue::new("key1", "value2")]);
2551        gauge.record(13, &[KeyValue::new("key1", "value2")]);
2552        gauge.record(6, &[KeyValue::new("key1", "value2")]);
2553
2554        test_context.flush_metrics();
2555
2556        // Assert
2557        let MetricData::Gauge(gauge_data_point) =
2558            test_context.get_aggregation::<i64>("my_gauge", None)
2559        else {
2560            unreachable!()
2561        };
2562        // Expecting 2 time-series.
2563        assert_eq!(gauge_data_point.data_points.len(), 2);
2564
2565        // find and validate key1=value2 datapoint
2566        let data_point1 =
2567            find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
2568                .expect("datapoint with key1=value1 expected");
2569        assert_eq!(data_point1.value, 4);
2570
2571        let data_point1 =
2572            find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
2573                .expect("datapoint with key1=value2 expected");
2574        assert_eq!(data_point1.value, 6);
2575
2576        // Reset and report more measurements
2577        test_context.reset_metrics();
2578        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2579        gauge.record(2, &[KeyValue::new("key1", "value1")]);
2580        gauge.record(11, &[KeyValue::new("key1", "value1")]);
2581        gauge.record(3, &[KeyValue::new("key1", "value1")]);
2582        gauge.record(41, &[KeyValue::new("key1", "value1")]);
2583
2584        gauge.record(34, &[KeyValue::new("key1", "value2")]);
2585        gauge.record(12, &[KeyValue::new("key1", "value2")]);
2586        gauge.record(54, &[KeyValue::new("key1", "value2")]);
2587
2588        test_context.flush_metrics();
2589
2590        let MetricData::Gauge(gauge) = test_context.get_aggregation::<i64>("my_gauge", None) else {
2591            unreachable!()
2592        };
2593        assert_eq!(gauge.data_points.len(), 2);
2594        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2595            .expect("datapoint with key1=value1 expected");
2596        assert_eq!(data_point1.value, 41);
2597
2598        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2")
2599            .expect("datapoint with key1=value2 expected");
2600        assert_eq!(data_point1.value, 54);
2601    }
2602
2603    fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) {
2604        // Arrange
2605        let mut test_context = TestContext::new(temporality);
2606        let _observable_gauge = test_context
2607            .meter()
2608            .i64_observable_gauge("test_observable_gauge")
2609            .with_callback(move |observer| {
2610                if use_empty_attributes {
2611                    observer.observe(1, &[]);
2612                }
2613                observer.observe(4, &[KeyValue::new("key1", "value1")]);
2614                observer.observe(5, &[KeyValue::new("key2", "value2")]);
2615            })
2616            .build();
2617
2618        test_context.flush_metrics();
2619
2620        // Assert
2621        let MetricData::Gauge(gauge) =
2622            test_context.get_aggregation::<i64>("test_observable_gauge", None)
2623        else {
2624            unreachable!()
2625        };
2626        // Expecting 2 time-series.
2627        let expected_time_series_count = if use_empty_attributes { 3 } else { 2 };
2628        assert_eq!(gauge.data_points.len(), expected_time_series_count);
2629
2630        if use_empty_attributes {
2631            // find and validate zero attribute datapoint
2632            let zero_attribute_datapoint =
2633                find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2634                    .expect("datapoint with no attributes expected");
2635            assert_eq!(zero_attribute_datapoint.value, 1);
2636        }
2637
2638        // find and validate key1=value1 datapoint
2639        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2640            .expect("datapoint with key1=value1 expected");
2641        assert_eq!(data_point1.value, 4);
2642
2643        // find and validate key2=value2 datapoint
2644        let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2645            .expect("datapoint with key2=value2 expected");
2646        assert_eq!(data_point2.value, 5);
2647
2648        // Reset and report more measurements
2649        test_context.reset_metrics();
2650
2651        test_context.flush_metrics();
2652
2653        let MetricData::Gauge(gauge) =
2654            test_context.get_aggregation::<i64>("test_observable_gauge", None)
2655        else {
2656            unreachable!()
2657        };
2658        assert_eq!(gauge.data_points.len(), expected_time_series_count);
2659
2660        if use_empty_attributes {
2661            let zero_attribute_datapoint =
2662                find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2663                    .expect("datapoint with no attributes expected");
2664            assert_eq!(zero_attribute_datapoint.value, 1);
2665        }
2666
2667        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2668            .expect("datapoint with key1=value1 expected");
2669        assert_eq!(data_point1.value, 4);
2670
2671        let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2672            .expect("datapoint with key2=value2 expected");
2673        assert_eq!(data_point2.value, 5);
2674    }
2675
2676    fn counter_aggregation_helper(temporality: Temporality) {
2677        // Arrange
2678        let mut test_context = TestContext::new(temporality);
2679        let counter = test_context.u64_counter("test", "my_counter", None);
2680
2681        // Act
2682        counter.add(1, &[KeyValue::new("key1", "value1")]);
2683        counter.add(1, &[KeyValue::new("key1", "value1")]);
2684        counter.add(1, &[KeyValue::new("key1", "value1")]);
2685        counter.add(1, &[KeyValue::new("key1", "value1")]);
2686        counter.add(1, &[KeyValue::new("key1", "value1")]);
2687
2688        counter.add(1, &[KeyValue::new("key1", "value2")]);
2689        counter.add(1, &[KeyValue::new("key1", "value2")]);
2690        counter.add(1, &[KeyValue::new("key1", "value2")]);
2691
2692        test_context.flush_metrics();
2693
2694        // Assert
2695        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2696            unreachable!()
2697        };
2698        // Expecting 2 time-series.
2699        assert_eq!(sum.data_points.len(), 2);
2700        assert!(sum.is_monotonic, "Counter should produce monotonic.");
2701        if let Temporality::Cumulative = temporality {
2702            assert_eq!(
2703                sum.temporality,
2704                Temporality::Cumulative,
2705                "Should produce cumulative"
2706            );
2707        } else {
2708            assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
2709        }
2710
2711        // find and validate key1=value2 datapoint
2712        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2713            .expect("datapoint with key1=value1 expected");
2714        assert_eq!(data_point1.value, 5);
2715
2716        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2717            .expect("datapoint with key1=value2 expected");
2718        assert_eq!(data_point1.value, 3);
2719
2720        // Reset and report more measurements
2721        test_context.reset_metrics();
2722        counter.add(1, &[KeyValue::new("key1", "value1")]);
2723        counter.add(1, &[KeyValue::new("key1", "value1")]);
2724        counter.add(1, &[KeyValue::new("key1", "value1")]);
2725        counter.add(1, &[KeyValue::new("key1", "value1")]);
2726        counter.add(1, &[KeyValue::new("key1", "value1")]);
2727
2728        counter.add(1, &[KeyValue::new("key1", "value2")]);
2729        counter.add(1, &[KeyValue::new("key1", "value2")]);
2730        counter.add(1, &[KeyValue::new("key1", "value2")]);
2731
2732        test_context.flush_metrics();
2733
2734        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2735            unreachable!()
2736        };
2737        assert_eq!(sum.data_points.len(), 2);
2738        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2739            .expect("datapoint with key1=value1 expected");
2740        if temporality == Temporality::Cumulative {
2741            assert_eq!(data_point1.value, 10);
2742        } else {
2743            assert_eq!(data_point1.value, 5);
2744        }
2745
2746        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2747            .expect("datapoint with key1=value2 expected");
2748        if temporality == Temporality::Cumulative {
2749            assert_eq!(data_point1.value, 6);
2750        } else {
2751            assert_eq!(data_point1.value, 3);
2752        }
2753    }
2754
2755    fn counter_aggregation_overflow_helper(temporality: Temporality) {
2756        // Arrange
2757        let mut test_context = TestContext::new(temporality);
2758        let counter = test_context.u64_counter("test", "my_counter", None);
2759
2760        // Act
2761        // Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit
2762        for v in 0..2000 {
2763            counter.add(100, &[KeyValue::new("A", v.to_string())]);
2764        }
2765
2766        // Empty attributes is specially treated and does not count towards the limit.
2767        counter.add(3, &[]);
2768        counter.add(3, &[]);
2769
2770        // All of the below will now go into overflow.
2771        counter.add(100, &[KeyValue::new("A", "foo")]);
2772        counter.add(100, &[KeyValue::new("A", "another")]);
2773        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2774        test_context.flush_metrics();
2775
2776        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2777            unreachable!()
2778        };
2779
2780        // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes)
2781        assert_eq!(sum.data_points.len(), 2002);
2782
2783        let data_point =
2784            find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2785        assert_eq!(data_point.value, 300);
2786
2787        // let empty_attrs_data_point = &sum.data_points[0];
2788        let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2789            .expect("Empty attributes point expected");
2790        assert!(
2791            empty_attrs_data_point.attributes.is_empty(),
2792            "Non-empty attribute set"
2793        );
2794        assert_eq!(
2795            empty_attrs_data_point.value, 6,
2796            "Empty attributes value should be 3+3=6"
2797        );
2798
2799        // Phase 2 - for delta temporality, after each collect, data points are cleared
2800        // but for cumulative, they are not cleared.
2801        test_context.reset_metrics();
2802        // The following should be aggregated normally for Delta,
2803        // and should go into overflow for Cumulative.
2804        counter.add(100, &[KeyValue::new("A", "foo")]);
2805        counter.add(100, &[KeyValue::new("A", "another")]);
2806        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2807        test_context.flush_metrics();
2808
2809        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2810            unreachable!()
2811        };
2812
2813        if temporality == Temporality::Delta {
2814            assert_eq!(sum.data_points.len(), 3);
2815
2816            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2817                .expect("point expected");
2818            assert_eq!(data_point.value, 100);
2819
2820            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2821                .expect("point expected");
2822            assert_eq!(data_point.value, 100);
2823
2824            let data_point =
2825                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2826                    .expect("point expected");
2827            assert_eq!(data_point.value, 100);
2828        } else {
2829            // For cumulative, overflow should still be there, and new points should not be added.
2830            assert_eq!(sum.data_points.len(), 2002);
2831            let data_point =
2832                find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2833            assert_eq!(data_point.value, 600);
2834
2835            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2836            assert!(data_point.is_none(), "point should not be present");
2837
2838            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2839            assert!(data_point.is_none(), "point should not be present");
2840
2841            let data_point =
2842                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2843            assert!(data_point.is_none(), "point should not be present");
2844        }
2845    }
2846
2847    fn counter_aggregation_overflow_helper_custom_limit(temporality: Temporality) {
2848        // Arrange
2849        let cardinality_limit = 2300;
2850        let view_change_cardinality = move |i: &Instrument| {
2851            if i.name == "my_counter" {
2852                Some(
2853                    Stream::builder()
2854                        .with_name("my_counter")
2855                        .with_cardinality_limit(cardinality_limit)
2856                        .build()
2857                        .unwrap(),
2858                )
2859            } else {
2860                None
2861            }
2862        };
2863        let mut test_context = TestContext::new_with_view(temporality, view_change_cardinality);
2864        let counter = test_context.u64_counter("test", "my_counter", None);
2865
2866        // Act
2867        // Record measurements with A:0, A:1,.......A:cardinality_limit, which just fits in the cardinality_limit
2868        for v in 0..cardinality_limit {
2869            counter.add(100, &[KeyValue::new("A", v.to_string())]);
2870        }
2871
2872        // Empty attributes is specially treated and does not count towards the limit.
2873        counter.add(3, &[]);
2874        counter.add(3, &[]);
2875
2876        // All of the below will now go into overflow.
2877        counter.add(100, &[KeyValue::new("A", "foo")]);
2878        counter.add(100, &[KeyValue::new("A", "another")]);
2879        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2880        test_context.flush_metrics();
2881
2882        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2883            unreachable!()
2884        };
2885
2886        // Expecting (cardinality_limit + 1 overflow + Empty attributes) data points.
2887        assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2888
2889        let data_point =
2890            find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2891        assert_eq!(data_point.value, 300);
2892
2893        // let empty_attrs_data_point = &sum.data_points[0];
2894        let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2895            .expect("Empty attributes point expected");
2896        assert!(
2897            empty_attrs_data_point.attributes.is_empty(),
2898            "Non-empty attribute set"
2899        );
2900        assert_eq!(
2901            empty_attrs_data_point.value, 6,
2902            "Empty attributes value should be 3+3=6"
2903        );
2904
2905        // Phase 2 - for delta temporality, after each collect, data points are cleared
2906        // but for cumulative, they are not cleared.
2907        test_context.reset_metrics();
2908        // The following should be aggregated normally for Delta,
2909        // and should go into overflow for Cumulative.
2910        counter.add(100, &[KeyValue::new("A", "foo")]);
2911        counter.add(100, &[KeyValue::new("A", "another")]);
2912        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2913        test_context.flush_metrics();
2914
2915        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2916            unreachable!()
2917        };
2918
2919        if temporality == Temporality::Delta {
2920            assert_eq!(sum.data_points.len(), 3);
2921
2922            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2923                .expect("point expected");
2924            assert_eq!(data_point.value, 100);
2925
2926            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2927                .expect("point expected");
2928            assert_eq!(data_point.value, 100);
2929
2930            let data_point =
2931                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2932                    .expect("point expected");
2933            assert_eq!(data_point.value, 100);
2934        } else {
2935            // For cumulative, overflow should still be there, and new points should not be added.
2936            assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2937            let data_point =
2938                find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2939            assert_eq!(data_point.value, 600);
2940
2941            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2942            assert!(data_point.is_none(), "point should not be present");
2943
2944            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2945            assert!(data_point.is_none(), "point should not be present");
2946
2947            let data_point =
2948                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2949            assert!(data_point.is_none(), "point should not be present");
2950        }
2951    }
2952
2953    fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
2954        // Arrange
2955        let mut test_context = TestContext::new(temporality);
2956        let counter = test_context.u64_counter("test", "my_counter", None);
2957
2958        // Act
2959        // Add the same set of attributes in different order. (they are expected
2960        // to be treated as same attributes)
2961        // start with sorted order
2962        if start_sorted {
2963            counter.add(
2964                1,
2965                &[
2966                    KeyValue::new("A", "a"),
2967                    KeyValue::new("B", "b"),
2968                    KeyValue::new("C", "c"),
2969                ],
2970            );
2971        } else {
2972            counter.add(
2973                1,
2974                &[
2975                    KeyValue::new("A", "a"),
2976                    KeyValue::new("C", "c"),
2977                    KeyValue::new("B", "b"),
2978                ],
2979            );
2980        }
2981
2982        counter.add(
2983            1,
2984            &[
2985                KeyValue::new("A", "a"),
2986                KeyValue::new("C", "c"),
2987                KeyValue::new("B", "b"),
2988            ],
2989        );
2990        counter.add(
2991            1,
2992            &[
2993                KeyValue::new("B", "b"),
2994                KeyValue::new("A", "a"),
2995                KeyValue::new("C", "c"),
2996            ],
2997        );
2998        counter.add(
2999            1,
3000            &[
3001                KeyValue::new("B", "b"),
3002                KeyValue::new("C", "c"),
3003                KeyValue::new("A", "a"),
3004            ],
3005        );
3006        counter.add(
3007            1,
3008            &[
3009                KeyValue::new("C", "c"),
3010                KeyValue::new("B", "b"),
3011                KeyValue::new("A", "a"),
3012            ],
3013        );
3014        counter.add(
3015            1,
3016            &[
3017                KeyValue::new("C", "c"),
3018                KeyValue::new("A", "a"),
3019                KeyValue::new("B", "b"),
3020            ],
3021        );
3022        test_context.flush_metrics();
3023
3024        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
3025            unreachable!()
3026        };
3027
3028        // Expecting 1 time-series.
3029        assert_eq!(sum.data_points.len(), 1);
3030
3031        // validate the sole datapoint
3032        let data_point1 = &sum.data_points[0];
3033        assert_eq!(data_point1.value, 6);
3034    }
3035
3036    fn updown_counter_aggregation_helper(temporality: Temporality) {
3037        // Arrange
3038        let mut test_context = TestContext::new(temporality);
3039        let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
3040
3041        // Act
3042        counter.add(10, &[KeyValue::new("key1", "value1")]);
3043        counter.add(-1, &[KeyValue::new("key1", "value1")]);
3044        counter.add(-5, &[KeyValue::new("key1", "value1")]);
3045        counter.add(0, &[KeyValue::new("key1", "value1")]);
3046        counter.add(1, &[KeyValue::new("key1", "value1")]);
3047
3048        counter.add(10, &[KeyValue::new("key1", "value2")]);
3049        counter.add(0, &[KeyValue::new("key1", "value2")]);
3050        counter.add(-3, &[KeyValue::new("key1", "value2")]);
3051
3052        test_context.flush_metrics();
3053
3054        // Assert
3055        let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3056        else {
3057            unreachable!()
3058        };
3059        // Expecting 2 time-series.
3060        assert_eq!(sum.data_points.len(), 2);
3061        assert!(
3062            !sum.is_monotonic,
3063            "UpDownCounter should produce non-monotonic."
3064        );
3065        assert_eq!(
3066            sum.temporality,
3067            Temporality::Cumulative,
3068            "Should produce Cumulative for UpDownCounter"
3069        );
3070
3071        // find and validate key1=value2 datapoint
3072        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3073            .expect("datapoint with key1=value1 expected");
3074        assert_eq!(data_point1.value, 5);
3075
3076        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3077            .expect("datapoint with key1=value2 expected");
3078        assert_eq!(data_point1.value, 7);
3079
3080        // Reset and report more measurements
3081        test_context.reset_metrics();
3082        counter.add(10, &[KeyValue::new("key1", "value1")]);
3083        counter.add(-1, &[KeyValue::new("key1", "value1")]);
3084        counter.add(-5, &[KeyValue::new("key1", "value1")]);
3085        counter.add(0, &[KeyValue::new("key1", "value1")]);
3086        counter.add(1, &[KeyValue::new("key1", "value1")]);
3087
3088        counter.add(10, &[KeyValue::new("key1", "value2")]);
3089        counter.add(0, &[KeyValue::new("key1", "value2")]);
3090        counter.add(-3, &[KeyValue::new("key1", "value2")]);
3091
3092        test_context.flush_metrics();
3093
3094        let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3095        else {
3096            unreachable!()
3097        };
3098        assert_eq!(sum.data_points.len(), 2);
3099        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3100            .expect("datapoint with key1=value1 expected");
3101        assert_eq!(data_point1.value, 10);
3102
3103        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3104            .expect("datapoint with key1=value2 expected");
3105        assert_eq!(data_point1.value, 14);
3106    }
3107
3108    fn find_sum_datapoint_with_key_value<'a, T>(
3109        data_points: &'a [SumDataPoint<T>],
3110        key: &str,
3111        value: &str,
3112    ) -> Option<&'a SumDataPoint<T>> {
3113        data_points.iter().find(|&datapoint| {
3114            datapoint
3115                .attributes
3116                .iter()
3117                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3118        })
3119    }
3120
3121    fn find_overflow_sum_datapoint<T>(data_points: &[SumDataPoint<T>]) -> Option<&SumDataPoint<T>> {
3122        data_points.iter().find(|&datapoint| {
3123            datapoint.attributes.iter().any(|kv| {
3124                kv.key.as_str() == "otel.metric.overflow" && kv.value == Value::Bool(true)
3125            })
3126        })
3127    }
3128
3129    fn find_gauge_datapoint_with_key_value<'a, T>(
3130        data_points: &'a [GaugeDataPoint<T>],
3131        key: &str,
3132        value: &str,
3133    ) -> Option<&'a GaugeDataPoint<T>> {
3134        data_points.iter().find(|&datapoint| {
3135            datapoint
3136                .attributes
3137                .iter()
3138                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3139        })
3140    }
3141
3142    fn find_sum_datapoint_with_no_attributes<T>(
3143        data_points: &[SumDataPoint<T>],
3144    ) -> Option<&SumDataPoint<T>> {
3145        data_points
3146            .iter()
3147            .find(|&datapoint| datapoint.attributes.is_empty())
3148    }
3149
3150    fn find_gauge_datapoint_with_no_attributes<T>(
3151        data_points: &[GaugeDataPoint<T>],
3152    ) -> Option<&GaugeDataPoint<T>> {
3153        data_points
3154            .iter()
3155            .find(|&datapoint| datapoint.attributes.is_empty())
3156    }
3157
3158    fn find_histogram_datapoint_with_key_value<'a, T>(
3159        data_points: &'a [HistogramDataPoint<T>],
3160        key: &str,
3161        value: &str,
3162    ) -> Option<&'a HistogramDataPoint<T>> {
3163        data_points.iter().find(|&datapoint| {
3164            datapoint
3165                .attributes
3166                .iter()
3167                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3168        })
3169    }
3170
3171    fn find_histogram_datapoint_with_no_attributes<T>(
3172        data_points: &[HistogramDataPoint<T>],
3173    ) -> Option<&HistogramDataPoint<T>> {
3174        data_points
3175            .iter()
3176            .find(|&datapoint| datapoint.attributes.is_empty())
3177    }
3178
3179    fn find_scope_metric<'a>(
3180        metrics: &'a [ScopeMetrics],
3181        name: &'a str,
3182    ) -> Option<&'a ScopeMetrics> {
3183        metrics
3184            .iter()
3185            .find(|&scope_metric| scope_metric.scope.name() == name)
3186    }
3187
3188    struct TestContext {
3189        exporter: InMemoryMetricExporter,
3190        meter_provider: SdkMeterProvider,
3191
3192        // Saving this on the test context for lifetime simplicity
3193        resource_metrics: Vec<ResourceMetrics>,
3194    }
3195
3196    impl TestContext {
3197        fn new(temporality: Temporality) -> Self {
3198            let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3199            let exporter = exporter.build();
3200            let meter_provider = SdkMeterProvider::builder()
3201                .with_periodic_exporter(exporter.clone())
3202                .build();
3203
3204            TestContext {
3205                exporter,
3206                meter_provider,
3207                resource_metrics: vec![],
3208            }
3209        }
3210
3211        fn new_with_view<T>(temporality: Temporality, view: T) -> Self
3212        where
3213            T: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
3214        {
3215            let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3216            let exporter = exporter.build();
3217            let meter_provider = SdkMeterProvider::builder()
3218                .with_periodic_exporter(exporter.clone())
3219                .with_view(view)
3220                .build();
3221
3222            TestContext {
3223                exporter,
3224                meter_provider,
3225                resource_metrics: vec![],
3226            }
3227        }
3228
3229        fn u64_counter(
3230            &self,
3231            meter_name: &'static str,
3232            counter_name: &'static str,
3233            unit: Option<&'static str>,
3234        ) -> Counter<u64> {
3235            let meter = self.meter_provider.meter(meter_name);
3236            let mut counter_builder = meter.u64_counter(counter_name);
3237            if let Some(unit_name) = unit {
3238                counter_builder = counter_builder.with_unit(unit_name);
3239            }
3240            counter_builder.build()
3241        }
3242
3243        fn i64_up_down_counter(
3244            &self,
3245            meter_name: &'static str,
3246            counter_name: &'static str,
3247            unit: Option<&'static str>,
3248        ) -> UpDownCounter<i64> {
3249            let meter = self.meter_provider.meter(meter_name);
3250            let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
3251            if let Some(unit_name) = unit {
3252                updown_counter_builder = updown_counter_builder.with_unit(unit_name);
3253            }
3254            updown_counter_builder.build()
3255        }
3256
3257        fn meter(&self) -> Meter {
3258            self.meter_provider.meter("test")
3259        }
3260
3261        fn flush_metrics(&self) {
3262            self.meter_provider.force_flush().unwrap();
3263        }
3264
3265        fn reset_metrics(&self) {
3266            self.exporter.reset();
3267        }
3268
3269        fn check_no_metrics(&self) {
3270            let resource_metrics = self
3271                .exporter
3272                .get_finished_metrics()
3273                .expect("metrics expected to be exported"); // TODO: Need to fix InMemoryMetricExporter to return None.
3274
3275            assert!(resource_metrics.is_empty(), "no metrics should be exported");
3276        }
3277
3278        fn get_aggregation<T: Number>(
3279            &mut self,
3280            counter_name: &str,
3281            unit_name: Option<&str>,
3282        ) -> &MetricData<T> {
3283            self.resource_metrics = self
3284                .exporter
3285                .get_finished_metrics()
3286                .expect("metrics expected to be exported");
3287
3288            assert!(
3289                !self.resource_metrics.is_empty(),
3290                "no metrics were exported"
3291            );
3292
3293            assert!(
3294                self.resource_metrics.len() == 1,
3295                "Expected single resource metrics."
3296            );
3297            let resource_metric = self
3298                .resource_metrics
3299                .first()
3300                .expect("This should contain exactly one resource metric, as validated above.");
3301
3302            assert!(
3303                !resource_metric.scope_metrics.is_empty(),
3304                "No scope metrics in latest export"
3305            );
3306            assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3307
3308            let metric = &resource_metric.scope_metrics[0].metrics[0];
3309            assert_eq!(metric.name, counter_name);
3310            if let Some(expected_unit) = unit_name {
3311                assert_eq!(metric.unit, expected_unit);
3312            }
3313
3314            T::extract_metrics_data_ref(&metric.data)
3315                .expect("Failed to cast aggregation to expected type")
3316        }
3317
3318        fn get_from_multiple_aggregations<T: Number>(
3319            &mut self,
3320            counter_name: &str,
3321            unit_name: Option<&str>,
3322            invocation_count: usize,
3323        ) -> Vec<&MetricData<T>> {
3324            self.resource_metrics = self
3325                .exporter
3326                .get_finished_metrics()
3327                .expect("metrics expected to be exported");
3328
3329            assert!(
3330                !self.resource_metrics.is_empty(),
3331                "no metrics were exported"
3332            );
3333
3334            assert_eq!(
3335                self.resource_metrics.len(),
3336                invocation_count,
3337                "Expected collect to be called {invocation_count} times"
3338            );
3339
3340            let result = self
3341                .resource_metrics
3342                .iter()
3343                .map(|resource_metric| {
3344                    assert!(
3345                        !resource_metric.scope_metrics.is_empty(),
3346                        "An export with no scope metrics occurred"
3347                    );
3348
3349                    assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3350
3351                    let metric = &resource_metric.scope_metrics[0].metrics[0];
3352                    assert_eq!(metric.name, counter_name);
3353
3354                    if let Some(expected_unit) = unit_name {
3355                        assert_eq!(metric.unit, expected_unit);
3356                    }
3357
3358                    let aggregation = T::extract_metrics_data_ref(&metric.data)
3359                        .expect("Failed to cast aggregation to expected type");
3360                    aggregation
3361                })
3362                .collect::<Vec<_>>();
3363
3364            result
3365        }
3366    }
3367}