opentelemetry_otlp/exporter/tonic/
trace.rs

1use core::fmt;
2use tokio::sync::Mutex;
3
4use opentelemetry::otel_debug;
5use opentelemetry_proto::tonic::collector::trace::v1::{
6    trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
7};
8use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
9use opentelemetry_sdk::error::OTelSdkError;
10use opentelemetry_sdk::{
11    error::OTelSdkResult,
12    trace::{SpanData, SpanExporter},
13};
14use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
15
16use super::BoxInterceptor;
17
18pub(crate) struct TonicTracesClient {
19    inner: Option<ClientInner>,
20    #[allow(dead_code)]
21    // <allow dead> would be removed once we support set_resource for metrics.
22    resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
23}
24
25struct ClientInner {
26    client: TraceServiceClient<Channel>,
27    interceptor: Mutex<BoxInterceptor>,
28}
29
30impl fmt::Debug for TonicTracesClient {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.write_str("TonicTracesClient")
33    }
34}
35
36impl TonicTracesClient {
37    pub(super) fn new(
38        channel: Channel,
39        interceptor: BoxInterceptor,
40        compression: Option<CompressionEncoding>,
41    ) -> Self {
42        let mut client = TraceServiceClient::new(channel);
43        if let Some(compression) = compression {
44            client = client
45                .send_compressed(compression)
46                .accept_compressed(compression);
47        }
48
49        otel_debug!(name: "TonicsTracesClientBuilt");
50
51        TonicTracesClient {
52            inner: Some(ClientInner {
53                client,
54                interceptor: Mutex::new(interceptor),
55            }),
56            resource: Default::default(),
57        }
58    }
59}
60
61impl SpanExporter for TonicTracesClient {
62    async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
63        let (mut client, metadata, extensions) = match &self.inner {
64            Some(inner) => {
65                let (m, e, _) = inner
66                    .interceptor
67                    .lock()
68                    .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
69                    .call(Request::new(()))
70                    .map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
71                    .into_parts();
72                (inner.client.clone(), m, e)
73            }
74            None => return Err(OTelSdkError::AlreadyShutdown),
75        };
76
77        let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
78
79        otel_debug!(name: "TonicTracesClient.ExportStarted");
80
81        let result = client
82            .export(Request::from_parts(
83                metadata,
84                extensions,
85                ExportTraceServiceRequest { resource_spans },
86            ))
87            .await;
88
89        match result {
90            Ok(_) => {
91                otel_debug!(name: "TonicTracesClient.ExportSucceeded");
92                Ok(())
93            }
94            Err(e) => {
95                let error = e.to_string();
96                otel_debug!(name: "TonicTracesClient.ExportFailed", error = &error);
97                Err(OTelSdkError::InternalFailure(error))
98            }
99        }
100    }
101
102    fn shutdown(&mut self) -> OTelSdkResult {
103        match self.inner.take() {
104            Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
105            None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
106        }
107    }
108
109    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
110        self.resource = resource.into();
111    }
112}