opentelemetry_sdk/logs/
batch_log_processor.rs

1//! # OpenTelemetry Batch Log Processor
2//! The `BatchLogProcessor` is one implementation of the `LogProcessor` interface.
3//!
4//! It buffers log records and sends them to the exporter
5//! in batches. This processor is designed for **production use** in high-throughput
6//! applications and reduces the overhead of frequent exports by using a background
7//! thread for batch processing.
8//!
9//! ## Diagram
10//!
11//! ```ascii
12//!   +-----+---------------+   +-----------------------+   +-------------------+
13//!   |     |               |   |                       |   |                   |
14//!   | SDK | Logger.emit() +---> (Batch)LogProcessor   +--->  (OTLPExporter)   |
15//!   +-----+---------------+   +-----------------------+   +-------------------+
16//! ```
17
18use crate::error::{OTelSdkError, OTelSdkResult};
19use crate::logs::log_processor::LogProcessor;
20use crate::{
21    logs::{LogBatch, LogExporter, SdkLogRecord},
22    Resource,
23};
24use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
25
26use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};
27
28use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
29use std::{cmp::min, env, sync::Mutex};
30use std::{
31    fmt::{self, Debug, Formatter},
32    str::FromStr,
33    sync::Arc,
34    thread,
35    time::Duration,
36    time::Instant,
37};
38
39/// Delay interval between two consecutive exports.
40pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
41/// Default delay interval between two consecutive exports.
42pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(1_000);
43/// Maximum allowed time to export data.
44#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
45pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
46/// Default maximum allowed time to export data.
47#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
48pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
49/// Maximum queue size.
50pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
51/// Default maximum queue size.
52pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
53/// Maximum batch size, must be less than or equal to OTEL_BLRP_MAX_QUEUE_SIZE.
54pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
55/// Default maximum batch size.
56pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
57
58/// Messages sent between application thread and batch log processor's work thread.
59#[allow(clippy::large_enum_variant)]
60#[derive(Debug)]
61enum BatchMessage {
62    /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
63    ExportLog(Arc<AtomicBool>),
64    /// ForceFlush flushes the current buffer to the exporter.
65    ForceFlush(mpsc::SyncSender<OTelSdkResult>),
66    /// Shut down the worker thread, push all logs in buffer to the exporter.
67    Shutdown(mpsc::SyncSender<OTelSdkResult>),
68    /// Set the resource for the exporter.
69    SetResource(Arc<Resource>),
70}
71
72type LogsData = Box<(SdkLogRecord, InstrumentationScope)>;
73
74/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
75/// in batches to the configured `LogExporter`. This processor is ideal for
76/// high-throughput environments, as it minimizes the overhead of exporting logs
77/// individually. It uses a **dedicated background thread** to manage and export logs
78/// asynchronously, ensuring that the application's main execution flow is not blocked.
79///
80/// This processor supports the following configurations:
81/// - **Queue size**: Maximum number of log records that can be buffered.
82/// - **Batch size**: Maximum number of log records to include in a single export.
83/// - **Scheduled delay**: Frequency at which the batch is exported.
84///
85/// When using this processor with the OTLP Exporter, the following exporter
86/// features are supported:
87/// - `grpc-tonic`: Requires `LoggerProvider` to be created within a tokio runtime.
88/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
89///
90/// In other words, other clients like `reqwest` and `hyper` are not supported.
91///
92/// `BatchLogProcessor` buffers logs in memory and exports them in batches. An
93/// export is triggered when `max_export_batch_size` is reached or every
94/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
95/// the `force_flush` method. Shutdown also triggers an export of all buffered
96/// logs and is recommended to be called before the application exits to ensure
97/// all buffered logs are exported.
98///
99/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
100/// is a blocking call ,should not be called from your main thread. This can
101/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
102/// tokio's `spawn_blocking`.
103///
104///
105/// ### Using a BatchLogProcessor:
106///
107/// ```rust
108/// use opentelemetry_sdk::logs::{BatchLogProcessor, BatchConfigBuilder, SdkLoggerProvider};
109/// use opentelemetry::global;
110/// use std::time::Duration;
111/// use opentelemetry_sdk::logs::InMemoryLogExporter;
112///
113/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
114/// let processor = BatchLogProcessor::builder(exporter)
115///     .with_batch_config(
116///         BatchConfigBuilder::default()
117///             .with_max_queue_size(2048)
118///             .with_max_export_batch_size(512)
119///             .with_scheduled_delay(Duration::from_secs(5))
120///             .build(),
121///     )
122///     .build();
123///
124/// let provider = SdkLoggerProvider::builder()
125///     .with_log_processor(processor)
126///     .build();
127///
128pub struct BatchLogProcessor {
129    logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
130    message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
131    handle: Mutex<Option<thread::JoinHandle<()>>>,
132    forceflush_timeout: Duration,
133    export_log_message_sent: Arc<AtomicBool>,
134    current_batch_size: Arc<AtomicUsize>,
135    max_export_batch_size: usize,
136
137    // Track dropped logs - we'll log this at shutdown
138    dropped_logs_count: AtomicUsize,
139
140    // Track the maximum queue size that was configured for this processor
141    max_queue_size: usize,
142}
143
144impl Debug for BatchLogProcessor {
145    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
146        f.debug_struct("BatchLogProcessor")
147            .field("message_sender", &self.message_sender)
148            .finish()
149    }
150}
151
152impl LogProcessor for BatchLogProcessor {
153    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
154        let result = self
155            .logs_sender
156            .try_send(Box::new((record.clone(), instrumentation.clone())));
157
158        // match for result and handle each separately
159        match result {
160            Ok(_) => {
161                // Successfully sent the log record to the data channel.
162                // Increment the current batch size and check if it has reached
163                // the max export batch size.
164                if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
165                    >= self.max_export_batch_size
166                {
167                    // Check if the a control message for exporting logs is
168                    // already sent to the worker thread. If not, send a control
169                    // message to export logs. `export_log_message_sent` is set
170                    // to false ONLY when the worker thread has processed the
171                    // control message.
172
173                    if !self.export_log_message_sent.load(Ordering::Relaxed) {
174                        // This is a cost-efficient check as atomic load
175                        // operations do not require exclusive access to cache
176                        // line. Perform atomic swap to
177                        // `export_log_message_sent` ONLY when the atomic load
178                        // operation above returns false. Atomic
179                        // swap/compare_exchange operations require exclusive
180                        // access to cache line on most processor architectures.
181                        // We could have used compare_exchange as well here, but
182                        // it's more verbose than swap.
183                        if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
184                            match self.message_sender.try_send(BatchMessage::ExportLog(
185                                self.export_log_message_sent.clone(),
186                            )) {
187                                Ok(_) => {
188                                    // Control message sent successfully.
189                                }
190                                Err(_err) => {
191                                    // TODO: Log error If the control message
192                                    // could not be sent, reset the
193                                    // `export_log_message_sent` flag.
194                                    self.export_log_message_sent.store(false, Ordering::Relaxed);
195                                }
196                            }
197                        }
198                    }
199                }
200            }
201            Err(mpsc::TrySendError::Full(_)) => {
202                // Increment dropped logs count. The first time we have to drop
203                // a log, emit a warning.
204                if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
205                    otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
206                        message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
207                }
208            }
209            Err(mpsc::TrySendError::Disconnected(_)) => {
210                // The following `otel_warn!` may cause an infinite feedback loop of
211                // 'telemetry-induced-telemetry', potentially causing a stack overflow
212                let _guard = Context::enter_telemetry_suppressed_scope();
213
214                // Given background thread is the only receiver, and it's
215                // disconnected, it indicates the thread is shutdown
216                otel_warn!(
217                    name: "BatchLogProcessor.Emit.AfterShutdown",
218                    message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
219                );
220            }
221        }
222    }
223
224    fn force_flush(&self) -> OTelSdkResult {
225        let (sender, receiver) = mpsc::sync_channel(1);
226        match self
227            .message_sender
228            .try_send(BatchMessage::ForceFlush(sender))
229        {
230            Ok(_) => receiver
231                .recv_timeout(self.forceflush_timeout)
232                .map_err(|err| {
233                    if err == RecvTimeoutError::Timeout {
234                        OTelSdkError::Timeout(self.forceflush_timeout)
235                    } else {
236                        OTelSdkError::InternalFailure(format!("{err}"))
237                    }
238                })?,
239            Err(mpsc::TrySendError::Full(_)) => {
240                // If the control message could not be sent, emit a warning.
241                otel_debug!(
242                    name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
243                    message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
244                );
245                Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
246            }
247            Err(mpsc::TrySendError::Disconnected(_)) => {
248                // Given background thread is the only receiver, and it's
249                // disconnected, it indicates the thread is shutdown
250                otel_debug!(
251                    name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
252                    message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
253                );
254
255                Err(OTelSdkError::AlreadyShutdown)
256            }
257        }
258    }
259
260    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
261        let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
262        let max_queue_size = self.max_queue_size;
263        if dropped_logs > 0 {
264            otel_warn!(
265                name: "BatchLogProcessor.LogsDropped",
266                dropped_logs_count = dropped_logs,
267                max_queue_size = max_queue_size,
268                message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
269            );
270        }
271
272        let (sender, receiver) = mpsc::sync_channel(1);
273        match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
274            Ok(_) => {
275                receiver
276                    .recv_timeout(timeout)
277                    .map(|_| {
278                        // join the background thread after receiving back the
279                        // shutdown signal
280                        if let Some(handle) = self.handle.lock().unwrap().take() {
281                            handle.join().unwrap();
282                        }
283                        OTelSdkResult::Ok(())
284                    })
285                    .map_err(|err| match err {
286                        RecvTimeoutError::Timeout => {
287                            otel_error!(
288                                name: "BatchLogProcessor.Shutdown.Timeout",
289                                message = "BatchLogProcessor shutdown timing out."
290                            );
291                            OTelSdkError::Timeout(timeout)
292                        }
293                        _ => {
294                            otel_error!(
295                                name: "BatchLogProcessor.Shutdown.Error",
296                                error = format!("{}", err)
297                            );
298                            OTelSdkError::InternalFailure(format!("{err}"))
299                        }
300                    })?
301            }
302            Err(mpsc::TrySendError::Full(_)) => {
303                // If the control message could not be sent, emit a warning.
304                otel_debug!(
305                    name: "BatchLogProcessor.Shutdown.ControlChannelFull",
306                    message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
307                );
308                Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
309            }
310            Err(mpsc::TrySendError::Disconnected(_)) => {
311                // Given background thread is the only receiver, and it's
312                // disconnected, it indicates the thread is shutdown
313                otel_debug!(
314                    name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
315                    message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
316                );
317
318                Err(OTelSdkError::AlreadyShutdown)
319            }
320        }
321    }
322
323    fn set_resource(&mut self, resource: &Resource) {
324        let resource = Arc::new(resource.clone());
325        let _ = self
326            .message_sender
327            .try_send(BatchMessage::SetResource(resource));
328    }
329}
330
331impl BatchLogProcessor {
332    pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
333    where
334        E: LogExporter + Send + Sync + 'static,
335    {
336        let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
337        let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
338        let max_queue_size = config.max_queue_size;
339        let max_export_batch_size = config.max_export_batch_size;
340        let current_batch_size = Arc::new(AtomicUsize::new(0));
341        let current_batch_size_for_thread = current_batch_size.clone();
342
343        let handle = thread::Builder::new()
344            .name("OpenTelemetry.Logs.BatchProcessor".to_string())
345            .spawn(move || {
346                let _suppress_guard = Context::enter_telemetry_suppressed_scope();
347                otel_debug!(
348                    name: "BatchLogProcessor.ThreadStarted",
349                    interval_in_millisecs = config.scheduled_delay.as_millis(),
350                    max_export_batch_size = config.max_export_batch_size,
351                    max_queue_size = max_queue_size,
352                );
353                let mut last_export_time = Instant::now();
354                let mut logs = Vec::with_capacity(config.max_export_batch_size);
355                let current_batch_size = current_batch_size_for_thread;
356
357                // This method gets up to `max_export_batch_size` amount of logs from the channel and exports them.
358                // It returns the result of the export operation.
359                // It expects the logs vec to be empty when it's called.
360                #[inline]
361                fn get_logs_and_export<E>(
362                    logs_receiver: &mpsc::Receiver<LogsData>,
363                    exporter: &E,
364                    logs: &mut Vec<LogsData>,
365                    last_export_time: &mut Instant,
366                    current_batch_size: &AtomicUsize,
367                    max_export_size: usize,
368                ) -> OTelSdkResult
369                where
370                    E: LogExporter + Send + Sync + 'static,
371                {
372                    let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
373                    let mut result = OTelSdkResult::Ok(());
374                    let mut total_exported_logs: usize = 0;
375
376                    while target > 0 && total_exported_logs < target {
377                        // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
378                        while let Ok(log) = logs_receiver.try_recv() {
379                            logs.push(log);
380                            if logs.len() == max_export_size {
381                                break;
382                            }
383                        }
384
385                        let count_of_logs = logs.len(); // Count of logs that will be exported
386                        total_exported_logs += count_of_logs;
387
388                        result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting
389
390                        current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
391                    }
392                    result
393                }
394
395                loop {
396                    let remaining_time = config
397                        .scheduled_delay
398                        .checked_sub(last_export_time.elapsed())
399                        .unwrap_or(config.scheduled_delay);
400
401                    match message_receiver.recv_timeout(remaining_time) {
402                        Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
403                            // Reset the export log message sent flag now it has has been processed.
404                            export_log_message_sent.store(false, Ordering::Relaxed);
405
406                            otel_debug!(
407                                name: "BatchLogProcessor.ExportingDueToBatchSize",
408                            );
409
410                            let _ = get_logs_and_export(
411                                &logs_receiver,
412                                &exporter,
413                                &mut logs,
414                                &mut last_export_time,
415                                &current_batch_size,
416                                max_export_batch_size,
417                            );
418                        }
419                        Ok(BatchMessage::ForceFlush(sender)) => {
420                            otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
421                            let result = get_logs_and_export(
422                                &logs_receiver,
423                                &exporter,
424                                &mut logs,
425                                &mut last_export_time,
426                                &current_batch_size,
427                                max_export_batch_size,
428                            );
429                            let _ = sender.send(result);
430                        }
431                        Ok(BatchMessage::Shutdown(sender)) => {
432                            otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
433                            let result = get_logs_and_export(
434                                &logs_receiver,
435                                &exporter,
436                                &mut logs,
437                                &mut last_export_time,
438                                &current_batch_size,
439                                max_export_batch_size,
440                            );
441                            let _ = exporter.shutdown();
442                            let _ = sender.send(result);
443
444                            otel_debug!(
445                                name: "BatchLogProcessor.ThreadExiting",
446                                reason = "ShutdownRequested"
447                            );
448                            //
449                            // break out the loop and return from the current background thread.
450                            //
451                            break;
452                        }
453                        Ok(BatchMessage::SetResource(resource)) => {
454                            exporter.set_resource(&resource);
455                        }
456                        Err(RecvTimeoutError::Timeout) => {
457                            otel_debug!(
458                                name: "BatchLogProcessor.ExportingDueToTimer",
459                            );
460
461                            let _ = get_logs_and_export(
462                                &logs_receiver,
463                                &exporter,
464                                &mut logs,
465                                &mut last_export_time,
466                                &current_batch_size,
467                                max_export_batch_size,
468                            );
469                        }
470                        Err(RecvTimeoutError::Disconnected) => {
471                            // Channel disconnected, only thing to do is break
472                            // out (i.e exit the thread)
473                            otel_debug!(
474                                name: "BatchLogProcessor.ThreadExiting",
475                                reason = "MessageSenderDisconnected"
476                            );
477                            break;
478                        }
479                    }
480                }
481                otel_debug!(
482                    name: "BatchLogProcessor.ThreadStopped"
483                );
484            })
485            .expect("Thread spawn failed."); //TODO: Handle thread spawn failure
486
487        // Return batch processor with link to worker
488        BatchLogProcessor {
489            logs_sender,
490            message_sender,
491            handle: Mutex::new(Some(handle)),
492            forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
493            dropped_logs_count: AtomicUsize::new(0),
494            max_queue_size,
495            export_log_message_sent: Arc::new(AtomicBool::new(false)),
496            current_batch_size,
497            max_export_batch_size,
498        }
499    }
500
501    /// Create a new batch processor builder
502    pub fn builder<E>(exporter: E) -> BatchLogProcessorBuilder<E>
503    where
504        E: LogExporter,
505    {
506        BatchLogProcessorBuilder {
507            exporter,
508            config: Default::default(),
509        }
510    }
511}
512
513#[allow(clippy::vec_box)]
514fn export_batch_sync<E>(
515    exporter: &E,
516    batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
517    last_export_time: &mut Instant,
518) -> OTelSdkResult
519where
520    E: LogExporter + ?Sized,
521{
522    *last_export_time = Instant::now();
523
524    if batch.is_empty() {
525        return OTelSdkResult::Ok(());
526    }
527
528    let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
529    let export_result = futures_executor::block_on(export);
530
531    // Clear the batch vec after exporting
532    batch.clear();
533
534    match export_result {
535        Ok(_) => OTelSdkResult::Ok(()),
536        Err(err) => {
537            otel_error!(
538                name: "BatchLogProcessor.ExportError",
539                error = format!("{}", err)
540            );
541            OTelSdkResult::Err(err)
542        }
543    }
544}
545
546///
547/// A builder for creating [`BatchLogProcessor`] instances.
548///
549#[derive(Debug)]
550pub struct BatchLogProcessorBuilder<E> {
551    exporter: E,
552    config: BatchConfig,
553}
554
555impl<E> BatchLogProcessorBuilder<E>
556where
557    E: LogExporter + 'static,
558{
559    /// Set the BatchConfig for [`BatchLogProcessorBuilder`]
560    pub fn with_batch_config(self, config: BatchConfig) -> Self {
561        BatchLogProcessorBuilder { config, ..self }
562    }
563
564    /// Build a batch processor
565    pub fn build(self) -> BatchLogProcessor {
566        BatchLogProcessor::new(self.exporter, self.config)
567    }
568}
569
570/// Batch log processor configuration.
571/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
572#[derive(Debug)]
573#[allow(dead_code)]
574pub struct BatchConfig {
575    /// The maximum queue size to buffer logs for delayed processing. If the
576    /// queue gets full it drops the logs. The default value of is 2048.
577    pub(crate) max_queue_size: usize,
578
579    /// The delay interval in milliseconds between two consecutive processing
580    /// of batches. The default value is 1 second.
581    pub(crate) scheduled_delay: Duration,
582
583    /// The maximum number of logs to process in a single batch. If there are
584    /// more than one batch worth of logs then it processes multiple batches
585    /// of logs one batch after the other without any delay. The default value
586    /// is 512.
587    pub(crate) max_export_batch_size: usize,
588
589    /// The maximum duration to export a batch of data.
590    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
591    pub(crate) max_export_timeout: Duration,
592}
593
594impl Default for BatchConfig {
595    fn default() -> Self {
596        BatchConfigBuilder::default().build()
597    }
598}
599
600/// A builder for creating [`BatchConfig`] instances.
601#[derive(Debug)]
602pub struct BatchConfigBuilder {
603    max_queue_size: usize,
604    scheduled_delay: Duration,
605    max_export_batch_size: usize,
606    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
607    max_export_timeout: Duration,
608}
609
610impl Default for BatchConfigBuilder {
611    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
612    /// The values are overridden by environment variables if set.
613    /// The supported environment variables are:
614    /// * `OTEL_BLRP_MAX_QUEUE_SIZE`
615    /// * `OTEL_BLRP_SCHEDULE_DELAY`
616    /// * `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
617    /// * `OTEL_BLRP_EXPORT_TIMEOUT`
618    ///
619    /// Note: Programmatic configuration overrides any value set via the environment variable.
620    fn default() -> Self {
621        BatchConfigBuilder {
622            max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
623            scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
624            max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
625            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
626            max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
627        }
628        .init_from_env_vars()
629    }
630}
631
632impl BatchConfigBuilder {
633    /// Set max_queue_size for [`BatchConfigBuilder`].
634    /// It's the maximum queue size to buffer logs for delayed processing.
635    /// If the queue gets full it will drop the logs.
636    /// The default value is 2048.
637    ///
638    /// Corresponding environment variable: `OTEL_BLRP_MAX_QUEUE_SIZE`.
639    ///
640    /// Note: Programmatically setting this will override any value set via the environment variable.
641    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
642        self.max_queue_size = max_queue_size;
643        self
644    }
645
646    /// Set scheduled_delay for [`BatchConfigBuilder`].
647    /// It's the delay interval in milliseconds between two consecutive processing of batches.
648    /// The default value is 1000 milliseconds.
649    ///
650    /// Corresponding environment variable: `OTEL_BLRP_SCHEDULE_DELAY`.
651    ///
652    /// Note: Programmatically setting this will override any value set via the environment variable.
653    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
654        self.scheduled_delay = scheduled_delay;
655        self
656    }
657
658    /// Set max_export_timeout for [`BatchConfigBuilder`].
659    /// It's the maximum duration to export a batch of data.
660    /// The default value is 30000 milliseconds.
661    ///
662    /// Corresponding environment variable: `OTEL_BLRP_EXPORT_TIMEOUT`.
663    ///
664    /// Note: Programmatically setting this will override any value set via the environment variable.
665    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
666    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
667        self.max_export_timeout = max_export_timeout;
668        self
669    }
670
671    /// Set max_export_batch_size for [`BatchConfigBuilder`].
672    /// It's the maximum number of logs to process in a single batch. If there are
673    /// more than one batch worth of logs then it processes multiple batches
674    /// of logs one batch after the other without any delay.
675    /// The default value is 512.
676    ///
677    /// Corresponding environment variable: `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`.
678    ///
679    /// Note: Programmatically setting this will override any value set via the environment variable.
680    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
681        self.max_export_batch_size = max_export_batch_size;
682        self
683    }
684
685    /// Builds a `BatchConfig` enforcing the following invariants:
686    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
687    pub fn build(self) -> BatchConfig {
688        // max export batch size must be less or equal to max queue size.
689        // we set max export batch size to max queue size if it's larger than max queue size.
690        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
691
692        BatchConfig {
693            max_queue_size: self.max_queue_size,
694            scheduled_delay: self.scheduled_delay,
695            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
696            max_export_timeout: self.max_export_timeout,
697            max_export_batch_size,
698        }
699    }
700
701    fn init_from_env_vars(mut self) -> Self {
702        if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
703            .ok()
704            .and_then(|queue_size| usize::from_str(&queue_size).ok())
705        {
706            self.max_queue_size = max_queue_size;
707        }
708
709        if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
710            .ok()
711            .and_then(|batch_size| usize::from_str(&batch_size).ok())
712        {
713            self.max_export_batch_size = max_export_batch_size;
714        }
715
716        if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
717            .ok()
718            .and_then(|delay| u64::from_str(&delay).ok())
719        {
720            self.scheduled_delay = Duration::from_millis(scheduled_delay);
721        }
722
723        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
724        if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
725            .ok()
726            .and_then(|s| u64::from_str(&s).ok())
727        {
728            self.max_export_timeout = Duration::from_millis(max_export_timeout);
729        }
730
731        self
732    }
733}
734
735#[cfg(all(test, feature = "testing", feature = "logs"))]
736mod tests {
737    use super::{
738        BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
739        OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE,
740        OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
741        OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
742    };
743    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
744    use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
745    use crate::logs::log_processor::tests::MockLogExporter;
746    use crate::logs::SdkLogRecord;
747    use crate::{
748        logs::{InMemoryLogExporter, InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider},
749        Resource,
750    };
751    use opentelemetry::InstrumentationScope;
752    use opentelemetry::KeyValue;
753    use std::sync::{Arc, Mutex};
754    use std::time::Duration;
755
756    #[test]
757    fn test_default_const_values() {
758        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
759        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000);
760        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
761        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
762        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
763        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000);
764        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
765        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
766        assert_eq!(
767            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
768            "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
769        );
770        assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
771    }
772
773    #[test]
774    fn test_default_batch_config_adheres_to_specification() {
775        // The following environment variables are expected to be unset so that their default values are used.
776        let env_vars = vec![
777            OTEL_BLRP_SCHEDULE_DELAY,
778            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
779            OTEL_BLRP_EXPORT_TIMEOUT,
780            OTEL_BLRP_MAX_QUEUE_SIZE,
781            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
782        ];
783
784        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
785
786        assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
787        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
788        assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
789        assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
790        assert_eq!(
791            config.max_export_batch_size,
792            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
793        );
794    }
795
796    #[test]
797    fn test_code_based_config_overrides_env_vars() {
798        let env_vars = vec![
799            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
800            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
801            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
802        ];
803
804        temp_env::with_vars(env_vars, || {
805            let config = BatchConfigBuilder::default()
806                .with_max_queue_size(2048)
807                .with_scheduled_delay(Duration::from_millis(1000))
808                .with_max_export_batch_size(512)
809                .build();
810
811            assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
812            assert_eq!(config.max_queue_size, 2048);
813            assert_eq!(config.max_export_batch_size, 512);
814        });
815    }
816
817    #[test]
818    fn test_batch_config_configurable_by_env_vars() {
819        let env_vars = vec![
820            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
821            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
822            (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
823            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
824            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
825        ];
826
827        let config = temp_env::with_vars(env_vars, BatchConfig::default);
828
829        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
830        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
831        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
832        assert_eq!(config.max_queue_size, 4096);
833        assert_eq!(config.max_export_batch_size, 1024);
834    }
835
836    #[test]
837    fn test_batch_config_max_export_batch_size_validation() {
838        let env_vars = vec![
839            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
840            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
841        ];
842
843        let config = temp_env::with_vars(env_vars, BatchConfig::default);
844
845        assert_eq!(config.max_queue_size, 256);
846        assert_eq!(config.max_export_batch_size, 256);
847        assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
848        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
849        assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
850    }
851
852    #[test]
853    fn test_batch_config_with_fields() {
854        let batch_builder = BatchConfigBuilder::default()
855            .with_max_export_batch_size(1)
856            .with_scheduled_delay(Duration::from_millis(2))
857            .with_max_queue_size(4);
858
859        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
860        let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
861        let batch = batch_builder.build();
862
863        assert_eq!(batch.max_export_batch_size, 1);
864        assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
865        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
866        assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
867        assert_eq!(batch.max_queue_size, 4);
868    }
869
870    #[test]
871    fn test_build_batch_log_processor_builder() {
872        let mut env_vars = vec![
873            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
874            (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
875            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
876            (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
877        ];
878        temp_env::with_vars(env_vars.clone(), || {
879            let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
880
881            assert_eq!(builder.config.max_export_batch_size, 500);
882            assert_eq!(
883                builder.config.scheduled_delay,
884                OTEL_BLRP_SCHEDULE_DELAY_DEFAULT
885            );
886            assert_eq!(
887                builder.config.max_queue_size,
888                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
889            );
890
891            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
892            assert_eq!(
893                builder.config.max_export_timeout,
894                Duration::from_millis(2046)
895            );
896        });
897
898        env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
899
900        temp_env::with_vars(env_vars, || {
901            let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
902            assert_eq!(builder.config.max_export_batch_size, 120);
903            assert_eq!(builder.config.max_queue_size, 120);
904        });
905    }
906
907    #[test]
908    fn test_build_batch_log_processor_builder_with_custom_config() {
909        let expected = BatchConfigBuilder::default()
910            .with_max_export_batch_size(1)
911            .with_scheduled_delay(Duration::from_millis(2))
912            .with_max_queue_size(4)
913            .build();
914
915        let builder =
916            BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
917
918        let actual = &builder.config;
919        assert_eq!(actual.max_export_batch_size, 1);
920        assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
921        assert_eq!(actual.max_queue_size, 4);
922    }
923
924    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
925    async fn test_set_resource_batch_processor() {
926        let exporter = MockLogExporter {
927            resource: Arc::new(Mutex::new(None)),
928        };
929        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
930        let provider = SdkLoggerProvider::builder()
931            .with_log_processor(processor)
932            .with_resource(
933                Resource::builder_empty()
934                    .with_attributes([
935                        KeyValue::new("k1", "v1"),
936                        KeyValue::new("k2", "v3"),
937                        KeyValue::new("k3", "v3"),
938                        KeyValue::new("k4", "v4"),
939                        KeyValue::new("k5", "v5"),
940                    ])
941                    .build(),
942            )
943            .build();
944
945        provider.force_flush().unwrap();
946
947        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
948        let _ = provider.shutdown();
949    }
950
951    #[tokio::test(flavor = "multi_thread")]
952    async fn test_batch_shutdown() {
953        // assert we will receive an error
954        // setup
955        let exporter = InMemoryLogExporterBuilder::default()
956            .keep_records_on_shutdown()
957            .build();
958        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
959
960        let mut record = SdkLogRecord::new();
961        let instrumentation = InstrumentationScope::default();
962
963        processor.emit(&mut record, &instrumentation);
964        processor.force_flush().unwrap();
965        processor.shutdown().unwrap();
966        // todo: expect to see errors here. How should we assert this?
967        processor.emit(&mut record, &instrumentation);
968        assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
969        assert!(exporter.is_shutdown_called());
970    }
971
972    #[tokio::test(flavor = "current_thread")]
973    async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
974        let exporter = InMemoryLogExporterBuilder::default().build();
975        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
976
977        processor.shutdown().unwrap();
978    }
979
980    #[tokio::test(flavor = "current_thread")]
981    async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
982        let exporter = InMemoryLogExporterBuilder::default().build();
983        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
984        processor.shutdown().unwrap();
985    }
986
987    #[tokio::test(flavor = "multi_thread")]
988    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
989        let exporter = InMemoryLogExporterBuilder::default().build();
990        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
991        processor.shutdown().unwrap();
992    }
993
994    #[tokio::test(flavor = "multi_thread")]
995    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
996        let exporter = InMemoryLogExporterBuilder::default().build();
997        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
998        processor.shutdown().unwrap();
999    }
1000}