opentelemetry_sdk/logs/
log_processor.rs1use crate::error::OTelSdkResult;
30use crate::{logs::SdkLogRecord, Resource};
31
32#[cfg(feature = "spec_unstable_logs_enabled")]
33use opentelemetry::logs::Severity;
34use opentelemetry::InstrumentationScope;
35
36use std::fmt::Debug;
37use std::time::Duration;
38
39pub trait LogProcessor: Send + Sync + Debug {
43 fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
55 fn force_flush(&self) -> OTelSdkResult;
57 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
61 Ok(())
62 }
63 fn shutdown(&self) -> OTelSdkResult {
65 self.shutdown_with_timeout(Duration::from_secs(5))
66 }
67 #[cfg(feature = "spec_unstable_logs_enabled")]
68 fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
70 true
72 }
73
74 fn set_resource(&mut self, _resource: &Resource) {}
76}
77
78#[cfg(all(test, feature = "testing", feature = "logs"))]
79pub(crate) mod tests {
80 use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
81 use crate::Resource;
82 use crate::{
83 error::OTelSdkResult,
84 logs::{LogProcessor, SdkLoggerProvider},
85 };
86 use opentelemetry::logs::AnyValue;
87 use opentelemetry::logs::LogRecord as _;
88 use opentelemetry::logs::{Logger, LoggerProvider};
89 use opentelemetry::{InstrumentationScope, Key};
90 use std::sync::{Arc, Mutex};
91
92 #[derive(Debug, Clone)]
93 pub(crate) struct MockLogExporter {
94 pub resource: Arc<Mutex<Option<Resource>>>,
95 }
96
97 impl LogExporter for MockLogExporter {
98 async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
99 Ok(())
100 }
101
102 fn set_resource(&mut self, resource: &Resource) {
103 self.resource
104 .lock()
105 .map(|mut res_opt| {
106 res_opt.replace(resource.clone());
107 })
108 .expect("mock log exporter shouldn't error when setting resource");
109 }
110 }
111
112 impl MockLogExporter {
114 pub(crate) fn get_resource(&self) -> Option<Resource> {
115 (*self.resource).lock().unwrap().clone()
116 }
117 }
118
119 #[derive(Debug)]
120 struct FirstProcessor {
121 pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
122 }
123
124 impl LogProcessor for FirstProcessor {
125 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
126 record.add_attribute(
128 Key::from_static_str("processed_by"),
129 AnyValue::String("FirstProcessor".into()),
130 );
131 record.body = Some("Updated by FirstProcessor".into());
133
134 self.logs
135 .lock()
136 .unwrap()
137 .push((record.clone(), instrumentation.clone())); }
139
140 fn force_flush(&self) -> OTelSdkResult {
141 Ok(())
142 }
143 }
144
145 #[derive(Debug)]
146 struct SecondProcessor {
147 pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
148 }
149
150 impl LogProcessor for SecondProcessor {
151 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
152 assert!(record.attributes_contains(
153 &Key::from_static_str("processed_by"),
154 &AnyValue::String("FirstProcessor".into())
155 ));
156 assert!(
157 record.body.clone().unwrap()
158 == AnyValue::String("Updated by FirstProcessor".into())
159 );
160 self.logs
161 .lock()
162 .unwrap()
163 .push((record.clone(), instrumentation.clone()));
164 }
165
166 fn force_flush(&self) -> OTelSdkResult {
167 Ok(())
168 }
169 }
170
171 #[test]
172 fn test_log_data_modification_by_multiple_processors() {
173 let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
174 let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
175
176 let first_processor = FirstProcessor {
177 logs: Arc::clone(&first_processor_logs),
178 };
179 let second_processor = SecondProcessor {
180 logs: Arc::clone(&second_processor_logs),
181 };
182
183 let logger_provider = SdkLoggerProvider::builder()
184 .with_log_processor(first_processor)
185 .with_log_processor(second_processor)
186 .build();
187
188 let logger = logger_provider.logger("test-logger");
189 let mut log_record = logger.create_log_record();
190 log_record.body = Some(AnyValue::String("Test log".into()));
191
192 logger.emit(log_record);
193
194 assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
195 assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
196
197 let first_log = &first_processor_logs.lock().unwrap()[0];
198 let second_log = &second_processor_logs.lock().unwrap()[0];
199
200 assert!(first_log.0.attributes_contains(
201 &Key::from_static_str("processed_by"),
202 &AnyValue::String("FirstProcessor".into())
203 ));
204 assert!(second_log.0.attributes_contains(
205 &Key::from_static_str("processed_by"),
206 &AnyValue::String("FirstProcessor".into())
207 ));
208
209 assert!(
210 first_log.0.body.clone().unwrap()
211 == AnyValue::String("Updated by FirstProcessor".into())
212 );
213 assert!(
214 second_log.0.body.clone().unwrap()
215 == AnyValue::String("Updated by FirstProcessor".into())
216 );
217 }
218}