opentelemetry_otlp/exporter/tonic/
metrics.rs1use core::fmt;
2use std::sync::Mutex;
3
4use opentelemetry::otel_debug;
5use opentelemetry_proto::tonic::collector::metrics::v1::{
6 metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
7};
8use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
9use opentelemetry_sdk::metrics::data::ResourceMetrics;
10use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
11
12use super::BoxInterceptor;
13use crate::metric::MetricsClient;
14
15pub(crate) struct TonicMetricsClient {
16 inner: Mutex<Option<ClientInner>>,
17}
18
19struct ClientInner {
20 client: MetricsServiceClient<Channel>,
21 interceptor: BoxInterceptor,
22}
23
24impl fmt::Debug for TonicMetricsClient {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 f.write_str("TonicMetricsClient")
27 }
28}
29
30impl TonicMetricsClient {
31 pub(super) fn new(
32 channel: Channel,
33 interceptor: BoxInterceptor,
34 compression: Option<CompressionEncoding>,
35 ) -> Self {
36 let mut client = MetricsServiceClient::new(channel);
37 if let Some(compression) = compression {
38 client = client
39 .send_compressed(compression)
40 .accept_compressed(compression);
41 }
42
43 otel_debug!(name: "TonicsMetricsClientBuilt");
44
45 TonicMetricsClient {
46 inner: Mutex::new(Some(ClientInner {
47 client,
48 interceptor,
49 })),
50 }
51 }
52}
53
54impl MetricsClient for TonicMetricsClient {
55 async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
56 let (mut client, metadata, extensions) = self
57 .inner
58 .lock()
59 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
60 .and_then(|mut inner| match &mut *inner {
61 Some(inner) => {
62 let (m, e, _) = inner
63 .interceptor
64 .call(Request::new(()))
65 .map_err(|e| {
66 OTelSdkError::InternalFailure(format!(
67 "unexpected status while exporting {e:?}"
68 ))
69 })?
70 .into_parts();
71 Ok((inner.client.clone(), m, e))
72 }
73 None => Err(OTelSdkError::InternalFailure(
74 "exporter is already shut down".into(),
75 )),
76 })?;
77
78 otel_debug!(name: "TonicMetricsClient.ExportStarted");
79
80 let result = client
81 .export(Request::from_parts(
82 metadata,
83 extensions,
84 ExportMetricsServiceRequest::from(metrics),
85 ))
86 .await;
87
88 match result {
89 Ok(_) => {
90 otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
91 Ok(())
92 }
93 Err(e) => {
94 let error = format!("{e:?}");
95 otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error);
96 Err(OTelSdkError::InternalFailure(error))
97 }
98 }
99 }
100
101 fn shutdown(&self) -> OTelSdkResult {
102 self.inner
103 .lock()
104 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))?
105 .take();
106
107 Ok(())
108 }
109}