opentelemetry_sdk/metrics/
pipeline.rs

1use core::fmt;
2use std::{
3    borrow::Cow,
4    collections::{HashMap, HashSet},
5    sync::{Arc, Mutex},
6};
7
8use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};
9
10use crate::{
11    error::{OTelSdkError, OTelSdkResult},
12    metrics::{
13        aggregation,
14        data::{Metric, ResourceMetrics, ScopeMetrics},
15        error::{MetricError, MetricResult},
16        instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
17        internal::{self, AggregateBuilder, Number},
18        reader::{MetricReader, SdkProducer},
19        view::View,
20    },
21    Resource,
22};
23
24use self::internal::AggregateFns;
25
26use super::{aggregation::Aggregation, Temporality};
27
28/// Connects all of the instruments created by a meter provider to a [MetricReader].
29///
30/// This is the object that will be registered when a meter provider is
31/// created.
32///
33/// As instruments are created the instrument should be checked if it exists in
34/// the views of a the reader, and if so each aggregate function should be added
35/// to the pipeline.
36#[doc(hidden)]
37pub struct Pipeline {
38    pub(crate) resource: Resource,
39    reader: Box<dyn MetricReader>,
40    views: Vec<Arc<dyn View>>,
41    inner: Mutex<PipelineInner>,
42}
43
44impl fmt::Debug for Pipeline {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        f.write_str("Pipeline")
47    }
48}
49
50/// Single or multi-instrument callbacks
51type GenericCallback = Arc<dyn Fn() + Send + Sync>;
52
53const DEFAULT_CARDINALITY_LIMIT: usize = 2000;
54
55#[derive(Default)]
56struct PipelineInner {
57    aggregations: HashMap<InstrumentationScope, Vec<InstrumentSync>>,
58    callbacks: Vec<GenericCallback>,
59}
60
61impl fmt::Debug for PipelineInner {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("PipelineInner")
64            .field("aggregations", &self.aggregations)
65            .field("callbacks", &self.callbacks.len())
66            .finish()
67    }
68}
69
70impl Pipeline {
71    /// Adds the [InstrumentSync] to pipeline with scope.
72    ///
73    /// This method is not idempotent. Duplicate calls will result in duplicate
74    /// additions, it is the callers responsibility to ensure this is called with
75    /// unique values.
76    fn add_sync(&self, scope: InstrumentationScope, i_sync: InstrumentSync) {
77        let _ = self.inner.lock().map(|mut inner| {
78            inner.aggregations.entry(scope).or_default().push(i_sync);
79        });
80    }
81
82    /// Registers a single instrument callback to be run when `produce` is called.
83    fn add_callback(&self, callback: GenericCallback) {
84        let _ = self
85            .inner
86            .lock()
87            .map(|mut inner| inner.callbacks.push(callback));
88    }
89
90    /// Send accumulated telemetry
91    fn force_flush(&self) -> OTelSdkResult {
92        self.reader.force_flush()
93    }
94
95    /// Shut down pipeline
96    fn shutdown(&self) -> OTelSdkResult {
97        self.reader.shutdown()
98    }
99}
100
101impl SdkProducer for Pipeline {
102    /// Returns aggregated metrics from a single collection.
103    fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
104        let inner = self
105            .inner
106            .lock()
107            .map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
108        otel_debug!(
109            name: "MeterProviderInvokingObservableCallbacks",
110            count =  inner.callbacks.len(),
111        );
112        for cb in &inner.callbacks {
113            // TODO consider parallel callbacks.
114            cb();
115        }
116
117        rm.resource = self.resource.clone();
118        if inner.aggregations.len() > rm.scope_metrics.len() {
119            rm.scope_metrics
120                .reserve(inner.aggregations.len() - rm.scope_metrics.len());
121        }
122
123        let mut i = 0;
124        for (scope, instruments) in inner.aggregations.iter() {
125            let sm = match rm.scope_metrics.get_mut(i) {
126                Some(sm) => sm,
127                None => {
128                    rm.scope_metrics.push(ScopeMetrics::default());
129                    rm.scope_metrics.last_mut().unwrap()
130                }
131            };
132            if instruments.len() > sm.metrics.len() {
133                sm.metrics.reserve(instruments.len() - sm.metrics.len());
134            }
135
136            let mut j = 0;
137            for inst in instruments {
138                let mut m = sm.metrics.get_mut(j);
139                match (inst.comp_agg.call(m.as_mut().map(|m| &mut m.data)), m) {
140                    // No metric to re-use, expect agg to create new metric data
141                    ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
142                        name: inst.name.clone(),
143                        description: inst.description.clone(),
144                        unit: inst.unit.clone(),
145                        data: initial_agg,
146                    }),
147                    // Existing metric can be re-used, update its values
148                    ((len, data), Some(prev_agg)) if len > 0 => {
149                        if let Some(data) = data {
150                            // previous aggregation was of a different type
151                            prev_agg.data = data;
152                        }
153                        prev_agg.name.clone_from(&inst.name);
154                        prev_agg.description.clone_from(&inst.description);
155                        prev_agg.unit.clone_from(&inst.unit);
156                    }
157                    _ => continue,
158                }
159
160                j += 1;
161            }
162
163            sm.metrics.truncate(j);
164            if !sm.metrics.is_empty() {
165                sm.scope = scope.clone();
166                i += 1;
167            }
168        }
169
170        rm.scope_metrics.truncate(i);
171
172        Ok(())
173    }
174}
175
176/// A synchronization point between a [Pipeline] and an instrument's aggregate function.
177struct InstrumentSync {
178    name: Cow<'static, str>,
179    description: Cow<'static, str>,
180    unit: Cow<'static, str>,
181    comp_agg: Arc<dyn internal::ComputeAggregation>,
182}
183
184impl fmt::Debug for InstrumentSync {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        f.debug_struct("InstrumentSync")
187            .field("name", &self.name)
188            .field("description", &self.description)
189            .field("unit", &self.unit)
190            .finish()
191    }
192}
193
194type Cache<T> = Mutex<HashMap<InstrumentId, MetricResult<Option<Arc<dyn internal::Measure<T>>>>>>;
195
196/// Facilitates inserting of new instruments from a single scope into a pipeline.
197struct Inserter<T> {
198    /// A cache that holds aggregate function inputs whose
199    /// outputs have been inserted into the underlying reader pipeline.
200    ///
201    /// This cache ensures no duplicate aggregate functions are inserted into
202    /// the reader pipeline and if a new request during an instrument creation
203    /// asks for the same aggregate function input the same instance is
204    /// returned.
205    aggregators: Cache<T>,
206
207    /// A cache that holds instrument identifiers for all the instruments a [Meter] has
208    /// created.
209    ///
210    /// It is provided from the `Meter` that owns this inserter. This cache ensures
211    /// that during the creation of instruments with the same name but different
212    /// options (e.g. description, unit) a warning message is logged.
213    views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
214
215    pipeline: Arc<Pipeline>,
216}
217
218impl<T> Inserter<T>
219where
220    T: Number,
221{
222    fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
223        Inserter {
224            aggregators: Default::default(),
225            views: vc,
226            pipeline: Arc::clone(&p),
227        }
228    }
229
230    /// Inserts the provided instrument into a pipeline.
231    ///
232    /// All views the pipeline contains are matched against, and any matching view
233    /// that creates a unique [Aggregator] will be inserted into the pipeline and
234    /// included in the returned list.
235    ///
236    /// The returned aggregate functions are ensured to be deduplicated and unique.
237    /// If another view in another pipeline that is cached by this inserter's cache
238    /// has already inserted the same aggregate function for the same instrument,
239    /// that function's instance is returned.
240    ///
241    /// If another instrument has already been inserted by this inserter, or any
242    /// other using the same cache, and it conflicts with the instrument being
243    /// inserted in this call, an aggregate function matching the arguments will
244    /// still be returned but a log message will also be logged to the OTel global
245    /// logger.
246    ///
247    /// If the passed instrument would result in an incompatible aggregate function,
248    /// an error is returned and that aggregate function is not inserted or
249    /// returned.
250    ///
251    /// If an instrument is determined to use a [aggregation::Aggregation::Drop],
252    /// that instrument is not inserted nor returned.
253    fn instrument(
254        &self,
255        inst: Instrument,
256        boundaries: Option<&[f64]>,
257    ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
258        let mut matched = false;
259        let mut measures = vec![];
260        let mut errs = vec![];
261        let kind = inst.kind;
262
263        // The cache will return the same Aggregator instance. Use stream ids to de duplicate.
264        let mut seen = HashSet::new();
265        for v in &self.pipeline.views {
266            let mut stream = match v.match_inst(&inst) {
267                Some(stream) => stream,
268                None => continue,
269            };
270            matched = true;
271
272            if stream.name.is_none() {
273                stream.name = Some(inst.name.clone());
274            }
275            if stream.description.is_none() {
276                stream.description = Some(inst.description.clone());
277            }
278            if stream.unit.is_none() {
279                stream.unit = Some(inst.unit.clone());
280            }
281
282            let id = self.inst_id(kind, &stream);
283            if seen.contains(&id) {
284                continue; // This aggregator has already been added
285            }
286
287            let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
288                Ok(Some(agg)) => agg,
289                Ok(None) => continue, // Drop aggregator.
290                Err(err) => {
291                    errs.push(err);
292                    continue;
293                }
294            };
295            seen.insert(id);
296            measures.push(agg);
297        }
298
299        if matched {
300            if errs.is_empty() {
301                return Ok(measures);
302            } else {
303                return Err(MetricError::Other(format!("{errs:?}")));
304            }
305        }
306
307        // Apply implicit default view if no explicit matched.
308        let mut stream = Stream {
309            name: Some(inst.name),
310            description: Some(inst.description),
311            unit: Some(inst.unit),
312            aggregation: None,
313            allowed_attribute_keys: None,
314            cardinality_limit: None,
315        };
316
317        // Override default histogram boundaries if provided.
318        if let Some(boundaries) = boundaries {
319            stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
320                boundaries: boundaries.to_vec(),
321                record_min_max: true,
322            });
323        }
324
325        match self.cached_aggregator(&inst.scope, kind, stream) {
326            Ok(agg) => {
327                if errs.is_empty() {
328                    if let Some(agg) = agg {
329                        measures.push(agg);
330                    }
331                    Ok(measures)
332                } else {
333                    Err(MetricError::Other(format!("{errs:?}")))
334                }
335            }
336            Err(err) => {
337                errs.push(err);
338                Err(MetricError::Other(format!("{errs:?}")))
339            }
340        }
341    }
342
343    /// Returns the appropriate aggregate functions for an instrument configuration.
344    ///
345    /// If the exact instrument has been created within the [Scope], that
346    /// aggregate function instance will be returned. Otherwise, a new computed
347    /// aggregate function will be cached and returned.
348    ///
349    /// If the instrument configuration conflicts with an instrument that has
350    /// already been created (e.g. description, unit, data type) a warning will be
351    /// logged with the global OTel logger. A valid new aggregate function for the
352    /// instrument configuration will still be returned without an error.
353    ///
354    /// If the instrument defines an unknown or incompatible aggregation, an error
355    /// is returned.
356    fn cached_aggregator(
357        &self,
358        scope: &InstrumentationScope,
359        kind: InstrumentKind,
360        mut stream: Stream,
361    ) -> MetricResult<Option<Arc<dyn internal::Measure<T>>>> {
362        // TODO: Create a separate pub (crate) Stream struct for the pipeline,
363        // as Stream will not have any optional fields as None at this point and
364        // new struct can better reflect this.
365        let mut agg = stream
366            .aggregation
367            .take()
368            .unwrap_or_else(|| default_aggregation_selector(kind));
369
370        // Apply default if stream or reader aggregation returns default
371        if matches!(agg, aggregation::Aggregation::Default) {
372            agg = default_aggregation_selector(kind);
373        }
374
375        if let Err(err) = is_aggregator_compatible(&kind, &agg) {
376            return Err(MetricError::Other(format!(
377                "creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
378                kind, stream.aggregation, err,
379            )));
380        }
381
382        let mut id = self.inst_id(kind, &stream);
383        // If there is a conflict, the specification says the view should
384        // still be applied and a warning should be logged.
385        self.log_conflict(&id);
386
387        // If there are requests for the same instrument with different name
388        // casing, the first-seen needs to be returned. Use a normalize ID for the
389        // cache lookup to ensure the correct comparison.
390        id.normalize();
391
392        let mut cache = self.aggregators.lock()?;
393
394        let cached = cache.entry(id).or_insert_with(|| {
395            let filter = stream
396                .allowed_attribute_keys
397                .clone()
398                .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
399
400            let cardinality_limit = stream
401                .cardinality_limit
402                .unwrap_or(DEFAULT_CARDINALITY_LIMIT);
403            let b = AggregateBuilder::new(
404                self.pipeline.reader.temporality(kind),
405                filter,
406                cardinality_limit,
407            );
408            let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
409                Ok(Some(inst)) => inst,
410                other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
411            };
412
413            otel_debug!(
414                name : "Metrics.InstrumentCreated",
415                instrument_name = stream.name.clone().unwrap_or_default().as_ref(),
416                cardinality_limit = cardinality_limit,
417            );
418
419            self.pipeline.add_sync(
420                scope.clone(),
421                InstrumentSync {
422                    name: stream.name.unwrap_or_default(),
423                    description: stream.description.unwrap_or_default(),
424                    unit: stream.unit.unwrap_or_default(),
425                    comp_agg: collect,
426                },
427            );
428
429            Ok(Some(measure))
430        });
431
432        match cached {
433            Ok(opt) => Ok(opt.clone()),
434            Err(err) => Err(MetricError::Other(err.to_string())),
435        }
436    }
437
438    /// Validates if an instrument with the same name as id has already been created.
439    ///
440    /// If that instrument conflicts with id, a warning is logged.
441    fn log_conflict(&self, id: &InstrumentId) {
442        if let Ok(views) = self.views.lock() {
443            if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
444                if existing == id {
445                    return;
446                }
447                // If an existing instrument with the same name but different attributes is found,
448                // log a warning with details about the conflicting metric stream definitions.
449                otel_debug!(
450                    name: "Instrument.DuplicateMetricStreamDefinitions",
451                    message = "duplicate metric stream definitions",
452                    reason = format!("names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
453                    existing.name, id.name,
454                    existing.description, id.description,
455                    existing.kind, id.kind,
456                    existing.unit, id.unit,
457                    existing.number, id.number,)
458                );
459            }
460        }
461    }
462
463    fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
464        InstrumentId {
465            name: stream.name.clone().unwrap_or_default(),
466            description: stream.description.clone().unwrap_or_default(),
467            kind,
468            unit: stream.unit.clone().unwrap_or_default(),
469            number: Cow::Borrowed(std::any::type_name::<T>()),
470        }
471    }
472}
473
474/// The default aggregation and parameters for an instrument of [InstrumentKind].
475///
476/// This aggregation selector uses the following selection mapping per [the spec]:
477///
478/// * Counter ⇨ Sum
479/// * Observable Counter ⇨ Sum
480/// * UpDownCounter ⇨ Sum
481/// * Observable UpDownCounter ⇨ Sum
482/// * Gauge ⇨ LastValue
483/// * Observable Gauge ⇨ LastValue
484/// * Histogram ⇨ ExplicitBucketHistogram
485///
486/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
487fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation {
488    match kind {
489        InstrumentKind::Counter
490        | InstrumentKind::UpDownCounter
491        | InstrumentKind::ObservableCounter
492        | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
493        InstrumentKind::Gauge => Aggregation::LastValue,
494        InstrumentKind::ObservableGauge => Aggregation::LastValue,
495        InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
496            boundaries: vec![
497                0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
498                5000.0, 7500.0, 10000.0,
499            ],
500            record_min_max: true,
501        },
502    }
503}
504
505/// Returns new aggregate functions for the given params.
506///
507/// If the aggregation is unknown or temporality is invalid, an error is returned.
508fn aggregate_fn<T: Number>(
509    b: AggregateBuilder<T>,
510    agg: &aggregation::Aggregation,
511    kind: InstrumentKind,
512) -> MetricResult<Option<AggregateFns<T>>> {
513    match agg {
514        Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
515        Aggregation::Drop => Ok(None),
516        Aggregation::LastValue => {
517            match kind {
518                InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
519                // temporality for LastValue only affects how data points are reported, so we can always use
520                // delta temporality, because observable instruments should report data points only since previous collection
521                InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
522                _ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
523            }
524        }
525        Aggregation::Sum => {
526            let fns = match kind {
527                // TODO implement: observable instruments should not report data points on every collect
528                // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
529                // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
530                InstrumentKind::ObservableCounter => b.precomputed_sum(true),
531                InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
532                InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
533                _ => b.sum(false),
534            };
535            Ok(Some(fns))
536        }
537        Aggregation::ExplicitBucketHistogram {
538            boundaries,
539            record_min_max,
540        } => {
541            let record_sum = !matches!(
542                kind,
543                InstrumentKind::UpDownCounter
544                    | InstrumentKind::ObservableUpDownCounter
545                    | InstrumentKind::ObservableGauge
546            );
547            // TODO implement: observable instruments should not report data points on every collect
548            // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
549            // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
550            Ok(Some(b.explicit_bucket_histogram(
551                boundaries.to_vec(),
552                *record_min_max,
553                record_sum,
554            )))
555        }
556        Aggregation::Base2ExponentialHistogram {
557            max_size,
558            max_scale,
559            record_min_max,
560        } => {
561            let record_sum = !matches!(
562                kind,
563                InstrumentKind::UpDownCounter
564                    | InstrumentKind::ObservableUpDownCounter
565                    | InstrumentKind::ObservableGauge
566            );
567            Ok(Some(b.exponential_bucket_histogram(
568                *max_size,
569                *max_scale,
570                *record_min_max,
571                record_sum,
572            )))
573        }
574    }
575}
576
577/// Checks if the aggregation can be used by the instrument.
578///
579/// Current compatibility:
580///
581/// | Instrument Kind          | Drop | LastValue | Sum | Histogram | Exponential Histogram |
582/// |--------------------------|------|-----------|-----|-----------|-----------------------|
583/// | Counter                  | ✓    |           | ✓   | ✓         | ✓                     |
584/// | UpDownCounter            | ✓    |           | ✓   | ✓         | ✓                     |
585/// | Histogram                | ✓    |           | ✓   | ✓         | ✓                     |
586/// | Observable Counter       | ✓    |           | ✓   | ✓         | ✓                     |
587/// | Observable UpDownCounter | ✓    |           | ✓   | ✓         | ✓                     |
588/// | Gauge                    | ✓    | ✓         |     | ✓         | ✓                     |
589/// | Observable Gauge         | ✓    | ✓         |     | ✓         | ✓                     |
590fn is_aggregator_compatible(
591    kind: &InstrumentKind,
592    agg: &aggregation::Aggregation,
593) -> MetricResult<()> {
594    match agg {
595        Aggregation::Default => Ok(()),
596        Aggregation::ExplicitBucketHistogram { .. }
597        | Aggregation::Base2ExponentialHistogram { .. } => {
598            if matches!(
599                kind,
600                InstrumentKind::Counter
601                    | InstrumentKind::UpDownCounter
602                    | InstrumentKind::Gauge
603                    | InstrumentKind::Histogram
604                    | InstrumentKind::ObservableCounter
605                    | InstrumentKind::ObservableUpDownCounter
606                    | InstrumentKind::ObservableGauge
607            ) {
608                return Ok(());
609            }
610            Err(MetricError::Other("incompatible aggregation".into()))
611        }
612        Aggregation::Sum => {
613            match kind {
614                InstrumentKind::ObservableCounter
615                | InstrumentKind::ObservableUpDownCounter
616                | InstrumentKind::Counter
617                | InstrumentKind::Histogram
618                | InstrumentKind::UpDownCounter => Ok(()),
619                _ => {
620                    // TODO: review need for aggregation check after
621                    // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
622                    Err(MetricError::Other("incompatible aggregation".into()))
623                }
624            }
625        }
626        Aggregation::LastValue => {
627            match kind {
628                InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
629                _ => {
630                    // TODO: review need for aggregation check after
631                    // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
632                    Err(MetricError::Other("incompatible aggregation".into()))
633                }
634            }
635        }
636        Aggregation::Drop => Ok(()),
637    }
638}
639
640/// The group of pipelines connecting Readers with instrument measurement.
641#[derive(Clone, Debug)]
642pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
643
644impl Pipelines {
645    pub(crate) fn new(
646        res: Resource,
647        readers: Vec<Box<dyn MetricReader>>,
648        views: Vec<Arc<dyn View>>,
649    ) -> Self {
650        let mut pipes = Vec::with_capacity(readers.len());
651        for r in readers {
652            let p = Arc::new(Pipeline {
653                resource: res.clone(),
654                reader: r,
655                views: views.clone(),
656                inner: Default::default(),
657            });
658            p.reader.register_pipeline(Arc::downgrade(&p));
659            pipes.push(p);
660        }
661
662        Pipelines(pipes)
663    }
664
665    pub(crate) fn register_callback<F>(&self, callback: F)
666    where
667        F: Fn() + Send + Sync + 'static,
668    {
669        let cb = Arc::new(callback);
670        for pipe in &self.0 {
671            pipe.add_callback(cb.clone())
672        }
673    }
674
675    /// Force flush all pipelines
676    pub(crate) fn force_flush(&self) -> OTelSdkResult {
677        let mut errs = vec![];
678        for pipeline in &self.0 {
679            if let Err(err) = pipeline.force_flush() {
680                errs.push(err);
681            }
682        }
683
684        if errs.is_empty() {
685            Ok(())
686        } else {
687            Err(OTelSdkError::InternalFailure(format!("{errs:?}")))
688        }
689    }
690
691    /// Shut down all pipelines
692    pub(crate) fn shutdown(&self) -> OTelSdkResult {
693        let mut errs = vec![];
694        for pipeline in &self.0 {
695            if let Err(err) = pipeline.shutdown() {
696                errs.push(err);
697            }
698        }
699
700        if errs.is_empty() {
701            Ok(())
702        } else {
703            Err(crate::error::OTelSdkError::InternalFailure(format!(
704                "{errs:?}"
705            )))
706        }
707    }
708}
709
710/// resolver facilitates resolving aggregate functions an instrument calls to
711/// aggregate measurements with while updating all pipelines that need to pull from
712/// those aggregations.
713pub(crate) struct Resolver<T> {
714    inserters: Vec<Inserter<T>>,
715}
716
717impl<T> Resolver<T>
718where
719    T: Number,
720{
721    pub(crate) fn new(
722        pipelines: Arc<Pipelines>,
723        view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
724    ) -> Self {
725        let inserters = pipelines
726            .0
727            .iter()
728            .map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
729            .collect();
730
731        Resolver { inserters }
732    }
733
734    /// The measures that must be updated by the instrument defined by key.
735    pub(crate) fn measures(
736        &self,
737        id: Instrument,
738        boundaries: Option<Vec<f64>>,
739    ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
740        let (mut measures, mut errs) = (vec![], vec![]);
741
742        for inserter in &self.inserters {
743            match inserter.instrument(id.clone(), boundaries.as_deref()) {
744                Ok(ms) => measures.extend(ms),
745                Err(err) => errs.push(err),
746            }
747        }
748
749        if errs.is_empty() {
750            if measures.is_empty() {
751                // TODO: Emit internal log that measurements from the instrument
752                // are being dropped due to view configuration
753            }
754            Ok(measures)
755        } else {
756            Err(MetricError::Other(format!("{errs:?}")))
757        }
758    }
759}