1use std::{
2 marker,
3 mem::replace,
4 ops::DerefMut,
5 sync::{Arc, Mutex},
6 time::SystemTime,
7};
8
9use crate::metrics::{data::AggregatedMetrics, Temporality};
10use opentelemetry::time::now;
11use opentelemetry::KeyValue;
12
13use super::{
14 exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
15 precomputed_sum::PrecomputedSum, sum::Sum, Number,
16};
17
18pub(crate) trait Measure<T>: Send + Sync + 'static {
20 fn call(&self, measurement: T, attrs: &[KeyValue]);
21}
22
23pub(crate) trait ComputeAggregation: Send + Sync + 'static {
26 fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>);
32}
33
34pub(crate) struct AggregateFns<T> {
36 pub(crate) measure: Arc<dyn Measure<T>>,
37 pub(crate) collect: Arc<dyn ComputeAggregation>,
38}
39
40impl<A, T> From<A> for AggregateFns<T>
42where
43 A: Measure<T> + ComputeAggregation,
44{
45 fn from(value: A) -> Self {
46 let inst = Arc::new(value);
47 Self {
48 measure: inst.clone(),
49 collect: inst,
50 }
51 }
52}
53
54pub(crate) struct AggregateTime {
55 pub start: SystemTime,
56 pub current: SystemTime,
57}
58
59pub(crate) struct AggregateTimeInitiator(Mutex<SystemTime>);
61
62impl AggregateTimeInitiator {
63 pub(crate) fn delta(&self) -> AggregateTime {
64 let current_time = now();
65 let start_time = self
66 .0
67 .lock()
68 .map(|mut start| replace(start.deref_mut(), current_time))
69 .unwrap_or(current_time);
70 AggregateTime {
71 start: start_time,
72 current: current_time,
73 }
74 }
75
76 pub(crate) fn cumulative(&self) -> AggregateTime {
77 let current_time = now();
78 let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time);
79 AggregateTime {
80 start: start_time,
81 current: current_time,
82 }
83 }
84}
85
86impl Default for AggregateTimeInitiator {
87 fn default() -> Self {
88 Self(Mutex::new(now()))
89 }
90}
91
92type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
93
94#[derive(Clone)]
97pub(crate) struct AttributeSetFilter {
98 filter: Option<Filter>,
99}
100
101impl AttributeSetFilter {
102 pub(crate) fn new(filter: Option<Filter>) -> Self {
103 Self { filter }
104 }
105
106 pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
107 if let Some(filter) = &self.filter {
108 let filtered_attrs: Vec<KeyValue> =
109 attrs.iter().filter(|kv| filter(kv)).cloned().collect();
110 run(&filtered_attrs);
111 } else {
112 run(attrs);
113 };
114 }
115}
116
117pub(crate) struct AggregateBuilder<T> {
119 temporality: Temporality,
121
122 filter: AttributeSetFilter,
125
126 cardinality_limit: usize,
128
129 _marker: marker::PhantomData<T>,
130}
131
132impl<T: Number> AggregateBuilder<T> {
133 pub(crate) fn new(
134 temporality: Temporality,
135 filter: Option<Filter>,
136 cardinality_limit: usize,
137 ) -> Self {
138 AggregateBuilder {
139 temporality,
140 filter: AttributeSetFilter::new(filter),
141 cardinality_limit,
142 _marker: marker::PhantomData,
143 }
144 }
145
146 pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
148 LastValue::new(
149 overwrite_temporality.unwrap_or(self.temporality),
150 self.filter.clone(),
151 self.cardinality_limit,
152 )
153 .into()
154 }
155
156 pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
158 PrecomputedSum::new(
159 self.temporality,
160 self.filter.clone(),
161 monotonic,
162 self.cardinality_limit,
163 )
164 .into()
165 }
166
167 pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
169 Sum::new(
170 self.temporality,
171 self.filter.clone(),
172 monotonic,
173 self.cardinality_limit,
174 )
175 .into()
176 }
177
178 pub(crate) fn explicit_bucket_histogram(
180 &self,
181 boundaries: Vec<f64>,
182 record_min_max: bool,
183 record_sum: bool,
184 ) -> AggregateFns<T> {
185 Histogram::new(
186 self.temporality,
187 self.filter.clone(),
188 boundaries,
189 record_min_max,
190 record_sum,
191 self.cardinality_limit,
192 )
193 .into()
194 }
195
196 pub(crate) fn exponential_bucket_histogram(
198 &self,
199 max_size: u32,
200 max_scale: i8,
201 record_min_max: bool,
202 record_sum: bool,
203 ) -> AggregateFns<T> {
204 ExpoHistogram::new(
205 self.temporality,
206 self.filter.clone(),
207 max_size,
208 max_scale,
209 record_min_max,
210 record_sum,
211 self.cardinality_limit,
212 )
213 .into()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use crate::metrics::data::{
220 ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
221 GaugeDataPoint, Histogram, HistogramDataPoint, MetricData, Sum, SumDataPoint,
222 };
223 use std::vec;
224
225 use super::*;
226
227 const CARDINALITY_LIMIT_DEFAULT: usize = 2000;
228
229 #[test]
230 fn last_value_aggregation() {
231 let AggregateFns { measure, collect } =
232 AggregateBuilder::<u64>::new(Temporality::Cumulative, None, CARDINALITY_LIMIT_DEFAULT)
233 .last_value(None);
234 let mut a = MetricData::Gauge(Gauge {
235 data_points: vec![GaugeDataPoint {
236 attributes: vec![KeyValue::new("a", 1)],
237 value: 1u64,
238 exemplars: vec![],
239 }],
240 start_time: Some(now()),
241 time: now(),
242 })
243 .into();
244 let new_attributes = [KeyValue::new("b", 2)];
245 measure.call(2, &new_attributes[..]);
246
247 let (count, new_agg) = collect.call(Some(&mut a));
248 let AggregatedMetrics::U64(MetricData::Gauge(a)) = a else {
249 unreachable!()
250 };
251
252 assert_eq!(count, 1);
253 assert!(new_agg.is_none());
254 assert_eq!(a.data_points.len(), 1);
255 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
256 assert_eq!(a.data_points[0].value, 2);
257 }
258
259 #[test]
260 fn precomputed_sum_aggregation() {
261 for temporality in [Temporality::Delta, Temporality::Cumulative] {
262 let AggregateFns { measure, collect } =
263 AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
264 .precomputed_sum(true);
265 let mut a = MetricData::Sum(Sum {
266 data_points: vec![
267 SumDataPoint {
268 attributes: vec![KeyValue::new("a1", 1)],
269 value: 1u64,
270 exemplars: vec![],
271 },
272 SumDataPoint {
273 attributes: vec![KeyValue::new("a2", 1)],
274 value: 2u64,
275 exemplars: vec![],
276 },
277 ],
278 start_time: now(),
279 time: now(),
280 temporality: if temporality == Temporality::Delta {
281 Temporality::Cumulative
282 } else {
283 Temporality::Delta
284 },
285 is_monotonic: false,
286 })
287 .into();
288 let new_attributes = [KeyValue::new("b", 2)];
289 measure.call(3, &new_attributes[..]);
290
291 let (count, new_agg) = collect.call(Some(&mut a));
292 let AggregatedMetrics::U64(MetricData::Sum(a)) = a else {
293 unreachable!()
294 };
295
296 assert_eq!(count, 1);
297 assert!(new_agg.is_none());
298 assert_eq!(a.temporality, temporality);
299 assert!(a.is_monotonic);
300 assert_eq!(a.data_points.len(), 1);
301 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
302 assert_eq!(a.data_points[0].value, 3);
303 }
304 }
305
306 #[test]
307 fn sum_aggregation() {
308 for temporality in [Temporality::Delta, Temporality::Cumulative] {
309 let AggregateFns { measure, collect } =
310 AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
311 .sum(true);
312 let mut a = MetricData::Sum(Sum {
313 data_points: vec![
314 SumDataPoint {
315 attributes: vec![KeyValue::new("a1", 1)],
316 value: 1u64,
317 exemplars: vec![],
318 },
319 SumDataPoint {
320 attributes: vec![KeyValue::new("a2", 1)],
321 value: 2u64,
322 exemplars: vec![],
323 },
324 ],
325 start_time: now(),
326 time: now(),
327 temporality: if temporality == Temporality::Delta {
328 Temporality::Cumulative
329 } else {
330 Temporality::Delta
331 },
332 is_monotonic: false,
333 })
334 .into();
335 let new_attributes = [KeyValue::new("b", 2)];
336 measure.call(3, &new_attributes[..]);
337
338 let (count, new_agg) = collect.call(Some(&mut a));
339 let AggregatedMetrics::U64(MetricData::Sum(a)) = a else {
340 unreachable!()
341 };
342
343 assert_eq!(count, 1);
344 assert!(new_agg.is_none());
345 assert_eq!(a.temporality, temporality);
346 assert!(a.is_monotonic);
347 assert_eq!(a.data_points.len(), 1);
348 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
349 assert_eq!(a.data_points[0].value, 3);
350 }
351 }
352
353 #[test]
354 fn explicit_bucket_histogram_aggregation() {
355 for temporality in [Temporality::Delta, Temporality::Cumulative] {
356 let AggregateFns { measure, collect } =
357 AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
358 .explicit_bucket_histogram(vec![1.0], true, true);
359 let mut a = MetricData::Histogram(Histogram {
360 data_points: vec![HistogramDataPoint {
361 attributes: vec![KeyValue::new("a1", 1)],
362 count: 2,
363 bounds: vec![1.0, 2.0],
364 bucket_counts: vec![0, 1, 1],
365 min: None,
366 max: None,
367 sum: 3u64,
368 exemplars: vec![],
369 }],
370 start_time: now(),
371 time: now(),
372 temporality: if temporality == Temporality::Delta {
373 Temporality::Cumulative
374 } else {
375 Temporality::Delta
376 },
377 })
378 .into();
379 let new_attributes = [KeyValue::new("b", 2)];
380 measure.call(3, &new_attributes[..]);
381
382 let (count, new_agg) = collect.call(Some(&mut a));
383 let AggregatedMetrics::U64(MetricData::Histogram(a)) = a else {
384 unreachable!()
385 };
386
387 assert_eq!(count, 1);
388 assert!(new_agg.is_none());
389 assert_eq!(a.temporality, temporality);
390 assert_eq!(a.data_points.len(), 1);
391 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
392 assert_eq!(a.data_points[0].count, 1);
393 assert_eq!(a.data_points[0].bounds, vec![1.0]);
394 assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
395 assert_eq!(a.data_points[0].min, Some(3));
396 assert_eq!(a.data_points[0].max, Some(3));
397 assert_eq!(a.data_points[0].sum, 3);
398 }
399 }
400
401 #[test]
402 fn exponential_histogram_aggregation() {
403 for temporality in [Temporality::Delta, Temporality::Cumulative] {
404 let AggregateFns { measure, collect } =
405 AggregateBuilder::<u64>::new(temporality, None, CARDINALITY_LIMIT_DEFAULT)
406 .exponential_bucket_histogram(4, 20, true, true);
407 let mut a = MetricData::ExponentialHistogram(ExponentialHistogram {
408 data_points: vec![ExponentialHistogramDataPoint {
409 attributes: vec![KeyValue::new("a1", 1)],
410 count: 2,
411 min: None,
412 max: None,
413 sum: 3u64,
414 scale: 10,
415 zero_count: 1,
416 positive_bucket: ExponentialBucket {
417 offset: 1,
418 counts: vec![1],
419 },
420 negative_bucket: ExponentialBucket {
421 offset: 1,
422 counts: vec![1],
423 },
424 zero_threshold: 1.0,
425 exemplars: vec![],
426 }],
427 start_time: now(),
428 time: now(),
429 temporality: if temporality == Temporality::Delta {
430 Temporality::Cumulative
431 } else {
432 Temporality::Delta
433 },
434 })
435 .into();
436 let new_attributes = [KeyValue::new("b", 2)];
437 measure.call(3, &new_attributes[..]);
438
439 let (count, new_agg) = collect.call(Some(&mut a));
440 let AggregatedMetrics::U64(MetricData::ExponentialHistogram(a)) = a else {
441 unreachable!()
442 };
443
444 assert_eq!(count, 1);
445 assert!(new_agg.is_none());
446 assert_eq!(a.temporality, temporality);
447 assert_eq!(a.data_points.len(), 1);
448 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
449 assert_eq!(a.data_points[0].count, 1);
450 assert_eq!(a.data_points[0].min, Some(3));
451 assert_eq!(a.data_points[0].max, Some(3));
452 assert_eq!(a.data_points[0].sum, 3);
453 assert_eq!(a.data_points[0].zero_count, 0);
454 assert_eq!(a.data_points[0].zero_threshold, 0.0);
455 assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
456 assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
457 assert_eq!(a.data_points[0].negative_bucket.offset, 0);
458 assert!(a.data_points[0].negative_bucket.counts.is_empty());
459 }
460 }
461}