opentelemetry_sdk/metrics/internal/
precomputed_sum.rs1use opentelemetry::KeyValue;
2
3use crate::metrics::data::{self, AggregatedMetrics, MetricData, SumDataPoint};
4use crate::metrics::Temporality;
5
6use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
7use super::{last_value::Assign, AtomicTracker, Number, ValueMap};
8use super::{ComputeAggregation, Measure};
9use std::{collections::HashMap, sync::Mutex};
10
11pub(crate) struct PrecomputedSum<T: Number> {
13 value_map: ValueMap<Assign<T>>,
14 init_time: AggregateTimeInitiator,
15 temporality: Temporality,
16 filter: AttributeSetFilter,
17 monotonic: bool,
18 reported: Mutex<HashMap<Vec<KeyValue>, T>>,
19}
20
21impl<T: Number> PrecomputedSum<T> {
22 pub(crate) fn new(
23 temporality: Temporality,
24 filter: AttributeSetFilter,
25 monotonic: bool,
26 cardinality_limit: usize,
27 ) -> Self {
28 PrecomputedSum {
29 value_map: ValueMap::new((), cardinality_limit),
30 init_time: AggregateTimeInitiator::default(),
31 temporality,
32 filter,
33 monotonic,
34 reported: Mutex::new(Default::default()),
35 }
36 }
37
38 pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
39 let time = self.init_time.delta();
40
41 let s_data = dest.and_then(|d| {
42 if let MetricData::Sum(sum) = d {
43 Some(sum)
44 } else {
45 None
46 }
47 });
48 let mut new_agg = if s_data.is_none() {
49 Some(data::Sum {
50 data_points: vec![],
51 start_time: time.start,
52 time: time.current,
53 temporality: Temporality::Delta,
54 is_monotonic: self.monotonic,
55 })
56 } else {
57 None
58 };
59 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
60 s_data.start_time = time.start;
61 s_data.time = time.current;
62 s_data.temporality = Temporality::Delta;
63 s_data.is_monotonic = self.monotonic;
64
65 let mut reported = match self.reported.lock() {
66 Ok(r) => r,
67 Err(_) => return (0, None),
68 };
69 let mut new_reported = HashMap::with_capacity(reported.len());
70
71 self.value_map
72 .collect_and_reset(&mut s_data.data_points, |attributes, aggr| {
73 let value = aggr.value.get_value();
74 new_reported.insert(attributes.clone(), value);
75 let delta = value - *reported.get(&attributes).unwrap_or(&T::default());
76 SumDataPoint {
77 attributes,
78 value: delta,
79 exemplars: vec![],
80 }
81 });
82
83 *reported = new_reported;
84 drop(reported); (s_data.data_points.len(), new_agg.map(Into::into))
87 }
88
89 pub(crate) fn cumulative(
90 &self,
91 dest: Option<&mut MetricData<T>>,
92 ) -> (usize, Option<MetricData<T>>) {
93 let time = self.init_time.cumulative();
94
95 let s_data = dest.and_then(|d| {
96 if let MetricData::Sum(sum) = d {
97 Some(sum)
98 } else {
99 None
100 }
101 });
102 let mut new_agg = if s_data.is_none() {
103 Some(data::Sum {
104 data_points: vec![],
105 start_time: time.start,
106 time: time.current,
107 temporality: Temporality::Cumulative,
108 is_monotonic: self.monotonic,
109 })
110 } else {
111 None
112 };
113 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
114 s_data.start_time = time.start;
115 s_data.time = time.current;
116 s_data.temporality = Temporality::Cumulative;
117 s_data.is_monotonic = self.monotonic;
118
119 self.value_map
120 .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
121 attributes,
122 value: aggr.value.get_value(),
123 exemplars: vec![],
124 });
125
126 (s_data.data_points.len(), new_agg.map(Into::into))
127 }
128}
129
130impl<T> Measure<T> for PrecomputedSum<T>
131where
132 T: Number,
133{
134 fn call(&self, measurement: T, attrs: &[KeyValue]) {
135 self.filter.apply(attrs, |filtered| {
136 self.value_map.measure(measurement, filtered);
137 })
138 }
139}
140
141impl<T> ComputeAggregation for PrecomputedSum<T>
142where
143 T: Number,
144{
145 fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
146 let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
147 let (len, new) = match self.temporality {
148 Temporality::Delta => self.delta(data),
149 _ => self.cumulative(data),
150 };
151 (len, new.map(T::make_aggregated_metrics))
152 }
153}