opentelemetry_proto/transform/
logs.rs

1#[cfg(feature = "gen-tonic-messages")]
2pub mod tonic {
3    use crate::{
4        tonic::{
5            common::v1::{
6                any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
7                KeyValueList,
8            },
9            logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
10            resource::v1::Resource,
11        },
12        transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
13    };
14    use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
15    use opentelemetry_sdk::logs::LogBatch;
16    use std::borrow::Cow;
17    use std::collections::HashMap;
18
19    impl From<LogsAnyValue> for AnyValue {
20        fn from(value: LogsAnyValue) -> Self {
21            AnyValue {
22                value: Some(value.into()),
23            }
24        }
25    }
26
27    impl From<LogsAnyValue> for Value {
28        fn from(value: LogsAnyValue) -> Self {
29            match value {
30                LogsAnyValue::Double(f) => Value::DoubleValue(f),
31                LogsAnyValue::Int(i) => Value::IntValue(i),
32                LogsAnyValue::String(s) => Value::StringValue(s.into()),
33                LogsAnyValue::Boolean(b) => Value::BoolValue(b),
34                LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue {
35                    values: v
36                        .into_iter()
37                        .map(|v| AnyValue {
38                            value: Some(v.into()),
39                        })
40                        .collect(),
41                }),
42                LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList {
43                    values: m
44                        .into_iter()
45                        .map(|(key, value)| KeyValue {
46                            key: key.into(),
47                            value: Some(AnyValue {
48                                value: Some(value.into()),
49                            }),
50                        })
51                        .collect(),
52                }),
53                LogsAnyValue::Bytes(v) => Value::BytesValue(*v),
54                _ => unreachable!("Nonexistent value type"),
55            }
56        }
57    }
58
59    impl From<&opentelemetry_sdk::logs::SdkLogRecord> for LogRecord {
60        fn from(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> Self {
61            let trace_context = log_record.trace_context();
62            let severity_number = match log_record.severity_number() {
63                Some(Severity::Trace) => SeverityNumber::Trace,
64                Some(Severity::Trace2) => SeverityNumber::Trace2,
65                Some(Severity::Trace3) => SeverityNumber::Trace3,
66                Some(Severity::Trace4) => SeverityNumber::Trace4,
67                Some(Severity::Debug) => SeverityNumber::Debug,
68                Some(Severity::Debug2) => SeverityNumber::Debug2,
69                Some(Severity::Debug3) => SeverityNumber::Debug3,
70                Some(Severity::Debug4) => SeverityNumber::Debug4,
71                Some(Severity::Info) => SeverityNumber::Info,
72                Some(Severity::Info2) => SeverityNumber::Info2,
73                Some(Severity::Info3) => SeverityNumber::Info3,
74                Some(Severity::Info4) => SeverityNumber::Info4,
75                Some(Severity::Warn) => SeverityNumber::Warn,
76                Some(Severity::Warn2) => SeverityNumber::Warn2,
77                Some(Severity::Warn3) => SeverityNumber::Warn3,
78                Some(Severity::Warn4) => SeverityNumber::Warn4,
79                Some(Severity::Error) => SeverityNumber::Error,
80                Some(Severity::Error2) => SeverityNumber::Error2,
81                Some(Severity::Error3) => SeverityNumber::Error3,
82                Some(Severity::Error4) => SeverityNumber::Error4,
83                Some(Severity::Fatal) => SeverityNumber::Fatal,
84                Some(Severity::Fatal2) => SeverityNumber::Fatal2,
85                Some(Severity::Fatal3) => SeverityNumber::Fatal3,
86                Some(Severity::Fatal4) => SeverityNumber::Fatal4,
87                None => SeverityNumber::Unspecified,
88            };
89
90            LogRecord {
91                time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(),
92                observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()),
93                attributes: {
94                    log_record
95                        .attributes_iter()
96                        .map(|kv| KeyValue {
97                            key: kv.0.to_string(),
98                            value: Some(AnyValue {
99                                value: Some(kv.1.clone().into()),
100                            }),
101                        })
102                        .collect()
103                },
104                event_name: log_record.event_name().unwrap_or_default().into(),
105                severity_number: severity_number.into(),
106                severity_text: log_record
107                    .severity_text()
108                    .map(Into::into)
109                    .unwrap_or_default(),
110                body: log_record.body().cloned().map(Into::into),
111                dropped_attributes_count: 0,
112                flags: trace_context
113                    .map(|ctx| {
114                        ctx.trace_flags
115                            .map(|flags| flags.to_u8() as u32)
116                            .unwrap_or_default()
117                    })
118                    .unwrap_or_default(),
119                span_id: trace_context
120                    .map(|ctx| ctx.span_id.to_bytes().to_vec())
121                    .unwrap_or_default(),
122                trace_id: trace_context
123                    .map(|ctx| ctx.trace_id.to_bytes().to_vec())
124                    .unwrap_or_default(),
125            }
126        }
127    }
128
129    impl
130        From<(
131            (
132                &opentelemetry_sdk::logs::SdkLogRecord,
133                &opentelemetry::InstrumentationScope,
134            ),
135            &ResourceAttributesWithSchema,
136        )> for ResourceLogs
137    {
138        fn from(
139            data: (
140                (
141                    &opentelemetry_sdk::logs::SdkLogRecord,
142                    &opentelemetry::InstrumentationScope,
143                ),
144                &ResourceAttributesWithSchema,
145            ),
146        ) -> Self {
147            let ((log_record, instrumentation), resource) = data;
148
149            ResourceLogs {
150                resource: Some(Resource {
151                    attributes: resource.attributes.0.clone(),
152                    dropped_attributes_count: 0,
153                    entity_refs: vec![],
154                }),
155                schema_url: resource.schema_url.clone().unwrap_or_default(),
156                scope_logs: vec![ScopeLogs {
157                    schema_url: instrumentation
158                        .schema_url()
159                        .map(ToOwned::to_owned)
160                        .unwrap_or_default(),
161                    scope: Some((instrumentation, log_record.target().cloned()).into()),
162                    log_records: vec![log_record.into()],
163                }],
164            }
165        }
166    }
167
168    pub fn group_logs_by_resource_and_scope(
169        logs: LogBatch<'_>,
170        resource: &ResourceAttributesWithSchema,
171    ) -> Vec<ResourceLogs> {
172        // Group logs by target or instrumentation name
173        let scope_map = logs.iter().fold(
174            HashMap::new(),
175            |mut scope_map: HashMap<
176                Cow<'static, str>,
177                Vec<(
178                    &opentelemetry_sdk::logs::SdkLogRecord,
179                    &opentelemetry::InstrumentationScope,
180                )>,
181            >,
182             (log_record, instrumentation)| {
183                let key = log_record
184                    .target()
185                    .cloned()
186                    .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned()));
187                scope_map
188                    .entry(key)
189                    .or_default()
190                    .push((log_record, instrumentation));
191                scope_map
192            },
193        );
194
195        let scope_logs = scope_map
196            .into_iter()
197            .map(|(key, log_data)| ScopeLogs {
198                scope: Some(InstrumentationScope::from((
199                    log_data.first().unwrap().1,
200                    Some(key.into_owned().into()),
201                ))),
202                schema_url: resource.schema_url.clone().unwrap_or_default(),
203                log_records: log_data
204                    .into_iter()
205                    .map(|(log_record, _)| log_record.into())
206                    .collect(),
207            })
208            .collect();
209
210        vec![ResourceLogs {
211            resource: Some(Resource {
212                attributes: resource.attributes.0.clone(),
213                dropped_attributes_count: 0,
214                entity_refs: vec![],
215            }),
216            scope_logs,
217            schema_url: resource.schema_url.clone().unwrap_or_default(),
218        }]
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use crate::transform::common::tonic::ResourceAttributesWithSchema;
225    use opentelemetry::logs::LogRecord as _;
226    use opentelemetry::logs::Logger;
227    use opentelemetry::logs::LoggerProvider;
228    use opentelemetry::time::now;
229    use opentelemetry::InstrumentationScope;
230    use opentelemetry_sdk::error::OTelSdkResult;
231    use opentelemetry_sdk::logs::LogProcessor;
232    use opentelemetry_sdk::logs::SdkLoggerProvider;
233    use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource};
234
235    #[derive(Debug)]
236    struct MockProcessor;
237
238    impl LogProcessor for MockProcessor {
239        fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {}
240
241        fn force_flush(&self) -> OTelSdkResult {
242            Ok(())
243        }
244    }
245
246    fn create_test_log_data(
247        instrumentation_name: &str,
248        _message: &str,
249    ) -> (SdkLogRecord, InstrumentationScope) {
250        let processor = MockProcessor {};
251        let logger = SdkLoggerProvider::builder()
252            .with_log_processor(processor)
253            .build()
254            .logger("test");
255        let mut logrecord = logger.create_log_record();
256        logrecord.set_timestamp(now());
257        logrecord.set_observed_timestamp(now());
258        let instrumentation =
259            InstrumentationScope::builder(instrumentation_name.to_string()).build();
260        (logrecord, instrumentation)
261    }
262
263    #[test]
264    fn test_group_logs_by_resource_and_scope_single_scope() {
265        let resource = Resource::builder().build();
266        let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
267        let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");
268
269        let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
270        let log_batch = LogBatch::new(&logs);
271        let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
272
273        let grouped_logs =
274            crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
275
276        assert_eq!(grouped_logs.len(), 1);
277        let resource_logs = &grouped_logs[0];
278        assert_eq!(resource_logs.scope_logs.len(), 1);
279
280        let scope_logs = &resource_logs.scope_logs[0];
281        assert_eq!(scope_logs.log_records.len(), 2);
282    }
283
284    #[test]
285    fn test_group_logs_by_resource_and_scope_multiple_scopes() {
286        let resource = Resource::builder().build();
287        let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
288        let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");
289
290        let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
291        let log_batch = LogBatch::new(&logs);
292        let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
293        let grouped_logs =
294            crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
295
296        assert_eq!(grouped_logs.len(), 1);
297        let resource_logs = &grouped_logs[0];
298        assert_eq!(resource_logs.scope_logs.len(), 2);
299
300        let scope_logs_1 = &resource_logs
301            .scope_logs
302            .iter()
303            .find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
304            .unwrap();
305        let scope_logs_2 = &resource_logs
306            .scope_logs
307            .iter()
308            .find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
309            .unwrap();
310
311        assert_eq!(scope_logs_1.log_records.len(), 1);
312        assert_eq!(scope_logs_2.log_records.len(), 1);
313    }
314}