opentelemetry_proto/transform/
logs.rs1#[cfg(feature = "gen-tonic-messages")]
2pub mod tonic {
3 use crate::{
4 tonic::{
5 common::v1::{
6 any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
7 KeyValueList,
8 },
9 logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
10 resource::v1::Resource,
11 },
12 transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
13 };
14 use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
15 use opentelemetry_sdk::logs::LogBatch;
16 use std::borrow::Cow;
17 use std::collections::HashMap;
18
19 impl From<LogsAnyValue> for AnyValue {
20 fn from(value: LogsAnyValue) -> Self {
21 AnyValue {
22 value: Some(value.into()),
23 }
24 }
25 }
26
27 impl From<LogsAnyValue> for Value {
28 fn from(value: LogsAnyValue) -> Self {
29 match value {
30 LogsAnyValue::Double(f) => Value::DoubleValue(f),
31 LogsAnyValue::Int(i) => Value::IntValue(i),
32 LogsAnyValue::String(s) => Value::StringValue(s.into()),
33 LogsAnyValue::Boolean(b) => Value::BoolValue(b),
34 LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue {
35 values: v
36 .into_iter()
37 .map(|v| AnyValue {
38 value: Some(v.into()),
39 })
40 .collect(),
41 }),
42 LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList {
43 values: m
44 .into_iter()
45 .map(|(key, value)| KeyValue {
46 key: key.into(),
47 value: Some(AnyValue {
48 value: Some(value.into()),
49 }),
50 })
51 .collect(),
52 }),
53 LogsAnyValue::Bytes(v) => Value::BytesValue(*v),
54 _ => unreachable!("Nonexistent value type"),
55 }
56 }
57 }
58
59 impl From<&opentelemetry_sdk::logs::SdkLogRecord> for LogRecord {
60 fn from(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> Self {
61 let trace_context = log_record.trace_context();
62 let severity_number = match log_record.severity_number() {
63 Some(Severity::Trace) => SeverityNumber::Trace,
64 Some(Severity::Trace2) => SeverityNumber::Trace2,
65 Some(Severity::Trace3) => SeverityNumber::Trace3,
66 Some(Severity::Trace4) => SeverityNumber::Trace4,
67 Some(Severity::Debug) => SeverityNumber::Debug,
68 Some(Severity::Debug2) => SeverityNumber::Debug2,
69 Some(Severity::Debug3) => SeverityNumber::Debug3,
70 Some(Severity::Debug4) => SeverityNumber::Debug4,
71 Some(Severity::Info) => SeverityNumber::Info,
72 Some(Severity::Info2) => SeverityNumber::Info2,
73 Some(Severity::Info3) => SeverityNumber::Info3,
74 Some(Severity::Info4) => SeverityNumber::Info4,
75 Some(Severity::Warn) => SeverityNumber::Warn,
76 Some(Severity::Warn2) => SeverityNumber::Warn2,
77 Some(Severity::Warn3) => SeverityNumber::Warn3,
78 Some(Severity::Warn4) => SeverityNumber::Warn4,
79 Some(Severity::Error) => SeverityNumber::Error,
80 Some(Severity::Error2) => SeverityNumber::Error2,
81 Some(Severity::Error3) => SeverityNumber::Error3,
82 Some(Severity::Error4) => SeverityNumber::Error4,
83 Some(Severity::Fatal) => SeverityNumber::Fatal,
84 Some(Severity::Fatal2) => SeverityNumber::Fatal2,
85 Some(Severity::Fatal3) => SeverityNumber::Fatal3,
86 Some(Severity::Fatal4) => SeverityNumber::Fatal4,
87 None => SeverityNumber::Unspecified,
88 };
89
90 LogRecord {
91 time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(),
92 observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()),
93 attributes: {
94 log_record
95 .attributes_iter()
96 .map(|kv| KeyValue {
97 key: kv.0.to_string(),
98 value: Some(AnyValue {
99 value: Some(kv.1.clone().into()),
100 }),
101 })
102 .collect()
103 },
104 event_name: log_record.event_name().unwrap_or_default().into(),
105 severity_number: severity_number.into(),
106 severity_text: log_record
107 .severity_text()
108 .map(Into::into)
109 .unwrap_or_default(),
110 body: log_record.body().cloned().map(Into::into),
111 dropped_attributes_count: 0,
112 flags: trace_context
113 .map(|ctx| {
114 ctx.trace_flags
115 .map(|flags| flags.to_u8() as u32)
116 .unwrap_or_default()
117 })
118 .unwrap_or_default(),
119 span_id: trace_context
120 .map(|ctx| ctx.span_id.to_bytes().to_vec())
121 .unwrap_or_default(),
122 trace_id: trace_context
123 .map(|ctx| ctx.trace_id.to_bytes().to_vec())
124 .unwrap_or_default(),
125 }
126 }
127 }
128
129 impl
130 From<(
131 (
132 &opentelemetry_sdk::logs::SdkLogRecord,
133 &opentelemetry::InstrumentationScope,
134 ),
135 &ResourceAttributesWithSchema,
136 )> for ResourceLogs
137 {
138 fn from(
139 data: (
140 (
141 &opentelemetry_sdk::logs::SdkLogRecord,
142 &opentelemetry::InstrumentationScope,
143 ),
144 &ResourceAttributesWithSchema,
145 ),
146 ) -> Self {
147 let ((log_record, instrumentation), resource) = data;
148
149 ResourceLogs {
150 resource: Some(Resource {
151 attributes: resource.attributes.0.clone(),
152 dropped_attributes_count: 0,
153 entity_refs: vec![],
154 }),
155 schema_url: resource.schema_url.clone().unwrap_or_default(),
156 scope_logs: vec![ScopeLogs {
157 schema_url: instrumentation
158 .schema_url()
159 .map(ToOwned::to_owned)
160 .unwrap_or_default(),
161 scope: Some((instrumentation, log_record.target().cloned()).into()),
162 log_records: vec![log_record.into()],
163 }],
164 }
165 }
166 }
167
168 pub fn group_logs_by_resource_and_scope(
169 logs: LogBatch<'_>,
170 resource: &ResourceAttributesWithSchema,
171 ) -> Vec<ResourceLogs> {
172 let scope_map = logs.iter().fold(
174 HashMap::new(),
175 |mut scope_map: HashMap<
176 Cow<'static, str>,
177 Vec<(
178 &opentelemetry_sdk::logs::SdkLogRecord,
179 &opentelemetry::InstrumentationScope,
180 )>,
181 >,
182 (log_record, instrumentation)| {
183 let key = log_record
184 .target()
185 .cloned()
186 .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned()));
187 scope_map
188 .entry(key)
189 .or_default()
190 .push((log_record, instrumentation));
191 scope_map
192 },
193 );
194
195 let scope_logs = scope_map
196 .into_iter()
197 .map(|(key, log_data)| ScopeLogs {
198 scope: Some(InstrumentationScope::from((
199 log_data.first().unwrap().1,
200 Some(key.into_owned().into()),
201 ))),
202 schema_url: resource.schema_url.clone().unwrap_or_default(),
203 log_records: log_data
204 .into_iter()
205 .map(|(log_record, _)| log_record.into())
206 .collect(),
207 })
208 .collect();
209
210 vec![ResourceLogs {
211 resource: Some(Resource {
212 attributes: resource.attributes.0.clone(),
213 dropped_attributes_count: 0,
214 entity_refs: vec![],
215 }),
216 scope_logs,
217 schema_url: resource.schema_url.clone().unwrap_or_default(),
218 }]
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use crate::transform::common::tonic::ResourceAttributesWithSchema;
225 use opentelemetry::logs::LogRecord as _;
226 use opentelemetry::logs::Logger;
227 use opentelemetry::logs::LoggerProvider;
228 use opentelemetry::time::now;
229 use opentelemetry::InstrumentationScope;
230 use opentelemetry_sdk::error::OTelSdkResult;
231 use opentelemetry_sdk::logs::LogProcessor;
232 use opentelemetry_sdk::logs::SdkLoggerProvider;
233 use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource};
234
235 #[derive(Debug)]
236 struct MockProcessor;
237
238 impl LogProcessor for MockProcessor {
239 fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {}
240
241 fn force_flush(&self) -> OTelSdkResult {
242 Ok(())
243 }
244 }
245
246 fn create_test_log_data(
247 instrumentation_name: &str,
248 _message: &str,
249 ) -> (SdkLogRecord, InstrumentationScope) {
250 let processor = MockProcessor {};
251 let logger = SdkLoggerProvider::builder()
252 .with_log_processor(processor)
253 .build()
254 .logger("test");
255 let mut logrecord = logger.create_log_record();
256 logrecord.set_timestamp(now());
257 logrecord.set_observed_timestamp(now());
258 let instrumentation =
259 InstrumentationScope::builder(instrumentation_name.to_string()).build();
260 (logrecord, instrumentation)
261 }
262
263 #[test]
264 fn test_group_logs_by_resource_and_scope_single_scope() {
265 let resource = Resource::builder().build();
266 let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
267 let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");
268
269 let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
270 let log_batch = LogBatch::new(&logs);
271 let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_logs =
274 crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
275
276 assert_eq!(grouped_logs.len(), 1);
277 let resource_logs = &grouped_logs[0];
278 assert_eq!(resource_logs.scope_logs.len(), 1);
279
280 let scope_logs = &resource_logs.scope_logs[0];
281 assert_eq!(scope_logs.log_records.len(), 2);
282 }
283
284 #[test]
285 fn test_group_logs_by_resource_and_scope_multiple_scopes() {
286 let resource = Resource::builder().build();
287 let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
288 let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");
289
290 let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
291 let log_batch = LogBatch::new(&logs);
292 let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_logs =
294 crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
295
296 assert_eq!(grouped_logs.len(), 1);
297 let resource_logs = &grouped_logs[0];
298 assert_eq!(resource_logs.scope_logs.len(), 2);
299
300 let scope_logs_1 = &resource_logs
301 .scope_logs
302 .iter()
303 .find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
304 .unwrap();
305 let scope_logs_2 = &resource_logs
306 .scope_logs
307 .iter()
308 .find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
309 .unwrap();
310
311 assert_eq!(scope_logs_1.log_records.len(), 1);
312 assert_eq!(scope_logs_2.log_records.len(), 1);
313 }
314}