1use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};
2
3use opentelemetry::{
4 metrics::{AsyncInstrument, SyncInstrument},
5 InstrumentationScope, Key, KeyValue,
6};
7
8use crate::metrics::{aggregation::Aggregation, internal::Measure};
9
10use super::meter::{
11 INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
12 INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
13};
14
15use super::Temporality;
16
17#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
19pub enum InstrumentKind {
20 Counter,
23 UpDownCounter,
26 Histogram,
29 ObservableCounter,
32 ObservableUpDownCounter,
35
36 Gauge,
39 ObservableGauge,
42}
43
44impl InstrumentKind {
45 pub(crate) fn temporality_preference(&self, temporality: Temporality) -> Temporality {
49 match temporality {
50 Temporality::Cumulative => Temporality::Cumulative,
51 Temporality::Delta => match self {
52 Self::Counter
53 | Self::Histogram
54 | Self::ObservableCounter
55 | Self::Gauge
56 | Self::ObservableGauge => Temporality::Delta,
57 Self::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
58 Temporality::Cumulative
59 }
60 },
61 Temporality::LowMemory => match self {
62 Self::Counter | InstrumentKind::Histogram => Temporality::Delta,
63 Self::ObservableCounter
64 | Self::Gauge
65 | Self::ObservableGauge
66 | Self::UpDownCounter
67 | Self::ObservableUpDownCounter => Temporality::Cumulative,
68 },
69 }
70 }
71}
72#[derive(Clone, Debug, PartialEq)]
97pub struct Instrument {
98 pub(crate) name: Cow<'static, str>,
100 pub(crate) description: Cow<'static, str>,
102 pub(crate) kind: InstrumentKind,
104 pub(crate) unit: Cow<'static, str>,
106 pub(crate) scope: InstrumentationScope,
108}
109
110impl Instrument {
111 pub fn name(&self) -> &str {
113 self.name.as_ref()
114 }
115
116 pub fn kind(&self) -> InstrumentKind {
118 self.kind
119 }
120
121 pub fn unit(&self) -> &str {
123 self.unit.as_ref()
124 }
125
126 pub fn scope(&self) -> &InstrumentationScope {
128 &self.scope
129 }
130}
131
132#[derive(Default, Debug)]
148pub struct StreamBuilder {
149 name: Option<Cow<'static, str>>,
150 description: Option<Cow<'static, str>>,
151 unit: Option<Cow<'static, str>>,
152 aggregation: Option<Aggregation>,
153 allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
154 cardinality_limit: Option<usize>,
155}
156
157impl StreamBuilder {
158 pub(crate) fn new() -> Self {
160 StreamBuilder::default()
161 }
162
163 pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
165 self.name = Some(name.into());
166 self
167 }
168
169 pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
171 self.description = Some(description.into());
172 self
173 }
174
175 pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
177 self.unit = Some(unit.into());
178 self
179 }
180
181 #[cfg(feature = "spec_unstable_metrics_views")]
182 pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
185 self.aggregation = Some(aggregation);
186 self
187 }
188
189 #[cfg(feature = "spec_unstable_metrics_views")]
190 pub fn with_allowed_attribute_keys(
196 mut self,
197 attribute_keys: impl IntoIterator<Item = Key>,
198 ) -> Self {
199 self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
200 self
201 }
202
203 pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
205 self.cardinality_limit = Some(limit);
206 self
207 }
208
209 pub fn build(self) -> Result<Stream, Box<dyn Error>> {
215 if let Some(name) = &self.name {
223 if name.is_empty() {
224 return Err(INSTRUMENT_NAME_EMPTY.into());
225 }
226
227 if name.len() > super::meter::INSTRUMENT_NAME_MAX_LENGTH {
228 return Err(INSTRUMENT_NAME_LENGTH.into());
229 }
230
231 if name.starts_with(|c: char| !c.is_ascii_alphabetic()) {
232 return Err(INSTRUMENT_NAME_FIRST_ALPHABETIC.into());
233 }
234
235 if name.contains(|c: char| {
236 !c.is_ascii_alphanumeric()
237 && !super::meter::INSTRUMENT_NAME_ALLOWED_NON_ALPHANUMERIC_CHARS.contains(&c)
238 }) {
239 return Err(INSTRUMENT_NAME_INVALID_CHAR.into());
240 }
241 }
242
243 if let Some(unit) = &self.unit {
245 if unit.len() > super::meter::INSTRUMENT_UNIT_NAME_MAX_LENGTH {
246 return Err(INSTRUMENT_UNIT_LENGTH.into());
247 }
248
249 if unit.contains(|c: char| !c.is_ascii()) {
250 return Err(INSTRUMENT_UNIT_INVALID_CHAR.into());
251 }
252 }
253
254 if let Some(limit) = self.cardinality_limit {
256 if limit == 0 {
257 return Err("Cardinality limit must be greater than 0".into());
258 }
259 }
260
261 if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
263 validate_bucket_boundaries(boundaries)?;
264 }
265
266 Ok(Stream {
267 name: self.name,
268 description: self.description,
269 unit: self.unit,
270 aggregation: self.aggregation,
271 allowed_attribute_keys: self.allowed_attribute_keys,
272 cardinality_limit: self.cardinality_limit,
273 })
274 }
275}
276
277fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
278 for boundary in boundaries {
280 if boundary.is_nan() || boundary.is_infinite() {
281 return Err(
282 "Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
283 );
284 }
285 }
286
287 for i in 1..boundaries.len() {
289 if boundaries[i] <= boundaries[i - 1] {
290 return Err(
291 "Bucket boundaries must be sorted and not contain any duplicates".to_string(),
292 );
293 }
294 }
295
296 Ok(())
297}
298
299#[derive(Default, Debug)]
302pub struct Stream {
303 pub(crate) name: Option<Cow<'static, str>>,
305 pub(crate) description: Option<Cow<'static, str>>,
307 pub(crate) unit: Option<Cow<'static, str>>,
309 pub(crate) aggregation: Option<Aggregation>,
311 pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
317
318 pub(crate) cardinality_limit: Option<usize>,
320}
321
322impl Stream {
323 pub fn builder() -> StreamBuilder {
325 StreamBuilder::new()
326 }
327}
328
329#[derive(Debug, PartialEq, Eq, Hash)]
331pub(crate) struct InstrumentId {
332 pub(crate) name: Cow<'static, str>,
334 pub(crate) description: Cow<'static, str>,
336 pub(crate) kind: InstrumentKind,
338 pub(crate) unit: Cow<'static, str>,
340 pub(crate) number: Cow<'static, str>,
342}
343
344impl InstrumentId {
345 pub(crate) fn normalize(&mut self) {
354 if self.name.chars().any(|c| c.is_ascii_uppercase()) {
355 self.name = self.name.to_ascii_lowercase().into();
356 }
357 }
358}
359
360pub(crate) struct ResolvedMeasures<T> {
361 pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
362}
363
364impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
365 fn measure(&self, val: T, attrs: &[KeyValue]) {
366 for measure in &self.measures {
367 measure.call(val, attrs)
368 }
369 }
370}
371
372#[derive(Clone)]
373pub(crate) struct Observable<T> {
374 measures: Vec<Arc<dyn Measure<T>>>,
375}
376
377impl<T> Observable<T> {
378 pub(crate) fn new(measures: Vec<Arc<dyn Measure<T>>>) -> Self {
379 Self { measures }
380 }
381}
382
383impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
384 fn observe(&self, measurement: T, attrs: &[KeyValue]) {
385 for measure in &self.measures {
386 measure.call(measurement, attrs)
387 }
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::StreamBuilder;
394 use crate::metrics::meter::{
395 INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
396 INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
397 };
398
399 #[test]
400 fn stream_name_validation() {
401 let stream_name_test_cases = vec![
403 ("validateName", ""),
404 ("_startWithNoneAlphabet", INSTRUMENT_NAME_FIRST_ALPHABETIC),
405 ("utf8char锈", INSTRUMENT_NAME_INVALID_CHAR),
406 ("a".repeat(255).leak(), ""),
407 ("a".repeat(256).leak(), INSTRUMENT_NAME_LENGTH),
408 ("invalid name", INSTRUMENT_NAME_INVALID_CHAR),
409 ("allow/slash", ""),
410 ("allow_under_score", ""),
411 ("allow.dots.ok", ""),
412 ("", INSTRUMENT_NAME_EMPTY),
413 ("\\allow\\slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
414 ("\\allow\\$$slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
415 ("Total $ Count", INSTRUMENT_NAME_INVALID_CHAR),
416 (
417 "\\test\\UsagePercent(Total) > 80%",
418 INSTRUMENT_NAME_FIRST_ALPHABETIC,
419 ),
420 ("/not / allowed", INSTRUMENT_NAME_FIRST_ALPHABETIC),
421 ];
422
423 for (name, expected_error) in stream_name_test_cases {
424 let builder = StreamBuilder::new().with_name(name);
425 let result = builder.build();
426
427 if expected_error.is_empty() {
428 assert!(
429 result.is_ok(),
430 "Expected successful build for name '{}', but got error: {:?}",
431 name,
432 result.err()
433 );
434 } else {
435 let err = result.err().unwrap();
436 let err_str = err.to_string();
437 assert!(
438 err_str == expected_error,
439 "For name '{name}', expected error '{expected_error}', but got '{err_str}'"
440 );
441 }
442 }
443 }
444
445 #[test]
446 fn stream_unit_validation() {
447 let stream_unit_test_cases = vec![
449 (
450 "0123456789012345678901234567890123456789012345678901234567890123",
451 INSTRUMENT_UNIT_LENGTH,
452 ),
453 ("utf8char锈", INSTRUMENT_UNIT_INVALID_CHAR),
454 ("kb", ""),
455 ("Kb/sec", ""),
456 ("%", ""),
457 ("", ""),
458 ];
459
460 for (unit, expected_error) in stream_unit_test_cases {
461 let builder = StreamBuilder::new().with_name("valid_name").with_unit(unit);
463
464 let result = builder.build();
465
466 if expected_error.is_empty() {
467 assert!(
468 result.is_ok(),
469 "Expected successful build for unit '{}', but got error: {:?}",
470 unit,
471 result.err()
472 );
473 } else {
474 let err = result.err().unwrap();
475 let err_str = err.to_string();
476 assert!(
477 err_str == expected_error,
478 "For unit '{unit}', expected error '{expected_error}', but got '{err_str}'"
479 );
480 }
481 }
482 }
483
484 #[test]
485 fn stream_cardinality_limit_validation() {
486 let builder = StreamBuilder::new()
488 .with_name("valid_name")
489 .with_cardinality_limit(0);
490
491 let result = builder.build();
492 assert!(result.is_err(), "Expected error for zero cardinality limit");
493 assert_eq!(
494 result.err().unwrap().to_string(),
495 "Cardinality limit must be greater than 0",
496 "Expected cardinality limit validation error message"
497 );
498
499 let valid_limits = vec![1, 10, 100, 1000];
501 for limit in valid_limits {
502 let builder = StreamBuilder::new()
503 .with_name("valid_name")
504 .with_cardinality_limit(limit);
505
506 let result = builder.build();
507 assert!(
508 result.is_ok(),
509 "Expected successful build for cardinality limit {}, but got error: {:?}",
510 limit,
511 result.err()
512 );
513 }
514 }
515
516 #[test]
517 fn stream_valid_build() {
518 let stream = StreamBuilder::new()
520 .with_name("valid_name")
521 .with_description("Valid description")
522 .with_unit("ms")
523 .with_cardinality_limit(100)
524 .build();
525
526 assert!(
527 stream.is_ok(),
528 "Expected valid Stream to be built successfully"
529 );
530 }
531
532 #[cfg(feature = "spec_unstable_metrics_views")]
533 #[test]
534 fn stream_histogram_bucket_validation() {
535 use super::Aggregation;
536
537 let valid_boundaries = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0];
539 let builder = StreamBuilder::new()
540 .with_name("valid_histogram")
541 .with_aggregation(Aggregation::ExplicitBucketHistogram {
542 boundaries: valid_boundaries.clone(),
543 record_min_max: true,
544 });
545
546 let result = builder.build();
547 assert!(
548 result.is_ok(),
549 "Expected successful build with valid bucket boundaries"
550 );
551
552 let invalid_nan_boundaries = vec![1.0, 2.0, f64::NAN, 10.0];
556
557 let builder = StreamBuilder::new()
558 .with_name("invalid_histogram_nan")
559 .with_aggregation(Aggregation::ExplicitBucketHistogram {
560 boundaries: invalid_nan_boundaries,
561 record_min_max: true,
562 });
563
564 let result = builder.build();
565 assert!(
566 result.is_err(),
567 "Expected error for NaN in bucket boundaries"
568 );
569 assert_eq!(
570 result.err().unwrap().to_string(),
571 "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
572 "Expected correct validation error for NaN"
573 );
574
575 let invalid_inf_boundaries = vec![1.0, 5.0, f64::INFINITY, 100.0];
577
578 let builder = StreamBuilder::new()
579 .with_name("invalid_histogram_inf")
580 .with_aggregation(Aggregation::ExplicitBucketHistogram {
581 boundaries: invalid_inf_boundaries,
582 record_min_max: true,
583 });
584
585 let result = builder.build();
586 assert!(
587 result.is_err(),
588 "Expected error for Infinity in bucket boundaries"
589 );
590 assert_eq!(
591 result.err().unwrap().to_string(),
592 "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
593 "Expected correct validation error for Infinity"
594 );
595
596 let invalid_neg_inf_boundaries = vec![f64::NEG_INFINITY, 5.0, 10.0, 100.0];
598
599 let builder = StreamBuilder::new()
600 .with_name("invalid_histogram_neg_inf")
601 .with_aggregation(Aggregation::ExplicitBucketHistogram {
602 boundaries: invalid_neg_inf_boundaries,
603 record_min_max: true,
604 });
605
606 let result = builder.build();
607 assert!(
608 result.is_err(),
609 "Expected error for negative Infinity in bucket boundaries"
610 );
611 assert_eq!(
612 result.err().unwrap().to_string(),
613 "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
614 "Expected correct validation error for negative Infinity"
615 );
616
617 let unsorted_boundaries = vec![1.0, 5.0, 2.0, 10.0]; let builder = StreamBuilder::new()
621 .with_name("unsorted_histogram")
622 .with_aggregation(Aggregation::ExplicitBucketHistogram {
623 boundaries: unsorted_boundaries,
624 record_min_max: true,
625 });
626
627 let result = builder.build();
628 assert!(
629 result.is_err(),
630 "Expected error for unsorted bucket boundaries"
631 );
632 assert_eq!(
633 result.err().unwrap().to_string(),
634 "Bucket boundaries must be sorted and not contain any duplicates",
635 "Expected correct validation error for unsorted boundaries"
636 );
637
638 let duplicate_boundaries = vec![1.0, 2.0, 5.0, 5.0, 10.0]; let builder = StreamBuilder::new()
642 .with_name("duplicate_histogram")
643 .with_aggregation(Aggregation::ExplicitBucketHistogram {
644 boundaries: duplicate_boundaries,
645 record_min_max: true,
646 });
647
648 let result = builder.build();
649 assert!(
650 result.is_err(),
651 "Expected error for duplicate bucket boundaries"
652 );
653 assert_eq!(
654 result.err().unwrap().to_string(),
655 "Bucket boundaries must be sorted and not contain any duplicates",
656 "Expected correct validation error for duplicate boundaries"
657 );
658 }
659}