1use crate::error::{OTelSdkError, OTelSdkResult};
19use crate::logs::log_processor::LogProcessor;
20use crate::{
21 logs::{LogBatch, LogExporter, SdkLogRecord},
22 Resource,
23};
24use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
25
26use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};
27
28use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
29use std::{cmp::min, env, sync::Mutex};
30use std::{
31 fmt::{self, Debug, Formatter},
32 str::FromStr,
33 sync::Arc,
34 thread,
35 time::Duration,
36 time::Instant,
37};
38
39pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
41pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(1_000);
43#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
45pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
46#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
48pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
49pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
51pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
53pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
55pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
57
58#[allow(clippy::large_enum_variant)]
60#[derive(Debug)]
61enum BatchMessage {
62 ExportLog(Arc<AtomicBool>),
64 ForceFlush(mpsc::SyncSender<OTelSdkResult>),
66 Shutdown(mpsc::SyncSender<OTelSdkResult>),
68 SetResource(Arc<Resource>),
70}
71
72type LogsData = Box<(SdkLogRecord, InstrumentationScope)>;
73
74pub struct BatchLogProcessor {
129 logs_sender: SyncSender<LogsData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
132 forceflush_timeout: Duration,
133 export_log_message_sent: Arc<AtomicBool>,
134 current_batch_size: Arc<AtomicUsize>,
135 max_export_batch_size: usize,
136
137 dropped_logs_count: AtomicUsize,
139
140 max_queue_size: usize,
142}
143
144impl Debug for BatchLogProcessor {
145 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
146 f.debug_struct("BatchLogProcessor")
147 .field("message_sender", &self.message_sender)
148 .finish()
149 }
150}
151
152impl LogProcessor for BatchLogProcessor {
153 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
154 let result = self
155 .logs_sender
156 .try_send(Box::new((record.clone(), instrumentation.clone())));
157
158 match result {
160 Ok(_) => {
161 if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
165 >= self.max_export_batch_size
166 {
167 if !self.export_log_message_sent.load(Ordering::Relaxed) {
174 if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
184 match self.message_sender.try_send(BatchMessage::ExportLog(
185 self.export_log_message_sent.clone(),
186 )) {
187 Ok(_) => {
188 }
190 Err(_err) => {
191 self.export_log_message_sent.store(false, Ordering::Relaxed);
195 }
196 }
197 }
198 }
199 }
200 }
201 Err(mpsc::TrySendError::Full(_)) => {
202 if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
205 otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
206 message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
207 }
208 }
209 Err(mpsc::TrySendError::Disconnected(_)) => {
210 let _guard = Context::enter_telemetry_suppressed_scope();
213
214 otel_warn!(
217 name: "BatchLogProcessor.Emit.AfterShutdown",
218 message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
219 );
220 }
221 }
222 }
223
224 fn force_flush(&self) -> OTelSdkResult {
225 let (sender, receiver) = mpsc::sync_channel(1);
226 match self
227 .message_sender
228 .try_send(BatchMessage::ForceFlush(sender))
229 {
230 Ok(_) => receiver
231 .recv_timeout(self.forceflush_timeout)
232 .map_err(|err| {
233 if err == RecvTimeoutError::Timeout {
234 OTelSdkError::Timeout(self.forceflush_timeout)
235 } else {
236 OTelSdkError::InternalFailure(format!("{err}"))
237 }
238 })?,
239 Err(mpsc::TrySendError::Full(_)) => {
240 otel_debug!(
242 name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
243 message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
244 );
245 Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
246 }
247 Err(mpsc::TrySendError::Disconnected(_)) => {
248 otel_debug!(
251 name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
252 message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
253 );
254
255 Err(OTelSdkError::AlreadyShutdown)
256 }
257 }
258 }
259
260 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
261 let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
262 let max_queue_size = self.max_queue_size;
263 if dropped_logs > 0 {
264 otel_warn!(
265 name: "BatchLogProcessor.LogsDropped",
266 dropped_logs_count = dropped_logs,
267 max_queue_size = max_queue_size,
268 message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
269 );
270 }
271
272 let (sender, receiver) = mpsc::sync_channel(1);
273 match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
274 Ok(_) => {
275 receiver
276 .recv_timeout(timeout)
277 .map(|_| {
278 if let Some(handle) = self.handle.lock().unwrap().take() {
281 handle.join().unwrap();
282 }
283 OTelSdkResult::Ok(())
284 })
285 .map_err(|err| match err {
286 RecvTimeoutError::Timeout => {
287 otel_error!(
288 name: "BatchLogProcessor.Shutdown.Timeout",
289 message = "BatchLogProcessor shutdown timing out."
290 );
291 OTelSdkError::Timeout(timeout)
292 }
293 _ => {
294 otel_error!(
295 name: "BatchLogProcessor.Shutdown.Error",
296 error = format!("{}", err)
297 );
298 OTelSdkError::InternalFailure(format!("{err}"))
299 }
300 })?
301 }
302 Err(mpsc::TrySendError::Full(_)) => {
303 otel_debug!(
305 name: "BatchLogProcessor.Shutdown.ControlChannelFull",
306 message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
307 );
308 Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
309 }
310 Err(mpsc::TrySendError::Disconnected(_)) => {
311 otel_debug!(
314 name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
315 message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
316 );
317
318 Err(OTelSdkError::AlreadyShutdown)
319 }
320 }
321 }
322
323 fn set_resource(&mut self, resource: &Resource) {
324 let resource = Arc::new(resource.clone());
325 let _ = self
326 .message_sender
327 .try_send(BatchMessage::SetResource(resource));
328 }
329}
330
331impl BatchLogProcessor {
332 pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
333 where
334 E: LogExporter + Send + Sync + 'static,
335 {
336 let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
337 let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
339 let max_export_batch_size = config.max_export_batch_size;
340 let current_batch_size = Arc::new(AtomicUsize::new(0));
341 let current_batch_size_for_thread = current_batch_size.clone();
342
343 let handle = thread::Builder::new()
344 .name("OpenTelemetry.Logs.BatchProcessor".to_string())
345 .spawn(move || {
346 let _suppress_guard = Context::enter_telemetry_suppressed_scope();
347 otel_debug!(
348 name: "BatchLogProcessor.ThreadStarted",
349 interval_in_millisecs = config.scheduled_delay.as_millis(),
350 max_export_batch_size = config.max_export_batch_size,
351 max_queue_size = max_queue_size,
352 );
353 let mut last_export_time = Instant::now();
354 let mut logs = Vec::with_capacity(config.max_export_batch_size);
355 let current_batch_size = current_batch_size_for_thread;
356
357 #[inline]
361 fn get_logs_and_export<E>(
362 logs_receiver: &mpsc::Receiver<LogsData>,
363 exporter: &E,
364 logs: &mut Vec<LogsData>,
365 last_export_time: &mut Instant,
366 current_batch_size: &AtomicUsize,
367 max_export_size: usize,
368 ) -> OTelSdkResult
369 where
370 E: LogExporter + Send + Sync + 'static,
371 {
372 let target = current_batch_size.load(Ordering::Relaxed); let mut result = OTelSdkResult::Ok(());
374 let mut total_exported_logs: usize = 0;
375
376 while target > 0 && total_exported_logs < target {
377 while let Ok(log) = logs_receiver.try_recv() {
379 logs.push(log);
380 if logs.len() == max_export_size {
381 break;
382 }
383 }
384
385 let count_of_logs = logs.len(); total_exported_logs += count_of_logs;
387
388 result = export_batch_sync(exporter, logs, last_export_time); current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
391 }
392 result
393 }
394
395 loop {
396 let remaining_time = config
397 .scheduled_delay
398 .checked_sub(last_export_time.elapsed())
399 .unwrap_or(config.scheduled_delay);
400
401 match message_receiver.recv_timeout(remaining_time) {
402 Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
403 export_log_message_sent.store(false, Ordering::Relaxed);
405
406 otel_debug!(
407 name: "BatchLogProcessor.ExportingDueToBatchSize",
408 );
409
410 let _ = get_logs_and_export(
411 &logs_receiver,
412 &exporter,
413 &mut logs,
414 &mut last_export_time,
415 ¤t_batch_size,
416 max_export_batch_size,
417 );
418 }
419 Ok(BatchMessage::ForceFlush(sender)) => {
420 otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
421 let result = get_logs_and_export(
422 &logs_receiver,
423 &exporter,
424 &mut logs,
425 &mut last_export_time,
426 ¤t_batch_size,
427 max_export_batch_size,
428 );
429 let _ = sender.send(result);
430 }
431 Ok(BatchMessage::Shutdown(sender)) => {
432 otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
433 let result = get_logs_and_export(
434 &logs_receiver,
435 &exporter,
436 &mut logs,
437 &mut last_export_time,
438 ¤t_batch_size,
439 max_export_batch_size,
440 );
441 let _ = exporter.shutdown();
442 let _ = sender.send(result);
443
444 otel_debug!(
445 name: "BatchLogProcessor.ThreadExiting",
446 reason = "ShutdownRequested"
447 );
448 break;
452 }
453 Ok(BatchMessage::SetResource(resource)) => {
454 exporter.set_resource(&resource);
455 }
456 Err(RecvTimeoutError::Timeout) => {
457 otel_debug!(
458 name: "BatchLogProcessor.ExportingDueToTimer",
459 );
460
461 let _ = get_logs_and_export(
462 &logs_receiver,
463 &exporter,
464 &mut logs,
465 &mut last_export_time,
466 ¤t_batch_size,
467 max_export_batch_size,
468 );
469 }
470 Err(RecvTimeoutError::Disconnected) => {
471 otel_debug!(
474 name: "BatchLogProcessor.ThreadExiting",
475 reason = "MessageSenderDisconnected"
476 );
477 break;
478 }
479 }
480 }
481 otel_debug!(
482 name: "BatchLogProcessor.ThreadStopped"
483 );
484 })
485 .expect("Thread spawn failed."); BatchLogProcessor {
489 logs_sender,
490 message_sender,
491 handle: Mutex::new(Some(handle)),
492 forceflush_timeout: Duration::from_secs(5), dropped_logs_count: AtomicUsize::new(0),
494 max_queue_size,
495 export_log_message_sent: Arc::new(AtomicBool::new(false)),
496 current_batch_size,
497 max_export_batch_size,
498 }
499 }
500
501 pub fn builder<E>(exporter: E) -> BatchLogProcessorBuilder<E>
503 where
504 E: LogExporter,
505 {
506 BatchLogProcessorBuilder {
507 exporter,
508 config: Default::default(),
509 }
510 }
511}
512
513#[allow(clippy::vec_box)]
514fn export_batch_sync<E>(
515 exporter: &E,
516 batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
517 last_export_time: &mut Instant,
518) -> OTelSdkResult
519where
520 E: LogExporter + ?Sized,
521{
522 *last_export_time = Instant::now();
523
524 if batch.is_empty() {
525 return OTelSdkResult::Ok(());
526 }
527
528 let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
529 let export_result = futures_executor::block_on(export);
530
531 batch.clear();
533
534 match export_result {
535 Ok(_) => OTelSdkResult::Ok(()),
536 Err(err) => {
537 otel_error!(
538 name: "BatchLogProcessor.ExportError",
539 error = format!("{}", err)
540 );
541 OTelSdkResult::Err(err)
542 }
543 }
544}
545
546#[derive(Debug)]
550pub struct BatchLogProcessorBuilder<E> {
551 exporter: E,
552 config: BatchConfig,
553}
554
555impl<E> BatchLogProcessorBuilder<E>
556where
557 E: LogExporter + 'static,
558{
559 pub fn with_batch_config(self, config: BatchConfig) -> Self {
561 BatchLogProcessorBuilder { config, ..self }
562 }
563
564 pub fn build(self) -> BatchLogProcessor {
566 BatchLogProcessor::new(self.exporter, self.config)
567 }
568}
569
570#[derive(Debug)]
573#[allow(dead_code)]
574pub struct BatchConfig {
575 pub(crate) max_queue_size: usize,
578
579 pub(crate) scheduled_delay: Duration,
582
583 pub(crate) max_export_batch_size: usize,
588
589 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
591 pub(crate) max_export_timeout: Duration,
592}
593
594impl Default for BatchConfig {
595 fn default() -> Self {
596 BatchConfigBuilder::default().build()
597 }
598}
599
600#[derive(Debug)]
602pub struct BatchConfigBuilder {
603 max_queue_size: usize,
604 scheduled_delay: Duration,
605 max_export_batch_size: usize,
606 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
607 max_export_timeout: Duration,
608}
609
610impl Default for BatchConfigBuilder {
611 fn default() -> Self {
621 BatchConfigBuilder {
622 max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
623 scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
624 max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
625 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
626 max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
627 }
628 .init_from_env_vars()
629 }
630}
631
632impl BatchConfigBuilder {
633 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
642 self.max_queue_size = max_queue_size;
643 self
644 }
645
646 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
654 self.scheduled_delay = scheduled_delay;
655 self
656 }
657
658 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
666 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
667 self.max_export_timeout = max_export_timeout;
668 self
669 }
670
671 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
681 self.max_export_batch_size = max_export_batch_size;
682 self
683 }
684
685 pub fn build(self) -> BatchConfig {
688 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
691
692 BatchConfig {
693 max_queue_size: self.max_queue_size,
694 scheduled_delay: self.scheduled_delay,
695 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
696 max_export_timeout: self.max_export_timeout,
697 max_export_batch_size,
698 }
699 }
700
701 fn init_from_env_vars(mut self) -> Self {
702 if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
703 .ok()
704 .and_then(|queue_size| usize::from_str(&queue_size).ok())
705 {
706 self.max_queue_size = max_queue_size;
707 }
708
709 if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
710 .ok()
711 .and_then(|batch_size| usize::from_str(&batch_size).ok())
712 {
713 self.max_export_batch_size = max_export_batch_size;
714 }
715
716 if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
717 .ok()
718 .and_then(|delay| u64::from_str(&delay).ok())
719 {
720 self.scheduled_delay = Duration::from_millis(scheduled_delay);
721 }
722
723 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
724 if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
725 .ok()
726 .and_then(|s| u64::from_str(&s).ok())
727 {
728 self.max_export_timeout = Duration::from_millis(max_export_timeout);
729 }
730
731 self
732 }
733}
734
735#[cfg(all(test, feature = "testing", feature = "logs"))]
736mod tests {
737 use super::{
738 BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
739 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE,
740 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
741 OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
742 };
743 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
744 use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
745 use crate::logs::log_processor::tests::MockLogExporter;
746 use crate::logs::SdkLogRecord;
747 use crate::{
748 logs::{InMemoryLogExporter, InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider},
749 Resource,
750 };
751 use opentelemetry::InstrumentationScope;
752 use opentelemetry::KeyValue;
753 use std::sync::{Arc, Mutex};
754 use std::time::Duration;
755
756 #[test]
757 fn test_default_const_values() {
758 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
759 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000);
760 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
761 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
762 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
763 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000);
764 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
765 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
766 assert_eq!(
767 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
768 "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
769 );
770 assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
771 }
772
773 #[test]
774 fn test_default_batch_config_adheres_to_specification() {
775 let env_vars = vec![
777 OTEL_BLRP_SCHEDULE_DELAY,
778 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
779 OTEL_BLRP_EXPORT_TIMEOUT,
780 OTEL_BLRP_MAX_QUEUE_SIZE,
781 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
782 ];
783
784 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
785
786 assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
787 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
788 assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
789 assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
790 assert_eq!(
791 config.max_export_batch_size,
792 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
793 );
794 }
795
796 #[test]
797 fn test_code_based_config_overrides_env_vars() {
798 let env_vars = vec![
799 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
800 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
801 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
802 ];
803
804 temp_env::with_vars(env_vars, || {
805 let config = BatchConfigBuilder::default()
806 .with_max_queue_size(2048)
807 .with_scheduled_delay(Duration::from_millis(1000))
808 .with_max_export_batch_size(512)
809 .build();
810
811 assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
812 assert_eq!(config.max_queue_size, 2048);
813 assert_eq!(config.max_export_batch_size, 512);
814 });
815 }
816
817 #[test]
818 fn test_batch_config_configurable_by_env_vars() {
819 let env_vars = vec![
820 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
821 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
822 (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
823 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
824 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
825 ];
826
827 let config = temp_env::with_vars(env_vars, BatchConfig::default);
828
829 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
830 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
831 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
832 assert_eq!(config.max_queue_size, 4096);
833 assert_eq!(config.max_export_batch_size, 1024);
834 }
835
836 #[test]
837 fn test_batch_config_max_export_batch_size_validation() {
838 let env_vars = vec![
839 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
840 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
841 ];
842
843 let config = temp_env::with_vars(env_vars, BatchConfig::default);
844
845 assert_eq!(config.max_queue_size, 256);
846 assert_eq!(config.max_export_batch_size, 256);
847 assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
848 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
849 assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
850 }
851
852 #[test]
853 fn test_batch_config_with_fields() {
854 let batch_builder = BatchConfigBuilder::default()
855 .with_max_export_batch_size(1)
856 .with_scheduled_delay(Duration::from_millis(2))
857 .with_max_queue_size(4);
858
859 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
860 let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
861 let batch = batch_builder.build();
862
863 assert_eq!(batch.max_export_batch_size, 1);
864 assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
865 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
866 assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
867 assert_eq!(batch.max_queue_size, 4);
868 }
869
870 #[test]
871 fn test_build_batch_log_processor_builder() {
872 let mut env_vars = vec![
873 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
874 (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
875 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
876 (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
877 ];
878 temp_env::with_vars(env_vars.clone(), || {
879 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
880
881 assert_eq!(builder.config.max_export_batch_size, 500);
882 assert_eq!(
883 builder.config.scheduled_delay,
884 OTEL_BLRP_SCHEDULE_DELAY_DEFAULT
885 );
886 assert_eq!(
887 builder.config.max_queue_size,
888 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
889 );
890
891 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
892 assert_eq!(
893 builder.config.max_export_timeout,
894 Duration::from_millis(2046)
895 );
896 });
897
898 env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
899
900 temp_env::with_vars(env_vars, || {
901 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
902 assert_eq!(builder.config.max_export_batch_size, 120);
903 assert_eq!(builder.config.max_queue_size, 120);
904 });
905 }
906
907 #[test]
908 fn test_build_batch_log_processor_builder_with_custom_config() {
909 let expected = BatchConfigBuilder::default()
910 .with_max_export_batch_size(1)
911 .with_scheduled_delay(Duration::from_millis(2))
912 .with_max_queue_size(4)
913 .build();
914
915 let builder =
916 BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
917
918 let actual = &builder.config;
919 assert_eq!(actual.max_export_batch_size, 1);
920 assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
921 assert_eq!(actual.max_queue_size, 4);
922 }
923
924 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
925 async fn test_set_resource_batch_processor() {
926 let exporter = MockLogExporter {
927 resource: Arc::new(Mutex::new(None)),
928 };
929 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
930 let provider = SdkLoggerProvider::builder()
931 .with_log_processor(processor)
932 .with_resource(
933 Resource::builder_empty()
934 .with_attributes([
935 KeyValue::new("k1", "v1"),
936 KeyValue::new("k2", "v3"),
937 KeyValue::new("k3", "v3"),
938 KeyValue::new("k4", "v4"),
939 KeyValue::new("k5", "v5"),
940 ])
941 .build(),
942 )
943 .build();
944
945 provider.force_flush().unwrap();
946
947 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
948 let _ = provider.shutdown();
949 }
950
951 #[tokio::test(flavor = "multi_thread")]
952 async fn test_batch_shutdown() {
953 let exporter = InMemoryLogExporterBuilder::default()
956 .keep_records_on_shutdown()
957 .build();
958 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
959
960 let mut record = SdkLogRecord::new();
961 let instrumentation = InstrumentationScope::default();
962
963 processor.emit(&mut record, &instrumentation);
964 processor.force_flush().unwrap();
965 processor.shutdown().unwrap();
966 processor.emit(&mut record, &instrumentation);
968 assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
969 assert!(exporter.is_shutdown_called());
970 }
971
972 #[tokio::test(flavor = "current_thread")]
973 async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
974 let exporter = InMemoryLogExporterBuilder::default().build();
975 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
976
977 processor.shutdown().unwrap();
978 }
979
980 #[tokio::test(flavor = "current_thread")]
981 async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
982 let exporter = InMemoryLogExporterBuilder::default().build();
983 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
984 processor.shutdown().unwrap();
985 }
986
987 #[tokio::test(flavor = "multi_thread")]
988 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
989 let exporter = InMemoryLogExporterBuilder::default().build();
990 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
991 processor.shutdown().unwrap();
992 }
993
994 #[tokio::test(flavor = "multi_thread")]
995 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
996 let exporter = InMemoryLogExporterBuilder::default().build();
997 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
998 processor.shutdown().unwrap();
999 }
1000}