opentelemetry_otlp/exporter/tonic/
logs.rs1use core::fmt;
2use opentelemetry::otel_debug;
3use opentelemetry_proto::tonic::collector::logs::v1::{
4 logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
5};
6use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
7use opentelemetry_sdk::logs::{LogBatch, LogExporter};
8use std::time;
9use tokio::sync::Mutex;
10use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
11
12use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
13
14use super::BoxInterceptor;
15
16pub(crate) struct TonicLogsClient {
17 inner: Mutex<Option<ClientInner>>,
18 #[allow(dead_code)]
19 resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
21}
22
23struct ClientInner {
24 client: LogsServiceClient<Channel>,
25 interceptor: BoxInterceptor,
26}
27
28impl fmt::Debug for TonicLogsClient {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 f.write_str("TonicLogsClient")
31 }
32}
33
34impl TonicLogsClient {
35 pub(super) fn new(
36 channel: Channel,
37 interceptor: BoxInterceptor,
38 compression: Option<CompressionEncoding>,
39 ) -> Self {
40 let mut client = LogsServiceClient::new(channel);
41 if let Some(compression) = compression {
42 client = client
43 .send_compressed(compression)
44 .accept_compressed(compression);
45 }
46
47 otel_debug!(name: "TonicsLogsClientBuilt");
48
49 TonicLogsClient {
50 inner: Mutex::new(Some(ClientInner {
51 client,
52 interceptor,
53 })),
54 resource: Default::default(),
55 }
56 }
57}
58
59impl LogExporter for TonicLogsClient {
60 async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
61 let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
62 Some(inner) => {
63 let (m, e, _) = inner
64 .interceptor
65 .call(Request::new(()))
66 .map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
67 .into_parts();
68 (inner.client.clone(), m, e)
69 }
70 None => return Err(OTelSdkError::AlreadyShutdown),
71 };
72
73 let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
74
75 otel_debug!(name: "TonicLogsClient.ExportStarted");
76
77 let result = client
78 .export(Request::from_parts(
79 metadata,
80 extensions,
81 ExportLogsServiceRequest { resource_logs },
82 ))
83 .await;
84
85 match result {
86 Ok(_) => {
87 otel_debug!(name: "TonicLogsClient.ExportSucceeded");
88 Ok(())
89 }
90 Err(e) => {
91 let error = format!("export error: {e:?}");
92 otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
93 Err(OTelSdkError::InternalFailure(error))
94 }
95 }
96 }
97
98 fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
99 Ok(())
107 }
108
109 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
110 self.resource = resource.into();
111 }
112}