opentelemetry_sdk/logs/
mod.rs

1//! # OpenTelemetry Log SDK
2mod batch_log_processor;
3mod export;
4mod log_processor;
5mod logger;
6mod logger_provider;
7pub(crate) mod record;
8mod simple_log_processor;
9
10/// In-Memory log exporter for testing purpose.
11#[cfg(any(feature = "testing", test))]
12#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
13pub mod in_memory_exporter;
14#[cfg(any(feature = "testing", test))]
15#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
16pub use in_memory_exporter::{InMemoryLogExporter, InMemoryLogExporterBuilder};
17
18pub use batch_log_processor::{
19    BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder,
20};
21pub use export::{LogBatch, LogExporter};
22pub use log_processor::LogProcessor;
23pub use logger::SdkLogger;
24pub use logger_provider::{LoggerProviderBuilder, SdkLoggerProvider};
25pub use record::{SdkLogRecord, TraceContext};
26pub use simple_log_processor::SimpleLogProcessor;
27
28#[cfg(feature = "experimental_logs_concurrent_log_processor")]
29/// Module for ConcurrentLogProcessor.
30pub mod concurrent_log_processor;
31
32#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
33/// Module for BatchLogProcessor with async runtime.
34pub mod log_processor_with_async_runtime;
35
36#[cfg(all(test, feature = "testing"))]
37mod tests {
38    use super::*;
39    use crate::error::OTelSdkResult;
40    use crate::Resource;
41    use opentelemetry::baggage::BaggageExt;
42    use opentelemetry::logs::LogRecord;
43    use opentelemetry::logs::{Logger, LoggerProvider, Severity};
44    use opentelemetry::{logs::AnyValue, Key, KeyValue};
45    use opentelemetry::{Context, InstrumentationScope};
46    use std::borrow::Borrow;
47    use std::collections::HashMap;
48    use std::sync::{Arc, Mutex};
49
50    #[test]
51    fn logging_sdk_test() {
52        // Arrange
53        let resource = Resource::builder_empty()
54            .with_attributes([
55                KeyValue::new("k1", "v1"),
56                KeyValue::new("k2", "v2"),
57                KeyValue::new("k3", "v3"),
58                KeyValue::new("k4", "v4"),
59            ])
60            .build();
61        let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
62        let logger_provider = SdkLoggerProvider::builder()
63            .with_resource(resource.clone())
64            .with_log_processor(SimpleLogProcessor::new(exporter.clone()))
65            .build();
66
67        // Act
68        let logger = logger_provider.logger("test-logger");
69        let mut log_record = logger.create_log_record();
70        log_record.set_severity_number(Severity::Error);
71        log_record.set_severity_text("Error");
72
73        // Adding attributes using a vector with explicitly constructed Key and AnyValue objects.
74        log_record.add_attributes(vec![
75            (Key::new("key1"), AnyValue::from("value1")),
76            (Key::new("key2"), AnyValue::from("value2")),
77        ]);
78
79        // Adding attributes using an array with explicitly constructed Key and AnyValue objects.
80        log_record.add_attributes([
81            (Key::new("key3"), AnyValue::from("value3")),
82            (Key::new("key4"), AnyValue::from("value4")),
83        ]);
84
85        // Adding attributes using a vector with tuple auto-conversion to Key and AnyValue.
86        log_record.add_attributes(vec![("key5", "value5"), ("key6", "value6")]);
87
88        // Adding attributes using an array with tuple auto-conversion to Key and AnyValue.
89        log_record.add_attributes([("key7", "value7"), ("key8", "value8")]);
90
91        // Adding Attributes from a HashMap
92        let mut attributes_map = HashMap::new();
93        attributes_map.insert("key9", "value9");
94        attributes_map.insert("key10", "value10");
95
96        log_record.add_attributes(attributes_map);
97
98        logger.emit(log_record);
99
100        // Assert
101        let exported_logs = exporter
102            .get_emitted_logs()
103            .expect("Logs are expected to be exported.");
104        assert_eq!(exported_logs.len(), 1);
105        let log = exported_logs
106            .first()
107            .expect("Atleast one log is expected to be present.");
108        assert_eq!(log.instrumentation.name(), "test-logger");
109        assert_eq!(log.record.severity_number, Some(Severity::Error));
110        assert_eq!(log.record.attributes_len(), 10);
111        for i in 1..=10 {
112            assert!(log.record.attributes_contains(
113                &Key::new(format!("key{i}")),
114                &AnyValue::String(format!("value{i}").into())
115            ));
116        }
117
118        // validate Resource
119        assert_eq!(&resource, log.resource.borrow());
120    }
121
122    #[test]
123    fn logger_attributes() {
124        let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
125        let provider = SdkLoggerProvider::builder()
126            .with_log_processor(SimpleLogProcessor::new(exporter.clone()))
127            .build();
128
129        let scope = InstrumentationScope::builder("test_logger")
130            .with_schema_url("https://opentelemetry.io/schema/1.0.0")
131            .with_attributes(vec![(KeyValue::new("test_k", "test_v"))])
132            .build();
133
134        let logger = provider.logger_with_scope(scope);
135
136        let mut log_record = logger.create_log_record();
137        log_record.set_severity_number(Severity::Error);
138
139        logger.emit(log_record);
140
141        let mut exported_logs = exporter
142            .get_emitted_logs()
143            .expect("Logs are expected to be exported.");
144        assert_eq!(exported_logs.len(), 1);
145        let log = exported_logs.remove(0);
146        assert_eq!(log.record.severity_number, Some(Severity::Error));
147
148        let instrumentation_scope = log.instrumentation;
149        assert_eq!(instrumentation_scope.name(), "test_logger");
150        assert_eq!(
151            instrumentation_scope.schema_url(),
152            Some("https://opentelemetry.io/schema/1.0.0")
153        );
154        assert!(instrumentation_scope
155            .attributes()
156            .eq(&[KeyValue::new("test_k", "test_v")]));
157    }
158
159    #[derive(Debug)]
160    struct EnrichWithBaggageProcessor;
161    impl LogProcessor for EnrichWithBaggageProcessor {
162        fn emit(&self, data: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {
163            Context::map_current(|cx| {
164                for (kk, vv) in cx.baggage().iter() {
165                    data.add_attribute(kk.clone(), vv.0.clone());
166                }
167            });
168        }
169
170        fn force_flush(&self) -> crate::error::OTelSdkResult {
171            Ok(())
172        }
173
174        fn shutdown(&self) -> crate::error::OTelSdkResult {
175            Ok(())
176        }
177    }
178    #[test]
179    fn log_and_baggage() {
180        // Arrange
181        let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
182        let logger_provider = SdkLoggerProvider::builder()
183            .with_log_processor(EnrichWithBaggageProcessor)
184            .with_log_processor(SimpleLogProcessor::new(exporter.clone()))
185            .build();
186
187        // Act
188        let logger = logger_provider.logger("test-logger");
189        let context_with_baggage =
190            Context::current_with_baggage(vec![KeyValue::new("key-from-bag", "value-from-bag")]);
191        let _cx_guard = context_with_baggage.attach();
192        let mut log_record = logger.create_log_record();
193        log_record.add_attribute("key", "value");
194        logger.emit(log_record);
195
196        // Assert
197        let exported_logs = exporter
198            .get_emitted_logs()
199            .expect("Logs are expected to be exported.");
200        assert_eq!(exported_logs.len(), 1);
201        let log = exported_logs
202            .first()
203            .expect("Atleast one log is expected to be present.");
204        assert_eq!(log.instrumentation.name(), "test-logger");
205        assert_eq!(log.record.attributes_len(), 2);
206
207        // Assert that the log record contains the baggage attribute
208        // and the attribute added to the log record.
209        assert!(log
210            .record
211            .attributes_contains(&Key::new("key"), &AnyValue::String("value".into())));
212        assert!(log.record.attributes_contains(
213            &Key::new("key-from-bag"),
214            &AnyValue::String("value-from-bag".into())
215        ));
216    }
217
218    #[test]
219    fn log_suppression() {
220        // Arrange
221        let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
222        let logger_provider = SdkLoggerProvider::builder()
223            .with_simple_exporter(exporter.clone())
224            .build();
225
226        // Act
227        let logger = logger_provider.logger("test-logger");
228        let log_record = logger.create_log_record();
229        {
230            let _suppressed_context = Context::enter_telemetry_suppressed_scope();
231            // This log emission should be suppressed and not exported.
232            logger.emit(log_record);
233        }
234
235        // Assert
236        let exported_logs = exporter.get_emitted_logs().expect("this should not fail.");
237        assert_eq!(
238            exported_logs.len(),
239            0,
240            "There should be a no logs as log emission is done inside a suppressed context"
241        );
242    }
243
244    #[derive(Debug, Clone)]
245    struct ReentrantLogProcessor {
246        logger: Arc<Mutex<Option<SdkLogger>>>,
247    }
248
249    impl ReentrantLogProcessor {
250        fn new() -> Self {
251            Self {
252                logger: Arc::new(Mutex::new(None)),
253            }
254        }
255
256        fn set_logger(&self, logger: SdkLogger) {
257            let mut guard = self.logger.lock().unwrap();
258            *guard = Some(logger);
259        }
260    }
261
262    impl LogProcessor for ReentrantLogProcessor {
263        fn emit(&self, _data: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {
264            let _suppress = Context::enter_telemetry_suppressed_scope();
265            // Without the suppression above, the logger.emit(log_record) below will cause a deadlock,
266            // as it emits another log, which will attempt to acquire the same lock that is
267            // already held by itself!
268            let logger = self.logger.lock().unwrap();
269            if let Some(logger) = logger.as_ref() {
270                let mut log_record = logger.create_log_record();
271                log_record.set_severity_number(Severity::Error);
272                logger.emit(log_record);
273            }
274        }
275
276        fn force_flush(&self) -> OTelSdkResult {
277            Ok(())
278        }
279
280        fn shutdown(&self) -> OTelSdkResult {
281            Ok(())
282        }
283    }
284
285    #[test]
286    fn processor_internal_log_does_not_deadlock_with_suppression_enabled() {
287        let processor: ReentrantLogProcessor = ReentrantLogProcessor::new();
288        let logger_provider = SdkLoggerProvider::builder()
289            .with_log_processor(processor.clone())
290            .build();
291        processor.set_logger(logger_provider.logger("processor-logger"));
292
293        let logger = logger_provider.logger("test-logger");
294        let mut log_record = logger.create_log_record();
295        log_record.set_severity_number(Severity::Error);
296        logger.emit(log_record);
297    }
298}