opentelemetry_sdk/metrics/internal/
precomputed_sum.rs

1use opentelemetry::KeyValue;
2
3use crate::metrics::data::{self, AggregatedMetrics, MetricData, SumDataPoint};
4use crate::metrics::Temporality;
5
6use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
7use super::{last_value::Assign, AtomicTracker, Number, ValueMap};
8use super::{ComputeAggregation, Measure};
9use std::{collections::HashMap, sync::Mutex};
10
11/// Summarizes a set of pre-computed sums as their arithmetic sum.
12pub(crate) struct PrecomputedSum<T: Number> {
13    value_map: ValueMap<Assign<T>>,
14    init_time: AggregateTimeInitiator,
15    temporality: Temporality,
16    filter: AttributeSetFilter,
17    monotonic: bool,
18    reported: Mutex<HashMap<Vec<KeyValue>, T>>,
19}
20
21impl<T: Number> PrecomputedSum<T> {
22    pub(crate) fn new(
23        temporality: Temporality,
24        filter: AttributeSetFilter,
25        monotonic: bool,
26        cardinality_limit: usize,
27    ) -> Self {
28        PrecomputedSum {
29            value_map: ValueMap::new((), cardinality_limit),
30            init_time: AggregateTimeInitiator::default(),
31            temporality,
32            filter,
33            monotonic,
34            reported: Mutex::new(Default::default()),
35        }
36    }
37
38    pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
39        let time = self.init_time.delta();
40
41        let s_data = dest.and_then(|d| {
42            if let MetricData::Sum(sum) = d {
43                Some(sum)
44            } else {
45                None
46            }
47        });
48        let mut new_agg = if s_data.is_none() {
49            Some(data::Sum {
50                data_points: vec![],
51                start_time: time.start,
52                time: time.current,
53                temporality: Temporality::Delta,
54                is_monotonic: self.monotonic,
55            })
56        } else {
57            None
58        };
59        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
60        s_data.start_time = time.start;
61        s_data.time = time.current;
62        s_data.temporality = Temporality::Delta;
63        s_data.is_monotonic = self.monotonic;
64
65        let mut reported = match self.reported.lock() {
66            Ok(r) => r,
67            Err(_) => return (0, None),
68        };
69        let mut new_reported = HashMap::with_capacity(reported.len());
70
71        self.value_map
72            .collect_and_reset(&mut s_data.data_points, |attributes, aggr| {
73                let value = aggr.value.get_value();
74                new_reported.insert(attributes.clone(), value);
75                let delta = value - *reported.get(&attributes).unwrap_or(&T::default());
76                SumDataPoint {
77                    attributes,
78                    value: delta,
79                    exemplars: vec![],
80                }
81            });
82
83        *reported = new_reported;
84        drop(reported); // drop before values guard is dropped
85
86        (s_data.data_points.len(), new_agg.map(Into::into))
87    }
88
89    pub(crate) fn cumulative(
90        &self,
91        dest: Option<&mut MetricData<T>>,
92    ) -> (usize, Option<MetricData<T>>) {
93        let time = self.init_time.cumulative();
94
95        let s_data = dest.and_then(|d| {
96            if let MetricData::Sum(sum) = d {
97                Some(sum)
98            } else {
99                None
100            }
101        });
102        let mut new_agg = if s_data.is_none() {
103            Some(data::Sum {
104                data_points: vec![],
105                start_time: time.start,
106                time: time.current,
107                temporality: Temporality::Cumulative,
108                is_monotonic: self.monotonic,
109            })
110        } else {
111            None
112        };
113        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
114        s_data.start_time = time.start;
115        s_data.time = time.current;
116        s_data.temporality = Temporality::Cumulative;
117        s_data.is_monotonic = self.monotonic;
118
119        self.value_map
120            .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
121                attributes,
122                value: aggr.value.get_value(),
123                exemplars: vec![],
124            });
125
126        (s_data.data_points.len(), new_agg.map(Into::into))
127    }
128}
129
130impl<T> Measure<T> for PrecomputedSum<T>
131where
132    T: Number,
133{
134    fn call(&self, measurement: T, attrs: &[KeyValue]) {
135        self.filter.apply(attrs, |filtered| {
136            self.value_map.measure(measurement, filtered);
137        })
138    }
139}
140
141impl<T> ComputeAggregation for PrecomputedSum<T>
142where
143    T: Number,
144{
145    fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
146        let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
147        let (len, new) = match self.temporality {
148            Temporality::Delta => self.delta(data),
149            _ => self.cumulative(data),
150        };
151        (len, new.map(T::make_aggregated_metrics))
152    }
153}