opentelemetry_otlp/exporter/tonic/
logs.rs

1use core::fmt;
2use opentelemetry::otel_debug;
3use opentelemetry_proto::tonic::collector::logs::v1::{
4    logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
5};
6use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
7use opentelemetry_sdk::logs::{LogBatch, LogExporter};
8use std::time;
9use tokio::sync::Mutex;
10use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
11
12use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
13
14use super::BoxInterceptor;
15
16pub(crate) struct TonicLogsClient {
17    inner: Mutex<Option<ClientInner>>,
18    #[allow(dead_code)]
19    // <allow dead> would be removed once we support set_resource for metrics.
20    resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
21}
22
23struct ClientInner {
24    client: LogsServiceClient<Channel>,
25    interceptor: BoxInterceptor,
26}
27
28impl fmt::Debug for TonicLogsClient {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        f.write_str("TonicLogsClient")
31    }
32}
33
34impl TonicLogsClient {
35    pub(super) fn new(
36        channel: Channel,
37        interceptor: BoxInterceptor,
38        compression: Option<CompressionEncoding>,
39    ) -> Self {
40        let mut client = LogsServiceClient::new(channel);
41        if let Some(compression) = compression {
42            client = client
43                .send_compressed(compression)
44                .accept_compressed(compression);
45        }
46
47        otel_debug!(name: "TonicsLogsClientBuilt");
48
49        TonicLogsClient {
50            inner: Mutex::new(Some(ClientInner {
51                client,
52                interceptor,
53            })),
54            resource: Default::default(),
55        }
56    }
57}
58
59impl LogExporter for TonicLogsClient {
60    async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
61        let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
62            Some(inner) => {
63                let (m, e, _) = inner
64                    .interceptor
65                    .call(Request::new(()))
66                    .map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
67                    .into_parts();
68                (inner.client.clone(), m, e)
69            }
70            None => return Err(OTelSdkError::AlreadyShutdown),
71        };
72
73        let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
74
75        otel_debug!(name: "TonicLogsClient.ExportStarted");
76
77        let result = client
78            .export(Request::from_parts(
79                metadata,
80                extensions,
81                ExportLogsServiceRequest { resource_logs },
82            ))
83            .await;
84
85        match result {
86            Ok(_) => {
87                otel_debug!(name: "TonicLogsClient.ExportSucceeded");
88                Ok(())
89            }
90            Err(e) => {
91                let error = format!("export error: {e:?}");
92                otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
93                Err(OTelSdkError::InternalFailure(error))
94            }
95        }
96    }
97
98    fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
99        // TODO: Implement actual shutdown
100        // Due to the use of tokio::sync::Mutex to guard
101        // the inner client, we need to await the call to lock the mutex
102        // and that requires async runtime.
103        // It is possible to fix this by using
104        // a dedicated thread just to handle shutdown.
105        // But for now, we just return Ok.
106        Ok(())
107    }
108
109    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
110        self.resource = resource.into();
111    }
112}