1use core::fmt;
2use opentelemetry::{
3 metrics::{Meter, MeterProvider},
4 otel_debug, otel_error, otel_info, InstrumentationScope,
5};
6use std::time::Duration;
7use std::{
8 collections::HashMap,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc, Mutex,
12 },
13};
14
15use crate::error::OTelSdkResult;
16use crate::Resource;
17
18use super::{
19 exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter,
20 periodic_reader::PeriodicReader, pipeline::Pipelines, reader::MetricReader, view::View,
21 Instrument, Stream,
22};
23
24#[derive(Clone, Debug)]
35pub struct SdkMeterProvider {
36 inner: Arc<SdkMeterProviderInner>,
37}
38
39#[derive(Debug)]
40struct SdkMeterProviderInner {
41 pipes: Arc<Pipelines>,
42 meters: Mutex<HashMap<InstrumentationScope, Arc<SdkMeter>>>,
43 shutdown_invoked: AtomicBool,
44}
45
46impl Default for SdkMeterProvider {
47 fn default() -> Self {
48 SdkMeterProvider::builder().build()
49 }
50}
51
52impl SdkMeterProvider {
53 pub fn builder() -> MeterProviderBuilder {
55 MeterProviderBuilder::default()
56 }
57
58 pub fn force_flush(&self) -> OTelSdkResult {
98 self.inner.force_flush()
99 }
100
101 pub fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
114 otel_debug!(
115 name: "MeterProvider.Shutdown",
116 message = "User initiated shutdown of MeterProvider."
117 );
118 self.inner.shutdown()
119 }
120
121 pub fn shutdown(&self) -> OTelSdkResult {
123 self.shutdown_with_timeout(Duration::from_secs(5))
124 }
125}
126
127impl SdkMeterProviderInner {
128 fn force_flush(&self) -> OTelSdkResult {
129 if self
130 .shutdown_invoked
131 .load(std::sync::atomic::Ordering::Relaxed)
132 {
133 Err(crate::error::OTelSdkError::AlreadyShutdown)
134 } else {
135 self.pipes.force_flush()
136 }
137 }
138
139 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
140 if self
141 .shutdown_invoked
142 .swap(true, std::sync::atomic::Ordering::SeqCst)
143 {
144 Err(crate::error::OTelSdkError::AlreadyShutdown)
146 } else {
147 self.pipes.shutdown()
148 }
149 }
150
151 fn shutdown(&self) -> OTelSdkResult {
152 self.shutdown_with_timeout(Duration::from_secs(5))
153 }
154}
155
156impl Drop for SdkMeterProviderInner {
157 fn drop(&mut self) {
158 if self.shutdown_invoked.load(Ordering::Relaxed) {
161 otel_debug!(
162 name: "MeterProvider.Drop.AlreadyShutdown",
163 message = "MeterProvider was already shut down; drop will not attempt shutdown again."
164 );
165 } else {
166 otel_info!(
167 name: "MeterProvider.Drop",
168 message = "Last reference of MeterProvider dropped, initiating shutdown."
169 );
170 if let Err(err) = self.shutdown() {
171 otel_error!(
172 name: "MeterProvider.Drop.ShutdownFailed",
173 message = "Shutdown attempt failed during drop of MeterProvider.",
174 reason = format!("{}", err)
175 );
176 } else {
177 otel_debug!(
178 name: "MeterProvider.Drop.ShutdownCompleted",
179 );
180 }
181 }
182 }
183}
184
185impl MeterProvider for SdkMeterProvider {
186 fn meter(&self, name: &'static str) -> Meter {
187 let scope = InstrumentationScope::builder(name).build();
188 self.meter_with_scope(scope)
189 }
190
191 fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
192 if self.inner.shutdown_invoked.load(Ordering::Relaxed) {
193 otel_debug!(
194 name: "MeterProvider.NoOpMeterReturned",
195 meter_name = scope.name(),
196 );
197 return Meter::new(Arc::new(NoopMeter::new()));
198 }
199
200 if scope.name().is_empty() {
201 otel_info!(name: "MeterNameEmpty", message = "Meter name is empty; consider providing a meaningful name. Meter will function normally and the provided name will be used as-is.");
202 };
203
204 if let Ok(mut meters) = self.inner.meters.lock() {
205 if let Some(existing_meter) = meters.get(&scope) {
206 otel_debug!(
207 name: "MeterProvider.ExistingMeterReturned",
208 meter_name = scope.name(),
209 );
210 Meter::new(existing_meter.clone())
211 } else {
212 let new_meter = Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()));
213 meters.insert(scope.clone(), new_meter.clone());
214 otel_debug!(
215 name: "MeterProvider.NewMeterCreated",
216 meter_name = scope.name(),
217 );
218 Meter::new(new_meter)
219 }
220 } else {
221 otel_debug!(
222 name: "MeterProvider.NoOpMeterReturned",
223 meter_name = scope.name(),
224 );
225 Meter::new(Arc::new(NoopMeter::new()))
226 }
227 }
228}
229
230#[derive(Default)]
232pub struct MeterProviderBuilder {
233 resource: Option<Resource>,
234 readers: Vec<Box<dyn MetricReader>>,
235 views: Vec<Arc<dyn View>>,
236}
237
238impl MeterProviderBuilder {
239 pub fn with_resource(mut self, resource: Resource) -> Self {
251 self.resource = match self.resource {
252 Some(existing) => Some(existing.merge(&resource)),
253 None => Some(resource),
254 };
255
256 self
257 }
258
259 pub fn with_reader<T: MetricReader>(mut self, reader: T) -> Self {
266 self.readers.push(Box::new(reader));
267 self
268 }
269
270 pub fn with_periodic_exporter<T>(mut self, exporter: T) -> Self
283 where
284 T: PushMetricExporter,
285 {
286 let reader = PeriodicReader::builder(exporter).build();
287 self.readers.push(Box::new(reader));
288 self
289 }
290
291 pub fn with_view<T>(mut self, view: T) -> Self
368 where
369 T: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
370 {
371 self.views.push(Arc::new(view));
372 self
373 }
374
375 pub fn build(self) -> SdkMeterProvider {
377 otel_debug!(
378 name: "MeterProvider.Building",
379 builder = format!("{:?}", &self),
380 );
381
382 let meter_provider = SdkMeterProvider {
383 inner: Arc::new(SdkMeterProviderInner {
384 pipes: Arc::new(Pipelines::new(
385 self.resource.unwrap_or(Resource::builder().build()),
386 self.readers,
387 self.views,
388 )),
389 meters: Default::default(),
390 shutdown_invoked: AtomicBool::new(false),
391 }),
392 };
393
394 otel_debug!(
395 name: "MeterProvider.Built",
396 );
397 meter_provider
398 }
399}
400
401impl fmt::Debug for MeterProviderBuilder {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 f.debug_struct("MeterProviderBuilder")
404 .field("resource", &self.resource)
405 .field("readers", &self.readers)
406 .field("views", &self.views.len())
407 .finish()
408 }
409}
410#[cfg(all(test, feature = "testing"))]
411mod tests {
412 use crate::error::OTelSdkError;
413 use crate::metrics::SdkMeterProvider;
414 use crate::resource::{
415 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
416 };
417 use crate::testing::metrics::metric_reader::TestMetricReader;
418 use crate::Resource;
419 use opentelemetry::metrics::MeterProvider;
420 use opentelemetry::{global, InstrumentationScope};
421 use opentelemetry::{Key, KeyValue, Value};
422 use std::env;
423
424 #[test]
425 fn test_meter_provider_resource() {
426 let assert_resource = |provider: &super::SdkMeterProvider,
427 resource_key: &'static str,
428 expect: Option<&'static str>| {
429 assert_eq!(
430 provider.inner.pipes.0[0]
431 .resource
432 .get(&Key::from_static_str(resource_key))
433 .map(|v| v.to_string()),
434 expect.map(|s| s.to_string())
435 );
436 };
437 let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
438 assert_eq!(
439 provider.inner.pipes.0[0]
440 .resource
441 .get(&TELEMETRY_SDK_LANGUAGE.into()),
442 Some(Value::from("rust"))
443 );
444 assert_eq!(
445 provider.inner.pipes.0[0]
446 .resource
447 .get(&TELEMETRY_SDK_NAME.into()),
448 Some(Value::from("opentelemetry"))
449 );
450 assert_eq!(
451 provider.inner.pipes.0[0]
452 .resource
453 .get(&TELEMETRY_SDK_VERSION.into()),
454 Some(Value::from(env!("CARGO_PKG_VERSION")))
455 );
456 };
457
458 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
460 let reader = TestMetricReader::new();
461 let default_meter_provider = super::SdkMeterProvider::builder()
462 .with_reader(reader)
463 .build();
464 assert_resource(
465 &default_meter_provider,
466 SERVICE_NAME,
467 Some("unknown_service"),
468 );
469 assert_telemetry_resource(&default_meter_provider);
470 });
471
472 let reader2 = TestMetricReader::new();
474 let custom_meter_provider = super::SdkMeterProvider::builder()
475 .with_reader(reader2)
476 .with_resource(
477 Resource::builder_empty()
478 .with_service_name("test_service")
479 .build(),
480 )
481 .build();
482 assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
483 assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);
484
485 temp_env::with_var(
486 "OTEL_RESOURCE_ATTRIBUTES",
487 Some("key1=value1, k2, k3=value2"),
488 || {
489 let reader3 = TestMetricReader::new();
491 let env_resource_provider = super::SdkMeterProvider::builder()
492 .with_reader(reader3)
493 .build();
494 assert_resource(
495 &env_resource_provider,
496 SERVICE_NAME,
497 Some("unknown_service"),
498 );
499 assert_resource(&env_resource_provider, "key1", Some("value1"));
500 assert_resource(&env_resource_provider, "k3", Some("value2"));
501 assert_telemetry_resource(&env_resource_provider);
502 assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
503 },
504 );
505
506 temp_env::with_var(
508 "OTEL_RESOURCE_ATTRIBUTES",
509 Some("my-custom-key=env-val,k2=value2"),
510 || {
511 let reader4 = TestMetricReader::new();
512 let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
513 .with_reader(reader4)
514 .with_resource(
515 Resource::builder()
516 .with_attributes([
517 KeyValue::new("my-custom-key", "my-custom-value"),
518 KeyValue::new("my-custom-key2", "my-custom-value2"),
519 ])
520 .build(),
521 )
522 .build();
523 assert_resource(
524 &user_provided_resource_config_provider,
525 SERVICE_NAME,
526 Some("unknown_service"),
527 );
528 assert_resource(
529 &user_provided_resource_config_provider,
530 "my-custom-key",
531 Some("my-custom-value"),
532 );
533 assert_resource(
534 &user_provided_resource_config_provider,
535 "my-custom-key2",
536 Some("my-custom-value2"),
537 );
538 assert_resource(
539 &user_provided_resource_config_provider,
540 "k2",
541 Some("value2"),
542 );
543 assert_telemetry_resource(&user_provided_resource_config_provider);
544 assert_eq!(
545 user_provided_resource_config_provider.inner.pipes.0[0]
546 .resource
547 .len(),
548 7
549 );
550 },
551 );
552
553 let reader5 = TestMetricReader::new();
555 let no_service_name = super::SdkMeterProvider::builder()
556 .with_reader(reader5)
557 .with_resource(Resource::empty())
558 .build();
559
560 assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
561 }
562
563 #[test]
564 fn test_meter_provider_shutdown() {
565 let reader = TestMetricReader::new();
566 let provider = super::SdkMeterProvider::builder()
567 .with_reader(reader.clone())
568 .build();
569 global::set_meter_provider(provider.clone());
570 assert!(!reader.is_shutdown());
571 let meter = global::meter("test");
573 let counter = meter.u64_counter("test_counter").build();
574 let shutdown_res = provider.shutdown();
576 assert!(shutdown_res.is_ok());
577
578 let shutdown_res = provider.shutdown();
580 assert!(matches!(shutdown_res, Err(OTelSdkError::AlreadyShutdown)));
581
582 assert!(shutdown_res.is_err());
583 assert!(reader.is_shutdown());
584 counter.add(1, &[]);
587 }
588 #[test]
589 fn test_shutdown_invoked_on_last_drop() {
590 let reader = TestMetricReader::new();
591 let provider = super::SdkMeterProvider::builder()
592 .with_reader(reader.clone())
593 .build();
594 let clone1 = provider.clone();
595 let clone2 = provider.clone();
596
597 assert!(!reader.is_shutdown());
599
600 drop(clone1);
602 assert!(!reader.is_shutdown());
603
604 drop(clone2);
606 assert!(!reader.is_shutdown());
607
608 drop(provider);
610 assert!(reader.is_shutdown());
612 }
613
614 #[test]
615 fn same_meter_reused_same_scope() {
616 let provider = super::SdkMeterProvider::builder().build();
617 let _meter1 = provider.meter("test");
618 let _meter2 = provider.meter("test");
619 assert_eq!(provider.inner.meters.lock().unwrap().len(), 1);
620
621 let scope = InstrumentationScope::builder("test")
622 .with_version("1.0.0")
623 .with_schema_url("http://example.com")
624 .build();
625
626 let _meter3 = provider.meter_with_scope(scope.clone());
627 let _meter4 = provider.meter_with_scope(scope.clone());
628 let _meter5 = provider.meter_with_scope(scope);
629 assert_eq!(provider.inner.meters.lock().unwrap().len(), 2);
630
631 let make_scope = |name| {
633 InstrumentationScope::builder(name)
634 .with_version("1.0.0")
635 .with_schema_url("http://example.com")
636 .build()
637 };
638
639 let _meter6 = provider.meter_with_scope(make_scope("ABC"));
640 let _meter7 = provider.meter_with_scope(make_scope("Abc"));
641 let _meter8 = provider.meter_with_scope(make_scope("abc"));
642
643 assert_eq!(provider.inner.meters.lock().unwrap().len(), 5);
644 }
645
646 #[test]
647 fn same_meter_reused_same_scope_attributes() {
648 let meter_provider = super::SdkMeterProvider::builder().build();
649 let make_scope = |attributes| {
650 InstrumentationScope::builder("test.meter")
651 .with_version("v0.1.0")
652 .with_schema_url("http://example.com")
653 .with_attributes(attributes)
654 .build()
655 };
656
657 let _meter1 =
658 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
659 let _meter2 =
660 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
661
662 assert_eq!(meter_provider.inner.meters.lock().unwrap().len(), 1);
663
664 let _meter3 = meter_provider.meter_with_scope(make_scope(vec![
666 KeyValue::new("key1", "value1"),
667 KeyValue::new("key2", "value2"),
668 ]));
669 let _meter4 = meter_provider.meter_with_scope(make_scope(vec![
670 KeyValue::new("key2", "value2"),
671 KeyValue::new("key1", "value1"),
672 ]));
673
674 assert_eq!(meter_provider.inner.meters.lock().unwrap().len(), 2);
675 }
676
677 #[test]
678 fn different_meter_different_attributes() {
679 let meter_provider = super::SdkMeterProvider::builder().build();
680 let make_scope = |attributes| {
681 InstrumentationScope::builder("test.meter")
682 .with_version("v0.1.0")
683 .with_schema_url("http://example.com")
684 .with_attributes(attributes)
685 .build()
686 };
687
688 let _meter1 = meter_provider.meter_with_scope(make_scope(vec![]));
689 let _meter2 =
691 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key1", "value1")]));
692 let _meter3 =
693 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("Key1", "value1")]));
694 let _meter4 =
695 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key1", "Value1")]));
696 let _meter5 = meter_provider.meter_with_scope(make_scope(vec![
697 KeyValue::new("key1", "value1"),
698 KeyValue::new("key2", "value2"),
699 ]));
700
701 assert_eq!(meter_provider.inner.meters.lock().unwrap().len(), 5);
702 }
703
704 #[test]
705 fn with_resource_multiple_calls_ensure_additive() {
706 let builder = SdkMeterProvider::builder()
707 .with_resource(Resource::new(vec![KeyValue::new("key1", "value1")]))
708 .with_resource(Resource::new(vec![KeyValue::new("key2", "value2")]))
709 .with_resource(
710 Resource::builder_empty()
711 .with_schema_url(vec![], "http://example.com")
712 .build(),
713 )
714 .with_resource(Resource::new(vec![KeyValue::new("key3", "value3")]));
715
716 let resource = builder.resource.unwrap();
717
718 assert_eq!(
719 resource.get(&Key::from_static_str("key1")),
720 Some(Value::from("value1"))
721 );
722 assert_eq!(
723 resource.get(&Key::from_static_str("key2")),
724 Some(Value::from("value2"))
725 );
726 assert_eq!(
727 resource.get(&Key::from_static_str("key3")),
728 Some(Value::from("value3"))
729 );
730 assert_eq!(resource.schema_url(), Some("http://example.com"));
731 }
732}