opentelemetry_sdk/metrics/internal/
mod.rs1mod aggregate;
2mod exponential_histogram;
3mod histogram;
4mod last_value;
5mod precomputed_sum;
6mod sum;
7
8use core::fmt;
9use std::collections::{HashMap, HashSet};
10use std::mem::swap;
11use std::ops::{Add, AddAssign, DerefMut, Sub};
12use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, OnceLock, RwLock};
14
15pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
16pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
17use opentelemetry::{otel_warn, KeyValue};
18
19use super::data::{AggregatedMetrics, MetricData};
20
21pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: OnceLock<Vec<KeyValue>> = OnceLock::new();
23
24#[inline]
25fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
26 STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", true)])
27}
28
29pub(crate) trait Aggregator {
30 type InitConfig;
33
34 type PreComputedValue;
38
39 fn create(init: &Self::InitConfig) -> Self;
41
42 fn update(&self, value: Self::PreComputedValue);
44
45 fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
47}
48
49pub(crate) struct ValueMap<A>
54where
55 A: Aggregator,
56{
57 trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
59
60 trackers_for_collect: OnceLock<RwLock<HashMap<Vec<KeyValue>, Arc<A>>>>,
64
65 count: AtomicUsize,
67 has_no_attribute_value: AtomicBool,
69 no_attribute_tracker: A,
71 config: A::InitConfig,
73 cardinality_limit: usize,
74}
75
76impl<A> ValueMap<A>
77where
78 A: Aggregator,
79{
80 fn new(config: A::InitConfig, cardinality_limit: usize) -> Self {
81 ValueMap {
82 trackers: RwLock::new(HashMap::with_capacity(1 + cardinality_limit)),
83 trackers_for_collect: OnceLock::new(),
84 has_no_attribute_value: AtomicBool::new(false),
85 no_attribute_tracker: A::create(&config),
86 count: AtomicUsize::new(0),
87 config,
88 cardinality_limit,
89 }
90 }
91
92 #[inline]
93 fn trackers_for_collect(&self) -> &RwLock<HashMap<Vec<KeyValue>, Arc<A>>> {
94 self.trackers_for_collect
95 .get_or_init(|| RwLock::new(HashMap::with_capacity(1 + self.cardinality_limit)))
96 }
97
98 fn is_under_cardinality_limit(&self) -> bool {
100 self.count.load(Ordering::SeqCst) < self.cardinality_limit
101 }
102
103 fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
104 if attributes.is_empty() {
105 self.no_attribute_tracker.update(value);
106 self.has_no_attribute_value.store(true, Ordering::Release);
107 return;
108 }
109
110 let Ok(trackers) = self.trackers.read() else {
111 return;
112 };
113
114 if let Some(tracker) = trackers.get(attributes) {
116 tracker.update(value);
117 return;
118 }
119
120 let sorted_attrs = sort_and_dedup(attributes);
122 if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
123 tracker.update(value);
124 return;
125 }
126
127 drop(trackers);
129
130 let Ok(mut trackers) = self.trackers.write() else {
131 return;
132 };
133
134 if let Some(tracker) = trackers.get(attributes) {
137 tracker.update(value);
138 } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
139 tracker.update(value);
140 } else if self.is_under_cardinality_limit() {
141 let new_tracker = Arc::new(A::create(&self.config));
142 new_tracker.update(value);
143
144 trackers.insert(attributes.to_vec(), new_tracker.clone());
146 trackers.insert(sorted_attrs, new_tracker);
147
148 self.count.fetch_add(1, Ordering::SeqCst);
149 } else if let Some(overflow_value) = trackers.get(stream_overflow_attributes().as_slice()) {
150 overflow_value.update(value);
151 } else {
152 let new_tracker = A::create(&self.config);
153 new_tracker.update(value);
154 trackers.insert(stream_overflow_attributes().clone(), Arc::new(new_tracker));
155 }
156 }
157
158 pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
161 where
162 MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
163 {
164 prepare_data(dest, self.count.load(Ordering::SeqCst));
165 if self.has_no_attribute_value.load(Ordering::Acquire) {
166 dest.push(map_fn(vec![], &self.no_attribute_tracker));
167 }
168
169 let Ok(trackers) = self.trackers.read() else {
170 return;
171 };
172
173 let mut seen = HashSet::new();
174 for (attrs, tracker) in trackers.iter() {
175 if seen.insert(Arc::as_ptr(tracker)) {
176 dest.push(map_fn(attrs.clone(), tracker));
177 }
178 }
179 }
180
181 pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
184 where
185 MapFn: FnMut(Vec<KeyValue>, A) -> Res,
186 {
187 prepare_data(dest, self.count.load(Ordering::SeqCst));
188 if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
189 dest.push(map_fn(
190 vec![],
191 self.no_attribute_tracker.clone_and_reset(&self.config),
192 ));
193 }
194
195 if let Ok(mut trackers_collect) = self.trackers_for_collect().write() {
196 if let Ok(mut trackers_current) = self.trackers.write() {
197 swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
198 self.count.store(0, Ordering::SeqCst);
199 } else {
200 otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
201 return;
202 }
203
204 let mut seen = HashSet::new();
205 for (attrs, tracker) in trackers_collect.drain() {
206 if seen.insert(Arc::as_ptr(&tracker)) {
207 dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
208 }
209 }
210 } else {
211 otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
212 }
213 }
214}
215
216fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
218 data.clear();
219 let total_len = list_len + 2; if total_len > data.capacity() {
221 data.reserve_exact(total_len - data.capacity());
222 }
223}
224
225fn sort_and_dedup(attributes: &[KeyValue]) -> Vec<KeyValue> {
226 let mut sorted = attributes.to_vec();
230 sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
231 sorted.dedup_by(|a, b| a.key == b.key);
232 sorted
233}
234
235pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
238 fn store(&self, _value: T);
239 fn add(&self, _value: T);
240 fn get_value(&self) -> T;
241 fn get_and_reset_value(&self) -> T;
242}
243
244pub(crate) trait AtomicallyUpdate<T> {
246 type AtomicTracker: AtomicTracker<T>;
247 fn new_atomic_tracker(init: T) -> Self::AtomicTracker;
248}
249
250pub(crate) trait AggregatedMetricsAccess: Sized {
251 #[allow(unused)]
253 fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<Self>>;
254 fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<Self>>;
255 fn make_aggregated_metrics(data: MetricData<Self>) -> AggregatedMetrics;
256}
257
258pub(crate) trait Number:
259 Add<Output = Self>
260 + AddAssign
261 + Sub<Output = Self>
262 + PartialOrd
263 + fmt::Debug
264 + Clone
265 + Copy
266 + PartialEq
267 + Default
268 + Send
269 + Sync
270 + 'static
271 + AtomicallyUpdate<Self>
272 + AggregatedMetricsAccess
273{
274 fn min() -> Self;
275 fn max() -> Self;
276
277 fn into_float(self) -> f64;
278}
279
280impl Number for i64 {
281 fn min() -> Self {
282 i64::MIN
283 }
284
285 fn max() -> Self {
286 i64::MAX
287 }
288
289 fn into_float(self) -> f64 {
290 self as f64
292 }
293}
294impl Number for u64 {
295 fn min() -> Self {
296 u64::MIN
297 }
298
299 fn max() -> Self {
300 u64::MAX
301 }
302
303 fn into_float(self) -> f64 {
304 self as f64
306 }
307}
308impl Number for f64 {
309 fn min() -> Self {
310 f64::MIN
311 }
312
313 fn max() -> Self {
314 f64::MAX
315 }
316
317 fn into_float(self) -> f64 {
318 self
319 }
320}
321
322impl AggregatedMetricsAccess for i64 {
323 fn make_aggregated_metrics(data: MetricData<i64>) -> AggregatedMetrics {
324 AggregatedMetrics::I64(data)
325 }
326
327 fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<i64>> {
328 if let AggregatedMetrics::I64(data) = data {
329 Some(data)
330 } else {
331 None
332 }
333 }
334
335 fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<i64>> {
336 if let AggregatedMetrics::I64(data) = data {
337 Some(data)
338 } else {
339 None
340 }
341 }
342}
343
344impl AggregatedMetricsAccess for u64 {
345 fn make_aggregated_metrics(data: MetricData<u64>) -> AggregatedMetrics {
346 AggregatedMetrics::U64(data)
347 }
348
349 fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<u64>> {
350 if let AggregatedMetrics::U64(data) = data {
351 Some(data)
352 } else {
353 None
354 }
355 }
356
357 fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<u64>> {
358 if let AggregatedMetrics::U64(data) = data {
359 Some(data)
360 } else {
361 None
362 }
363 }
364}
365
366impl AggregatedMetricsAccess for f64 {
367 fn make_aggregated_metrics(data: MetricData<f64>) -> AggregatedMetrics {
368 AggregatedMetrics::F64(data)
369 }
370
371 fn extract_metrics_data_ref(data: &AggregatedMetrics) -> Option<&MetricData<f64>> {
372 if let AggregatedMetrics::F64(data) = data {
373 Some(data)
374 } else {
375 None
376 }
377 }
378
379 fn extract_metrics_data_mut(data: &mut AggregatedMetrics) -> Option<&mut MetricData<f64>> {
380 if let AggregatedMetrics::F64(data) = data {
381 Some(data)
382 } else {
383 None
384 }
385 }
386}
387
388impl AtomicTracker<u64> for AtomicU64 {
389 fn store(&self, value: u64) {
390 self.store(value, Ordering::Relaxed);
391 }
392
393 fn add(&self, value: u64) {
394 self.fetch_add(value, Ordering::Relaxed);
395 }
396
397 fn get_value(&self) -> u64 {
398 self.load(Ordering::Relaxed)
399 }
400
401 fn get_and_reset_value(&self) -> u64 {
402 self.swap(0, Ordering::Relaxed)
403 }
404}
405
406impl AtomicallyUpdate<u64> for u64 {
407 type AtomicTracker = AtomicU64;
408
409 fn new_atomic_tracker(init: u64) -> Self::AtomicTracker {
410 AtomicU64::new(init)
411 }
412}
413
414impl AtomicTracker<i64> for AtomicI64 {
415 fn store(&self, value: i64) {
416 self.store(value, Ordering::Relaxed);
417 }
418
419 fn add(&self, value: i64) {
420 self.fetch_add(value, Ordering::Relaxed);
421 }
422
423 fn get_value(&self) -> i64 {
424 self.load(Ordering::Relaxed)
425 }
426
427 fn get_and_reset_value(&self) -> i64 {
428 self.swap(0, Ordering::Relaxed)
429 }
430}
431
432impl AtomicallyUpdate<i64> for i64 {
433 type AtomicTracker = AtomicI64;
434
435 fn new_atomic_tracker(init: i64) -> Self::AtomicTracker {
436 AtomicI64::new(init)
437 }
438}
439
440pub(crate) struct F64AtomicTracker {
441 inner: AtomicU64, }
443
444impl F64AtomicTracker {
445 fn new(init: f64) -> Self {
446 let value_as_u64 = init.to_bits();
447 F64AtomicTracker {
448 inner: AtomicU64::new(value_as_u64),
449 }
450 }
451}
452
453impl AtomicTracker<f64> for F64AtomicTracker {
454 fn store(&self, value: f64) {
455 let value_as_u64 = value.to_bits();
456 self.inner.store(value_as_u64, Ordering::Relaxed);
457 }
458
459 fn add(&self, value: f64) {
460 let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed);
461
462 loop {
463 let current_value = f64::from_bits(current_value_as_u64);
464 let new_value = current_value + value;
465 let new_value_as_u64 = new_value.to_bits();
466 match self.inner.compare_exchange(
467 current_value_as_u64,
468 new_value_as_u64,
469 Ordering::Relaxed,
470 Ordering::Relaxed,
471 ) {
472 Ok(_) => return,
474
475 Err(v) => current_value_as_u64 = v,
478 }
479 }
480 }
481
482 fn get_value(&self) -> f64 {
483 let value_as_u64 = self.inner.load(Ordering::Relaxed);
484 f64::from_bits(value_as_u64)
485 }
486
487 fn get_and_reset_value(&self) -> f64 {
488 let zero_as_u64 = 0.0_f64.to_bits();
489 let value = self.inner.swap(zero_as_u64, Ordering::Relaxed);
490 f64::from_bits(value)
491 }
492}
493
494impl AtomicallyUpdate<f64> for f64 {
495 type AtomicTracker = F64AtomicTracker;
496
497 fn new_atomic_tracker(init: f64) -> Self::AtomicTracker {
498 F64AtomicTracker::new(init)
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn can_store_u64_atomic_value() {
508 let atomic = u64::new_atomic_tracker(0);
509 let atomic_tracker = &atomic as &dyn AtomicTracker<u64>;
510
511 let value = atomic.get_value();
512 assert_eq!(value, 0);
513
514 atomic_tracker.store(25);
515 let value = atomic.get_value();
516 assert_eq!(value, 25);
517 }
518
519 #[test]
520 fn can_add_and_get_u64_atomic_value() {
521 let atomic = u64::new_atomic_tracker(0);
522 atomic.add(15);
523 atomic.add(10);
524
525 let value = atomic.get_value();
526 assert_eq!(value, 25);
527 }
528
529 #[test]
530 fn can_reset_u64_atomic_value() {
531 let atomic = u64::new_atomic_tracker(0);
532 atomic.add(15);
533
534 let value = atomic.get_and_reset_value();
535 let value2 = atomic.get_value();
536
537 assert_eq!(value, 15, "Incorrect first value");
538 assert_eq!(value2, 0, "Incorrect second value");
539 }
540
541 #[test]
542 fn can_store_i64_atomic_value() {
543 let atomic = i64::new_atomic_tracker(0);
544 let atomic_tracker = &atomic as &dyn AtomicTracker<i64>;
545
546 let value = atomic.get_value();
547 assert_eq!(value, 0);
548
549 atomic_tracker.store(-25);
550 let value = atomic.get_value();
551 assert_eq!(value, -25);
552
553 atomic_tracker.store(25);
554 let value = atomic.get_value();
555 assert_eq!(value, 25);
556 }
557
558 #[test]
559 fn can_add_and_get_i64_atomic_value() {
560 let atomic = i64::new_atomic_tracker(0);
561 atomic.add(15);
562 atomic.add(-10);
563
564 let value = atomic.get_value();
565 assert_eq!(value, 5);
566 }
567
568 #[test]
569 fn can_reset_i64_atomic_value() {
570 let atomic = i64::new_atomic_tracker(0);
571 atomic.add(15);
572
573 let value = atomic.get_and_reset_value();
574 let value2 = atomic.get_value();
575
576 assert_eq!(value, 15, "Incorrect first value");
577 assert_eq!(value2, 0, "Incorrect second value");
578 }
579
580 #[test]
581 fn can_store_f64_atomic_value() {
582 let atomic = f64::new_atomic_tracker(0.0);
583 let atomic_tracker = &atomic as &dyn AtomicTracker<f64>;
584
585 let value = atomic.get_value();
586 assert_eq!(value, 0.0);
587
588 atomic_tracker.store(-15.5);
589 let value = atomic.get_value();
590 assert!(f64::abs(-15.5 - value) < 0.0001);
591
592 atomic_tracker.store(25.7);
593 let value = atomic.get_value();
594 assert!(f64::abs(25.7 - value) < 0.0001);
595 }
596
597 #[test]
598 fn can_add_and_get_f64_atomic_value() {
599 let atomic = f64::new_atomic_tracker(0.0);
600 atomic.add(15.3);
601 atomic.add(10.4);
602
603 let value = atomic.get_value();
604
605 assert!(f64::abs(25.7 - value) < 0.0001);
606 }
607
608 #[test]
609 fn can_reset_f64_atomic_value() {
610 let atomic = f64::new_atomic_tracker(0.0);
611 atomic.add(15.5);
612
613 let value = atomic.get_and_reset_value();
614 let value2 = atomic.get_value();
615
616 assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
617 assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
618 }
619}