opentelemetry_sdk/metrics/internal/
mod.rs

1mod aggregate;
2mod exponential_histogram;
3mod histogram;
4mod last_value;
5mod precomputed_sum;
6mod sum;
7
8use core::fmt;
9use std::collections::{HashMap, HashSet};
10use std::mem::swap;
11use std::ops::{Add, AddAssign, DerefMut, Sub};
12use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, OnceLock, RwLock};
14
15pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
16pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
17use opentelemetry::{otel_warn, KeyValue};
18
19use super::data::{AggregatedMetrics, MetricData};
20
21// TODO Replace it with LazyLock once it is stable
22pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: OnceLock<Vec<KeyValue>> = OnceLock::new();
23
24#[inline]
25fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
26    STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", true)])
27}
28
29pub(crate) trait Aggregator {
30    /// A static configuration that is needed in order to initialize aggregator.
31    /// E.g. bucket_size at creation time .
32    type InitConfig;
33
34    /// Some aggregators can do some computations before updating aggregator.
35    /// This helps to reduce contention for aggregators because it makes
36    /// [`Aggregator::update`] as short as possible.
37    type PreComputedValue;
38
39    /// Called everytime a new attribute-set is stored.
40    fn create(init: &Self::InitConfig) -> Self;
41
42    /// Called for each measurement.
43    fn update(&self, value: Self::PreComputedValue);
44
45    /// Return current value and reset this instance
46    fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
47}
48
49/// The storage for sums.
50///
51/// This structure is parametrized by an `Operation` that indicates how
52/// updates to the underlying value trackers should be performed.
53pub(crate) struct ValueMap<A>
54where
55    A: Aggregator,
56{
57    /// Trackers store the values associated with different attribute sets.
58    trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
59
60    /// Used ONLY by Delta collect. The data type must match the one used in
61    /// `trackers` to allow mem::swap. Wrapping the type in `OnceLock` to
62    /// avoid this allocation for Cumulative aggregation.
63    trackers_for_collect: OnceLock<RwLock<HashMap<Vec<KeyValue>, Arc<A>>>>,
64
65    /// Number of different attribute set stored in the `trackers` map.
66    count: AtomicUsize,
67    /// Indicates whether a value with no attributes has been stored.
68    has_no_attribute_value: AtomicBool,
69    /// Tracker for values with no attributes attached.
70    no_attribute_tracker: A,
71    /// Configuration for an Aggregator
72    config: A::InitConfig,
73    cardinality_limit: usize,
74}
75
76impl<A> ValueMap<A>
77where
78    A: Aggregator,
79{
80    fn new(config: A::InitConfig, cardinality_limit: usize) -> Self {
81        ValueMap {
82            trackers: RwLock::new(HashMap::with_capacity(1 + cardinality_limit)),
83            trackers_for_collect: OnceLock::new(),
84            has_no_attribute_value: AtomicBool::new(false),
85            no_attribute_tracker: A::create(&config),
86            count: AtomicUsize::new(0),
87            config,
88            cardinality_limit,
89        }
90    }
91
92    #[inline]
93    fn trackers_for_collect(&self) -> &RwLock<HashMap<Vec<KeyValue>, Arc<A>>> {
94        self.trackers_for_collect
95            .get_or_init(|| RwLock::new(HashMap::with_capacity(1 + self.cardinality_limit)))
96    }
97
98    /// Checks whether aggregator has hit cardinality limit for metric streams
99    fn is_under_cardinality_limit(&self) -> bool {
100        self.count.load(Ordering::SeqCst) < self.cardinality_limit
101    }
102
103    fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
104        if attributes.is_empty() {
105            self.no_attribute_tracker.update(value);
106            self.has_no_attribute_value.store(true, Ordering::Release);
107            return;
108        }
109
110        let Ok(trackers) = self.trackers.read() else {
111            return;
112        };
113
114        // Try to retrieve and update the tracker with the attributes in the provided order first
115        if let Some(tracker) = trackers.get(attributes) {
116            tracker.update(value);
117            return;
118        }
119
120        // Try to retrieve and update the tracker with the attributes sorted.
121        let sorted_attrs = sort_and_dedup(attributes);
122        if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
123            tracker.update(value);
124            return;
125        }
126
127        // Give up the read lock before acquiring the write lock.
128        drop(trackers);
129
130        let Ok(mut trackers) = self.trackers.write() else {
131            return;
132        };
133
134        // Recheck both the provided and sorted orders after acquiring the write lock
135        // in case another thread has pushed an update in the meantime.
136        if let Some(tracker) = trackers.get(attributes) {
137            tracker.update(value);
138        } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
139            tracker.update(value);
140        } else if self.is_under_cardinality_limit() {
141            let new_tracker = Arc::new(A::create(&self.config));
142            new_tracker.update(value);
143
144            // Insert tracker with the attributes in the provided and sorted orders
145            trackers.insert(attributes.to_vec(), new_tracker.clone());
146            trackers.insert(sorted_attrs, new_tracker);
147
148            self.count.fetch_add(1, Ordering::SeqCst);
149        } else if let Some(overflow_value) = trackers.get(stream_overflow_attributes().as_slice()) {
150            overflow_value.update(value);
151        } else {
152            let new_tracker = A::create(&self.config);
153            new_tracker.update(value);
154            trackers.insert(stream_overflow_attributes().clone(), Arc::new(new_tracker));
155        }
156    }
157
158    /// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
159    /// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared.
160    pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
161    where
162        MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
163    {
164        prepare_data(dest, self.count.load(Ordering::SeqCst));
165        if self.has_no_attribute_value.load(Ordering::Acquire) {
166            dest.push(map_fn(vec![], &self.no_attribute_tracker));
167        }
168
169        let Ok(trackers) = self.trackers.read() else {
170            return;
171        };
172
173        let mut seen = HashSet::new();
174        for (attrs, tracker) in trackers.iter() {
175            if seen.insert(Arc::as_ptr(tracker)) {
176                dest.push(map_fn(attrs.clone(), tracker));
177            }
178        }
179    }
180
181    /// Iterate through all attribute sets, populate `DataPoints` and reset.
182    /// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection.
183    pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
184    where
185        MapFn: FnMut(Vec<KeyValue>, A) -> Res,
186    {
187        prepare_data(dest, self.count.load(Ordering::SeqCst));
188        if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
189            dest.push(map_fn(
190                vec![],
191                self.no_attribute_tracker.clone_and_reset(&self.config),
192            ));
193        }
194
195        if let Ok(mut trackers_collect) = self.trackers_for_collect().write() {
196            if let Ok(mut trackers_current) = self.trackers.write() {
197                swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
198                self.count.store(0, Ordering::SeqCst);
199            } else {
200                otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
201                return;
202            }
203
204            let mut seen = HashSet::new();
205            for (attrs, tracker) in trackers_collect.drain() {
206                if seen.insert(Arc::as_ptr(&tracker)) {
207                    dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
208                }
209            }
210        } else {
211            otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
212        }
213    }
214}
215
216/// Clear and allocate exactly required amount of space for all attribute-sets
217fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
218    data.clear();
219    let total_len = list_len + 2; // to account for no_attributes case + overflow state
220    if total_len > data.capacity() {
221        data.reserve_exact(total_len - data.capacity());
222    }
223}
224
225fn sort_and_dedup(attributes: &[KeyValue]) -> Vec<KeyValue> {
226    // Use newly allocated vec here as incoming attributes are immutable so
227    // cannot sort/de-dup in-place. TODO: This allocation can be avoided by
228    // leveraging a ThreadLocal vec.
229    let mut sorted = attributes.to_vec();
230    sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
231    sorted.dedup_by(|a, b| a.key == b.key);
232    sorted
233}
234
235/// Marks a type that can have a value added and retrieved atomically. Required since
236/// different types have different backing atomic mechanisms
237pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
238    fn store(&self, _value: T);
239    fn add(&self, _value: T);
240    fn get_value(&self) -> T;
241    fn get_and_reset_value(&self) -> T;
242}
243
244/// Marks a type that can have an atomic tracker generated for it
245pub(crate) trait AtomicallyUpdate<T> {
246    type AtomicTracker: AtomicTracker<T>;
247    fn new_atomic_tracker(init: T) -> Self::AtomicTracker;
248}
249
250pub(crate) trait AggregatedMetricsAccess: Sized {
251    /// This function is used in tests.
252    #[allow(unused)]
253    fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<Self>>;
254    fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<Self>>;
255    fn make_aggregated_metrics(data: MetricData<Self>) -> AggregatedMetrics;
256}
257
258pub(crate) trait Number:
259    Add<Output = Self>
260    + AddAssign
261    + Sub<Output = Self>
262    + PartialOrd
263    + fmt::Debug
264    + Clone
265    + Copy
266    + PartialEq
267    + Default
268    + Send
269    + Sync
270    + 'static
271    + AtomicallyUpdate<Self>
272    + AggregatedMetricsAccess
273{
274    fn min() -> Self;
275    fn max() -> Self;
276
277    fn into_float(self) -> f64;
278}
279
280impl Number for i64 {
281    fn min() -> Self {
282        i64::MIN
283    }
284
285    fn max() -> Self {
286        i64::MAX
287    }
288
289    fn into_float(self) -> f64 {
290        // May have precision loss at high values
291        self as f64
292    }
293}
294impl Number for u64 {
295    fn min() -> Self {
296        u64::MIN
297    }
298
299    fn max() -> Self {
300        u64::MAX
301    }
302
303    fn into_float(self) -> f64 {
304        // May have precision loss at high values
305        self as f64
306    }
307}
308impl Number for f64 {
309    fn min() -> Self {
310        f64::MIN
311    }
312
313    fn max() -> Self {
314        f64::MAX
315    }
316
317    fn into_float(self) -> f64 {
318        self
319    }
320}
321
322impl AggregatedMetricsAccess for i64 {
323    fn make_aggregated_metrics(data: MetricData<i64>) -> AggregatedMetrics {
324        AggregatedMetrics::I64(data)
325    }
326
327    fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<i64>> {
328        if let AggregatedMetrics::I64(data) = data {
329            Some(data)
330        } else {
331            None
332        }
333    }
334
335    fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<i64>> {
336        if let AggregatedMetrics::I64(data) = data {
337            Some(data)
338        } else {
339            None
340        }
341    }
342}
343
344impl AggregatedMetricsAccess for u64 {
345    fn make_aggregated_metrics(data: MetricData<u64>) -> AggregatedMetrics {
346        AggregatedMetrics::U64(data)
347    }
348
349    fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<u64>> {
350        if let AggregatedMetrics::U64(data) = data {
351            Some(data)
352        } else {
353            None
354        }
355    }
356
357    fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<u64>> {
358        if let AggregatedMetrics::U64(data) = data {
359            Some(data)
360        } else {
361            None
362        }
363    }
364}
365
366impl AggregatedMetricsAccess for f64 {
367    fn make_aggregated_metrics(data: MetricData<f64>) -> AggregatedMetrics {
368        AggregatedMetrics::F64(data)
369    }
370
371    fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<f64>> {
372        if let AggregatedMetrics::F64(data) = data {
373            Some(data)
374        } else {
375            None
376        }
377    }
378
379    fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<f64>> {
380        if let AggregatedMetrics::F64(data) = data {
381            Some(data)
382        } else {
383            None
384        }
385    }
386}
387
388impl AtomicTracker<u64> for AtomicU64 {
389    fn store(&self, value: u64) {
390        self.store(value, Ordering::Relaxed);
391    }
392
393    fn add(&self, value: u64) {
394        self.fetch_add(value, Ordering::Relaxed);
395    }
396
397    fn get_value(&self) -> u64 {
398        self.load(Ordering::Relaxed)
399    }
400
401    fn get_and_reset_value(&self) -> u64 {
402        self.swap(0, Ordering::Relaxed)
403    }
404}
405
406impl AtomicallyUpdate<u64> for u64 {
407    type AtomicTracker = AtomicU64;
408
409    fn new_atomic_tracker(init: u64) -> Self::AtomicTracker {
410        AtomicU64::new(init)
411    }
412}
413
414impl AtomicTracker<i64> for AtomicI64 {
415    fn store(&self, value: i64) {
416        self.store(value, Ordering::Relaxed);
417    }
418
419    fn add(&self, value: i64) {
420        self.fetch_add(value, Ordering::Relaxed);
421    }
422
423    fn get_value(&self) -> i64 {
424        self.load(Ordering::Relaxed)
425    }
426
427    fn get_and_reset_value(&self) -> i64 {
428        self.swap(0, Ordering::Relaxed)
429    }
430}
431
432impl AtomicallyUpdate<i64> for i64 {
433    type AtomicTracker = AtomicI64;
434
435    fn new_atomic_tracker(init: i64) -> Self::AtomicTracker {
436        AtomicI64::new(init)
437    }
438}
439
440pub(crate) struct F64AtomicTracker {
441    inner: AtomicU64, // Floating points don't have true atomics, so we need to use the their binary representation to perform atomic operations
442}
443
444impl F64AtomicTracker {
445    fn new(init: f64) -> Self {
446        let value_as_u64 = init.to_bits();
447        F64AtomicTracker {
448            inner: AtomicU64::new(value_as_u64),
449        }
450    }
451}
452
453impl AtomicTracker<f64> for F64AtomicTracker {
454    fn store(&self, value: f64) {
455        let value_as_u64 = value.to_bits();
456        self.inner.store(value_as_u64, Ordering::Relaxed);
457    }
458
459    fn add(&self, value: f64) {
460        let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed);
461
462        loop {
463            let current_value = f64::from_bits(current_value_as_u64);
464            let new_value = current_value + value;
465            let new_value_as_u64 = new_value.to_bits();
466            match self.inner.compare_exchange(
467                current_value_as_u64,
468                new_value_as_u64,
469                Ordering::Relaxed,
470                Ordering::Relaxed,
471            ) {
472                // Succeeded in updating the value
473                Ok(_) => return,
474
475                // Some other thread changed the value before this thread could update it.
476                // Read the latest value again and try to swap it with the recomputed `new_value_as_u64`.
477                Err(v) => current_value_as_u64 = v,
478            }
479        }
480    }
481
482    fn get_value(&self) -> f64 {
483        let value_as_u64 = self.inner.load(Ordering::Relaxed);
484        f64::from_bits(value_as_u64)
485    }
486
487    fn get_and_reset_value(&self) -> f64 {
488        let zero_as_u64 = 0.0_f64.to_bits();
489        let value = self.inner.swap(zero_as_u64, Ordering::Relaxed);
490        f64::from_bits(value)
491    }
492}
493
494impl AtomicallyUpdate<f64> for f64 {
495    type AtomicTracker = F64AtomicTracker;
496
497    fn new_atomic_tracker(init: f64) -> Self::AtomicTracker {
498        F64AtomicTracker::new(init)
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[test]
507    fn can_store_u64_atomic_value() {
508        let atomic = u64::new_atomic_tracker(0);
509        let atomic_tracker = &atomic as &dyn AtomicTracker<u64>;
510
511        let value = atomic.get_value();
512        assert_eq!(value, 0);
513
514        atomic_tracker.store(25);
515        let value = atomic.get_value();
516        assert_eq!(value, 25);
517    }
518
519    #[test]
520    fn can_add_and_get_u64_atomic_value() {
521        let atomic = u64::new_atomic_tracker(0);
522        atomic.add(15);
523        atomic.add(10);
524
525        let value = atomic.get_value();
526        assert_eq!(value, 25);
527    }
528
529    #[test]
530    fn can_reset_u64_atomic_value() {
531        let atomic = u64::new_atomic_tracker(0);
532        atomic.add(15);
533
534        let value = atomic.get_and_reset_value();
535        let value2 = atomic.get_value();
536
537        assert_eq!(value, 15, "Incorrect first value");
538        assert_eq!(value2, 0, "Incorrect second value");
539    }
540
541    #[test]
542    fn can_store_i64_atomic_value() {
543        let atomic = i64::new_atomic_tracker(0);
544        let atomic_tracker = &atomic as &dyn AtomicTracker<i64>;
545
546        let value = atomic.get_value();
547        assert_eq!(value, 0);
548
549        atomic_tracker.store(-25);
550        let value = atomic.get_value();
551        assert_eq!(value, -25);
552
553        atomic_tracker.store(25);
554        let value = atomic.get_value();
555        assert_eq!(value, 25);
556    }
557
558    #[test]
559    fn can_add_and_get_i64_atomic_value() {
560        let atomic = i64::new_atomic_tracker(0);
561        atomic.add(15);
562        atomic.add(-10);
563
564        let value = atomic.get_value();
565        assert_eq!(value, 5);
566    }
567
568    #[test]
569    fn can_reset_i64_atomic_value() {
570        let atomic = i64::new_atomic_tracker(0);
571        atomic.add(15);
572
573        let value = atomic.get_and_reset_value();
574        let value2 = atomic.get_value();
575
576        assert_eq!(value, 15, "Incorrect first value");
577        assert_eq!(value2, 0, "Incorrect second value");
578    }
579
580    #[test]
581    fn can_store_f64_atomic_value() {
582        let atomic = f64::new_atomic_tracker(0.0);
583        let atomic_tracker = &atomic as &dyn AtomicTracker<f64>;
584
585        let value = atomic.get_value();
586        assert_eq!(value, 0.0);
587
588        atomic_tracker.store(-15.5);
589        let value = atomic.get_value();
590        assert!(f64::abs(-15.5 - value) < 0.0001);
591
592        atomic_tracker.store(25.7);
593        let value = atomic.get_value();
594        assert!(f64::abs(25.7 - value) < 0.0001);
595    }
596
597    #[test]
598    fn can_add_and_get_f64_atomic_value() {
599        let atomic = f64::new_atomic_tracker(0.0);
600        atomic.add(15.3);
601        atomic.add(10.4);
602
603        let value = atomic.get_value();
604
605        assert!(f64::abs(25.7 - value) < 0.0001);
606    }
607
608    #[test]
609    fn can_reset_f64_atomic_value() {
610        let atomic = f64::new_atomic_tracker(0.0);
611        atomic.add(15.5);
612
613        let value = atomic.get_and_reset_value();
614        let value2 = atomic.get_value();
615
616        assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
617        assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
618    }
619}