1use crate::error::{OTelSdkError, OTelSdkResult};
20use crate::logs::log_processor::LogProcessor;
21use crate::{
22 logs::{LogBatch, LogExporter, SdkLogRecord},
23 Resource,
24};
25
26use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};
27
28use std::fmt::Debug;
29use std::sync::atomic::AtomicBool;
30use std::sync::Mutex;
31
32#[derive(Debug)]
62pub struct SimpleLogProcessor<T: LogExporter> {
63 exporter: Mutex<T>,
64 is_shutdown: AtomicBool,
65}
66
67impl<T: LogExporter> SimpleLogProcessor<T> {
68 pub fn new(exporter: T) -> Self {
70 SimpleLogProcessor {
71 exporter: Mutex::new(exporter),
72 is_shutdown: AtomicBool::new(false),
73 }
74 }
75}
76
77impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
78 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
79 let _suppress_guard = Context::enter_telemetry_suppressed_scope();
80 if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
82 otel_warn!(
84 name: "SimpleLogProcessor.Emit.ProcessorShutdown",
85 );
86 return;
87 }
88
89 let result = self
90 .exporter
91 .lock()
92 .map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
93 .and_then(|exporter| {
94 let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
95 futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
96 });
97 match result {
99 Err(OTelSdkError::InternalFailure(_)) => {
100 otel_debug!(
102 name: "SimpleLogProcessor.Emit.MutexPoisoning",
103 );
104 }
105 Err(err) => {
106 otel_error!(
107 name: "SimpleLogProcessor.Emit.ExportError",
108 error = format!("{}",err)
109 );
110 }
111 _ => {}
112 }
113 }
114
115 fn force_flush(&self) -> OTelSdkResult {
116 Ok(())
117 }
118
119 fn shutdown(&self) -> OTelSdkResult {
120 self.is_shutdown
121 .store(true, std::sync::atomic::Ordering::Relaxed);
122 if let Ok(exporter) = self.exporter.lock() {
123 exporter.shutdown()
124 } else {
125 Err(OTelSdkError::InternalFailure(
126 "SimpleLogProcessor mutex poison at shutdown".into(),
127 ))
128 }
129 }
130
131 fn set_resource(&mut self, resource: &Resource) {
132 if let Ok(mut exporter) = self.exporter.lock() {
133 exporter.set_resource(resource);
134 }
135 }
136
137 #[cfg(feature = "spec_unstable_logs_enabled")]
138 #[inline]
139 fn event_enabled(
140 &self,
141 level: opentelemetry::logs::Severity,
142 target: &str,
143 name: Option<&str>,
144 ) -> bool {
145 if let Ok(exporter) = self.exporter.lock() {
146 exporter.event_enabled(level, target, name)
147 } else {
148 true
149 }
150 }
151}
152
153#[cfg(all(test, feature = "testing", feature = "logs"))]
154mod tests {
155 use crate::logs::log_processor::tests::MockLogExporter;
156 use crate::logs::{LogBatch, LogExporter, SdkLogRecord, SdkLogger};
157 use crate::{
158 error::OTelSdkResult,
159 logs::{InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider, SimpleLogProcessor},
160 Resource,
161 };
162 use opentelemetry::logs::{LogRecord, Logger, LoggerProvider};
163 use opentelemetry::InstrumentationScope;
164 use opentelemetry::KeyValue;
165 use std::sync::atomic::{AtomicUsize, Ordering};
166 use std::sync::{Arc, Mutex};
167 use std::time;
168 use std::time::Duration;
169
170 #[derive(Debug, Clone)]
171 struct LogExporterThatRequiresTokio {
172 export_count: Arc<AtomicUsize>,
173 }
174
175 impl LogExporterThatRequiresTokio {
176 fn new() -> Self {
178 LogExporterThatRequiresTokio {
179 export_count: Arc::new(AtomicUsize::new(0)),
180 }
181 }
182
183 fn len(&self) -> usize {
185 self.export_count.load(Ordering::Acquire)
186 }
187 }
188
189 impl LogExporter for LogExporterThatRequiresTokio {
190 async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
191 tokio::time::sleep(Duration::from_millis(50)).await;
193
194 for _ in batch.iter() {
195 self.export_count.fetch_add(1, Ordering::Acquire);
196 }
197 Ok(())
198 }
199 fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
200 Ok(())
201 }
202 }
203
204 #[test]
205 fn test_set_resource_simple_processor() {
206 let exporter = MockLogExporter {
207 resource: Arc::new(Mutex::new(None)),
208 };
209 let processor = SimpleLogProcessor::new(exporter.clone());
210 let _ = SdkLoggerProvider::builder()
211 .with_log_processor(processor)
212 .with_resource(
213 Resource::builder_empty()
214 .with_attributes([
215 KeyValue::new("k1", "v1"),
216 KeyValue::new("k2", "v3"),
217 KeyValue::new("k3", "v3"),
218 KeyValue::new("k4", "v4"),
219 KeyValue::new("k5", "v5"),
220 ])
221 .build(),
222 )
223 .build();
224 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
225 }
226
227 #[test]
228 fn test_simple_shutdown() {
229 let exporter = InMemoryLogExporterBuilder::default()
230 .keep_records_on_shutdown()
231 .build();
232 let processor = SimpleLogProcessor::new(exporter.clone());
233
234 let mut record: SdkLogRecord = SdkLogRecord::new();
235 let instrumentation: InstrumentationScope = Default::default();
236
237 processor.emit(&mut record, &instrumentation);
238
239 processor.shutdown().unwrap();
240
241 let is_shutdown = processor
242 .is_shutdown
243 .load(std::sync::atomic::Ordering::Relaxed);
244 assert!(is_shutdown);
245
246 processor.emit(&mut record, &instrumentation);
247
248 assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
249 assert!(exporter.is_shutdown_called());
250 }
251
252 #[test]
253 fn test_simple_processor_sync_exporter_without_runtime() {
254 let exporter = InMemoryLogExporterBuilder::default().build();
255 let processor = SimpleLogProcessor::new(exporter.clone());
256
257 let mut record: SdkLogRecord = SdkLogRecord::new();
258 let instrumentation: InstrumentationScope = Default::default();
259
260 processor.emit(&mut record, &instrumentation);
261
262 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
263 }
264
265 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
266 async fn test_simple_processor_sync_exporter_with_runtime() {
267 let exporter = InMemoryLogExporterBuilder::default().build();
268 let processor = SimpleLogProcessor::new(exporter.clone());
269
270 let mut record: SdkLogRecord = SdkLogRecord::new();
271 let instrumentation: InstrumentationScope = Default::default();
272
273 processor.emit(&mut record, &instrumentation);
274
275 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
276 }
277
278 #[tokio::test(flavor = "multi_thread")]
279 async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
280 let exporter = InMemoryLogExporterBuilder::default().build();
281 let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
282
283 let mut handles = vec![];
284 for _ in 0..10 {
285 let processor_clone = Arc::clone(&processor);
286 let handle = tokio::spawn(async move {
287 let mut record: SdkLogRecord = SdkLogRecord::new();
288 let instrumentation: InstrumentationScope = Default::default();
289 processor_clone.emit(&mut record, &instrumentation);
290 });
291 handles.push(handle);
292 }
293
294 for handle in handles {
295 handle.await.unwrap();
296 }
297
298 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
299 }
300
301 #[tokio::test(flavor = "current_thread")]
302 async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
303 let exporter = InMemoryLogExporterBuilder::default().build();
304 let processor = SimpleLogProcessor::new(exporter.clone());
305
306 let mut record: SdkLogRecord = SdkLogRecord::new();
307 let instrumentation: InstrumentationScope = Default::default();
308
309 processor.emit(&mut record, &instrumentation);
310
311 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
312 }
313
314 #[test]
315 fn test_simple_processor_async_exporter_without_runtime() {
316 let result = std::panic::catch_unwind(|| {
318 let exporter = LogExporterThatRequiresTokio::new();
319 let processor = SimpleLogProcessor::new(exporter.clone());
320
321 let mut record: SdkLogRecord = SdkLogRecord::new();
322 let instrumentation: InstrumentationScope = Default::default();
323
324 processor.emit(&mut record, &instrumentation);
326 });
327
328 assert!(
330 result.is_err(),
331 "The test should fail due to missing Tokio runtime, but it did not."
332 );
333 let panic_payload = result.unwrap_err();
334 let panic_message = panic_payload
335 .downcast_ref::<String>()
336 .map(|s| s.as_str())
337 .or_else(|| panic_payload.downcast_ref::<&str>().copied())
338 .unwrap_or("No panic message");
339
340 assert!(
341 panic_message.contains("no reactor running")
342 || panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
343 "Expected panic message about missing Tokio runtime, but got: {panic_message}"
344 );
345 }
346
347 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
348 #[ignore]
349 async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
366 let exporter = LogExporterThatRequiresTokio::new();
367 let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
368
369 let concurrent_emit = 4; let mut handles = vec![];
372 for _ in 0..concurrent_emit {
374 let processor_clone = Arc::clone(&processor);
375 let handle = tokio::spawn(async move {
376 let mut record: SdkLogRecord = SdkLogRecord::new();
377 let instrumentation: InstrumentationScope = Default::default();
378 processor_clone.emit(&mut record, &instrumentation);
379 });
380 handles.push(handle);
381 }
382
383 for handle in handles {
385 handle.await.unwrap();
386 }
387 assert_eq!(exporter.len(), concurrent_emit);
388 }
389
390 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
391 async fn test_simple_processor_async_exporter_with_runtime() {
397 let exporter = LogExporterThatRequiresTokio::new();
398 let processor = SimpleLogProcessor::new(exporter.clone());
399
400 let mut record: SdkLogRecord = SdkLogRecord::new();
401 let instrumentation: InstrumentationScope = Default::default();
402
403 processor.emit(&mut record, &instrumentation);
404
405 assert_eq!(exporter.len(), 1);
406 }
407
408 #[tokio::test(flavor = "multi_thread")]
409 async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
415 let exporter = LogExporterThatRequiresTokio::new();
416
417 let processor = SimpleLogProcessor::new(exporter.clone());
418
419 let mut record: SdkLogRecord = SdkLogRecord::new();
420 let instrumentation: InstrumentationScope = Default::default();
421
422 processor.emit(&mut record, &instrumentation);
423
424 assert_eq!(exporter.len(), 1);
425 }
426
427 #[tokio::test(flavor = "current_thread")]
428 #[ignore]
429 async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
435 let exporter = LogExporterThatRequiresTokio::new();
436
437 let processor = SimpleLogProcessor::new(exporter.clone());
438
439 let mut record: SdkLogRecord = SdkLogRecord::new();
440 let instrumentation: InstrumentationScope = Default::default();
441
442 processor.emit(&mut record, &instrumentation);
443
444 assert_eq!(exporter.len(), 1);
445 }
446
447 #[derive(Debug, Clone)]
448 struct ReentrantLogExporter {
449 logger: Arc<Mutex<Option<SdkLogger>>>,
450 }
451
452 impl ReentrantLogExporter {
453 fn new() -> Self {
454 Self {
455 logger: Arc::new(Mutex::new(None)),
456 }
457 }
458
459 fn set_logger(&self, logger: SdkLogger) {
460 let mut guard = self.logger.lock().unwrap();
461 *guard = Some(logger);
462 }
463 }
464
465 impl LogExporter for ReentrantLogExporter {
466 async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
467 let logger = self.logger.lock().unwrap();
468 if let Some(logger) = logger.as_ref() {
469 let mut log_record = logger.create_log_record();
470 log_record.set_severity_number(opentelemetry::logs::Severity::Error);
471 logger.emit(log_record);
472 }
473
474 Ok(())
475 }
476 }
477
478 #[test]
479 fn exporter_internal_log_does_not_deadlock_with_simple_processor() {
480 let exporter: ReentrantLogExporter = ReentrantLogExporter::new();
484 let logger_provider = SdkLoggerProvider::builder()
485 .with_simple_exporter(exporter.clone())
486 .build();
487 exporter.set_logger(logger_provider.logger("processor-logger"));
488
489 let logger = logger_provider.logger("test-logger");
490 let mut log_record = logger.create_log_record();
491 log_record.set_severity_number(opentelemetry::logs::Severity::Error);
492 logger.emit(log_record);
493 }
494}