opentelemetry_sdk/metrics/internal/
histogram.rs

1use std::mem::replace;
2use std::ops::DerefMut;
3use std::sync::Mutex;
4
5use crate::metrics::data::{self, MetricData};
6use crate::metrics::data::{AggregatedMetrics, HistogramDataPoint};
7use crate::metrics::Temporality;
8use opentelemetry::KeyValue;
9
10use super::aggregate::AggregateTimeInitiator;
11use super::aggregate::AttributeSetFilter;
12use super::ComputeAggregation;
13use super::Measure;
14use super::ValueMap;
15use super::{Aggregator, Number};
16
17impl<T> Aggregator for Mutex<Buckets<T>>
18where
19    T: Number,
20{
21    type InitConfig = usize;
22    /// Value and bucket index
23    type PreComputedValue = (T, usize);
24
25    fn update(&self, (value, index): (T, usize)) {
26        let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());
27
28        buckets.total += value;
29        buckets.count += 1;
30        if !buckets.counts.is_empty() {
31            buckets.counts[index] += 1;
32        }
33
34        if value < buckets.min {
35            buckets.min = value;
36        }
37        if value > buckets.max {
38            buckets.max = value
39        }
40    }
41
42    fn create(count: &usize) -> Self {
43        Mutex::new(Buckets::<T>::new(*count))
44    }
45
46    fn clone_and_reset(&self, count: &usize) -> Self {
47        let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
48        Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
49    }
50}
51
52#[derive(Default)]
53struct Buckets<T> {
54    counts: Vec<u64>,
55    count: u64,
56    total: T,
57    min: T,
58    max: T,
59}
60
61impl<T: Number> Buckets<T> {
62    /// returns buckets with `n` bins.
63    fn new(n: usize) -> Buckets<T> {
64        Buckets {
65            counts: vec![0; n],
66            min: T::max(),
67            max: T::min(),
68            ..Default::default()
69        }
70    }
71}
72
73/// Summarizes a set of measurements as a histogram with explicitly defined
74/// buckets.
75pub(crate) struct Histogram<T: Number> {
76    value_map: ValueMap<Mutex<Buckets<T>>>,
77    init_time: AggregateTimeInitiator,
78    temporality: Temporality,
79    filter: AttributeSetFilter,
80    bounds: Vec<f64>,
81    record_min_max: bool,
82    record_sum: bool,
83}
84
85impl<T: Number> Histogram<T> {
86    #[allow(unused_mut)]
87    pub(crate) fn new(
88        temporality: Temporality,
89        filter: AttributeSetFilter,
90        mut bounds: Vec<f64>,
91        record_min_max: bool,
92        record_sum: bool,
93        cardinality_limit: usize,
94    ) -> Self {
95        #[cfg(feature = "spec_unstable_metrics_views")]
96        {
97            // TODO: When views are used, validate this upfront
98            bounds.retain(|v| !v.is_nan());
99            bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
100        }
101
102        let buckets_count = if bounds.is_empty() {
103            0
104        } else {
105            bounds.len() + 1
106        };
107
108        Histogram {
109            value_map: ValueMap::new(buckets_count, cardinality_limit),
110            init_time: AggregateTimeInitiator::default(),
111            temporality,
112            filter,
113            bounds,
114            record_min_max,
115            record_sum,
116        }
117    }
118
119    fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
120        let time = self.init_time.delta();
121
122        let h = dest.and_then(|d| {
123            if let MetricData::Histogram(hist) = d {
124                Some(hist)
125            } else {
126                None
127            }
128        });
129        let mut new_agg = if h.is_none() {
130            Some(data::Histogram {
131                data_points: vec![],
132                start_time: time.start,
133                time: time.current,
134                temporality: Temporality::Delta,
135            })
136        } else {
137            None
138        };
139        let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
140        h.temporality = Temporality::Delta;
141        h.start_time = time.start;
142        h.time = time.current;
143
144        self.value_map
145            .collect_and_reset(&mut h.data_points, |attributes, aggr| {
146                let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
147                HistogramDataPoint {
148                    attributes,
149                    count: b.count,
150                    bounds: self.bounds.clone(),
151                    bucket_counts: b.counts,
152                    sum: if self.record_sum {
153                        b.total
154                    } else {
155                        T::default()
156                    },
157                    min: if self.record_min_max {
158                        Some(b.min)
159                    } else {
160                        None
161                    },
162                    max: if self.record_min_max {
163                        Some(b.max)
164                    } else {
165                        None
166                    },
167                    exemplars: vec![],
168                }
169            });
170
171        (h.data_points.len(), new_agg.map(Into::into))
172    }
173
174    fn cumulative(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
175        let time = self.init_time.cumulative();
176        let h = dest.and_then(|d| {
177            if let MetricData::Histogram(hist) = d {
178                Some(hist)
179            } else {
180                None
181            }
182        });
183        let mut new_agg = if h.is_none() {
184            Some(data::Histogram {
185                data_points: vec![],
186                start_time: time.start,
187                time: time.current,
188                temporality: Temporality::Cumulative,
189            })
190        } else {
191            None
192        };
193        let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
194        h.temporality = Temporality::Cumulative;
195        h.start_time = time.start;
196        h.time = time.current;
197
198        self.value_map
199            .collect_readonly(&mut h.data_points, |attributes, aggr| {
200                let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
201                HistogramDataPoint {
202                    attributes,
203                    count: b.count,
204                    bounds: self.bounds.clone(),
205                    bucket_counts: b.counts.clone(),
206                    sum: if self.record_sum {
207                        b.total
208                    } else {
209                        T::default()
210                    },
211                    min: if self.record_min_max {
212                        Some(b.min)
213                    } else {
214                        None
215                    },
216                    max: if self.record_min_max {
217                        Some(b.max)
218                    } else {
219                        None
220                    },
221                    exemplars: vec![],
222                }
223            });
224
225        (h.data_points.len(), new_agg.map(Into::into))
226    }
227}
228
229impl<T> Measure<T> for Histogram<T>
230where
231    T: Number,
232{
233    fn call(&self, measurement: T, attrs: &[KeyValue]) {
234        let f = measurement.into_float();
235        // This search will return an index in the range `[0, bounds.len()]`, where
236        // it will return `bounds.len()` if value is greater than the last element
237        // of `bounds`. This aligns with the buckets in that the length of buckets
238        // is `bounds.len()+1`, with the last bucket representing:
239        // `(bounds[bounds.len()-1], +∞)`.
240        let index = self.bounds.partition_point(|&x| x < f);
241
242        self.filter.apply(attrs, |filtered| {
243            self.value_map.measure((measurement, index), filtered);
244        })
245    }
246}
247
248impl<T> ComputeAggregation for Histogram<T>
249where
250    T: Number,
251{
252    fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
253        let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
254        let (len, new) = match self.temporality {
255            Temporality::Delta => self.delta(data),
256            _ => self.cumulative(data),
257        };
258        (len, new.map(T::make_aggregated_metrics))
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn check_buckets_are_selected_correctly() {
268        let hist = Histogram::<i64>::new(
269            Temporality::Cumulative,
270            AttributeSetFilter::new(None),
271            vec![1.0, 3.0, 6.0],
272            false,
273            false,
274            2000,
275        );
276        for v in 1..11 {
277            Measure::call(&hist, v, &[]);
278        }
279        let (count, dp) = ComputeAggregation::call(&hist, None);
280        let dp = dp.unwrap();
281        let AggregatedMetrics::I64(MetricData::Histogram(dp)) = dp else {
282            unreachable!()
283        };
284        assert_eq!(count, 1);
285        assert_eq!(dp.data_points[0].count, 10);
286        assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
287        assert_eq!(dp.data_points[0].bucket_counts[0], 1); // 1
288        assert_eq!(dp.data_points[0].bucket_counts[1], 2); // 2, 3
289        assert_eq!(dp.data_points[0].bucket_counts[2], 3); // 4, 5, 6
290        assert_eq!(dp.data_points[0].bucket_counts[3], 4); // 7, 8, 9, 10
291    }
292}