opentelemetry_sdk/logs/
simple_log_processor.rs

1//! # OpenTelemetry Simple Log Processor
2//! The `SimpleLogProcessor` is one implementation of the `LogProcessor` interface.
3//!
4//! It forwards log records to the exporter immediately after they are emitted
5//! (or one exporter after another if applicable). This processor is
6//! **synchronous** and is designed for debugging or testing purposes. It is
7//! **not suitable for production** environments due to its lack of batching,
8//! performance optimizations, or support for high-throughput scenarios.
9//!
10//! ## Diagram
11//!
12//! ```ascii
13//!   +-----+---------------+   +-----------------------+   +-------------------+
14//!   |     |               |   |                       |   |                   |
15//!   | SDK | Logger.emit() +---> (Simple)LogProcessor  +--->  LogExporter      |
16//!   +-----+---------------+   +-----------------------+   +-------------------+
17//! ```
18
19use crate::error::{OTelSdkError, OTelSdkResult};
20use crate::logs::log_processor::LogProcessor;
21use crate::{
22    logs::{LogBatch, LogExporter, SdkLogRecord},
23    Resource,
24};
25
26use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};
27
28use std::fmt::Debug;
29use std::sync::atomic::AtomicBool;
30use std::sync::Mutex;
31
32/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
33/// exports log records as they are emitted. Log records are exported synchronously
34/// in the same thread that emits the log record.
35/// When using this processor with the OTLP Exporter, the following exporter
36/// features are supported:
37/// - `grpc-tonic`: This requires LoggerProvider to be created within a tokio
38///   runtime. Logs can be emitted from any thread, including tokio runtime
39///   threads.
40/// - `reqwest-blocking-client`: LoggerProvider may be created anywhere, but
41///   logs must be emitted from a non-tokio runtime thread.
42/// - `reqwest-client`: LoggerProvider may be created anywhere, but logs must be
43///   emitted from a tokio runtime thread.
44///
45/// ## Example
46///
47/// ### Using a SimpleLogProcessor
48///
49/// ```rust
50/// use opentelemetry_sdk::logs::{SimpleLogProcessor, SdkLoggerProvider, LogExporter};
51/// use opentelemetry::global;
52/// use opentelemetry_sdk::logs::InMemoryLogExporter;
53///
54/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
55/// let provider = SdkLoggerProvider::builder()
56///     .with_simple_exporter(exporter)
57///     .build();
58///
59/// ```
60///
61#[derive(Debug)]
62pub struct SimpleLogProcessor<T: LogExporter> {
63    exporter: Mutex<T>,
64    is_shutdown: AtomicBool,
65}
66
67impl<T: LogExporter> SimpleLogProcessor<T> {
68    /// Creates a new instance of `SimpleLogProcessor`.
69    pub fn new(exporter: T) -> Self {
70        SimpleLogProcessor {
71            exporter: Mutex::new(exporter),
72            is_shutdown: AtomicBool::new(false),
73        }
74    }
75}
76
77impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
78    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
79        let _suppress_guard = Context::enter_telemetry_suppressed_scope();
80        // noop after shutdown
81        if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
82            // this is a warning, as the user is trying to log after the processor has been shutdown
83            otel_warn!(
84                name: "SimpleLogProcessor.Emit.ProcessorShutdown",
85            );
86            return;
87        }
88
89        let result = self
90            .exporter
91            .lock()
92            .map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
93            .and_then(|exporter| {
94                let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
95                futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
96            });
97        // Handle errors with specific static names
98        match result {
99            Err(OTelSdkError::InternalFailure(_)) => {
100                // logging as debug as this is not a user error
101                otel_debug!(
102                    name: "SimpleLogProcessor.Emit.MutexPoisoning",
103                );
104            }
105            Err(err) => {
106                otel_error!(
107                    name: "SimpleLogProcessor.Emit.ExportError",
108                    error = format!("{}",err)
109                );
110            }
111            _ => {}
112        }
113    }
114
115    fn force_flush(&self) -> OTelSdkResult {
116        Ok(())
117    }
118
119    fn shutdown(&self) -> OTelSdkResult {
120        self.is_shutdown
121            .store(true, std::sync::atomic::Ordering::Relaxed);
122        if let Ok(exporter) = self.exporter.lock() {
123            exporter.shutdown()
124        } else {
125            Err(OTelSdkError::InternalFailure(
126                "SimpleLogProcessor mutex poison at shutdown".into(),
127            ))
128        }
129    }
130
131    fn set_resource(&mut self, resource: &Resource) {
132        if let Ok(mut exporter) = self.exporter.lock() {
133            exporter.set_resource(resource);
134        }
135    }
136
137    #[cfg(feature = "spec_unstable_logs_enabled")]
138    #[inline]
139    fn event_enabled(
140        &self,
141        level: opentelemetry::logs::Severity,
142        target: &str,
143        name: Option<&str>,
144    ) -> bool {
145        if let Ok(exporter) = self.exporter.lock() {
146            exporter.event_enabled(level, target, name)
147        } else {
148            true
149        }
150    }
151}
152
153#[cfg(all(test, feature = "testing", feature = "logs"))]
154mod tests {
155    use crate::logs::log_processor::tests::MockLogExporter;
156    use crate::logs::{LogBatch, LogExporter, SdkLogRecord, SdkLogger};
157    use crate::{
158        error::OTelSdkResult,
159        logs::{InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider, SimpleLogProcessor},
160        Resource,
161    };
162    use opentelemetry::logs::{LogRecord, Logger, LoggerProvider};
163    use opentelemetry::InstrumentationScope;
164    use opentelemetry::KeyValue;
165    use std::sync::atomic::{AtomicUsize, Ordering};
166    use std::sync::{Arc, Mutex};
167    use std::time;
168    use std::time::Duration;
169
170    #[derive(Debug, Clone)]
171    struct LogExporterThatRequiresTokio {
172        export_count: Arc<AtomicUsize>,
173    }
174
175    impl LogExporterThatRequiresTokio {
176        /// Creates a new instance of `LogExporterThatRequiresTokio`.
177        fn new() -> Self {
178            LogExporterThatRequiresTokio {
179                export_count: Arc::new(AtomicUsize::new(0)),
180            }
181        }
182
183        /// Returns the number of logs stored in the exporter.
184        fn len(&self) -> usize {
185            self.export_count.load(Ordering::Acquire)
186        }
187    }
188
189    impl LogExporter for LogExporterThatRequiresTokio {
190        async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
191            // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
192            tokio::time::sleep(Duration::from_millis(50)).await;
193
194            for _ in batch.iter() {
195                self.export_count.fetch_add(1, Ordering::Acquire);
196            }
197            Ok(())
198        }
199        fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
200            Ok(())
201        }
202    }
203
204    #[test]
205    fn test_set_resource_simple_processor() {
206        let exporter = MockLogExporter {
207            resource: Arc::new(Mutex::new(None)),
208        };
209        let processor = SimpleLogProcessor::new(exporter.clone());
210        let _ = SdkLoggerProvider::builder()
211            .with_log_processor(processor)
212            .with_resource(
213                Resource::builder_empty()
214                    .with_attributes([
215                        KeyValue::new("k1", "v1"),
216                        KeyValue::new("k2", "v3"),
217                        KeyValue::new("k3", "v3"),
218                        KeyValue::new("k4", "v4"),
219                        KeyValue::new("k5", "v5"),
220                    ])
221                    .build(),
222            )
223            .build();
224        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
225    }
226
227    #[test]
228    fn test_simple_shutdown() {
229        let exporter = InMemoryLogExporterBuilder::default()
230            .keep_records_on_shutdown()
231            .build();
232        let processor = SimpleLogProcessor::new(exporter.clone());
233
234        let mut record: SdkLogRecord = SdkLogRecord::new();
235        let instrumentation: InstrumentationScope = Default::default();
236
237        processor.emit(&mut record, &instrumentation);
238
239        processor.shutdown().unwrap();
240
241        let is_shutdown = processor
242            .is_shutdown
243            .load(std::sync::atomic::Ordering::Relaxed);
244        assert!(is_shutdown);
245
246        processor.emit(&mut record, &instrumentation);
247
248        assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
249        assert!(exporter.is_shutdown_called());
250    }
251
252    #[test]
253    fn test_simple_processor_sync_exporter_without_runtime() {
254        let exporter = InMemoryLogExporterBuilder::default().build();
255        let processor = SimpleLogProcessor::new(exporter.clone());
256
257        let mut record: SdkLogRecord = SdkLogRecord::new();
258        let instrumentation: InstrumentationScope = Default::default();
259
260        processor.emit(&mut record, &instrumentation);
261
262        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
263    }
264
265    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
266    async fn test_simple_processor_sync_exporter_with_runtime() {
267        let exporter = InMemoryLogExporterBuilder::default().build();
268        let processor = SimpleLogProcessor::new(exporter.clone());
269
270        let mut record: SdkLogRecord = SdkLogRecord::new();
271        let instrumentation: InstrumentationScope = Default::default();
272
273        processor.emit(&mut record, &instrumentation);
274
275        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
276    }
277
278    #[tokio::test(flavor = "multi_thread")]
279    async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
280        let exporter = InMemoryLogExporterBuilder::default().build();
281        let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
282
283        let mut handles = vec![];
284        for _ in 0..10 {
285            let processor_clone = Arc::clone(&processor);
286            let handle = tokio::spawn(async move {
287                let mut record: SdkLogRecord = SdkLogRecord::new();
288                let instrumentation: InstrumentationScope = Default::default();
289                processor_clone.emit(&mut record, &instrumentation);
290            });
291            handles.push(handle);
292        }
293
294        for handle in handles {
295            handle.await.unwrap();
296        }
297
298        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
299    }
300
301    #[tokio::test(flavor = "current_thread")]
302    async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
303        let exporter = InMemoryLogExporterBuilder::default().build();
304        let processor = SimpleLogProcessor::new(exporter.clone());
305
306        let mut record: SdkLogRecord = SdkLogRecord::new();
307        let instrumentation: InstrumentationScope = Default::default();
308
309        processor.emit(&mut record, &instrumentation);
310
311        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
312    }
313
314    #[test]
315    fn test_simple_processor_async_exporter_without_runtime() {
316        // Use `catch_unwind` to catch the panic caused by missing Tokio runtime
317        let result = std::panic::catch_unwind(|| {
318            let exporter = LogExporterThatRequiresTokio::new();
319            let processor = SimpleLogProcessor::new(exporter.clone());
320
321            let mut record: SdkLogRecord = SdkLogRecord::new();
322            let instrumentation: InstrumentationScope = Default::default();
323
324            // This will panic because an tokio async operation within exporter without a runtime.
325            processor.emit(&mut record, &instrumentation);
326        });
327
328        // Verify that the panic occurred and check the panic message for the absence of a Tokio runtime
329        assert!(
330            result.is_err(),
331            "The test should fail due to missing Tokio runtime, but it did not."
332        );
333        let panic_payload = result.unwrap_err();
334        let panic_message = panic_payload
335            .downcast_ref::<String>()
336            .map(|s| s.as_str())
337            .or_else(|| panic_payload.downcast_ref::<&str>().copied())
338            .unwrap_or("No panic message");
339
340        assert!(
341            panic_message.contains("no reactor running")
342                || panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
343            "Expected panic message about missing Tokio runtime, but got: {panic_message}"
344        );
345    }
346
347    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
348    #[ignore]
349    // This test demonstrates a potential deadlock scenario in a multi-threaded Tokio runtime.
350    // It spawns Tokio tasks equal to the number of runtime worker threads (4) to emit log events.
351    // Each task attempts to acquire a mutex on the exporter in `SimpleLogProcessor::emit`.
352    // Only one task obtains the lock, while the others are blocked, waiting for its release.
353    //
354    // The task holding the lock invokes the LogExporterThatRequiresTokio, which performs an
355    // asynchronous operation (e.g., network I/O simulated by `tokio::sleep`). This operation
356    // requires yielding control back to the Tokio runtime to make progress.
357    //
358    // However, all worker threads are occupied:
359    // - One thread is executing the async exporter operation
360    // - Three threads are blocked waiting for the mutex
361    //
362    // This leads to a deadlock as there are no available threads to drive the async operation
363    // to completion, preventing the mutex from being released. Consequently, neither the blocked
364    // tasks nor the exporter can proceed.
365    async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
366        let exporter = LogExporterThatRequiresTokio::new();
367        let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
368
369        let concurrent_emit = 4; // number of worker threads
370
371        let mut handles = vec![];
372        // try send `concurrent_emit` events concurrently
373        for _ in 0..concurrent_emit {
374            let processor_clone = Arc::clone(&processor);
375            let handle = tokio::spawn(async move {
376                let mut record: SdkLogRecord = SdkLogRecord::new();
377                let instrumentation: InstrumentationScope = Default::default();
378                processor_clone.emit(&mut record, &instrumentation);
379            });
380            handles.push(handle);
381        }
382
383        // below code won't get executed
384        for handle in handles {
385            handle.await.unwrap();
386        }
387        assert_eq!(exporter.len(), concurrent_emit);
388    }
389
390    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
391    // This test uses a multi-threaded runtime setup with a single worker thread. Note that even
392    // though only one worker thread is created, it is distinct from the main thread. The processor
393    // emits a log event, and the exporter performs an async operation that requires the runtime.
394    // The single worker thread handles this operation without deadlocking, as long as no other
395    // tasks occupy the runtime.
396    async fn test_simple_processor_async_exporter_with_runtime() {
397        let exporter = LogExporterThatRequiresTokio::new();
398        let processor = SimpleLogProcessor::new(exporter.clone());
399
400        let mut record: SdkLogRecord = SdkLogRecord::new();
401        let instrumentation: InstrumentationScope = Default::default();
402
403        processor.emit(&mut record, &instrumentation);
404
405        assert_eq!(exporter.len(), 1);
406    }
407
408    #[tokio::test(flavor = "multi_thread")]
409    // This test uses a multi-threaded runtime setup with the default number of worker threads.
410    // The processor emits a log event, and the exporter, which requires the runtime for its async
411    // operations, can access one of the available worker threads to complete its task. As there
412    // are multiple threads, the exporter can proceed without blocking other tasks, ensuring the
413    // test completes successfully.
414    async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
415        let exporter = LogExporterThatRequiresTokio::new();
416
417        let processor = SimpleLogProcessor::new(exporter.clone());
418
419        let mut record: SdkLogRecord = SdkLogRecord::new();
420        let instrumentation: InstrumentationScope = Default::default();
421
422        processor.emit(&mut record, &instrumentation);
423
424        assert_eq!(exporter.len(), 1);
425    }
426
427    #[tokio::test(flavor = "current_thread")]
428    #[ignore]
429    // This test uses a current-thread runtime, where all operations run on the main thread.
430    // The processor emits a log event while the runtime is blocked using `futures::block_on`
431    // to complete the export operation. The exporter, which performs an async operation and
432    // requires the runtime, cannot progress because the main thread is already blocked.
433    // This results in a deadlock, as the runtime cannot move forward.
434    async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
435        let exporter = LogExporterThatRequiresTokio::new();
436
437        let processor = SimpleLogProcessor::new(exporter.clone());
438
439        let mut record: SdkLogRecord = SdkLogRecord::new();
440        let instrumentation: InstrumentationScope = Default::default();
441
442        processor.emit(&mut record, &instrumentation);
443
444        assert_eq!(exporter.len(), 1);
445    }
446
447    #[derive(Debug, Clone)]
448    struct ReentrantLogExporter {
449        logger: Arc<Mutex<Option<SdkLogger>>>,
450    }
451
452    impl ReentrantLogExporter {
453        fn new() -> Self {
454            Self {
455                logger: Arc::new(Mutex::new(None)),
456            }
457        }
458
459        fn set_logger(&self, logger: SdkLogger) {
460            let mut guard = self.logger.lock().unwrap();
461            *guard = Some(logger);
462        }
463    }
464
465    impl LogExporter for ReentrantLogExporter {
466        async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
467            let logger = self.logger.lock().unwrap();
468            if let Some(logger) = logger.as_ref() {
469                let mut log_record = logger.create_log_record();
470                log_record.set_severity_number(opentelemetry::logs::Severity::Error);
471                logger.emit(log_record);
472            }
473
474            Ok(())
475        }
476    }
477
478    #[test]
479    fn exporter_internal_log_does_not_deadlock_with_simple_processor() {
480        // This tests that even when exporter produces logs while
481        // exporting, it does not deadlock, as SimpleLogProcessor
482        // activates SuppressGuard before calling the exporter.
483        let exporter: ReentrantLogExporter = ReentrantLogExporter::new();
484        let logger_provider = SdkLoggerProvider::builder()
485            .with_simple_exporter(exporter.clone())
486            .build();
487        exporter.set_logger(logger_provider.logger("processor-logger"));
488
489        let logger = logger_provider.logger("test-logger");
490        let mut log_record = logger.create_log_record();
491        log_record.set_severity_number(opentelemetry::logs::Severity::Error);
492        logger.emit(log_record);
493    }
494}