opentelemetry_sdk/metrics/internal/
last_value.rs

1use crate::metrics::{
2    data::{self, AggregatedMetrics, GaugeDataPoint, MetricData},
3    Temporality,
4};
5use opentelemetry::KeyValue;
6
7use super::{
8    aggregate::{AggregateTimeInitiator, AttributeSetFilter},
9    Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap,
10};
11
12/// this is reused by PrecomputedSum
13pub(crate) struct Assign<T>
14where
15    T: AtomicallyUpdate<T>,
16{
17    pub(crate) value: T::AtomicTracker,
18}
19
20impl<T> Aggregator for Assign<T>
21where
22    T: Number,
23{
24    type InitConfig = ();
25    type PreComputedValue = T;
26
27    fn create(_init: &()) -> Self {
28        Self {
29            value: T::new_atomic_tracker(T::default()),
30        }
31    }
32
33    fn update(&self, value: T) {
34        self.value.store(value)
35    }
36
37    fn clone_and_reset(&self, _: &()) -> Self {
38        Self {
39            value: T::new_atomic_tracker(self.value.get_and_reset_value()),
40        }
41    }
42}
43
44/// Summarizes a set of measurements as the last one made.
45pub(crate) struct LastValue<T: Number> {
46    value_map: ValueMap<Assign<T>>,
47    init_time: AggregateTimeInitiator,
48    temporality: Temporality,
49    filter: AttributeSetFilter,
50}
51
52impl<T: Number> LastValue<T> {
53    pub(crate) fn new(
54        temporality: Temporality,
55        filter: AttributeSetFilter,
56        cardinality_limit: usize,
57    ) -> Self {
58        LastValue {
59            value_map: ValueMap::new((), cardinality_limit),
60            init_time: AggregateTimeInitiator::default(),
61            temporality,
62            filter,
63        }
64    }
65
66    pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
67        let time = self.init_time.delta();
68
69        let s_data = dest.and_then(|d| {
70            if let MetricData::Gauge(gauge) = d {
71                Some(gauge)
72            } else {
73                None
74            }
75        });
76        let mut new_agg = if s_data.is_none() {
77            Some(data::Gauge {
78                data_points: vec![],
79                start_time: Some(time.start),
80                time: time.current,
81            })
82        } else {
83            None
84        };
85        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
86        s_data.start_time = Some(time.start);
87        s_data.time = time.current;
88
89        self.value_map
90            .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
91                attributes,
92                value: aggr.value.get_value(),
93                exemplars: vec![],
94            });
95
96        (s_data.data_points.len(), new_agg.map(Into::into))
97    }
98
99    pub(crate) fn cumulative(
100        &self,
101        dest: Option<&mut MetricData<T>>,
102    ) -> (usize, Option<MetricData<T>>) {
103        let time = self.init_time.cumulative();
104        let s_data = dest.and_then(|d| {
105            if let MetricData::Gauge(gauge) = d {
106                Some(gauge)
107            } else {
108                None
109            }
110        });
111        let mut new_agg = if s_data.is_none() {
112            Some(data::Gauge {
113                data_points: vec![],
114                start_time: Some(time.start),
115                time: time.current,
116            })
117        } else {
118            None
119        };
120        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
121
122        s_data.start_time = Some(time.start);
123        s_data.time = time.current;
124
125        self.value_map
126            .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
127                attributes,
128                value: aggr.value.get_value(),
129                exemplars: vec![],
130            });
131
132        (s_data.data_points.len(), new_agg.map(Into::into))
133    }
134}
135
136impl<T> Measure<T> for LastValue<T>
137where
138    T: Number,
139{
140    fn call(&self, measurement: T, attrs: &[KeyValue]) {
141        self.filter.apply(attrs, |filtered| {
142            self.value_map.measure(measurement, filtered);
143        })
144    }
145}
146
147impl<T> ComputeAggregation for LastValue<T>
148where
149    T: Number,
150{
151    fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
152        let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
153        let (len, new) = match self.temporality {
154            Temporality::Delta => self.delta(data),
155            _ => self.cumulative(data),
156        };
157        (len, new.map(T::make_aggregated_metrics))
158    }
159}