opentelemetry_sdk/metrics/internal/
last_value.rs1use crate::metrics::{
2 data::{self, AggregatedMetrics, GaugeDataPoint, MetricData},
3 Temporality,
4};
5use opentelemetry::KeyValue;
6
7use super::{
8 aggregate::{AggregateTimeInitiator, AttributeSetFilter},
9 Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap,
10};
11
12pub(crate) struct Assign<T>
14where
15 T: AtomicallyUpdate<T>,
16{
17 pub(crate) value: T::AtomicTracker,
18}
19
20impl<T> Aggregator for Assign<T>
21where
22 T: Number,
23{
24 type InitConfig = ();
25 type PreComputedValue = T;
26
27 fn create(_init: &()) -> Self {
28 Self {
29 value: T::new_atomic_tracker(T::default()),
30 }
31 }
32
33 fn update(&self, value: T) {
34 self.value.store(value)
35 }
36
37 fn clone_and_reset(&self, _: &()) -> Self {
38 Self {
39 value: T::new_atomic_tracker(self.value.get_and_reset_value()),
40 }
41 }
42}
43
44pub(crate) struct LastValue<T: Number> {
46 value_map: ValueMap<Assign<T>>,
47 init_time: AggregateTimeInitiator,
48 temporality: Temporality,
49 filter: AttributeSetFilter,
50}
51
52impl<T: Number> LastValue<T> {
53 pub(crate) fn new(
54 temporality: Temporality,
55 filter: AttributeSetFilter,
56 cardinality_limit: usize,
57 ) -> Self {
58 LastValue {
59 value_map: ValueMap::new((), cardinality_limit),
60 init_time: AggregateTimeInitiator::default(),
61 temporality,
62 filter,
63 }
64 }
65
66 pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
67 let time = self.init_time.delta();
68
69 let s_data = dest.and_then(|d| {
70 if let MetricData::Gauge(gauge) = d {
71 Some(gauge)
72 } else {
73 None
74 }
75 });
76 let mut new_agg = if s_data.is_none() {
77 Some(data::Gauge {
78 data_points: vec![],
79 start_time: Some(time.start),
80 time: time.current,
81 })
82 } else {
83 None
84 };
85 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
86 s_data.start_time = Some(time.start);
87 s_data.time = time.current;
88
89 self.value_map
90 .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
91 attributes,
92 value: aggr.value.get_value(),
93 exemplars: vec![],
94 });
95
96 (s_data.data_points.len(), new_agg.map(Into::into))
97 }
98
99 pub(crate) fn cumulative(
100 &self,
101 dest: Option<&mut MetricData<T>>,
102 ) -> (usize, Option<MetricData<T>>) {
103 let time = self.init_time.cumulative();
104 let s_data = dest.and_then(|d| {
105 if let MetricData::Gauge(gauge) = d {
106 Some(gauge)
107 } else {
108 None
109 }
110 });
111 let mut new_agg = if s_data.is_none() {
112 Some(data::Gauge {
113 data_points: vec![],
114 start_time: Some(time.start),
115 time: time.current,
116 })
117 } else {
118 None
119 };
120 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
121
122 s_data.start_time = Some(time.start);
123 s_data.time = time.current;
124
125 self.value_map
126 .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
127 attributes,
128 value: aggr.value.get_value(),
129 exemplars: vec![],
130 });
131
132 (s_data.data_points.len(), new_agg.map(Into::into))
133 }
134}
135
136impl<T> Measure<T> for LastValue<T>
137where
138 T: Number,
139{
140 fn call(&self, measurement: T, attrs: &[KeyValue]) {
141 self.filter.apply(attrs, |filtered| {
142 self.value_map.measure(measurement, filtered);
143 })
144 }
145}
146
147impl<T> ComputeAggregation for LastValue<T>
148where
149 T: Number,
150{
151 fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
152 let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
153 let (len, new) = match self.temporality {
154 Temporality::Delta => self.delta(data),
155 _ => self.cumulative(data),
156 };
157 (len, new.map(T::make_aggregated_metrics))
158 }
159}