opentelemetry_sdk/metrics/internal/
aggregate.rs

1use std::{
2    marker,
3    mem::replace,
4    ops::DerefMut,
5    sync::{Arc, Mutex},
6    time::SystemTime,
7};
8
9use crate::metrics::{data::AggregatedMetrics, Temporality};
10use opentelemetry::time::now;
11use opentelemetry::KeyValue;
12
13use super::{
14    exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
15    precomputed_sum::PrecomputedSum, sum::Sum, Number,
16};
17
18/// Receives measurements to be aggregated.
19pub(crate) trait Measure<T>: Send + Sync + 'static {
20    fn call(&self, measurement: T, attrs: &[KeyValue]);
21}
22
23/// Stores the aggregate of measurements into the aggregation and returns the number
24/// of aggregate data-points output.
25pub(crate) trait ComputeAggregation: Send + Sync + 'static {
26    /// Compute the new aggregation and store in `dest`.
27    ///
28    /// If no initial aggregation exists, `dest` will be `None`, in which case the
29    /// returned option is expected to contain a new aggregation with the data from
30    /// the current collection cycle.
31    fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>);
32}
33
34/// Separate `measure` and `collect` functions for an aggregate.
35pub(crate) struct AggregateFns<T> {
36    pub(crate) measure: Arc<dyn Measure<T>>,
37    pub(crate) collect: Arc<dyn ComputeAggregation>,
38}
39
40/// Creates aggregate functions out of aggregate instance
41impl<A, T> From<A> for AggregateFns<T>
42where
43    A: Measure<T> + ComputeAggregation,
44{
45    fn from(value: A) -> Self {
46        let inst = Arc::new(value);
47        Self {
48            measure: inst.clone(),
49            collect: inst,
50        }
51    }
52}
53
54pub(crate) struct AggregateTime {
55    pub start: SystemTime,
56    pub current: SystemTime,
57}
58
59/// Initialized [`AggregateTime`] for specific [`Temporality`]
60pub(crate) struct AggregateTimeInitiator(Mutex<SystemTime>);
61
62impl AggregateTimeInitiator {
63    pub(crate) fn delta(&self) -> AggregateTime {
64        let current_time = now();
65        let start_time = self
66            .0
67            .lock()
68            .map(|mut start| replace(start.deref_mut(), current_time))
69            .unwrap_or(current_time);
70        AggregateTime {
71            start: start_time,
72            current: current_time,
73        }
74    }
75
76    pub(crate) fn cumulative(&self) -> AggregateTime {
77        let current_time = now();
78        let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time);
79        AggregateTime {
80            start: start_time,
81            current: current_time,
82        }
83    }
84}
85
86impl Default for AggregateTimeInitiator {
87    fn default() -> Self {
88        Self(Mutex::new(now()))
89    }
90}
91
92type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
93
94/// Applies filter on provided attribute set
95/// No-op, if filter is not set
96#[derive(Clone)]
97pub(crate) struct AttributeSetFilter {
98    filter: Option<Filter>,
99}
100
101impl AttributeSetFilter {
102    pub(crate) fn new(filter: Option<Filter>) -> Self {
103        Self { filter }
104    }
105
106    pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
107        if let Some(filter) = &self.filter {
108            let filtered_attrs: Vec<KeyValue> =
109                attrs.iter().filter(|kv| filter(kv)).cloned().collect();
110            run(&filtered_attrs);
111        } else {
112            run(attrs);
113        };
114    }
115}
116
117/// Builds aggregate functions
118pub(crate) struct AggregateBuilder<T> {
119    /// The temporality used for the returned aggregate functions.
120    temporality: Temporality,
121
122    /// The attribute filter the aggregate function will use on the input of
123    /// measurements.
124    filter: AttributeSetFilter,
125
126    /// Cardinality limit for the metric stream
127    cardinality_limit: usize,
128
129    _marker: marker::PhantomData<T>,
130}
131
132impl<T: Number> AggregateBuilder<T> {
133    pub(crate) fn new(
134        temporality: Temporality,
135        filter: Option<Filter>,
136        cardinality_limit: usize,
137    ) -> Self {
138        AggregateBuilder {
139            temporality,
140            filter: AttributeSetFilter::new(filter),
141            cardinality_limit,
142            _marker: marker::PhantomData,
143        }
144    }
145
146    /// Builds a last-value aggregate function input and output.
147    pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
148        LastValue::new(
149            overwrite_temporality.unwrap_or(self.temporality),
150            self.filter.clone(),
151            self.cardinality_limit,
152        )
153        .into()
154    }
155
156    /// Builds a precomputed sum aggregate function input and output.
157    pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
158        PrecomputedSum::new(
159            self.temporality,
160            self.filter.clone(),
161            monotonic,
162            self.cardinality_limit,
163        )
164        .into()
165    }
166
167    /// Builds a sum aggregate function input and output.
168    pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
169        Sum::new(
170            self.temporality,
171            self.filter.clone(),
172            monotonic,
173            self.cardinality_limit,
174        )
175        .into()
176    }
177
178    /// Builds a histogram aggregate function input and output.
179    pub(crate) fn explicit_bucket_histogram(
180        &self,
181        boundaries: Vec<f64>,
182        record_min_max: bool,
183        record_sum: bool,
184    ) -> AggregateFns<T> {
185        Histogram::new(
186            self.temporality,
187            self.filter.clone(),
188            boundaries,
189            record_min_max,
190            record_sum,
191            self.cardinality_limit,
192        )
193        .into()
194    }
195
196    /// Builds an exponential histogram aggregate function input and output.
197    pub(crate) fn exponential_bucket_histogram(
198        &self,
199        max_size: u32,
200        max_scale: i8,
201        record_min_max: bool,
202        record_sum: bool,
203    ) -> AggregateFns<T> {
204        ExpoHistogram::new(
205            self.temporality,
206            self.filter.clone(),
207            max_size,
208            max_scale,
209            record_min_max,
210            record_sum,
211            self.cardinality_limit,
212        )
213        .into()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use crate::metrics::data::{
220        ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
221        GaugeDataPoint, Histogram, HistogramDataPoint, MetricData, Sum, SumDataPoint,
222    };
223    use std::vec;
224
225    use super::*;
226
227    const CARDINALITY_LIMIT_DEFAULT: usize = 2000;
228
229    #[test]
230    fn last_value_aggregation() {
231        let AggregateFns { measure, collect } =
232            AggregateBuilder::<u64>::new(Temporality::Cumulative, None, CARDINALITY_LIMIT_DEFAULT)
233                .last_value(None);
234        let mut a = MetricData::Gauge(Gauge {
235            data_points: vec![GaugeDataPoint {
236                attributes: vec![KeyValue::new("a", 1)],
237                value: 1u64,
238                exemplars: vec![],
239            }],
240            start_time: Some(now()),
241            time: now(),
242        })
243        .into();
244        let new_attributes = [KeyValue::new("b", 2)];
245        measure.call(2, &new_attributes[..]);
246
247        let (count, new_agg) = collect.call(Some(&mut a));
248        let AggregatedMetrics::U64(MetricData::Gauge(a)) = a else {
249            unreachable!()
250        };
251
252        assert_eq!(count, 1);
253        assert!(new_agg.is_none());
254        assert_eq!(a.data_points.len(), 1);
255        assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
256        assert_eq!(a.data_points[0].value, 2);
257    }
258
259    #[test]
260    fn precomputed_sum_aggregation() {
261        for temporality in [Temporality::Delta, Temporality::Cumulative] {
262            let AggregateFns { measure, collect } =
263                AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
264                    .precomputed_sum(true);
265            let mut a = MetricData::Sum(Sum {
266                data_points: vec![
267                    SumDataPoint {
268                        attributes: vec![KeyValue::new("a1", 1)],
269                        value: 1u64,
270                        exemplars: vec![],
271                    },
272                    SumDataPoint {
273                        attributes: vec![KeyValue::new("a2", 1)],
274                        value: 2u64,
275                        exemplars: vec![],
276                    },
277                ],
278                start_time: now(),
279                time: now(),
280                temporality: if temporality == Temporality::Delta {
281                    Temporality::Cumulative
282                } else {
283                    Temporality::Delta
284                },
285                is_monotonic: false,
286            })
287            .into();
288            let new_attributes = [KeyValue::new("b", 2)];
289            measure.call(3, &new_attributes[..]);
290
291            let (count, new_agg) = collect.call(Some(&mut a));
292            let AggregatedMetrics::U64(MetricData::Sum(a)) = a else {
293                unreachable!()
294            };
295
296            assert_eq!(count, 1);
297            assert!(new_agg.is_none());
298            assert_eq!(a.temporality, temporality);
299            assert!(a.is_monotonic);
300            assert_eq!(a.data_points.len(), 1);
301            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
302            assert_eq!(a.data_points[0].value, 3);
303        }
304    }
305
306    #[test]
307    fn sum_aggregation() {
308        for temporality in [Temporality::Delta, Temporality::Cumulative] {
309            let AggregateFns { measure, collect } =
310                AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
311                    .sum(true);
312            let mut a = MetricData::Sum(Sum {
313                data_points: vec![
314                    SumDataPoint {
315                        attributes: vec![KeyValue::new("a1", 1)],
316                        value: 1u64,
317                        exemplars: vec![],
318                    },
319                    SumDataPoint {
320                        attributes: vec![KeyValue::new("a2", 1)],
321                        value: 2u64,
322                        exemplars: vec![],
323                    },
324                ],
325                start_time: now(),
326                time: now(),
327                temporality: if temporality == Temporality::Delta {
328                    Temporality::Cumulative
329                } else {
330                    Temporality::Delta
331                },
332                is_monotonic: false,
333            })
334            .into();
335            let new_attributes = [KeyValue::new("b", 2)];
336            measure.call(3, &new_attributes[..]);
337
338            let (count, new_agg) = collect.call(Some(&mut a));
339            let AggregatedMetrics::U64(MetricData::Sum(a)) = a else {
340                unreachable!()
341            };
342
343            assert_eq!(count, 1);
344            assert!(new_agg.is_none());
345            assert_eq!(a.temporality, temporality);
346            assert!(a.is_monotonic);
347            assert_eq!(a.data_points.len(), 1);
348            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
349            assert_eq!(a.data_points[0].value, 3);
350        }
351    }
352
353    #[test]
354    fn explicit_bucket_histogram_aggregation() {
355        for temporality in [Temporality::Delta, Temporality::Cumulative] {
356            let AggregateFns { measure, collect } =
357                AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
358                    .explicit_bucket_histogram(vec![1.0], true, true);
359            let mut a = MetricData::Histogram(Histogram {
360                data_points: vec![HistogramDataPoint {
361                    attributes: vec![KeyValue::new("a1", 1)],
362                    count: 2,
363                    bounds: vec![1.0, 2.0],
364                    bucket_counts: vec![0, 1, 1],
365                    min: None,
366                    max: None,
367                    sum: 3u64,
368                    exemplars: vec![],
369                }],
370                start_time: now(),
371                time: now(),
372                temporality: if temporality == Temporality::Delta {
373                    Temporality::Cumulative
374                } else {
375                    Temporality::Delta
376                },
377            })
378            .into();
379            let new_attributes = [KeyValue::new("b", 2)];
380            measure.call(3, &new_attributes[..]);
381
382            let (count, new_agg) = collect.call(Some(&mut a));
383            let AggregatedMetrics::U64(MetricData::Histogram(a)) = a else {
384                unreachable!()
385            };
386
387            assert_eq!(count, 1);
388            assert!(new_agg.is_none());
389            assert_eq!(a.temporality, temporality);
390            assert_eq!(a.data_points.len(), 1);
391            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
392            assert_eq!(a.data_points[0].count, 1);
393            assert_eq!(a.data_points[0].bounds, vec![1.0]);
394            assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
395            assert_eq!(a.data_points[0].min, Some(3));
396            assert_eq!(a.data_points[0].max, Some(3));
397            assert_eq!(a.data_points[0].sum, 3);
398        }
399    }
400
401    #[test]
402    fn exponential_histogram_aggregation() {
403        for temporality in [Temporality::Delta, Temporality::Cumulative] {
404            let AggregateFns { measure, collect } =
405                AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
406                    .exponential_bucket_histogram(4, 20, true, true);
407            let mut a = MetricData::ExponentialHistogram(ExponentialHistogram {
408                data_points: vec![ExponentialHistogramDataPoint {
409                    attributes: vec![KeyValue::new("a1", 1)],
410                    count: 2,
411                    min: None,
412                    max: None,
413                    sum: 3u64,
414                    scale: 10,
415                    zero_count: 1,
416                    positive_bucket: ExponentialBucket {
417                        offset: 1,
418                        counts: vec![1],
419                    },
420                    negative_bucket: ExponentialBucket {
421                        offset: 1,
422                        counts: vec![1],
423                    },
424                    zero_threshold: 1.0,
425                    exemplars: vec![],
426                }],
427                start_time: now(),
428                time: now(),
429                temporality: if temporality == Temporality::Delta {
430                    Temporality::Cumulative
431                } else {
432                    Temporality::Delta
433                },
434            })
435            .into();
436            let new_attributes = [KeyValue::new("b", 2)];
437            measure.call(3, &new_attributes[..]);
438
439            let (count, new_agg) = collect.call(Some(&mut a));
440            let AggregatedMetrics::U64(MetricData::ExponentialHistogram(a)) = a else {
441                unreachable!()
442            };
443
444            assert_eq!(count, 1);
445            assert!(new_agg.is_none());
446            assert_eq!(a.temporality, temporality);
447            assert_eq!(a.data_points.len(), 1);
448            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
449            assert_eq!(a.data_points[0].count, 1);
450            assert_eq!(a.data_points[0].min, Some(3));
451            assert_eq!(a.data_points[0].max, Some(3));
452            assert_eq!(a.data_points[0].sum, 3);
453            assert_eq!(a.data_points[0].zero_count, 0);
454            assert_eq!(a.data_points[0].zero_threshold, 0.0);
455            assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
456            assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
457            assert_eq!(a.data_points[0].negative_bucket.offset, 0);
458            assert!(a.data_points[0].negative_bucket.counts.is_empty());
459        }
460    }
461}