opentelemetry_sdk/logs/
log_processor.rs

1//! # OpenTelemetry Log Processor Interface
2//!
3//! The `LogProcessor` interface provides hooks for log record processing and
4//! exporting. Log processors receive `LogRecord`s emitted by the SDK's
5//! `Logger` and determine how these records are handled.
6//!
7//! Built-in log processors are responsible for converting logs to exportable
8//! representations and passing them to configured exporters. They can be
9//! registered directly with a `LoggerProvider`.
10//!
11//! ## Types of Log Processors
12//!
13//! There are currently two types of log processors available in the SDK:
14//! - **SimpleLogProcessor**: Forwards log records to the exporter immediately.
15//! - **BatchLogProcessor**: Buffers log records and sends them to the exporter in batches.
16//!
17//! For more information, see simple_log_processor.rs and batch_log_processor.rs.
18//!
19//! ## Diagram
20//!
21//! ```ascii
22//!   +-----+---------------+   +-----------------------+   +-------------------+
23//!   |     |               |   |                       |   |                   |
24//!   | SDK | Logger.emit() +---> (Simple)LogProcessor  +--->  LogExporter      |
25//!   |     |               |   | (Batch)LogProcessor   +--->  (OTLPExporter)   |
26//!   +-----+---------------+   +-----------------------+   +-------------------+
27//! ```
28
29use crate::error::OTelSdkResult;
30use crate::{logs::SdkLogRecord, Resource};
31
32#[cfg(feature = "spec_unstable_logs_enabled")]
33use opentelemetry::logs::Severity;
34use opentelemetry::InstrumentationScope;
35
36use std::fmt::Debug;
37use std::time::Duration;
38
39/// The interface for plugging into a [`SdkLogger`].
40///
41/// [`SdkLogger`]: crate::logs::SdkLogger
42pub trait LogProcessor: Send + Sync + Debug {
43    /// Called when a log record is ready to processed and exported.
44    ///
45    /// This method receives a mutable reference to `LogRecord`. If the processor
46    /// needs to handle the export asynchronously, it should clone the data to
47    /// ensure it can be safely processed without lifetime issues. Any changes
48    /// made to the log data in this method will be reflected in the next log
49    /// processor in the chain.
50    ///
51    /// # Parameters
52    /// - `record`: A mutable reference to `LogRecord` representing the log record.
53    /// - `instrumentation`: The instrumentation scope associated with the log record.
54    fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
55    /// Force the logs lying in the cache to be exported.
56    fn force_flush(&self) -> OTelSdkResult;
57    /// Shuts down the processor.
58    /// After shutdown returns the log processor should stop processing any logs.
59    /// It's up to the implementation on when to drop the LogProcessor.
60    fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
61        Ok(())
62    }
63    /// Shuts down the processor with default timeout.
64    fn shutdown(&self) -> OTelSdkResult {
65        self.shutdown_with_timeout(Duration::from_secs(5))
66    }
67    #[cfg(feature = "spec_unstable_logs_enabled")]
68    /// Check if logging is enabled
69    fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
70        // By default, all logs are enabled
71        true
72    }
73
74    /// Set the resource for the log processor.
75    fn set_resource(&mut self, _resource: &Resource) {}
76}
77
78#[cfg(all(test, feature = "testing", feature = "logs"))]
79pub(crate) mod tests {
80    use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
81    use crate::Resource;
82    use crate::{
83        error::OTelSdkResult,
84        logs::{LogProcessor, SdkLoggerProvider},
85    };
86    use opentelemetry::logs::AnyValue;
87    use opentelemetry::logs::LogRecord as _;
88    use opentelemetry::logs::{Logger, LoggerProvider};
89    use opentelemetry::{InstrumentationScope, Key};
90    use std::sync::{Arc, Mutex};
91
92    #[derive(Debug, Clone)]
93    pub(crate) struct MockLogExporter {
94        pub resource: Arc<Mutex<Option<Resource>>>,
95    }
96
97    impl LogExporter for MockLogExporter {
98        async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
99            Ok(())
100        }
101
102        fn set_resource(&mut self, resource: &Resource) {
103            self.resource
104                .lock()
105                .map(|mut res_opt| {
106                    res_opt.replace(resource.clone());
107                })
108                .expect("mock log exporter shouldn't error when setting resource");
109        }
110    }
111
112    // Implementation specific to the MockLogExporter, not part of the LogExporter trait
113    impl MockLogExporter {
114        pub(crate) fn get_resource(&self) -> Option<Resource> {
115            (*self.resource).lock().unwrap().clone()
116        }
117    }
118
119    #[derive(Debug)]
120    struct FirstProcessor {
121        pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
122    }
123
124    impl LogProcessor for FirstProcessor {
125        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
126            // add attribute
127            record.add_attribute(
128                Key::from_static_str("processed_by"),
129                AnyValue::String("FirstProcessor".into()),
130            );
131            // update body
132            record.body = Some("Updated by FirstProcessor".into());
133
134            self.logs
135                .lock()
136                .unwrap()
137                .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
138        }
139
140        fn force_flush(&self) -> OTelSdkResult {
141            Ok(())
142        }
143    }
144
145    #[derive(Debug)]
146    struct SecondProcessor {
147        pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
148    }
149
150    impl LogProcessor for SecondProcessor {
151        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
152            assert!(record.attributes_contains(
153                &Key::from_static_str("processed_by"),
154                &AnyValue::String("FirstProcessor".into())
155            ));
156            assert!(
157                record.body.clone().unwrap()
158                    == AnyValue::String("Updated by FirstProcessor".into())
159            );
160            self.logs
161                .lock()
162                .unwrap()
163                .push((record.clone(), instrumentation.clone()));
164        }
165
166        fn force_flush(&self) -> OTelSdkResult {
167            Ok(())
168        }
169    }
170
171    #[test]
172    fn test_log_data_modification_by_multiple_processors() {
173        let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
174        let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
175
176        let first_processor = FirstProcessor {
177            logs: Arc::clone(&first_processor_logs),
178        };
179        let second_processor = SecondProcessor {
180            logs: Arc::clone(&second_processor_logs),
181        };
182
183        let logger_provider = SdkLoggerProvider::builder()
184            .with_log_processor(first_processor)
185            .with_log_processor(second_processor)
186            .build();
187
188        let logger = logger_provider.logger("test-logger");
189        let mut log_record = logger.create_log_record();
190        log_record.body = Some(AnyValue::String("Test log".into()));
191
192        logger.emit(log_record);
193
194        assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
195        assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
196
197        let first_log = &first_processor_logs.lock().unwrap()[0];
198        let second_log = &second_processor_logs.lock().unwrap()[0];
199
200        assert!(first_log.0.attributes_contains(
201            &Key::from_static_str("processed_by"),
202            &AnyValue::String("FirstProcessor".into())
203        ));
204        assert!(second_log.0.attributes_contains(
205            &Key::from_static_str("processed_by"),
206            &AnyValue::String("FirstProcessor".into())
207        ));
208
209        assert!(
210            first_log.0.body.clone().unwrap()
211                == AnyValue::String("Updated by FirstProcessor".into())
212        );
213        assert!(
214            second_log.0.body.clone().unwrap()
215                == AnyValue::String("Updated by FirstProcessor".into())
216        );
217    }
218}