opentelemetry_sdk/metrics/internal/
histogram.rs1use std::mem::replace;
2use std::ops::DerefMut;
3use std::sync::Mutex;
4
5use crate::metrics::data::{self, MetricData};
6use crate::metrics::data::{AggregatedMetrics, HistogramDataPoint};
7use crate::metrics::Temporality;
8use opentelemetry::KeyValue;
9
10use super::aggregate::AggregateTimeInitiator;
11use super::aggregate::AttributeSetFilter;
12use super::ComputeAggregation;
13use super::Measure;
14use super::ValueMap;
15use super::{Aggregator, Number};
16
17impl<T> Aggregator for Mutex<Buckets<T>>
18where
19 T: Number,
20{
21 type InitConfig = usize;
22 type PreComputedValue = (T, usize);
24
25 fn update(&self, (value, index): (T, usize)) {
26 let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());
27
28 buckets.total += value;
29 buckets.count += 1;
30 if !buckets.counts.is_empty() {
31 buckets.counts[index] += 1;
32 }
33
34 if value < buckets.min {
35 buckets.min = value;
36 }
37 if value > buckets.max {
38 buckets.max = value
39 }
40 }
41
42 fn create(count: &usize) -> Self {
43 Mutex::new(Buckets::<T>::new(*count))
44 }
45
46 fn clone_and_reset(&self, count: &usize) -> Self {
47 let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
48 Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
49 }
50}
51
52#[derive(Default)]
53struct Buckets<T> {
54 counts: Vec<u64>,
55 count: u64,
56 total: T,
57 min: T,
58 max: T,
59}
60
61impl<T: Number> Buckets<T> {
62 fn new(n: usize) -> Buckets<T> {
64 Buckets {
65 counts: vec![0; n],
66 min: T::max(),
67 max: T::min(),
68 ..Default::default()
69 }
70 }
71}
72
73pub(crate) struct Histogram<T: Number> {
76 value_map: ValueMap<Mutex<Buckets<T>>>,
77 init_time: AggregateTimeInitiator,
78 temporality: Temporality,
79 filter: AttributeSetFilter,
80 bounds: Vec<f64>,
81 record_min_max: bool,
82 record_sum: bool,
83}
84
85impl<T: Number> Histogram<T> {
86 #[allow(unused_mut)]
87 pub(crate) fn new(
88 temporality: Temporality,
89 filter: AttributeSetFilter,
90 mut bounds: Vec<f64>,
91 record_min_max: bool,
92 record_sum: bool,
93 cardinality_limit: usize,
94 ) -> Self {
95 #[cfg(feature = "spec_unstable_metrics_views")]
96 {
97 bounds.retain(|v| !v.is_nan());
99 bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
100 }
101
102 let buckets_count = if bounds.is_empty() {
103 0
104 } else {
105 bounds.len() + 1
106 };
107
108 Histogram {
109 value_map: ValueMap::new(buckets_count, cardinality_limit),
110 init_time: AggregateTimeInitiator::default(),
111 temporality,
112 filter,
113 bounds,
114 record_min_max,
115 record_sum,
116 }
117 }
118
119 fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
120 let time = self.init_time.delta();
121
122 let h = dest.and_then(|d| {
123 if let MetricData::Histogram(hist) = d {
124 Some(hist)
125 } else {
126 None
127 }
128 });
129 let mut new_agg = if h.is_none() {
130 Some(data::Histogram {
131 data_points: vec![],
132 start_time: time.start,
133 time: time.current,
134 temporality: Temporality::Delta,
135 })
136 } else {
137 None
138 };
139 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
140 h.temporality = Temporality::Delta;
141 h.start_time = time.start;
142 h.time = time.current;
143
144 self.value_map
145 .collect_and_reset(&mut h.data_points, |attributes, aggr| {
146 let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
147 HistogramDataPoint {
148 attributes,
149 count: b.count,
150 bounds: self.bounds.clone(),
151 bucket_counts: b.counts,
152 sum: if self.record_sum {
153 b.total
154 } else {
155 T::default()
156 },
157 min: if self.record_min_max {
158 Some(b.min)
159 } else {
160 None
161 },
162 max: if self.record_min_max {
163 Some(b.max)
164 } else {
165 None
166 },
167 exemplars: vec![],
168 }
169 });
170
171 (h.data_points.len(), new_agg.map(Into::into))
172 }
173
174 fn cumulative(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
175 let time = self.init_time.cumulative();
176 let h = dest.and_then(|d| {
177 if let MetricData::Histogram(hist) = d {
178 Some(hist)
179 } else {
180 None
181 }
182 });
183 let mut new_agg = if h.is_none() {
184 Some(data::Histogram {
185 data_points: vec![],
186 start_time: time.start,
187 time: time.current,
188 temporality: Temporality::Cumulative,
189 })
190 } else {
191 None
192 };
193 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
194 h.temporality = Temporality::Cumulative;
195 h.start_time = time.start;
196 h.time = time.current;
197
198 self.value_map
199 .collect_readonly(&mut h.data_points, |attributes, aggr| {
200 let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
201 HistogramDataPoint {
202 attributes,
203 count: b.count,
204 bounds: self.bounds.clone(),
205 bucket_counts: b.counts.clone(),
206 sum: if self.record_sum {
207 b.total
208 } else {
209 T::default()
210 },
211 min: if self.record_min_max {
212 Some(b.min)
213 } else {
214 None
215 },
216 max: if self.record_min_max {
217 Some(b.max)
218 } else {
219 None
220 },
221 exemplars: vec![],
222 }
223 });
224
225 (h.data_points.len(), new_agg.map(Into::into))
226 }
227}
228
229impl<T> Measure<T> for Histogram<T>
230where
231 T: Number,
232{
233 fn call(&self, measurement: T, attrs: &[KeyValue]) {
234 let f = measurement.into_float();
235 let index = self.bounds.partition_point(|&x| x < f);
241
242 self.filter.apply(attrs, |filtered| {
243 self.value_map.measure((measurement, index), filtered);
244 })
245 }
246}
247
248impl<T> ComputeAggregation for Histogram<T>
249where
250 T: Number,
251{
252 fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
253 let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
254 let (len, new) = match self.temporality {
255 Temporality::Delta => self.delta(data),
256 _ => self.cumulative(data),
257 };
258 (len, new.map(T::make_aggregated_metrics))
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn check_buckets_are_selected_correctly() {
268 let hist = Histogram::<i64>::new(
269 Temporality::Cumulative,
270 AttributeSetFilter::new(None),
271 vec![1.0, 3.0, 6.0],
272 false,
273 false,
274 2000,
275 );
276 for v in 1..11 {
277 Measure::call(&hist, v, &[]);
278 }
279 let (count, dp) = ComputeAggregation::call(&hist, None);
280 let dp = dp.unwrap();
281 let AggregatedMetrics::I64(MetricData::Histogram(dp)) = dp else {
282 unreachable!()
283 };
284 assert_eq!(count, 1);
285 assert_eq!(dp.data_points[0].count, 10);
286 assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
287 assert_eq!(dp.data_points[0].bucket_counts[0], 1); assert_eq!(dp.data_points[0].bucket_counts[1], 2); assert_eq!(dp.data_points[0].bucket_counts[2], 3); assert_eq!(dp.data_points[0].bucket_counts[3], 4); }
292}