opentelemetry_sdk/metrics/
instrument.rs

1use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};
2
3use opentelemetry::{
4    metrics::{AsyncInstrument, SyncInstrument},
5    InstrumentationScope, Key, KeyValue,
6};
7
8use crate::metrics::{aggregation::Aggregation, internal::Measure};
9
10use super::meter::{
11    INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
12    INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
13};
14
15use super::Temporality;
16
17/// The identifier of a group of instruments that all perform the same function.
18#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
19pub enum InstrumentKind {
20    /// Identifies a group of instruments that record increasing values synchronously
21    /// with the code path they are measuring.
22    Counter,
23    /// A group of instruments that record increasing and decreasing values
24    /// synchronously with the code path they are measuring.
25    UpDownCounter,
26    /// A group of instruments that record a distribution of values synchronously with
27    /// the code path they are measuring.
28    Histogram,
29    /// A group of instruments that record increasing values in an asynchronous
30    /// callback.
31    ObservableCounter,
32    /// A group of instruments that record increasing and decreasing values in an
33    /// asynchronous callback.
34    ObservableUpDownCounter,
35
36    /// a group of instruments that record current value synchronously with
37    /// the code path they are measuring.
38    Gauge,
39    ///
40    /// a group of instruments that record current values in an asynchronous callback.
41    ObservableGauge,
42}
43
44impl InstrumentKind {
45    /// Select the [Temporality] preference based on [InstrumentKind]
46    ///
47    /// [exporter-docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/a1c13d59bb7d0fb086df2b3e1eaec9df9efef6cc/specification/metrics/sdk_exporters/otlp.md#additional-configuration
48    pub(crate) fn temporality_preference(&self, temporality: Temporality) -> Temporality {
49        match temporality {
50            Temporality::Cumulative => Temporality::Cumulative,
51            Temporality::Delta => match self {
52                Self::Counter
53                | Self::Histogram
54                | Self::ObservableCounter
55                | Self::Gauge
56                | Self::ObservableGauge => Temporality::Delta,
57                Self::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
58                    Temporality::Cumulative
59                }
60            },
61            Temporality::LowMemory => match self {
62                Self::Counter | InstrumentKind::Histogram => Temporality::Delta,
63                Self::ObservableCounter
64                | Self::Gauge
65                | Self::ObservableGauge
66                | Self::UpDownCounter
67                | Self::ObservableUpDownCounter => Temporality::Cumulative,
68            },
69        }
70    }
71}
72/// Describes the properties of an instrument at creation, used for filtering in
73/// views. This is utilized in the `with_view` methods on `MeterProviderBuilder`
74/// to customize metric output.
75///
76/// Users can use a reference to `Instrument` to select which instrument(s) a
77/// [Stream] should be applied to.
78///
79/// # Example
80///
81/// ```rust
82/// use opentelemetry_sdk::metrics::{Instrument, Stream};
83///
84/// let my_view_change_cardinality = |i: &Instrument| {
85///     if i.name() == "my_second_histogram" {
86///         // Note: If Stream is invalid, `build()` will return an error. By
87///         // calling `.ok()`, any such error is ignored and treated as if the
88///         // view does not match the instrument. If this is not the desired
89///         // behavior, consider handling the error explicitly.
90///         Stream::builder().with_cardinality_limit(2).build().ok()
91///     } else {
92///         None
93///     }
94/// };
95/// ```
96#[derive(Clone, Debug, PartialEq)]
97pub struct Instrument {
98    /// The human-readable identifier of the instrument.
99    pub(crate) name: Cow<'static, str>,
100    /// describes the purpose of the instrument.
101    pub(crate) description: Cow<'static, str>,
102    /// The functional group of the instrument.
103    pub(crate) kind: InstrumentKind,
104    /// Unit is the unit of measurement recorded by the instrument.
105    pub(crate) unit: Cow<'static, str>,
106    /// The instrumentation that created the instrument.
107    pub(crate) scope: InstrumentationScope,
108}
109
110impl Instrument {
111    /// Instrument name.
112    pub fn name(&self) -> &str {
113        self.name.as_ref()
114    }
115
116    /// Instrument kind.
117    pub fn kind(&self) -> InstrumentKind {
118        self.kind
119    }
120
121    /// Instrument unit.
122    pub fn unit(&self) -> &str {
123        self.unit.as_ref()
124    }
125
126    /// Instrument scope.
127    pub fn scope(&self) -> &InstrumentationScope {
128        &self.scope
129    }
130}
131
132/// A builder for creating Stream objects.
133///
134/// # Example
135///
136/// ```
137/// use opentelemetry_sdk::metrics::{Aggregation, Stream};
138/// use opentelemetry::Key;
139///
140/// let stream = Stream::builder()
141///     .with_name("my_stream")
142///     .with_aggregation(Aggregation::Sum)
143///     .with_cardinality_limit(100)
144///     .build()
145///     .unwrap();
146/// ```
147#[derive(Default, Debug)]
148pub struct StreamBuilder {
149    name: Option<Cow<'static, str>>,
150    description: Option<Cow<'static, str>>,
151    unit: Option<Cow<'static, str>>,
152    aggregation: Option<Aggregation>,
153    allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
154    cardinality_limit: Option<usize>,
155}
156
157impl StreamBuilder {
158    /// Create a new stream builder with default values.
159    pub(crate) fn new() -> Self {
160        StreamBuilder::default()
161    }
162
163    /// Set the stream name. If this is not set, name provide while creating the instrument will be used.
164    pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
165        self.name = Some(name.into());
166        self
167    }
168
169    /// Set the stream description. If this is not set, description provided while creating the instrument will be used.
170    pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
171        self.description = Some(description.into());
172        self
173    }
174
175    /// Set the stream unit. If this is not set, unit provided while creating the instrument will be used.
176    pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
177        self.unit = Some(unit.into());
178        self
179    }
180
181    #[cfg(feature = "spec_unstable_metrics_views")]
182    /// Set the stream aggregation. This is used to customize the aggregation.
183    /// If not set, the default aggregation based on the instrument kind will be used.
184    pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
185        self.aggregation = Some(aggregation);
186        self
187    }
188
189    #[cfg(feature = "spec_unstable_metrics_views")]
190    /// Set the stream allowed attribute keys.
191    ///
192    /// Any attribute recorded for the stream with a key not in this set will be
193    /// dropped. If the set is empty, all attributes will be dropped.
194    /// If this method is not used, all attributes will be kept.
195    pub fn with_allowed_attribute_keys(
196        mut self,
197        attribute_keys: impl IntoIterator<Item = Key>,
198    ) -> Self {
199        self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
200        self
201    }
202
203    /// Set the stream cardinality limit. If this is not set, the default limit of 2000 will be used.
204    pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
205        self.cardinality_limit = Some(limit);
206        self
207    }
208
209    /// Build a new Stream instance using the configuration in this builder.
210    ///
211    /// # Returns
212    ///
213    /// A Result containing the new Stream instance or an error if the build failed.
214    pub fn build(self) -> Result<Stream, Box<dyn Error>> {
215        // TODO: Avoid copying the validation logic from meter.rs,
216        // and instead move it to a common place and do it once.
217        // It is a bug that validations are done in meter.rs
218        // as it'll not allow users to fix instrumentation mistakes
219        // using views.
220
221        // Validate name if provided
222        if let Some(name) = &self.name {
223            if name.is_empty() {
224                return Err(INSTRUMENT_NAME_EMPTY.into());
225            }
226
227            if name.len() > super::meter::INSTRUMENT_NAME_MAX_LENGTH {
228                return Err(INSTRUMENT_NAME_LENGTH.into());
229            }
230
231            if name.starts_with(|c: char| !c.is_ascii_alphabetic()) {
232                return Err(INSTRUMENT_NAME_FIRST_ALPHABETIC.into());
233            }
234
235            if name.contains(|c: char| {
236                !c.is_ascii_alphanumeric()
237                    && !super::meter::INSTRUMENT_NAME_ALLOWED_NON_ALPHANUMERIC_CHARS.contains(&c)
238            }) {
239                return Err(INSTRUMENT_NAME_INVALID_CHAR.into());
240            }
241        }
242
243        // Validate unit if provided
244        if let Some(unit) = &self.unit {
245            if unit.len() > super::meter::INSTRUMENT_UNIT_NAME_MAX_LENGTH {
246                return Err(INSTRUMENT_UNIT_LENGTH.into());
247            }
248
249            if unit.contains(|c: char| !c.is_ascii()) {
250                return Err(INSTRUMENT_UNIT_INVALID_CHAR.into());
251            }
252        }
253
254        // Validate cardinality limit
255        if let Some(limit) = self.cardinality_limit {
256            if limit == 0 {
257                return Err("Cardinality limit must be greater than 0".into());
258            }
259        }
260
261        // Validate bucket boundaries if using ExplicitBucketHistogram
262        if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
263            validate_bucket_boundaries(boundaries)?;
264        }
265
266        Ok(Stream {
267            name: self.name,
268            description: self.description,
269            unit: self.unit,
270            aggregation: self.aggregation,
271            allowed_attribute_keys: self.allowed_attribute_keys,
272            cardinality_limit: self.cardinality_limit,
273        })
274    }
275}
276
277fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
278    // Validate boundaries do not contain f64::NAN, f64::INFINITY, or f64::NEG_INFINITY
279    for boundary in boundaries {
280        if boundary.is_nan() || boundary.is_infinite() {
281            return Err(
282                "Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
283            );
284        }
285    }
286
287    // validate that buckets are sorted and non-duplicate
288    for i in 1..boundaries.len() {
289        if boundaries[i] <= boundaries[i - 1] {
290            return Err(
291                "Bucket boundaries must be sorted and not contain any duplicates".to_string(),
292            );
293        }
294    }
295
296    Ok(())
297}
298
299/// Describes the stream of data an instrument produces. Used in `with_view`
300/// methods on `MeterProviderBuilder` to customize the metric output.
301#[derive(Default, Debug)]
302pub struct Stream {
303    /// The human-readable identifier of the stream.
304    pub(crate) name: Option<Cow<'static, str>>,
305    /// Describes the purpose of the data.
306    pub(crate) description: Option<Cow<'static, str>>,
307    /// the unit of measurement recorded.
308    pub(crate) unit: Option<Cow<'static, str>>,
309    /// Aggregation the stream uses for an instrument.
310    pub(crate) aggregation: Option<Aggregation>,
311    /// An allow-list of attribute keys that will be preserved for the stream.
312    ///
313    /// Any attribute recorded for the stream with a key not in this set will be
314    /// dropped. If the set is empty, all attributes will be dropped, if `None` all
315    /// attributes will be kept.
316    pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
317
318    /// Cardinality limit for the stream.
319    pub(crate) cardinality_limit: Option<usize>,
320}
321
322impl Stream {
323    /// Create a new stream builder with default values.
324    pub fn builder() -> StreamBuilder {
325        StreamBuilder::new()
326    }
327}
328
329/// The identifying properties of an instrument.
330#[derive(Debug, PartialEq, Eq, Hash)]
331pub(crate) struct InstrumentId {
332    /// The human-readable identifier of the instrument.
333    pub(crate) name: Cow<'static, str>,
334    /// Describes the purpose of the data.
335    pub(crate) description: Cow<'static, str>,
336    /// Defines the functional group of the instrument.
337    pub(crate) kind: InstrumentKind,
338    /// The unit of measurement recorded.
339    pub(crate) unit: Cow<'static, str>,
340    /// Number is the underlying data type of the instrument.
341    pub(crate) number: Cow<'static, str>,
342}
343
344impl InstrumentId {
345    /// Instrument names are considered case-insensitive ASCII.
346    ///
347    /// Standardize the instrument name to always be lowercase so it can be compared
348    /// via hash.
349    ///
350    /// See [naming syntax] for full requirements.
351    ///
352    /// [naming syntax]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/api.md#instrument-name-syntax
353    pub(crate) fn normalize(&mut self) {
354        if self.name.chars().any(|c| c.is_ascii_uppercase()) {
355            self.name = self.name.to_ascii_lowercase().into();
356        }
357    }
358}
359
360pub(crate) struct ResolvedMeasures<T> {
361    pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
362}
363
364impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
365    fn measure(&self, val: T, attrs: &[KeyValue]) {
366        for measure in &self.measures {
367            measure.call(val, attrs)
368        }
369    }
370}
371
372#[derive(Clone)]
373pub(crate) struct Observable<T> {
374    measures: Vec<Arc<dyn Measure<T>>>,
375}
376
377impl<T> Observable<T> {
378    pub(crate) fn new(measures: Vec<Arc<dyn Measure<T>>>) -> Self {
379        Self { measures }
380    }
381}
382
383impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
384    fn observe(&self, measurement: T, attrs: &[KeyValue]) {
385        for measure in &self.measures {
386            measure.call(measurement, attrs)
387        }
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::StreamBuilder;
394    use crate::metrics::meter::{
395        INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
396        INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
397    };
398
399    #[test]
400    fn stream_name_validation() {
401        // (name, expected error)
402        let stream_name_test_cases = vec![
403            ("validateName", ""),
404            ("_startWithNoneAlphabet", INSTRUMENT_NAME_FIRST_ALPHABETIC),
405            ("utf8char锈", INSTRUMENT_NAME_INVALID_CHAR),
406            ("a".repeat(255).leak(), ""),
407            ("a".repeat(256).leak(), INSTRUMENT_NAME_LENGTH),
408            ("invalid name", INSTRUMENT_NAME_INVALID_CHAR),
409            ("allow/slash", ""),
410            ("allow_under_score", ""),
411            ("allow.dots.ok", ""),
412            ("", INSTRUMENT_NAME_EMPTY),
413            ("\\allow\\slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
414            ("\\allow\\$$slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
415            ("Total $ Count", INSTRUMENT_NAME_INVALID_CHAR),
416            (
417                "\\test\\UsagePercent(Total) > 80%",
418                INSTRUMENT_NAME_FIRST_ALPHABETIC,
419            ),
420            ("/not / allowed", INSTRUMENT_NAME_FIRST_ALPHABETIC),
421        ];
422
423        for (name, expected_error) in stream_name_test_cases {
424            let builder = StreamBuilder::new().with_name(name);
425            let result = builder.build();
426
427            if expected_error.is_empty() {
428                assert!(
429                    result.is_ok(),
430                    "Expected successful build for name '{}', but got error: {:?}",
431                    name,
432                    result.err()
433                );
434            } else {
435                let err = result.err().unwrap();
436                let err_str = err.to_string();
437                assert!(
438                    err_str == expected_error,
439                    "For name '{name}', expected error '{expected_error}', but got '{err_str}'"
440                );
441            }
442        }
443    }
444
445    #[test]
446    fn stream_unit_validation() {
447        // (unit, expected error)
448        let stream_unit_test_cases = vec![
449            (
450                "0123456789012345678901234567890123456789012345678901234567890123",
451                INSTRUMENT_UNIT_LENGTH,
452            ),
453            ("utf8char锈", INSTRUMENT_UNIT_INVALID_CHAR),
454            ("kb", ""),
455            ("Kb/sec", ""),
456            ("%", ""),
457            ("", ""),
458        ];
459
460        for (unit, expected_error) in stream_unit_test_cases {
461            // Use a valid name to isolate unit validation
462            let builder = StreamBuilder::new().with_name("valid_name").with_unit(unit);
463
464            let result = builder.build();
465
466            if expected_error.is_empty() {
467                assert!(
468                    result.is_ok(),
469                    "Expected successful build for unit '{}', but got error: {:?}",
470                    unit,
471                    result.err()
472                );
473            } else {
474                let err = result.err().unwrap();
475                let err_str = err.to_string();
476                assert!(
477                    err_str == expected_error,
478                    "For unit '{unit}', expected error '{expected_error}', but got '{err_str}'"
479                );
480            }
481        }
482    }
483
484    #[test]
485    fn stream_cardinality_limit_validation() {
486        // Test zero cardinality limit (invalid)
487        let builder = StreamBuilder::new()
488            .with_name("valid_name")
489            .with_cardinality_limit(0);
490
491        let result = builder.build();
492        assert!(result.is_err(), "Expected error for zero cardinality limit");
493        assert_eq!(
494            result.err().unwrap().to_string(),
495            "Cardinality limit must be greater than 0",
496            "Expected cardinality limit validation error message"
497        );
498
499        // Test valid cardinality limits
500        let valid_limits = vec![1, 10, 100, 1000];
501        for limit in valid_limits {
502            let builder = StreamBuilder::new()
503                .with_name("valid_name")
504                .with_cardinality_limit(limit);
505
506            let result = builder.build();
507            assert!(
508                result.is_ok(),
509                "Expected successful build for cardinality limit {}, but got error: {:?}",
510                limit,
511                result.err()
512            );
513        }
514    }
515
516    #[test]
517    fn stream_valid_build() {
518        // Test with valid configuration
519        let stream = StreamBuilder::new()
520            .with_name("valid_name")
521            .with_description("Valid description")
522            .with_unit("ms")
523            .with_cardinality_limit(100)
524            .build();
525
526        assert!(
527            stream.is_ok(),
528            "Expected valid Stream to be built successfully"
529        );
530    }
531
532    #[cfg(feature = "spec_unstable_metrics_views")]
533    #[test]
534    fn stream_histogram_bucket_validation() {
535        use super::Aggregation;
536
537        // Test with valid bucket boundaries
538        let valid_boundaries = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0];
539        let builder = StreamBuilder::new()
540            .with_name("valid_histogram")
541            .with_aggregation(Aggregation::ExplicitBucketHistogram {
542                boundaries: valid_boundaries.clone(),
543                record_min_max: true,
544            });
545
546        let result = builder.build();
547        assert!(
548            result.is_ok(),
549            "Expected successful build with valid bucket boundaries"
550        );
551
552        // Test with invalid bucket boundaries (NaN and Infinity)
553
554        // Test with NaN
555        let invalid_nan_boundaries = vec![1.0, 2.0, f64::NAN, 10.0];
556
557        let builder = StreamBuilder::new()
558            .with_name("invalid_histogram_nan")
559            .with_aggregation(Aggregation::ExplicitBucketHistogram {
560                boundaries: invalid_nan_boundaries,
561                record_min_max: true,
562            });
563
564        let result = builder.build();
565        assert!(
566            result.is_err(),
567            "Expected error for NaN in bucket boundaries"
568        );
569        assert_eq!(
570            result.err().unwrap().to_string(),
571            "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
572            "Expected correct validation error for NaN"
573        );
574
575        // Test with infinity
576        let invalid_inf_boundaries = vec![1.0, 5.0, f64::INFINITY, 100.0];
577
578        let builder = StreamBuilder::new()
579            .with_name("invalid_histogram_inf")
580            .with_aggregation(Aggregation::ExplicitBucketHistogram {
581                boundaries: invalid_inf_boundaries,
582                record_min_max: true,
583            });
584
585        let result = builder.build();
586        assert!(
587            result.is_err(),
588            "Expected error for Infinity in bucket boundaries"
589        );
590        assert_eq!(
591            result.err().unwrap().to_string(),
592            "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
593            "Expected correct validation error for Infinity"
594        );
595
596        // Test with negative infinity
597        let invalid_neg_inf_boundaries = vec![f64::NEG_INFINITY, 5.0, 10.0, 100.0];
598
599        let builder = StreamBuilder::new()
600            .with_name("invalid_histogram_neg_inf")
601            .with_aggregation(Aggregation::ExplicitBucketHistogram {
602                boundaries: invalid_neg_inf_boundaries,
603                record_min_max: true,
604            });
605
606        let result = builder.build();
607        assert!(
608            result.is_err(),
609            "Expected error for negative Infinity in bucket boundaries"
610        );
611        assert_eq!(
612            result.err().unwrap().to_string(),
613            "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
614            "Expected correct validation error for negative Infinity"
615        );
616
617        // Test with unsorted bucket boundaries
618        let unsorted_boundaries = vec![1.0, 5.0, 2.0, 10.0]; // 2.0 comes after 5.0, which is incorrect
619
620        let builder = StreamBuilder::new()
621            .with_name("unsorted_histogram")
622            .with_aggregation(Aggregation::ExplicitBucketHistogram {
623                boundaries: unsorted_boundaries,
624                record_min_max: true,
625            });
626
627        let result = builder.build();
628        assert!(
629            result.is_err(),
630            "Expected error for unsorted bucket boundaries"
631        );
632        assert_eq!(
633            result.err().unwrap().to_string(),
634            "Bucket boundaries must be sorted and not contain any duplicates",
635            "Expected correct validation error for unsorted boundaries"
636        );
637
638        // Test with duplicate bucket boundaries
639        let duplicate_boundaries = vec![1.0, 2.0, 5.0, 5.0, 10.0]; // 5.0 appears twice
640
641        let builder = StreamBuilder::new()
642            .with_name("duplicate_histogram")
643            .with_aggregation(Aggregation::ExplicitBucketHistogram {
644                boundaries: duplicate_boundaries,
645                record_min_max: true,
646            });
647
648        let result = builder.build();
649        assert!(
650            result.is_err(),
651            "Expected error for duplicate bucket boundaries"
652        );
653        assert_eq!(
654            result.err().unwrap().to_string(),
655            "Bucket boundaries must be sorted and not contain any duplicates",
656            "Expected correct validation error for duplicate boundaries"
657        );
658    }
659}