1use core::fmt;
2use std::{
3 borrow::Cow,
4 collections::{HashMap, HashSet},
5 sync::{Arc, Mutex},
6};
7
8use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};
9
10use crate::{
11 error::{OTelSdkError, OTelSdkResult},
12 metrics::{
13 aggregation,
14 data::{Metric, ResourceMetrics, ScopeMetrics},
15 error::{MetricError, MetricResult},
16 instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
17 internal::{self, AggregateBuilder, Number},
18 reader::{MetricReader, SdkProducer},
19 view::View,
20 },
21 Resource,
22};
23
24use self::internal::AggregateFns;
25
26use super::{aggregation::Aggregation, Temporality};
27
28#[doc(hidden)]
37pub struct Pipeline {
38 pub(crate) resource: Resource,
39 reader: Box<dyn MetricReader>,
40 views: Vec<Arc<dyn View>>,
41 inner: Mutex<PipelineInner>,
42}
43
44impl fmt::Debug for Pipeline {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.write_str("Pipeline")
47 }
48}
49
50type GenericCallback = Arc<dyn Fn() + Send + Sync>;
52
53const DEFAULT_CARDINALITY_LIMIT: usize = 2000;
54
55#[derive(Default)]
56struct PipelineInner {
57 aggregations: HashMap<InstrumentationScope, Vec<InstrumentSync>>,
58 callbacks: Vec<GenericCallback>,
59}
60
61impl fmt::Debug for PipelineInner {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("PipelineInner")
64 .field("aggregations", &self.aggregations)
65 .field("callbacks", &self.callbacks.len())
66 .finish()
67 }
68}
69
70impl Pipeline {
71 fn add_sync(&self, scope: InstrumentationScope, i_sync: InstrumentSync) {
77 let _ = self.inner.lock().map(|mut inner| {
78 inner.aggregations.entry(scope).or_default().push(i_sync);
79 });
80 }
81
82 fn add_callback(&self, callback: GenericCallback) {
84 let _ = self
85 .inner
86 .lock()
87 .map(|mut inner| inner.callbacks.push(callback));
88 }
89
90 fn force_flush(&self) -> OTelSdkResult {
92 self.reader.force_flush()
93 }
94
95 fn shutdown(&self) -> OTelSdkResult {
97 self.reader.shutdown()
98 }
99}
100
101impl SdkProducer for Pipeline {
102 fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
104 let inner = self
105 .inner
106 .lock()
107 .map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
108 otel_debug!(
109 name: "MeterProviderInvokingObservableCallbacks",
110 count = inner.callbacks.len(),
111 );
112 for cb in &inner.callbacks {
113 cb();
115 }
116
117 rm.resource = self.resource.clone();
118 if inner.aggregations.len() > rm.scope_metrics.len() {
119 rm.scope_metrics
120 .reserve(inner.aggregations.len() - rm.scope_metrics.len());
121 }
122
123 let mut i = 0;
124 for (scope, instruments) in inner.aggregations.iter() {
125 let sm = match rm.scope_metrics.get_mut(i) {
126 Some(sm) => sm,
127 None => {
128 rm.scope_metrics.push(ScopeMetrics::default());
129 rm.scope_metrics.last_mut().unwrap()
130 }
131 };
132 if instruments.len() > sm.metrics.len() {
133 sm.metrics.reserve(instruments.len() - sm.metrics.len());
134 }
135
136 let mut j = 0;
137 for inst in instruments {
138 let mut m = sm.metrics.get_mut(j);
139 match (inst.comp_agg.call(m.as_mut().map(|m| &mut m.data)), m) {
140 ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
142 name: inst.name.clone(),
143 description: inst.description.clone(),
144 unit: inst.unit.clone(),
145 data: initial_agg,
146 }),
147 ((len, data), Some(prev_agg)) if len > 0 => {
149 if let Some(data) = data {
150 prev_agg.data = data;
152 }
153 prev_agg.name.clone_from(&inst.name);
154 prev_agg.description.clone_from(&inst.description);
155 prev_agg.unit.clone_from(&inst.unit);
156 }
157 _ => continue,
158 }
159
160 j += 1;
161 }
162
163 sm.metrics.truncate(j);
164 if !sm.metrics.is_empty() {
165 sm.scope = scope.clone();
166 i += 1;
167 }
168 }
169
170 rm.scope_metrics.truncate(i);
171
172 Ok(())
173 }
174}
175
176struct InstrumentSync {
178 name: Cow<'static, str>,
179 description: Cow<'static, str>,
180 unit: Cow<'static, str>,
181 comp_agg: Arc<dyn internal::ComputeAggregation>,
182}
183
184impl fmt::Debug for InstrumentSync {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 f.debug_struct("InstrumentSync")
187 .field("name", &self.name)
188 .field("description", &self.description)
189 .field("unit", &self.unit)
190 .finish()
191 }
192}
193
194type Cache<T> = Mutex<HashMap<InstrumentId, MetricResult<Option<Arc<dyn internal::Measure<T>>>>>>;
195
196struct Inserter<T> {
198 aggregators: Cache<T>,
206
207 views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
214
215 pipeline: Arc<Pipeline>,
216}
217
218impl<T> Inserter<T>
219where
220 T: Number,
221{
222 fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
223 Inserter {
224 aggregators: Default::default(),
225 views: vc,
226 pipeline: Arc::clone(&p),
227 }
228 }
229
230 fn instrument(
254 &self,
255 inst: Instrument,
256 boundaries: Option<&[f64]>,
257 ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
258 let mut matched = false;
259 let mut measures = vec![];
260 let mut errs = vec![];
261 let kind = inst.kind;
262
263 let mut seen = HashSet::new();
265 for v in &self.pipeline.views {
266 let mut stream = match v.match_inst(&inst) {
267 Some(stream) => stream,
268 None => continue,
269 };
270 matched = true;
271
272 if stream.name.is_none() {
273 stream.name = Some(inst.name.clone());
274 }
275 if stream.description.is_none() {
276 stream.description = Some(inst.description.clone());
277 }
278 if stream.unit.is_none() {
279 stream.unit = Some(inst.unit.clone());
280 }
281
282 let id = self.inst_id(kind, &stream);
283 if seen.contains(&id) {
284 continue; }
286
287 let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
288 Ok(Some(agg)) => agg,
289 Ok(None) => continue, Err(err) => {
291 errs.push(err);
292 continue;
293 }
294 };
295 seen.insert(id);
296 measures.push(agg);
297 }
298
299 if matched {
300 if errs.is_empty() {
301 return Ok(measures);
302 } else {
303 return Err(MetricError::Other(format!("{errs:?}")));
304 }
305 }
306
307 let mut stream = Stream {
309 name: Some(inst.name),
310 description: Some(inst.description),
311 unit: Some(inst.unit),
312 aggregation: None,
313 allowed_attribute_keys: None,
314 cardinality_limit: None,
315 };
316
317 if let Some(boundaries) = boundaries {
319 stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
320 boundaries: boundaries.to_vec(),
321 record_min_max: true,
322 });
323 }
324
325 match self.cached_aggregator(&inst.scope, kind, stream) {
326 Ok(agg) => {
327 if errs.is_empty() {
328 if let Some(agg) = agg {
329 measures.push(agg);
330 }
331 Ok(measures)
332 } else {
333 Err(MetricError::Other(format!("{errs:?}")))
334 }
335 }
336 Err(err) => {
337 errs.push(err);
338 Err(MetricError::Other(format!("{errs:?}")))
339 }
340 }
341 }
342
343 fn cached_aggregator(
357 &self,
358 scope: &InstrumentationScope,
359 kind: InstrumentKind,
360 mut stream: Stream,
361 ) -> MetricResult<Option<Arc<dyn internal::Measure<T>>>> {
362 let mut agg = stream
366 .aggregation
367 .take()
368 .unwrap_or_else(|| default_aggregation_selector(kind));
369
370 if matches!(agg, aggregation::Aggregation::Default) {
372 agg = default_aggregation_selector(kind);
373 }
374
375 if let Err(err) = is_aggregator_compatible(&kind, &agg) {
376 return Err(MetricError::Other(format!(
377 "creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
378 kind, stream.aggregation, err,
379 )));
380 }
381
382 let mut id = self.inst_id(kind, &stream);
383 self.log_conflict(&id);
386
387 id.normalize();
391
392 let mut cache = self.aggregators.lock()?;
393
394 let cached = cache.entry(id).or_insert_with(|| {
395 let filter = stream
396 .allowed_attribute_keys
397 .clone()
398 .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
399
400 let cardinality_limit = stream
401 .cardinality_limit
402 .unwrap_or(DEFAULT_CARDINALITY_LIMIT);
403 let b = AggregateBuilder::new(
404 self.pipeline.reader.temporality(kind),
405 filter,
406 cardinality_limit,
407 );
408 let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
409 Ok(Some(inst)) => inst,
410 other => return other.map(|fs| fs.map(|inst| inst.measure)), };
412
413 otel_debug!(
414 name : "Metrics.InstrumentCreated",
415 instrument_name = stream.name.clone().unwrap_or_default().as_ref(),
416 cardinality_limit = cardinality_limit,
417 );
418
419 self.pipeline.add_sync(
420 scope.clone(),
421 InstrumentSync {
422 name: stream.name.unwrap_or_default(),
423 description: stream.description.unwrap_or_default(),
424 unit: stream.unit.unwrap_or_default(),
425 comp_agg: collect,
426 },
427 );
428
429 Ok(Some(measure))
430 });
431
432 match cached {
433 Ok(opt) => Ok(opt.clone()),
434 Err(err) => Err(MetricError::Other(err.to_string())),
435 }
436 }
437
438 fn log_conflict(&self, id: &InstrumentId) {
442 if let Ok(views) = self.views.lock() {
443 if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
444 if existing == id {
445 return;
446 }
447 otel_debug!(
450 name: "Instrument.DuplicateMetricStreamDefinitions",
451 message = "duplicate metric stream definitions",
452 reason = format!("names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
453 existing.name, id.name,
454 existing.description, id.description,
455 existing.kind, id.kind,
456 existing.unit, id.unit,
457 existing.number, id.number,)
458 );
459 }
460 }
461 }
462
463 fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
464 InstrumentId {
465 name: stream.name.clone().unwrap_or_default(),
466 description: stream.description.clone().unwrap_or_default(),
467 kind,
468 unit: stream.unit.clone().unwrap_or_default(),
469 number: Cow::Borrowed(std::any::type_name::<T>()),
470 }
471 }
472}
473
474fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation {
488 match kind {
489 InstrumentKind::Counter
490 | InstrumentKind::UpDownCounter
491 | InstrumentKind::ObservableCounter
492 | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
493 InstrumentKind::Gauge => Aggregation::LastValue,
494 InstrumentKind::ObservableGauge => Aggregation::LastValue,
495 InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
496 boundaries: vec![
497 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
498 5000.0, 7500.0, 10000.0,
499 ],
500 record_min_max: true,
501 },
502 }
503}
504
505fn aggregate_fn<T: Number>(
509 b: AggregateBuilder<T>,
510 agg: &aggregation::Aggregation,
511 kind: InstrumentKind,
512) -> MetricResult<Option<AggregateFns<T>>> {
513 match agg {
514 Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
515 Aggregation::Drop => Ok(None),
516 Aggregation::LastValue => {
517 match kind {
518 InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
519 InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
522 _ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
523 }
524 }
525 Aggregation::Sum => {
526 let fns = match kind {
527 InstrumentKind::ObservableCounter => b.precomputed_sum(true),
531 InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
532 InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
533 _ => b.sum(false),
534 };
535 Ok(Some(fns))
536 }
537 Aggregation::ExplicitBucketHistogram {
538 boundaries,
539 record_min_max,
540 } => {
541 let record_sum = !matches!(
542 kind,
543 InstrumentKind::UpDownCounter
544 | InstrumentKind::ObservableUpDownCounter
545 | InstrumentKind::ObservableGauge
546 );
547 Ok(Some(b.explicit_bucket_histogram(
551 boundaries.to_vec(),
552 *record_min_max,
553 record_sum,
554 )))
555 }
556 Aggregation::Base2ExponentialHistogram {
557 max_size,
558 max_scale,
559 record_min_max,
560 } => {
561 let record_sum = !matches!(
562 kind,
563 InstrumentKind::UpDownCounter
564 | InstrumentKind::ObservableUpDownCounter
565 | InstrumentKind::ObservableGauge
566 );
567 Ok(Some(b.exponential_bucket_histogram(
568 *max_size,
569 *max_scale,
570 *record_min_max,
571 record_sum,
572 )))
573 }
574 }
575}
576
577fn is_aggregator_compatible(
591 kind: &InstrumentKind,
592 agg: &aggregation::Aggregation,
593) -> MetricResult<()> {
594 match agg {
595 Aggregation::Default => Ok(()),
596 Aggregation::ExplicitBucketHistogram { .. }
597 | Aggregation::Base2ExponentialHistogram { .. } => {
598 if matches!(
599 kind,
600 InstrumentKind::Counter
601 | InstrumentKind::UpDownCounter
602 | InstrumentKind::Gauge
603 | InstrumentKind::Histogram
604 | InstrumentKind::ObservableCounter
605 | InstrumentKind::ObservableUpDownCounter
606 | InstrumentKind::ObservableGauge
607 ) {
608 return Ok(());
609 }
610 Err(MetricError::Other("incompatible aggregation".into()))
611 }
612 Aggregation::Sum => {
613 match kind {
614 InstrumentKind::ObservableCounter
615 | InstrumentKind::ObservableUpDownCounter
616 | InstrumentKind::Counter
617 | InstrumentKind::Histogram
618 | InstrumentKind::UpDownCounter => Ok(()),
619 _ => {
620 Err(MetricError::Other("incompatible aggregation".into()))
623 }
624 }
625 }
626 Aggregation::LastValue => {
627 match kind {
628 InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
629 _ => {
630 Err(MetricError::Other("incompatible aggregation".into()))
633 }
634 }
635 }
636 Aggregation::Drop => Ok(()),
637 }
638}
639
640#[derive(Clone, Debug)]
642pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
643
644impl Pipelines {
645 pub(crate) fn new(
646 res: Resource,
647 readers: Vec<Box<dyn MetricReader>>,
648 views: Vec<Arc<dyn View>>,
649 ) -> Self {
650 let mut pipes = Vec::with_capacity(readers.len());
651 for r in readers {
652 let p = Arc::new(Pipeline {
653 resource: res.clone(),
654 reader: r,
655 views: views.clone(),
656 inner: Default::default(),
657 });
658 p.reader.register_pipeline(Arc::downgrade(&p));
659 pipes.push(p);
660 }
661
662 Pipelines(pipes)
663 }
664
665 pub(crate) fn register_callback<F>(&self, callback: F)
666 where
667 F: Fn() + Send + Sync + 'static,
668 {
669 let cb = Arc::new(callback);
670 for pipe in &self.0 {
671 pipe.add_callback(cb.clone())
672 }
673 }
674
675 pub(crate) fn force_flush(&self) -> OTelSdkResult {
677 let mut errs = vec![];
678 for pipeline in &self.0 {
679 if let Err(err) = pipeline.force_flush() {
680 errs.push(err);
681 }
682 }
683
684 if errs.is_empty() {
685 Ok(())
686 } else {
687 Err(OTelSdkError::InternalFailure(format!("{errs:?}")))
688 }
689 }
690
691 pub(crate) fn shutdown(&self) -> OTelSdkResult {
693 let mut errs = vec![];
694 for pipeline in &self.0 {
695 if let Err(err) = pipeline.shutdown() {
696 errs.push(err);
697 }
698 }
699
700 if errs.is_empty() {
701 Ok(())
702 } else {
703 Err(crate::error::OTelSdkError::InternalFailure(format!(
704 "{errs:?}"
705 )))
706 }
707 }
708}
709
710pub(crate) struct Resolver<T> {
714 inserters: Vec<Inserter<T>>,
715}
716
717impl<T> Resolver<T>
718where
719 T: Number,
720{
721 pub(crate) fn new(
722 pipelines: Arc<Pipelines>,
723 view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
724 ) -> Self {
725 let inserters = pipelines
726 .0
727 .iter()
728 .map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
729 .collect();
730
731 Resolver { inserters }
732 }
733
734 pub(crate) fn measures(
736 &self,
737 id: Instrument,
738 boundaries: Option<Vec<f64>>,
739 ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
740 let (mut measures, mut errs) = (vec![], vec![]);
741
742 for inserter in &self.inserters {
743 match inserter.instrument(id.clone(), boundaries.as_deref()) {
744 Ok(ms) => measures.extend(ms),
745 Err(err) => errs.push(err),
746 }
747 }
748
749 if errs.is_empty() {
750 if measures.is_empty() {
751 }
754 Ok(measures)
755 } else {
756 Err(MetricError::Other(format!("{errs:?}")))
757 }
758 }
759}