opentelemetry_sdk/metrics/internal/
sum.rs1use crate::metrics::data::{self, AggregatedMetrics, MetricData, SumDataPoint};
2use crate::metrics::Temporality;
3use opentelemetry::KeyValue;
4
5use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
6use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
7use super::{AtomicallyUpdate, ValueMap};
8
9struct Increment<T>
10where
11 T: AtomicallyUpdate<T>,
12{
13 value: T::AtomicTracker,
14}
15
16impl<T> Aggregator for Increment<T>
17where
18 T: Number,
19{
20 type InitConfig = ();
21 type PreComputedValue = T;
22
23 fn create(_init: &()) -> Self {
24 Self {
25 value: T::new_atomic_tracker(T::default()),
26 }
27 }
28
29 fn update(&self, value: T) {
30 self.value.add(value)
31 }
32
33 fn clone_and_reset(&self, _: &()) -> Self {
34 Self {
35 value: T::new_atomic_tracker(self.value.get_and_reset_value()),
36 }
37 }
38}
39
40pub(crate) struct Sum<T: Number> {
42 value_map: ValueMap<Increment<T>>,
43 init_time: AggregateTimeInitiator,
44 temporality: Temporality,
45 filter: AttributeSetFilter,
46 monotonic: bool,
47}
48
49impl<T: Number> Sum<T> {
50 pub(crate) fn new(
56 temporality: Temporality,
57 filter: AttributeSetFilter,
58 monotonic: bool,
59 cardinality_limit: usize,
60 ) -> Self {
61 Sum {
62 value_map: ValueMap::new((), cardinality_limit),
63 init_time: AggregateTimeInitiator::default(),
64 temporality,
65 filter,
66 monotonic,
67 }
68 }
69
70 pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
71 let time = self.init_time.delta();
72 let s_data = dest.and_then(|d| {
73 if let MetricData::Sum(sum) = d {
74 Some(sum)
75 } else {
76 None
77 }
78 });
79 let mut new_agg = if s_data.is_none() {
80 Some(data::Sum {
81 data_points: vec![],
82 start_time: time.start,
83 time: time.current,
84 temporality: Temporality::Delta,
85 is_monotonic: self.monotonic,
86 })
87 } else {
88 None
89 };
90 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
91 s_data.start_time = time.start;
92 s_data.time = time.current;
93 s_data.temporality = Temporality::Delta;
94 s_data.is_monotonic = self.monotonic;
95
96 self.value_map
97 .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
98 attributes,
99 value: aggr.value.get_value(),
100 exemplars: vec![],
101 });
102
103 (s_data.data_points.len(), new_agg.map(Into::into))
104 }
105
106 pub(crate) fn cumulative(
107 &self,
108 dest: Option<&mut MetricData<T>>,
109 ) -> (usize, Option<MetricData<T>>) {
110 let time = self.init_time.cumulative();
111 let s_data = dest.and_then(|d| {
112 if let MetricData::Sum(sum) = d {
113 Some(sum)
114 } else {
115 None
116 }
117 });
118 let mut new_agg = if s_data.is_none() {
119 Some(data::Sum {
120 data_points: vec![],
121 start_time: time.start,
122 time: time.current,
123 temporality: Temporality::Cumulative,
124 is_monotonic: self.monotonic,
125 })
126 } else {
127 None
128 };
129 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
130
131 s_data.start_time = time.start;
132 s_data.time = time.current;
133 s_data.temporality = Temporality::Cumulative;
134 s_data.is_monotonic = self.monotonic;
135
136 self.value_map
137 .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
138 attributes,
139 value: aggr.value.get_value(),
140 exemplars: vec![],
141 });
142
143 (s_data.data_points.len(), new_agg.map(Into::into))
144 }
145}
146
147impl<T> Measure<T> for Sum<T>
148where
149 T: Number,
150{
151 fn call(&self, measurement: T, attrs: &[KeyValue]) {
152 self.filter.apply(attrs, |filtered| {
153 self.value_map.measure(measurement, filtered);
154 })
155 }
156}
157
158impl<T> ComputeAggregation for Sum<T>
159where
160 T: Number,
161{
162 fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
163 let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
164 let (len, new) = match self.temporality {
165 Temporality::Delta => self.delta(data),
166 _ => self.cumulative(data),
167 };
168 (len, new.map(T::make_aggregated_metrics))
169 }
170}