opentelemetry_sdk/metrics/internal/
sum.rs

1use crate::metrics::data::{self, AggregatedMetrics, MetricData, SumDataPoint};
2use crate::metrics::Temporality;
3use opentelemetry::KeyValue;
4
5use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
6use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
7use super::{AtomicallyUpdate, ValueMap};
8
9struct Increment<T>
10where
11    T: AtomicallyUpdate<T>,
12{
13    value: T::AtomicTracker,
14}
15
16impl<T> Aggregator for Increment<T>
17where
18    T: Number,
19{
20    type InitConfig = ();
21    type PreComputedValue = T;
22
23    fn create(_init: &()) -> Self {
24        Self {
25            value: T::new_atomic_tracker(T::default()),
26        }
27    }
28
29    fn update(&self, value: T) {
30        self.value.add(value)
31    }
32
33    fn clone_and_reset(&self, _: &()) -> Self {
34        Self {
35            value: T::new_atomic_tracker(self.value.get_and_reset_value()),
36        }
37    }
38}
39
40/// Summarizes a set of measurements made as their arithmetic sum.
41pub(crate) struct Sum<T: Number> {
42    value_map: ValueMap<Increment<T>>,
43    init_time: AggregateTimeInitiator,
44    temporality: Temporality,
45    filter: AttributeSetFilter,
46    monotonic: bool,
47}
48
49impl<T: Number> Sum<T> {
50    /// Returns an aggregator that summarizes a set of measurements as their
51    /// arithmetic sum.
52    ///
53    /// Each sum is scoped by attributes and the aggregation cycle the measurements
54    /// were made in.
55    pub(crate) fn new(
56        temporality: Temporality,
57        filter: AttributeSetFilter,
58        monotonic: bool,
59        cardinality_limit: usize,
60    ) -> Self {
61        Sum {
62            value_map: ValueMap::new((), cardinality_limit),
63            init_time: AggregateTimeInitiator::default(),
64            temporality,
65            filter,
66            monotonic,
67        }
68    }
69
70    pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
71        let time = self.init_time.delta();
72        let s_data = dest.and_then(|d| {
73            if let MetricData::Sum(sum) = d {
74                Some(sum)
75            } else {
76                None
77            }
78        });
79        let mut new_agg = if s_data.is_none() {
80            Some(data::Sum {
81                data_points: vec![],
82                start_time: time.start,
83                time: time.current,
84                temporality: Temporality::Delta,
85                is_monotonic: self.monotonic,
86            })
87        } else {
88            None
89        };
90        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
91        s_data.start_time = time.start;
92        s_data.time = time.current;
93        s_data.temporality = Temporality::Delta;
94        s_data.is_monotonic = self.monotonic;
95
96        self.value_map
97            .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
98                attributes,
99                value: aggr.value.get_value(),
100                exemplars: vec![],
101            });
102
103        (s_data.data_points.len(), new_agg.map(Into::into))
104    }
105
106    pub(crate) fn cumulative(
107        &self,
108        dest: Option<&mut MetricData<T>>,
109    ) -> (usize, Option<MetricData<T>>) {
110        let time = self.init_time.cumulative();
111        let s_data = dest.and_then(|d| {
112            if let MetricData::Sum(sum) = d {
113                Some(sum)
114            } else {
115                None
116            }
117        });
118        let mut new_agg = if s_data.is_none() {
119            Some(data::Sum {
120                data_points: vec![],
121                start_time: time.start,
122                time: time.current,
123                temporality: Temporality::Cumulative,
124                is_monotonic: self.monotonic,
125            })
126        } else {
127            None
128        };
129        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
130
131        s_data.start_time = time.start;
132        s_data.time = time.current;
133        s_data.temporality = Temporality::Cumulative;
134        s_data.is_monotonic = self.monotonic;
135
136        self.value_map
137            .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
138                attributes,
139                value: aggr.value.get_value(),
140                exemplars: vec![],
141            });
142
143        (s_data.data_points.len(), new_agg.map(Into::into))
144    }
145}
146
147impl<T> Measure<T> for Sum<T>
148where
149    T: Number,
150{
151    fn call(&self, measurement: T, attrs: &[KeyValue]) {
152        self.filter.apply(attrs, |filtered| {
153            self.value_map.measure(measurement, filtered);
154        })
155    }
156}
157
158impl<T> ComputeAggregation for Sum<T>
159where
160    T: Number,
161{
162    fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
163        let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
164        let (len, new) = match self.temporality {
165            Temporality::Delta => self.delta(data),
166            _ => self.cumulative(data),
167        };
168        (len, new.map(T::make_aggregated_metrics))
169    }
170}