opentelemetry_otlp/exporter/tonic/
metrics.rs

1use core::fmt;
2use std::sync::Mutex;
3
4use opentelemetry::otel_debug;
5use opentelemetry_proto::tonic::collector::metrics::v1::{
6    metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
7};
8use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
9use opentelemetry_sdk::metrics::data::ResourceMetrics;
10use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
11
12use super::BoxInterceptor;
13use crate::metric::MetricsClient;
14
15pub(crate) struct TonicMetricsClient {
16    inner: Mutex<Option<ClientInner>>,
17}
18
19struct ClientInner {
20    client: MetricsServiceClient<Channel>,
21    interceptor: BoxInterceptor,
22}
23
24impl fmt::Debug for TonicMetricsClient {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        f.write_str("TonicMetricsClient")
27    }
28}
29
30impl TonicMetricsClient {
31    pub(super) fn new(
32        channel: Channel,
33        interceptor: BoxInterceptor,
34        compression: Option<CompressionEncoding>,
35    ) -> Self {
36        let mut client = MetricsServiceClient::new(channel);
37        if let Some(compression) = compression {
38            client = client
39                .send_compressed(compression)
40                .accept_compressed(compression);
41        }
42
43        otel_debug!(name: "TonicsMetricsClientBuilt");
44
45        TonicMetricsClient {
46            inner: Mutex::new(Some(ClientInner {
47                client,
48                interceptor,
49            })),
50        }
51    }
52}
53
54impl MetricsClient for TonicMetricsClient {
55    async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
56        let (mut client, metadata, extensions) = self
57            .inner
58            .lock()
59            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
60            .and_then(|mut inner| match &mut *inner {
61                Some(inner) => {
62                    let (m, e, _) = inner
63                        .interceptor
64                        .call(Request::new(()))
65                        .map_err(|e| {
66                            OTelSdkError::InternalFailure(format!(
67                                "unexpected status while exporting {e:?}"
68                            ))
69                        })?
70                        .into_parts();
71                    Ok((inner.client.clone(), m, e))
72                }
73                None => Err(OTelSdkError::InternalFailure(
74                    "exporter is already shut down".into(),
75                )),
76            })?;
77
78        otel_debug!(name: "TonicMetricsClient.ExportStarted");
79
80        let result = client
81            .export(Request::from_parts(
82                metadata,
83                extensions,
84                ExportMetricsServiceRequest::from(metrics),
85            ))
86            .await;
87
88        match result {
89            Ok(_) => {
90                otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
91                Ok(())
92            }
93            Err(e) => {
94                let error = format!("{e:?}");
95                otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error);
96                Err(OTelSdkError::InternalFailure(error))
97            }
98        }
99    }
100
101    fn shutdown(&self) -> OTelSdkResult {
102        self.inner
103            .lock()
104            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))?
105            .take();
106
107        Ok(())
108    }
109}