1use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
2
3use opentelemetry::{otel_debug, KeyValue};
4use std::sync::OnceLock;
5
6use crate::metrics::{
7 data::{self, AggregatedMetrics, MetricData},
8 Temporality,
9};
10
11use super::{
12 aggregate::{AggregateTimeInitiator, AttributeSetFilter},
13 Aggregator, ComputeAggregation, Measure, Number, ValueMap,
14};
15
16pub(crate) const EXPO_MAX_SCALE: i8 = 20;
17pub(crate) const EXPO_MIN_SCALE: i8 = -10;
18
19#[derive(Debug, PartialEq)]
21struct ExpoHistogramDataPoint<T> {
22 max_size: i32,
23 count: usize,
24 min: T,
25 max: T,
26 sum: T,
27 scale: i8,
28 pos_buckets: ExpoBuckets,
29 neg_buckets: ExpoBuckets,
30 zero_count: u64,
31}
32
33impl<T: Number> ExpoHistogramDataPoint<T> {
34 fn new(config: &BucketConfig) -> Self {
35 ExpoHistogramDataPoint {
36 max_size: config.max_size,
37 count: 0,
38 min: T::max(),
39 max: T::min(),
40 sum: T::default(),
41 scale: config.max_scale,
42 pos_buckets: ExpoBuckets::default(),
43 neg_buckets: ExpoBuckets::default(),
44 zero_count: 0,
45 }
46 }
47}
48
49impl<T: Number> ExpoHistogramDataPoint<T> {
50 fn record(&mut self, v: T) {
54 self.count += 1;
55
56 if v < self.min {
57 self.min = v;
58 }
59 if v > self.max {
60 self.max = v;
61 }
62 self.sum += v;
63
64 let abs_v = v.into_float().abs();
65
66 if abs_v == 0.0 {
67 self.zero_count += 1;
68 return;
69 }
70
71 let mut bin = self.get_bin(abs_v);
72
73 let v_is_negative = v < T::default();
74
75 let scale_delta = {
78 let bucket = if v_is_negative {
79 &self.neg_buckets
80 } else {
81 &self.pos_buckets
82 };
83
84 scale_change(
85 self.max_size,
86 bin,
87 bucket.start_bin,
88 bucket.counts.len() as i32,
89 )
90 };
91 if scale_delta > 0 {
92 if (self.scale - scale_delta as i8) < EXPO_MIN_SCALE {
93 otel_debug!(
98 name: "ExponentialHistogramDataPoint.Scale.Underflow",
99 current_scale = self.scale,
100 scale_delta = scale_delta,
101 max_size = self.max_size,
102 min_scale = EXPO_MIN_SCALE,
103 value = format!("{:?}", v),
104 message = "The measurement will be dropped due to scale underflow. Check the histogram configuration"
105 );
106
107 return;
108 }
109 self.scale -= scale_delta as i8;
111 self.pos_buckets.downscale(scale_delta);
112 self.neg_buckets.downscale(scale_delta);
113
114 bin = self.get_bin(abs_v);
115 }
116
117 if v_is_negative {
118 self.neg_buckets.record(bin)
119 } else {
120 self.pos_buckets.record(bin)
121 }
122 }
123
124 fn get_bin(&self, v: f64) -> i32 {
126 let (frac, exp) = frexp(v);
127 if self.scale <= 0 {
128 let mut correction = 1;
130 if frac == 0.5 {
131 correction = 2;
134 }
135 return (exp - correction) >> -self.scale;
136 }
137 (exp << self.scale) + (frac.ln() * scale_factors()[self.scale as usize]) as i32 - 1
138 }
139}
140
141fn scale_change(max_size: i32, bin: i32, start_bin: i32, length: i32) -> u32 {
145 if length == 0 {
146 return 0;
148 }
149
150 let mut low = start_bin;
151 let mut high = bin;
152 if start_bin >= bin {
153 low = bin;
154 high = start_bin + length - 1;
155 }
156
157 let mut count = 0u32;
158 while high - low >= max_size {
159 low >>= 1;
160 high >>= 1;
161 count += 1;
162
163 if count > (EXPO_MAX_SCALE - EXPO_MIN_SCALE) as u32 {
164 return count;
165 }
166 }
167
168 count
169}
170
171static SCALE_FACTORS: OnceLock<[f64; 21]> = OnceLock::new();
173
174#[inline]
176fn scale_factors() -> &'static [f64; 21] {
177 SCALE_FACTORS.get_or_init(|| {
178 [
179 LOG2_E * 2f64.powi(0),
180 LOG2_E * 2f64.powi(1),
181 LOG2_E * 2f64.powi(2),
182 LOG2_E * 2f64.powi(3),
183 LOG2_E * 2f64.powi(4),
184 LOG2_E * 2f64.powi(5),
185 LOG2_E * 2f64.powi(6),
186 LOG2_E * 2f64.powi(7),
187 LOG2_E * 2f64.powi(8),
188 LOG2_E * 2f64.powi(9),
189 LOG2_E * 2f64.powi(10),
190 LOG2_E * 2f64.powi(11),
191 LOG2_E * 2f64.powi(12),
192 LOG2_E * 2f64.powi(13),
193 LOG2_E * 2f64.powi(14),
194 LOG2_E * 2f64.powi(15),
195 LOG2_E * 2f64.powi(16),
196 LOG2_E * 2f64.powi(17),
197 LOG2_E * 2f64.powi(18),
198 LOG2_E * 2f64.powi(19),
199 LOG2_E * 2f64.powi(20),
200 ]
201 })
202}
203
204#[inline(always)]
209fn frexp(x: f64) -> (f64, i32) {
210 let mut y = x.to_bits();
211 let ee = ((y >> 52) & 0x7ff) as i32;
212
213 if ee == 0 {
214 if x != 0.0 {
215 let x1p64 = f64::from_bits(0x43f0000000000000);
216 let (x, e) = frexp(x * x1p64);
217 return (x, e - 64);
218 }
219 return (x, 0);
220 } else if ee == 0x7ff {
221 return (x, 0);
222 }
223
224 let e = ee - 0x3fe;
225 y &= 0x800fffffffffffff;
226 y |= 0x3fe0000000000000;
227
228 (f64::from_bits(y), e)
229}
230
231#[derive(Default, Debug, PartialEq)]
233struct ExpoBuckets {
234 start_bin: i32,
235 counts: Vec<u64>,
236}
237
238impl ExpoBuckets {
239 fn record(&mut self, bin: i32) {
243 if self.counts.is_empty() {
244 self.counts = vec![1];
245 self.start_bin = bin;
246 return;
247 }
248
249 let end_bin = self.start_bin + self.counts.len() as i32 - 1;
250
251 if bin >= self.start_bin && bin <= end_bin {
253 self.counts[(bin - self.start_bin) as usize] += 1;
254 return;
255 }
256
257 if bin < self.start_bin {
259 let mut zeroes = vec![0; (end_bin - bin + 1) as usize];
260 let shift = (self.start_bin - bin) as usize;
261 zeroes[shift..].copy_from_slice(&self.counts);
262 self.counts = zeroes;
263 self.counts[0] = 1;
264 self.start_bin = bin;
265 } else if bin > end_bin {
266 if ((bin - self.start_bin) as usize) < self.counts.capacity() {
268 self.counts.resize((bin - self.start_bin + 1) as usize, 0);
269 self.counts[(bin - self.start_bin) as usize] = 1;
270 return;
271 }
272
273 self.counts.extend(
274 std::iter::repeat(0).take((bin - self.start_bin) as usize - self.counts.len() + 1),
275 );
276 self.counts[(bin - self.start_bin) as usize] = 1
277 }
278 }
279
280 fn downscale(&mut self, delta: u32) {
284 if self.counts.len() <= 1 || delta < 1 {
294 self.start_bin >>= delta;
295 return;
296 }
297
298 let steps = 1 << delta;
299 let mut offset = self.start_bin % steps;
300 offset = (offset + steps) % steps; for i in 1..self.counts.len() {
302 let idx = i + offset as usize;
303 if idx % steps as usize == 0 {
304 self.counts[idx / steps as usize] = self.counts[i];
305 continue;
306 }
307 self.counts[idx / steps as usize] += self.counts[i];
308 }
309
310 let last_idx = (self.counts.len() as i32 - 1 + offset) / steps;
311 self.counts = self.counts[..last_idx as usize + 1].to_vec();
312 self.start_bin >>= delta;
313 }
314}
315
316impl<T> Aggregator for Mutex<ExpoHistogramDataPoint<T>>
317where
318 T: Number,
319{
320 type InitConfig = BucketConfig;
321
322 type PreComputedValue = T;
323
324 fn create(init: &BucketConfig) -> Self {
325 Mutex::new(ExpoHistogramDataPoint::new(init))
326 }
327
328 fn update(&self, value: T) {
329 let mut this = match self.lock() {
330 Ok(guard) => guard,
331 Err(_) => return,
332 };
333 this.record(value);
334 }
335
336 fn clone_and_reset(&self, init: &BucketConfig) -> Self {
337 let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
338 let cloned = replace(current.deref_mut(), ExpoHistogramDataPoint::new(init));
339 Mutex::new(cloned)
340 }
341}
342
343#[derive(Debug, Clone, Copy, PartialEq)]
344struct BucketConfig {
345 max_size: i32,
346 max_scale: i8,
347}
348
349pub(crate) struct ExpoHistogram<T: Number> {
355 value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
356 init_time: AggregateTimeInitiator,
357 temporality: Temporality,
358 filter: AttributeSetFilter,
359 record_sum: bool,
360 record_min_max: bool,
361}
362
363impl<T: Number> ExpoHistogram<T> {
364 pub(crate) fn new(
366 temporality: Temporality,
367 filter: AttributeSetFilter,
368 max_size: u32,
369 max_scale: i8,
370 record_min_max: bool,
371 record_sum: bool,
372 cardinality_limit: usize,
373 ) -> Self {
374 ExpoHistogram {
375 value_map: ValueMap::new(
376 BucketConfig {
377 max_size: max_size as i32,
378 max_scale,
379 },
380 cardinality_limit,
381 ),
382 init_time: AggregateTimeInitiator::default(),
383 temporality,
384 filter,
385 record_sum,
386 record_min_max,
387 }
388 }
389
390 fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
391 let time = self.init_time.delta();
392
393 let h = dest.and_then(|d| {
394 if let MetricData::ExponentialHistogram(hist) = d {
395 Some(hist)
396 } else {
397 None
398 }
399 });
400 let mut new_agg = if h.is_none() {
401 Some(data::ExponentialHistogram {
402 data_points: vec![],
403 start_time: time.start,
404 time: time.current,
405 temporality: Temporality::Delta,
406 })
407 } else {
408 None
409 };
410 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
411 h.temporality = Temporality::Delta;
412 h.start_time = time.start;
413 h.time = time.current;
414
415 self.value_map
416 .collect_and_reset(&mut h.data_points, |attributes, attr| {
417 let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
418 data::ExponentialHistogramDataPoint {
419 attributes,
420 count: b.count,
421 min: if self.record_min_max {
422 Some(b.min)
423 } else {
424 None
425 },
426 max: if self.record_min_max {
427 Some(b.max)
428 } else {
429 None
430 },
431 sum: if self.record_sum { b.sum } else { T::default() },
432 scale: b.scale,
433 zero_count: b.zero_count,
434 positive_bucket: data::ExponentialBucket {
435 offset: b.pos_buckets.start_bin,
436 counts: b.pos_buckets.counts,
437 },
438 negative_bucket: data::ExponentialBucket {
439 offset: b.neg_buckets.start_bin,
440 counts: b.neg_buckets.counts,
441 },
442 zero_threshold: 0.0,
443 exemplars: vec![],
444 }
445 });
446
447 (h.data_points.len(), new_agg.map(Into::into))
448 }
449
450 fn cumulative(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
451 let time = self.init_time.cumulative();
452
453 let h = dest.and_then(|d| {
454 if let MetricData::ExponentialHistogram(hist) = d {
455 Some(hist)
456 } else {
457 None
458 }
459 });
460 let mut new_agg = if h.is_none() {
461 Some(data::ExponentialHistogram {
462 data_points: vec![],
463 start_time: time.start,
464 time: time.current,
465 temporality: Temporality::Cumulative,
466 })
467 } else {
468 None
469 };
470 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
471 h.temporality = Temporality::Cumulative;
472 h.start_time = time.start;
473 h.time = time.current;
474
475 self.value_map
476 .collect_readonly(&mut h.data_points, |attributes, attr| {
477 let b = attr.lock().unwrap_or_else(|err| err.into_inner());
478 data::ExponentialHistogramDataPoint {
479 attributes,
480 count: b.count,
481 min: if self.record_min_max {
482 Some(b.min)
483 } else {
484 None
485 },
486 max: if self.record_min_max {
487 Some(b.max)
488 } else {
489 None
490 },
491 sum: if self.record_sum { b.sum } else { T::default() },
492 scale: b.scale,
493 zero_count: b.zero_count,
494 positive_bucket: data::ExponentialBucket {
495 offset: b.pos_buckets.start_bin,
496 counts: b.pos_buckets.counts.clone(),
497 },
498 negative_bucket: data::ExponentialBucket {
499 offset: b.neg_buckets.start_bin,
500 counts: b.neg_buckets.counts.clone(),
501 },
502 zero_threshold: 0.0,
503 exemplars: vec![],
504 }
505 });
506
507 (h.data_points.len(), new_agg.map(Into::into))
508 }
509}
510
511impl<T> Measure<T> for ExpoHistogram<T>
512where
513 T: Number,
514{
515 fn call(&self, measurement: T, attrs: &[KeyValue]) {
516 let f_value = measurement.into_float();
517 if !f_value.is_finite() {
520 return;
521 }
522
523 self.filter.apply(attrs, |filtered| {
524 self.value_map.measure(measurement, filtered);
525 })
526 }
527}
528
529impl<T> ComputeAggregation for ExpoHistogram<T>
530where
531 T: Number,
532{
533 fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
534 let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
535 let (len, new) = match self.temporality {
536 Temporality::Delta => self.delta(data),
537 _ => self.cumulative(data),
538 };
539 (len, new.map(T::make_aggregated_metrics))
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use opentelemetry::time::now;
546 use std::{any::Any, ops::Neg};
547 use tests::internal::AggregateFns;
548
549 use crate::metrics::internal::{self, AggregateBuilder};
550
551 use super::*;
552
553 const CARDINALITY_LIMIT_DEFAULT: usize = 2000;
554
555 #[test]
556 fn test_expo_histogram_data_point_record() {
557 run_data_point_record::<f64>();
558 run_data_point_record_f64();
559 run_min_max_sum_f64();
560 run_min_max_sum::<i64>();
561 run_min_max_sum::<u64>();
562 run_data_point_record::<i64>();
563 }
564
565 fn run_data_point_record<T: Number + Neg<Output = T> + From<u32>>() {
566 struct TestCase<T> {
567 max_size: i32,
568 values: Vec<T>,
569 expected_buckets: ExpoBuckets,
570 expected_scale: i8,
571 }
572 let test_cases: Vec<TestCase<T>> = vec![
573 TestCase {
574 max_size: 4,
575 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
576 expected_buckets: ExpoBuckets {
577 start_bin: -1,
578 counts: vec![1, 1, 1],
579 },
580 expected_scale: 0,
581 },
582 TestCase {
583 max_size: 4,
584 values: vec![4, 4, 4, 2, 16, 1]
585 .into_iter()
586 .map(Into::into)
587 .collect(),
588 expected_buckets: ExpoBuckets {
589 start_bin: -1,
590 counts: vec![1, 4, 1],
591 },
592 expected_scale: -1,
593 },
594 TestCase {
595 max_size: 2,
596 values: vec![1, 2, 4].into_iter().map(Into::into).collect(),
597 expected_buckets: ExpoBuckets {
598 start_bin: -1,
599 counts: vec![1, 2],
600 },
601 expected_scale: -1,
602 },
603 TestCase {
604 max_size: 2,
605 values: vec![1, 4, 2].into_iter().map(Into::into).collect(),
606 expected_buckets: ExpoBuckets {
607 start_bin: -1,
608 counts: vec![1, 2],
609 },
610 expected_scale: -1,
611 },
612 TestCase {
613 max_size: 2,
614 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
615 expected_buckets: ExpoBuckets {
616 start_bin: -1,
617 counts: vec![1, 2],
618 },
619 expected_scale: -1,
620 },
621 TestCase {
622 max_size: 2,
623 values: vec![2, 1, 4].into_iter().map(Into::into).collect(),
624 expected_buckets: ExpoBuckets {
625 start_bin: -1,
626 counts: vec![1, 2],
627 },
628 expected_scale: -1,
629 },
630 TestCase {
631 max_size: 2,
632 values: vec![4, 1, 2].into_iter().map(Into::into).collect(),
633 expected_buckets: ExpoBuckets {
634 start_bin: -1,
635 counts: vec![1, 2],
636 },
637 expected_scale: -1,
638 },
639 TestCase {
640 max_size: 2,
641 values: vec![4, 2, 1].into_iter().map(Into::into).collect(),
642 expected_buckets: ExpoBuckets {
643 start_bin: -1,
644 counts: vec![1, 2],
645 },
646 expected_scale: -1,
647 },
648 ];
649
650 for test in test_cases {
651 let mut dp = ExpoHistogramDataPoint::<T>::new(&BucketConfig {
652 max_size: test.max_size,
653 max_scale: 20,
654 });
655 for v in test.values {
656 dp.record(v);
657 dp.record(-v);
658 }
659
660 assert_eq!(test.expected_buckets, dp.pos_buckets, "positive buckets");
661 assert_eq!(test.expected_buckets, dp.neg_buckets, "negative buckets");
662 assert_eq!(test.expected_scale, dp.scale, "scale");
663 }
664 }
665
666 fn run_min_max_sum_f64() {
667 struct Expected {
668 min: f64,
669 max: f64,
670 sum: f64,
671 count: usize,
672 }
673 impl Expected {
674 fn new(min: f64, max: f64, sum: f64, count: usize) -> Self {
675 Expected {
676 min,
677 max,
678 sum,
679 count,
680 }
681 }
682 }
683 struct TestCase {
684 values: Vec<f64>,
685 expected: Expected,
686 }
687
688 let test_cases = vec![
689 TestCase {
690 values: vec![2.0, 4.0, 1.0],
691 expected: Expected::new(1.0, 4.0, 7.0, 3),
692 },
693 TestCase {
694 values: vec![2.0, 4.0, 1.0, f64::INFINITY],
695 expected: Expected::new(1.0, 4.0, 7.0, 3),
696 },
697 TestCase {
698 values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
699 expected: Expected::new(1.0, 4.0, 7.0, 3),
700 },
701 TestCase {
702 values: vec![2.0, 4.0, 1.0, f64::NAN],
703 expected: Expected::new(1.0, 4.0, 7.0, 3),
704 },
705 TestCase {
706 values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
707 expected: Expected::new(1.0, 16.0, 31.0, 6),
708 },
709 ];
710
711 for test in test_cases {
712 let h = ExpoHistogram::new(
713 Temporality::Cumulative,
714 AttributeSetFilter::new(None),
715 4,
716 20,
717 true,
718 true,
719 CARDINALITY_LIMIT_DEFAULT,
720 );
721 for v in test.values {
722 Measure::call(&h, v, &[]);
723 }
724 let dp = h.value_map.no_attribute_tracker.lock().unwrap();
725
726 assert_eq!(test.expected.max, dp.max);
727 assert_eq!(test.expected.min, dp.min);
728 assert_eq!(test.expected.sum, dp.sum);
729 assert_eq!(test.expected.count, dp.count);
730 }
731 }
732
733 fn run_min_max_sum<T: Number + From<u32>>() {
734 struct Expected<T> {
735 min: T,
736 max: T,
737 sum: T,
738 count: usize,
739 }
740 impl<T: Number> Expected<T> {
741 fn new(min: T, max: T, sum: T, count: usize) -> Self {
742 Expected {
743 min,
744 max,
745 sum,
746 count,
747 }
748 }
749 }
750 struct TestCase<T> {
751 values: Vec<T>,
752 expected: Expected<T>,
753 }
754 let test_cases: Vec<TestCase<T>> = vec![
755 TestCase {
756 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
757 expected: Expected::new(1.into(), 4.into(), 7.into(), 3),
758 },
759 TestCase {
760 values: vec![4, 4, 4, 2, 16, 1]
761 .into_iter()
762 .map(Into::into)
763 .collect(),
764 expected: Expected::new(1.into(), 16.into(), 31.into(), 6),
765 },
766 ];
767
768 for test in test_cases {
769 let h = ExpoHistogram::new(
770 Temporality::Cumulative,
771 AttributeSetFilter::new(None),
772 4,
773 20,
774 true,
775 true,
776 CARDINALITY_LIMIT_DEFAULT,
777 );
778 for v in test.values {
779 Measure::call(&h, v, &[]);
780 }
781 let dp = h.value_map.no_attribute_tracker.lock().unwrap();
782
783 assert_eq!(test.expected.max, dp.max);
784 assert_eq!(test.expected.min, dp.min);
785 assert_eq!(test.expected.sum, dp.sum);
786 assert_eq!(test.expected.count, dp.count);
787 }
788 }
789
790 fn run_data_point_record_f64() {
791 struct TestCase {
792 max_size: i32,
793 values: Vec<f64>,
794 expected_buckets: ExpoBuckets,
795 expected_scale: i8,
796 }
797
798 let test_cases = vec![
799 TestCase {
800 max_size: 4,
801 values: vec![2.0, 2.0, 2.0, 1.0, 8.0, 0.5],
802 expected_buckets: ExpoBuckets {
803 start_bin: -1,
804 counts: vec![2, 3, 1],
805 },
806 expected_scale: -1,
807 },
808 TestCase {
809 max_size: 2,
810 values: vec![1.0, 0.5, 2.0],
811 expected_buckets: ExpoBuckets {
812 start_bin: -1,
813 counts: vec![2, 1],
814 },
815 expected_scale: -1,
816 },
817 TestCase {
818 max_size: 2,
819 values: vec![1.0, 2.0, 0.5],
820 expected_buckets: ExpoBuckets {
821 start_bin: -1,
822 counts: vec![2, 1],
823 },
824 expected_scale: -1,
825 },
826 TestCase {
827 max_size: 2,
828 values: vec![2.0, 0.5, 1.0],
829 expected_buckets: ExpoBuckets {
830 start_bin: -1,
831 counts: vec![2, 1],
832 },
833 expected_scale: -1,
834 },
835 TestCase {
836 max_size: 2,
837 values: vec![2.0, 1.0, 0.5],
838 expected_buckets: ExpoBuckets {
839 start_bin: -1,
840 counts: vec![2, 1],
841 },
842 expected_scale: -1,
843 },
844 TestCase {
845 max_size: 2,
846 values: vec![0.5, 1.0, 2.0],
847 expected_buckets: ExpoBuckets {
848 start_bin: -1,
849 counts: vec![2, 1],
850 },
851 expected_scale: -1,
852 },
853 TestCase {
854 max_size: 2,
855 values: vec![0.5, 2.0, 1.0],
856 expected_buckets: ExpoBuckets {
857 start_bin: -1,
858 counts: vec![2, 1],
859 },
860 expected_scale: -1,
861 },
862 ];
863 for test in test_cases {
864 let mut dp = ExpoHistogramDataPoint::new(&BucketConfig {
865 max_size: test.max_size,
866 max_scale: 20,
867 });
868 for v in test.values {
869 dp.record(v);
870 dp.record(-v);
871 }
872
873 assert_eq!(test.expected_buckets, dp.pos_buckets);
874 assert_eq!(test.expected_buckets, dp.neg_buckets);
875 assert_eq!(test.expected_scale, dp.scale);
876 }
877 }
878
879 #[test]
880 fn data_point_record_limits() {
881 let cfg = BucketConfig {
885 max_size: 4,
886 max_scale: 20,
887 };
888 let mut fdp = ExpoHistogramDataPoint::new(&cfg);
889 fdp.record(f64::MAX);
890
891 assert_eq!(
892 fdp.pos_buckets.start_bin, 1073741823,
893 "start bin does not match for large f64 values",
894 );
895
896 let mut fdp = ExpoHistogramDataPoint::new(&cfg);
897 fdp.record(f64::MIN_POSITIVE);
898
899 assert_eq!(
900 fdp.pos_buckets.start_bin, -1071644673,
901 "start bin does not match for small positive values",
902 );
903
904 let mut idp = ExpoHistogramDataPoint::new(&cfg);
905 idp.record(i64::MAX);
906
907 assert_eq!(
908 idp.pos_buckets.start_bin, 66060287,
909 "start bin does not match for max i64 values",
910 );
911 }
912
913 #[test]
914 fn expo_bucket_downscale() {
915 struct TestCase {
916 name: &'static str,
917 bucket: ExpoBuckets,
918 scale: i8,
919 want: ExpoBuckets,
920 }
921
922 let test_cases = vec![
923 TestCase {
924 name: "Empty bucket",
925 bucket: ExpoBuckets {
926 start_bin: 0,
927 counts: vec![],
928 },
929 scale: 3,
930 want: ExpoBuckets {
931 start_bin: 0,
932 counts: vec![],
933 },
934 },
935 TestCase {
936 name: "1 size bucket",
937 bucket: ExpoBuckets {
938 start_bin: 50,
939 counts: vec![7],
940 },
941 scale: 4,
942 want: ExpoBuckets {
943 start_bin: 3,
944 counts: vec![7],
945 },
946 },
947 TestCase {
948 name: "zero scale",
949 bucket: ExpoBuckets {
950 start_bin: 50,
951 counts: vec![7, 5],
952 },
953 scale: 0,
954 want: ExpoBuckets {
955 start_bin: 50,
956 counts: vec![7, 5],
957 },
958 },
959 TestCase {
960 name: "aligned bucket scale 1",
961 bucket: ExpoBuckets {
962 start_bin: 0,
963 counts: vec![1, 2, 3, 4, 5, 6],
964 },
965 scale: 1,
966 want: ExpoBuckets {
967 start_bin: 0,
968 counts: vec![3, 7, 11],
969 },
970 },
971 TestCase {
972 name: "aligned bucket scale 2",
973 bucket: ExpoBuckets {
974 start_bin: 0,
975 counts: vec![1, 2, 3, 4, 5, 6],
976 },
977 scale: 2,
978 want: ExpoBuckets {
979 start_bin: 0,
980 counts: vec![10, 11],
981 },
982 },
983 TestCase {
984 name: "aligned bucket scale 3",
985 bucket: ExpoBuckets {
986 start_bin: 0,
987 counts: vec![1, 2, 3, 4, 5, 6],
988 },
989 scale: 3,
990 want: ExpoBuckets {
991 start_bin: 0,
992 counts: vec![21],
993 },
994 },
995 TestCase {
996 name: "unaligned bucket scale 1",
997 bucket: ExpoBuckets {
998 start_bin: 5,
999 counts: vec![1, 2, 3, 4, 5, 6],
1000 }, scale: 1,
1002 want: ExpoBuckets {
1003 start_bin: 2,
1004 counts: vec![1, 5, 9, 6],
1005 }, },
1007 TestCase {
1008 name: "unaligned bucket scale 2",
1009 bucket: ExpoBuckets {
1010 start_bin: 7,
1011 counts: vec![1, 2, 3, 4, 5, 6],
1012 }, scale: 2,
1014 want: ExpoBuckets {
1015 start_bin: 1,
1016 counts: vec![1, 14, 6],
1017 }, },
1019 TestCase {
1020 name: "unaligned bucket scale 3",
1021 bucket: ExpoBuckets {
1022 start_bin: 3,
1023 counts: vec![1, 2, 3, 4, 5, 6],
1024 }, scale: 3,
1026 want: ExpoBuckets {
1027 start_bin: 0,
1028 counts: vec![15, 6],
1029 }, },
1031 TestCase {
1032 name: "unaligned bucket scale 1",
1033 bucket: ExpoBuckets {
1034 start_bin: 1,
1035 counts: vec![1, 0, 1],
1036 },
1037 scale: 1,
1038 want: ExpoBuckets {
1039 start_bin: 0,
1040 counts: vec![1, 1],
1041 },
1042 },
1043 TestCase {
1044 name: "negative start_bin",
1045 bucket: ExpoBuckets {
1046 start_bin: -1,
1047 counts: vec![1, 0, 3],
1048 },
1049 scale: 1,
1050 want: ExpoBuckets {
1051 start_bin: -1,
1052 counts: vec![1, 3],
1053 },
1054 },
1055 TestCase {
1056 name: "negative start_bin 2",
1057 bucket: ExpoBuckets {
1058 start_bin: -4,
1059 counts: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
1060 },
1061 scale: 1,
1062 want: ExpoBuckets {
1063 start_bin: -2,
1064 counts: vec![3, 7, 11, 15, 19],
1065 },
1066 },
1067 ];
1068 for mut test in test_cases {
1069 test.bucket.downscale(test.scale as u32);
1070 assert_eq!(test.want, test.bucket, "{}", test.name);
1071 }
1072 }
1073
1074 #[test]
1075 fn expo_bucket_record() {
1076 struct TestCase {
1077 name: &'static str,
1078 bucket: ExpoBuckets,
1079 bin: i32,
1080 want: ExpoBuckets,
1081 }
1082
1083 let test_cases = vec![
1084 TestCase {
1085 name: "Empty bucket creates first count",
1086 bucket: ExpoBuckets {
1087 start_bin: 0,
1088 counts: vec![],
1089 },
1090 bin: -5,
1091 want: ExpoBuckets {
1092 start_bin: -5,
1093 counts: vec![1],
1094 },
1095 },
1096 TestCase {
1097 name: "Bin is in the bucket",
1098 bucket: ExpoBuckets {
1099 start_bin: 3,
1100 counts: vec![1, 2, 3, 4, 5, 6],
1101 },
1102 bin: 5,
1103 want: ExpoBuckets {
1104 start_bin: 3,
1105 counts: vec![1, 2, 4, 4, 5, 6],
1106 },
1107 },
1108 TestCase {
1109 name: "Bin is before the start of the bucket",
1110 bucket: ExpoBuckets {
1111 start_bin: 1,
1112 counts: vec![1, 2, 3, 4, 5, 6],
1113 },
1114 bin: -2,
1115 want: ExpoBuckets {
1116 start_bin: -2,
1117 counts: vec![1, 0, 0, 1, 2, 3, 4, 5, 6],
1118 },
1119 },
1120 TestCase {
1121 name: "Bin is after the end of the bucket",
1122 bucket: ExpoBuckets {
1123 start_bin: -2,
1124 counts: vec![1, 2, 3, 4, 5, 6],
1125 },
1126 bin: 4,
1127 want: ExpoBuckets {
1128 start_bin: -2,
1129 counts: vec![1, 2, 3, 4, 5, 6, 1],
1130 },
1131 },
1132 ];
1133
1134 for mut test in test_cases {
1135 test.bucket.record(test.bin);
1136 assert_eq!(test.want, test.bucket, "{}", test.name);
1137 }
1138 }
1139
1140 #[test]
1141 fn scale_change_rescaling() {
1142 struct Args {
1143 bin: i32,
1144 start_bin: i32,
1145 length: i32,
1146 max_size: i32,
1147 }
1148 struct TestCase {
1149 name: &'static str,
1150 args: Args,
1151 want: u32,
1152 }
1153 let test_cases = vec![
1154 TestCase {
1155 name: "if length is 0, no rescale is needed",
1156 args: Args {
1158 bin: 5,
1159 start_bin: 0,
1160 length: 0,
1161 max_size: 4,
1162 },
1163 want: 0,
1164 },
1165 TestCase {
1166 name: "if bin is between start, and the end, no rescale needed",
1167 args: Args {
1169 bin: 5,
1170 start_bin: -1,
1171 length: 10,
1172 max_size: 20,
1173 },
1174 want: 0,
1175 },
1176 TestCase {
1177 name: "if [bin,... end].len() > max_size, rescale needed",
1178 args: Args {
1180 bin: 5,
1181 start_bin: 8,
1182 length: 3,
1183 max_size: 5,
1184 },
1185 want: 1,
1186 },
1187 TestCase {
1188 name: "if [start, ..., bin].len() > max_size, rescale needed",
1189 args: Args {
1191 bin: 7,
1192 start_bin: 2,
1193 length: 3,
1194 max_size: 5,
1195 },
1196 want: 1,
1197 },
1198 TestCase {
1199 name: "if [start, ..., bin].len() > max_size, rescale needed",
1200 args: Args {
1202 bin: 13,
1203 start_bin: 2,
1204 length: 3,
1205 max_size: 5,
1206 },
1207 want: 2,
1208 },
1209 TestCase {
1210 name: "It should not hang if it will never be able to rescale",
1211 args: Args {
1212 bin: 1,
1213 start_bin: -1,
1214 length: 1,
1215 max_size: 1,
1216 },
1217 want: 31,
1218 },
1219 ];
1220
1221 for test in test_cases {
1222 let got = scale_change(
1223 test.args.max_size,
1224 test.args.bin,
1225 test.args.start_bin,
1226 test.args.length,
1227 );
1228 assert_eq!(got, test.want, "incorrect scale change, {}", test.name);
1229 }
1230 }
1231
1232 #[test]
1233 fn sub_normal() {
1234 let want = ExpoHistogramDataPoint {
1235 max_size: 4,
1236 count: 3,
1237 min: f64::MIN_POSITIVE,
1238 max: f64::MIN_POSITIVE,
1239 sum: 3.0 * f64::MIN_POSITIVE,
1240
1241 scale: 20,
1242 pos_buckets: ExpoBuckets {
1243 start_bin: -1071644673,
1244 counts: vec![3],
1245 },
1246 neg_buckets: ExpoBuckets {
1247 start_bin: 0,
1248 counts: vec![],
1249 },
1250 zero_count: 0,
1251 };
1252
1253 let mut ehdp = ExpoHistogramDataPoint::new(&BucketConfig {
1254 max_size: 4,
1255 max_scale: 20,
1256 });
1257 ehdp.record(f64::MIN_POSITIVE);
1258 ehdp.record(f64::MIN_POSITIVE);
1259 ehdp.record(f64::MIN_POSITIVE);
1260
1261 assert_eq!(want, ehdp);
1262 }
1263
1264 #[test]
1265 fn hist_aggregations() {
1266 hist_aggregation::<i64>();
1267 hist_aggregation::<u64>();
1268 hist_aggregation::<f64>();
1269 }
1270
1271 fn hist_aggregation<T: Number + From<u32>>() {
1272 let max_size = 4;
1273 let max_scale = 20;
1274 let record_min_max = true;
1275 let record_sum = true;
1276
1277 #[allow(clippy::type_complexity)]
1278 struct TestCase<T> {
1279 name: &'static str,
1280 build: Box<dyn Fn() -> AggregateFns<T>>,
1281 input: Vec<Vec<T>>,
1282 want: data::ExponentialHistogram<T>,
1283 want_count: usize,
1284 }
1285 let test_cases: Vec<TestCase<T>> = vec![
1286 TestCase {
1287 name: "Delta Single",
1288 build: Box::new(move || {
1289 AggregateBuilder::new(Temporality::Delta, None, CARDINALITY_LIMIT_DEFAULT)
1290 .exponential_bucket_histogram(
1291 max_size,
1292 max_scale,
1293 record_min_max,
1294 record_sum,
1295 )
1296 }),
1297 input: vec![vec![4, 4, 4, 2, 16, 1]
1298 .into_iter()
1299 .map(Into::into)
1300 .collect()],
1301 want: data::ExponentialHistogram {
1302 temporality: Temporality::Delta,
1303 data_points: vec![data::ExponentialHistogramDataPoint {
1304 attributes: vec![],
1305 count: 6,
1306 min: Some(1.into()),
1307 max: Some(16.into()),
1308 sum: 31.into(),
1309 scale: -1,
1310 positive_bucket: data::ExponentialBucket {
1311 offset: -1,
1312 counts: vec![1, 4, 1],
1313 },
1314 negative_bucket: data::ExponentialBucket {
1315 offset: 0,
1316 counts: vec![],
1317 },
1318 exemplars: vec![],
1319 zero_threshold: 0.0,
1320 zero_count: 0,
1321 }],
1322 start_time: now(),
1323 time: now(),
1324 },
1325 want_count: 1,
1326 },
1327 TestCase {
1328 name: "Cumulative Single",
1329 build: Box::new(move || {
1330 internal::AggregateBuilder::new(
1331 Temporality::Cumulative,
1332 None,
1333 CARDINALITY_LIMIT_DEFAULT,
1334 )
1335 .exponential_bucket_histogram(
1336 max_size,
1337 max_scale,
1338 record_min_max,
1339 record_sum,
1340 )
1341 }),
1342 input: vec![vec![4, 4, 4, 2, 16, 1]
1343 .into_iter()
1344 .map(Into::into)
1345 .collect()],
1346 want: data::ExponentialHistogram {
1347 temporality: Temporality::Cumulative,
1348 data_points: vec![data::ExponentialHistogramDataPoint {
1349 attributes: vec![],
1350 count: 6,
1351 min: Some(1.into()),
1352 max: Some(16.into()),
1353 sum: 31.into(),
1354 scale: -1,
1355 positive_bucket: data::ExponentialBucket {
1356 offset: -1,
1357 counts: vec![1, 4, 1],
1358 },
1359 negative_bucket: data::ExponentialBucket {
1360 offset: 0,
1361 counts: vec![],
1362 },
1363 exemplars: vec![],
1364 zero_threshold: 0.0,
1365 zero_count: 0,
1366 }],
1367 start_time: now(),
1368 time: now(),
1369 },
1370 want_count: 1,
1371 },
1372 TestCase {
1373 name: "Delta Multiple",
1374 build: Box::new(move || {
1375 internal::AggregateBuilder::new(
1376 Temporality::Delta,
1377 None,
1378 CARDINALITY_LIMIT_DEFAULT,
1379 )
1380 .exponential_bucket_histogram(
1381 max_size,
1382 max_scale,
1383 record_min_max,
1384 record_sum,
1385 )
1386 }),
1387 input: vec![
1388 vec![2, 3, 8].into_iter().map(Into::into).collect(),
1389 vec![4, 4, 4, 2, 16, 1]
1390 .into_iter()
1391 .map(Into::into)
1392 .collect(),
1393 ],
1394 want: data::ExponentialHistogram {
1395 temporality: Temporality::Delta,
1396 data_points: vec![data::ExponentialHistogramDataPoint {
1397 attributes: vec![],
1398 count: 6,
1399 min: Some(1.into()),
1400 max: Some(16.into()),
1401 sum: 31.into(),
1402 scale: -1,
1403 positive_bucket: data::ExponentialBucket {
1404 offset: -1,
1405 counts: vec![1, 4, 1],
1406 },
1407 negative_bucket: data::ExponentialBucket {
1408 offset: 0,
1409 counts: vec![],
1410 },
1411 exemplars: vec![],
1412 zero_threshold: 0.0,
1413 zero_count: 0,
1414 }],
1415 start_time: now(),
1416 time: now(),
1417 },
1418 want_count: 1,
1419 },
1420 TestCase {
1421 name: "Cumulative Multiple ",
1422 build: Box::new(move || {
1423 internal::AggregateBuilder::new(
1424 Temporality::Cumulative,
1425 None,
1426 CARDINALITY_LIMIT_DEFAULT,
1427 )
1428 .exponential_bucket_histogram(
1429 max_size,
1430 max_scale,
1431 record_min_max,
1432 record_sum,
1433 )
1434 }),
1435 input: vec![
1436 vec![2, 3, 8].into_iter().map(Into::into).collect(),
1437 vec![4, 4, 4, 2, 16, 1]
1438 .into_iter()
1439 .map(Into::into)
1440 .collect(),
1441 ],
1442 want: data::ExponentialHistogram {
1443 temporality: Temporality::Cumulative,
1444 data_points: vec![data::ExponentialHistogramDataPoint {
1445 count: 9,
1446 min: Some(1.into()),
1447 max: Some(16.into()),
1448 sum: 44.into(),
1449 scale: -1,
1450 positive_bucket: data::ExponentialBucket {
1451 offset: -1,
1452 counts: vec![1, 6, 2],
1453 },
1454 attributes: vec![],
1455 negative_bucket: data::ExponentialBucket {
1456 offset: 0,
1457 counts: vec![],
1458 },
1459 exemplars: vec![],
1460 zero_threshold: 0.0,
1461 zero_count: 0,
1462 }],
1463 start_time: now(),
1464 time: now(),
1465 },
1466 want_count: 1,
1467 },
1468 ];
1469
1470 for test in test_cases {
1471 let AggregateFns { measure, collect } = (test.build)();
1472
1473 let mut got = T::make_aggregated_metrics(MetricData::ExponentialHistogram(
1474 data::ExponentialHistogram::<T> {
1475 data_points: vec![],
1476 start_time: now(),
1477 time: now(),
1478 temporality: Temporality::Delta,
1479 },
1480 ));
1481 let mut count = 0;
1482 for n in test.input {
1483 for v in n {
1484 measure.call(v, &[])
1485 }
1486 count = collect.call(Some(&mut got)).0
1487 }
1488
1489 assert_aggregation_eq(
1490 &MetricData::ExponentialHistogram(test.want),
1491 T::extract_metrics_data_ref(&got).unwrap(),
1492 test.name,
1493 );
1494 assert_eq!(test.want_count, count, "{}", test.name);
1495 }
1496 }
1497
1498 fn assert_aggregation_eq<T: Number + PartialEq>(
1499 a: &MetricData<T>,
1500 b: &MetricData<T>,
1501 test_name: &'static str,
1502 ) {
1503 match (a, b) {
1504 (MetricData::Gauge(a), MetricData::Gauge(b)) => {
1505 assert_eq!(
1506 a.data_points.len(),
1507 b.data_points.len(),
1508 "{test_name} gauge counts"
1509 );
1510 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1511 assert_gauge_data_points_eq(a, b, "mismatching gauge data points", test_name);
1512 }
1513 }
1514 (MetricData::Sum(a), MetricData::Sum(b)) => {
1515 assert_eq!(
1516 a.temporality, b.temporality,
1517 "{test_name} mismatching sum temporality"
1518 );
1519 assert_eq!(
1520 a.is_monotonic, b.is_monotonic,
1521 "{test_name} mismatching sum monotonicity",
1522 );
1523 assert_eq!(
1524 a.data_points.len(),
1525 b.data_points.len(),
1526 "{test_name} sum counts"
1527 );
1528 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1529 assert_sum_data_points_eq(a, b, "mismatching sum data points", test_name);
1530 }
1531 }
1532 (MetricData::Histogram(a), MetricData::Histogram(b)) => {
1533 assert_eq!(
1534 a.temporality, b.temporality,
1535 "{test_name}: mismatching hist temporality"
1536 );
1537 assert_eq!(
1538 a.data_points.len(),
1539 b.data_points.len(),
1540 "{test_name} hist counts"
1541 );
1542 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1543 assert_hist_data_points_eq(a, b, "mismatching hist data points", test_name);
1544 }
1545 }
1546 (MetricData::ExponentialHistogram(a), MetricData::ExponentialHistogram(b)) => {
1547 assert_eq!(
1548 a.temporality, b.temporality,
1549 "{test_name} mismatching hist temporality"
1550 );
1551 assert_eq!(
1552 a.data_points.len(),
1553 b.data_points.len(),
1554 "{test_name} hist counts"
1555 );
1556 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1557 assert_exponential_hist_data_points_eq(
1558 a,
1559 b,
1560 "mismatching hist data points",
1561 test_name,
1562 );
1563 }
1564 }
1565 _ => {
1566 assert_eq!(
1567 a.type_id(),
1568 b.type_id(),
1569 "{test_name} Aggregation types not equal"
1570 );
1571 }
1572 }
1573 }
1574
1575 fn assert_sum_data_points_eq<T: Number>(
1576 a: &data::SumDataPoint<T>,
1577 b: &data::SumDataPoint<T>,
1578 message: &'static str,
1579 test_name: &'static str,
1580 ) {
1581 assert_eq!(
1582 a.attributes, b.attributes,
1583 "{test_name}: {message} attributes"
1584 );
1585 assert_eq!(a.value, b.value, "{test_name}: {message} value");
1586 }
1587
1588 fn assert_gauge_data_points_eq<T: Number>(
1589 a: &data::GaugeDataPoint<T>,
1590 b: &data::GaugeDataPoint<T>,
1591 message: &'static str,
1592 test_name: &'static str,
1593 ) {
1594 assert_eq!(
1595 a.attributes, b.attributes,
1596 "{test_name}: {message} attributes"
1597 );
1598 assert_eq!(a.value, b.value, "{test_name}: {message} value");
1599 }
1600
1601 fn assert_hist_data_points_eq<T: Number>(
1602 a: &data::HistogramDataPoint<T>,
1603 b: &data::HistogramDataPoint<T>,
1604 message: &'static str,
1605 test_name: &'static str,
1606 ) {
1607 assert_eq!(
1608 a.attributes, b.attributes,
1609 "{test_name}: {message} attributes"
1610 );
1611 assert_eq!(a.count, b.count, "{test_name}: {message} count");
1612 assert_eq!(a.bounds, b.bounds, "{test_name}: {message} bounds");
1613 assert_eq!(
1614 a.bucket_counts, b.bucket_counts,
1615 "{test_name}: {message} bucket counts"
1616 );
1617 assert_eq!(a.min, b.min, "{test_name}: {message} min");
1618 assert_eq!(a.max, b.max, "{test_name}: {message} max");
1619 assert_eq!(a.sum, b.sum, "{test_name}: {message} sum");
1620 }
1621
1622 fn assert_exponential_hist_data_points_eq<T: Number>(
1623 a: &data::ExponentialHistogramDataPoint<T>,
1624 b: &data::ExponentialHistogramDataPoint<T>,
1625 message: &'static str,
1626 test_name: &'static str,
1627 ) {
1628 assert_eq!(
1629 a.attributes, b.attributes,
1630 "{test_name}: {message} attributes"
1631 );
1632 assert_eq!(a.count, b.count, "{test_name}: {message} count");
1633 assert_eq!(a.min, b.min, "{test_name}: {message} min");
1634 assert_eq!(a.max, b.max, "{test_name}: {message} max");
1635 assert_eq!(a.sum, b.sum, "{test_name}: {message} sum");
1636
1637 assert_eq!(a.scale, b.scale, "{test_name}: {message} scale");
1638 assert_eq!(a.zero_count, b.zero_count, "{test_name}: {message} zeros");
1639
1640 assert_eq!(
1641 a.positive_bucket, b.positive_bucket,
1642 "{test_name}: {message} pos"
1643 );
1644 assert_eq!(
1645 a.negative_bucket, b.negative_bucket,
1646 "{test_name}: {message} neg"
1647 );
1648 }
1649}