opentelemetry_otlp/exporter/
mod.rs

1//! OTLP exporter builder and configurations.
2//!
3//! OTLP supports sending data via different protocols and formats.
4
5#[cfg(any(feature = "http-proto", feature = "http-json"))]
6use crate::exporter::http::HttpExporterBuilder;
7#[cfg(feature = "grpc-tonic")]
8use crate::exporter::tonic::TonicExporterBuilder;
9use crate::Protocol;
10#[cfg(feature = "serialize")]
11use serde::{Deserialize, Serialize};
12use std::fmt::{Display, Formatter};
13use std::str::FromStr;
14use std::time::Duration;
15use thiserror::Error;
16
17/// Target to which the exporter is going to send signals, defaults to https://localhost:4317.
18/// Learn about the relationship between this constant and metrics/spans/logs at
19/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#endpoint-urls-for-otlphttp>
20pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
21/// Default target to which the exporter is going to send signals.
22pub const OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT: &str = OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT;
23/// Key-value pairs to be used as headers associated with gRPC or HTTP requests
24/// Example: `k1=v1,k2=v2`
25/// Note: as of now, this is only supported for HTTP requests.
26pub const OTEL_EXPORTER_OTLP_HEADERS: &str = "OTEL_EXPORTER_OTLP_HEADERS";
27/// Protocol the exporter will use. Either `http/protobuf` or `grpc`.
28pub const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
29/// Compression algorithm to use, defaults to none.
30pub const OTEL_EXPORTER_OTLP_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_COMPRESSION";
31
32#[cfg(feature = "http-json")]
33/// Default protocol, using http-json.
34pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON;
35#[cfg(all(feature = "http-proto", not(feature = "http-json")))]
36/// Default protocol, using http-proto.
37pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF;
38#[cfg(all(
39    feature = "grpc-tonic",
40    not(any(feature = "http-proto", feature = "http-json"))
41))]
42/// Default protocol, using grpc
43pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = OTEL_EXPORTER_OTLP_PROTOCOL_GRPC;
44
45#[cfg(not(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json")))]
46/// Default protocol if no features are enabled.
47pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = "";
48
49const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf";
50const OTEL_EXPORTER_OTLP_PROTOCOL_GRPC: &str = "grpc";
51const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON: &str = "http/json";
52
53/// Max waiting time for the backend to process each signal batch, defaults to 10 seconds.
54pub const OTEL_EXPORTER_OTLP_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TIMEOUT";
55/// Default max waiting time for the backend to process each signal batch.
56pub const OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT: Duration = Duration::from_millis(10000);
57
58// Endpoints per protocol https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md
59#[cfg(feature = "grpc-tonic")]
60const OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT: &str = "http://localhost:4317";
61const OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT: &str = "http://localhost:4318";
62
63#[cfg(any(feature = "http-proto", feature = "http-json"))]
64pub(crate) mod http;
65#[cfg(feature = "grpc-tonic")]
66pub(crate) mod tonic;
67
68/// Configuration for the OTLP exporter.
69#[derive(Debug)]
70pub struct ExportConfig {
71    /// The address of the OTLP collector.
72    /// Default address will be used based on the protocol.
73    ///
74    /// Note: Programmatically setting this will override any value set via the environment variable.
75    pub endpoint: Option<String>,
76
77    /// The protocol to use when communicating with the collector.
78    pub protocol: Protocol,
79
80    /// The timeout to the collector.
81    /// The default value is 10 seconds.
82    ///
83    /// Note: Programmatically setting this will override any value set via the environment variable.
84    pub timeout: Option<Duration>,
85}
86
87impl Default for ExportConfig {
88    fn default() -> Self {
89        let protocol = default_protocol();
90
91        Self {
92            endpoint: None,
93            // don't use default_endpoint(protocol) here otherwise we
94            // won't know if user provided a value
95            protocol,
96            timeout: None,
97        }
98    }
99}
100
101#[derive(Error, Debug)]
102/// Errors that can occur while building an exporter.
103// TODO: Refine and polish this.
104// Non-exhaustive to allow for future expansion without breaking changes.
105// This could be refined after polishing and finalizing the errors.
106#[non_exhaustive]
107pub enum ExporterBuildError {
108    /// Spawning a new thread failed.
109    #[error("Spawning a new thread failed. Unable to create Reqwest-Blocking client.")]
110    ThreadSpawnFailed,
111
112    /// Feature required to use the specified compression algorithm.
113    #[cfg(any(not(feature = "gzip-tonic"), not(feature = "zstd-tonic")))]
114    #[error("feature '{0}' is required to use the compression algorithm '{1}'")]
115    FeatureRequiredForCompressionAlgorithm(&'static str, Compression),
116
117    /// No Http client specified.
118    #[error("no http client specified")]
119    NoHttpClient,
120
121    /// Unsupported compression algorithm.
122    #[error("unsupported compression algorithm '{0}'")]
123    UnsupportedCompressionAlgorithm(String),
124
125    /// Invalid URI.
126    #[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
127    #[error("invalid URI {0}. Reason {1}")]
128    InvalidUri(String, String),
129
130    /// Failed due to an internal error.
131    /// The error message is intended for logging purposes only and should not
132    /// be used to make programmatic decisions. It is implementation-specific
133    /// and subject to change without notice. Consumers of this error should not
134    /// rely on its content beyond logging.
135    #[error("Reason: {0}")]
136    InternalFailure(String),
137}
138
139/// The compression algorithm to use when sending data.
140#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
141#[derive(Clone, Copy, Debug, Eq, PartialEq)]
142pub enum Compression {
143    /// Compresses data using gzip.
144    Gzip,
145    /// Compresses data using zstd.
146    Zstd,
147}
148
149impl Display for Compression {
150    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
151        match self {
152            Compression::Gzip => write!(f, "gzip"),
153            Compression::Zstd => write!(f, "zstd"),
154        }
155    }
156}
157
158impl FromStr for Compression {
159    type Err = ExporterBuildError;
160
161    fn from_str(s: &str) -> Result<Self, Self::Err> {
162        match s {
163            "gzip" => Ok(Compression::Gzip),
164            "zstd" => Ok(Compression::Zstd),
165            _ => Err(ExporterBuildError::UnsupportedCompressionAlgorithm(
166                s.to_string(),
167            )),
168        }
169    }
170}
171
172/// Resolve compression from environment variables with priority:
173/// 1. Provided config value
174/// 2. Signal-specific environment variable
175/// 3. Generic OTEL_EXPORTER_OTLP_COMPRESSION
176/// 4. None (default)
177#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
178fn resolve_compression_from_env(
179    config_compression: Option<Compression>,
180    signal_env_var: &str,
181) -> Result<Option<Compression>, ExporterBuildError> {
182    if let Some(compression) = config_compression {
183        Ok(Some(compression))
184    } else if let Ok(compression) = std::env::var(signal_env_var) {
185        Ok(Some(compression.parse::<Compression>()?))
186    } else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
187        Ok(Some(compression.parse::<Compression>()?))
188    } else {
189        Ok(None)
190    }
191}
192
193/// default protocol based on enabled features
194fn default_protocol() -> Protocol {
195    match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT {
196        OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF => Protocol::HttpBinary,
197        OTEL_EXPORTER_OTLP_PROTOCOL_GRPC => Protocol::Grpc,
198        OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON => Protocol::HttpJson,
199        _ => Protocol::HttpBinary,
200    }
201}
202
203/// default user-agent headers
204#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
205fn default_headers() -> std::collections::HashMap<String, String> {
206    let mut headers = std::collections::HashMap::new();
207    headers.insert(
208        "User-Agent".to_string(),
209        format!("OTel-OTLP-Exporter-Rust/{}", env!("CARGO_PKG_VERSION")),
210    );
211    headers
212}
213
214/// Provide access to the [ExportConfig] field within the exporter builders.
215pub trait HasExportConfig {
216    /// Return a mutable reference to the [ExportConfig] within the exporter builders.
217    fn export_config(&mut self) -> &mut ExportConfig;
218}
219
220/// Provide [ExportConfig] access to the [TonicExporterBuilder].
221#[cfg(feature = "grpc-tonic")]
222impl HasExportConfig for TonicExporterBuilder {
223    fn export_config(&mut self) -> &mut ExportConfig {
224        &mut self.exporter_config
225    }
226}
227
228/// Provide [ExportConfig] access to the [HttpExporterBuilder].
229#[cfg(any(feature = "http-proto", feature = "http-json"))]
230impl HasExportConfig for HttpExporterBuilder {
231    fn export_config(&mut self) -> &mut ExportConfig {
232        &mut self.exporter_config
233    }
234}
235
236/// Expose methods to override [ExportConfig].
237///
238/// This trait will be implemented for every struct that implemented [`HasExportConfig`] trait.
239///
240/// ## Examples
241/// ```
242/// # #[cfg(all(feature = "trace", feature = "grpc-tonic"))]
243/// # {
244/// use crate::opentelemetry_otlp::WithExportConfig;
245/// let exporter_builder = opentelemetry_otlp::SpanExporter::builder()
246///     .with_tonic()
247///     .with_endpoint("http://localhost:7201");
248/// # }
249/// ```
250pub trait WithExportConfig {
251    /// Set the address of the OTLP collector. If not set or set to empty string, the default address is used.
252    ///
253    /// Note: Programmatically setting this will override any value set via the environment variable.
254    fn with_endpoint<T: Into<String>>(self, endpoint: T) -> Self;
255    /// Set the protocol to use when communicating with the collector.
256    ///
257    /// Note that protocols that are not supported by exporters will be ignored. The exporter
258    /// will use default protocol in this case.
259    ///
260    /// ## Note
261    /// All exporters in this crate only support one protocol, thus choosing the protocol is a no-op at the moment.
262    fn with_protocol(self, protocol: Protocol) -> Self;
263    /// Set the timeout to the collector.
264    ///
265    /// Note: Programmatically setting this will override any value set via the environment variable.
266    fn with_timeout(self, timeout: Duration) -> Self;
267    /// Set export config. This will override all previous configurations.
268    ///
269    /// Note: Programmatically setting this will override any value set via environment variables.
270    fn with_export_config(self, export_config: ExportConfig) -> Self;
271}
272
273impl<B: HasExportConfig> WithExportConfig for B {
274    fn with_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
275        self.export_config().endpoint = Some(endpoint.into());
276        self
277    }
278
279    fn with_protocol(mut self, protocol: Protocol) -> Self {
280        self.export_config().protocol = protocol;
281        self
282    }
283
284    fn with_timeout(mut self, timeout: Duration) -> Self {
285        self.export_config().timeout = Some(timeout);
286        self
287    }
288
289    fn with_export_config(mut self, exporter_config: ExportConfig) -> Self {
290        self.export_config().endpoint = exporter_config.endpoint;
291        self.export_config().protocol = exporter_config.protocol;
292        self.export_config().timeout = exporter_config.timeout;
293        self
294    }
295}
296
297#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
298fn resolve_timeout(signal_timeout_var: &str, provided_timeout: Option<&Duration>) -> Duration {
299    // programmatic configuration overrides any value set via environment variables
300    if let Some(timeout) = provided_timeout {
301        *timeout
302    } else if let Some(timeout) = std::env::var(signal_timeout_var)
303        .ok()
304        .and_then(|s| s.parse().ok())
305    {
306        // per signal env var is not modified
307        Duration::from_millis(timeout)
308    } else if let Some(timeout) = std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT)
309        .ok()
310        .and_then(|s| s.parse().ok())
311    {
312        // if signal env var is not set, then we check if the OTEL_EXPORTER_OTLP_TIMEOUT env var is set
313        Duration::from_millis(timeout)
314    } else {
315        OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
316    }
317}
318
319#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
320fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, String)> {
321    value
322        .split_terminator(',')
323        .map(str::trim)
324        .filter_map(parse_header_key_value_string)
325}
326
327#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
328fn url_decode(value: &str) -> Option<String> {
329    let mut result = String::with_capacity(value.len());
330    let mut chars_to_decode = Vec::<u8>::new();
331    let mut all_chars = value.chars();
332
333    loop {
334        let ch = all_chars.next();
335
336        if ch.is_some() && ch.unwrap() == '%' {
337            chars_to_decode.push(
338                u8::from_str_radix(&format!("{}{}", all_chars.next()?, all_chars.next()?), 16)
339                    .ok()?,
340            );
341            continue;
342        }
343
344        if !chars_to_decode.is_empty() {
345            result.push_str(std::str::from_utf8(&chars_to_decode).ok()?);
346            chars_to_decode.clear();
347        }
348
349        if let Some(c) = ch {
350            result.push(c);
351        } else {
352            return Some(result);
353        }
354    }
355}
356
357#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
358fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, String)> {
359    key_value_string
360        .split_once('=')
361        .map(|(key, value)| {
362            (
363                key.trim(),
364                url_decode(value.trim()).unwrap_or(value.to_string()),
365            )
366        })
367        .filter(|(key, value)| !key.is_empty() && !value.is_empty())
368}
369
370#[cfg(test)]
371#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
372mod tests {
373    pub(crate) fn run_env_test<T, F>(env_vars: T, f: F)
374    where
375        F: FnOnce(),
376        T: Into<Vec<(&'static str, &'static str)>>,
377    {
378        temp_env::with_vars(
379            env_vars
380                .into()
381                .iter()
382                .map(|&(k, v)| (k, Some(v)))
383                .collect::<Vec<(&'static str, Option<&'static str>)>>(),
384            f,
385        )
386    }
387
388    #[cfg(any(feature = "http-proto", feature = "http-json"))]
389    #[test]
390    fn test_default_http_endpoint() {
391        let exporter_builder = crate::HttpExporterBuilder::default();
392
393        assert_eq!(exporter_builder.exporter_config.endpoint, None);
394    }
395
396    #[cfg(feature = "logs")]
397    #[cfg(any(feature = "http-proto", feature = "http-json"))]
398    #[test]
399    fn export_builder_error_invalid_http_endpoint() {
400        use std::time::Duration;
401
402        use crate::{ExportConfig, LogExporter, Protocol, WithExportConfig};
403
404        let ex_config = ExportConfig {
405            endpoint: Some("invalid_uri/something".to_string()),
406            protocol: Protocol::HttpBinary,
407            timeout: Some(Duration::from_secs(10)),
408        };
409
410        let exporter_result = LogExporter::builder()
411            .with_http()
412            .with_export_config(ex_config)
413            .build();
414
415        assert!(
416            matches!(
417                exporter_result,
418                Err(crate::exporter::ExporterBuildError::InvalidUri(_, _))
419            ),
420            "Expected InvalidUri error, but got {exporter_result:?}"
421        );
422    }
423
424    #[cfg(feature = "grpc-tonic")]
425    #[tokio::test]
426    async fn export_builder_error_invalid_grpc_endpoint() {
427        use std::time::Duration;
428
429        use crate::{ExportConfig, LogExporter, Protocol, WithExportConfig};
430
431        let ex_config = ExportConfig {
432            endpoint: Some("invalid_uri/something".to_string()),
433            protocol: Protocol::Grpc,
434            timeout: Some(Duration::from_secs(10)),
435        };
436
437        let exporter_result = LogExporter::builder()
438            .with_tonic()
439            .with_export_config(ex_config)
440            .build();
441
442        assert!(matches!(
443            exporter_result,
444            Err(crate::exporter::ExporterBuildError::InvalidUri(_, _))
445        ));
446    }
447
448    #[cfg(feature = "grpc-tonic")]
449    #[test]
450    fn test_default_tonic_endpoint() {
451        let exporter_builder = crate::TonicExporterBuilder::default();
452
453        assert_eq!(exporter_builder.exporter_config.endpoint, None);
454    }
455
456    #[test]
457    fn test_default_protocol() {
458        #[cfg(all(
459            feature = "http-json",
460            not(any(feature = "grpc-tonic", feature = "http-proto"))
461        ))]
462        {
463            assert_eq!(
464                crate::exporter::default_protocol(),
465                crate::Protocol::HttpJson
466            );
467        }
468
469        #[cfg(all(
470            feature = "http-proto",
471            not(any(feature = "grpc-tonic", feature = "http-json"))
472        ))]
473        {
474            assert_eq!(
475                crate::exporter::default_protocol(),
476                crate::Protocol::HttpBinary
477            );
478        }
479
480        #[cfg(all(
481            feature = "grpc-tonic",
482            not(any(feature = "http-proto", feature = "http-json"))
483        ))]
484        {
485            assert_eq!(crate::exporter::default_protocol(), crate::Protocol::Grpc);
486        }
487    }
488
489    #[test]
490    fn test_url_decode() {
491        let test_cases = vec![
492            // Format: (encoded, expected_decoded)
493            ("v%201", Some("v 1")),
494            ("v 1", Some("v 1")),
495            ("%C3%B6%C3%A0%C2%A7%C3%96abcd%C3%84", Some("öà§ÖabcdÄ")),
496            ("v%XX1", None),
497        ];
498
499        for (encoded, expected_decoded) in test_cases {
500            assert_eq!(
501                super::url_decode(encoded),
502                expected_decoded.map(|v| v.to_string()),
503            )
504        }
505    }
506
507    #[test]
508    fn test_parse_header_string() {
509        let test_cases = vec![
510            // Format: (input_str, expected_headers)
511            ("k1=v1", vec![("k1", "v1")]),
512            ("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
513            ("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
514            ("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
515        ];
516
517        for (input_str, expected_headers) in test_cases {
518            assert_eq!(
519                super::parse_header_string(input_str).collect::<Vec<_>>(),
520                expected_headers
521                    .into_iter()
522                    .map(|(k, v)| (k, v.to_string()))
523                    .collect::<Vec<_>>(),
524            )
525        }
526    }
527
528    #[test]
529    fn test_parse_header_key_value_string() {
530        let test_cases = vec![
531            // Format: (input_str, expected_header)
532            ("k1=v1", Some(("k1", "v1"))),
533            (
534                "Authentication=Basic AAA",
535                Some(("Authentication", "Basic AAA")),
536            ),
537            (
538                "Authentication=Basic%20AAA",
539                Some(("Authentication", "Basic AAA")),
540            ),
541            ("k1=%XX", Some(("k1", "%XX"))),
542            ("", None),
543            ("=v1", None),
544            ("k1=", None),
545        ];
546
547        for (input_str, expected_headers) in test_cases {
548            assert_eq!(
549                super::parse_header_key_value_string(input_str),
550                expected_headers.map(|(k, v)| (k, v.to_string())),
551            )
552        }
553    }
554
555    #[test]
556    fn test_priority_of_signal_env_over_generic_env_for_timeout() {
557        run_env_test(
558            vec![
559                (crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, "3000"),
560                (super::OTEL_EXPORTER_OTLP_TIMEOUT, "2000"),
561            ],
562            || {
563                let timeout =
564                    super::resolve_timeout(crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, None);
565                assert_eq!(timeout.as_millis(), 3000);
566            },
567        );
568    }
569
570    #[test]
571    fn test_priority_of_code_based_config_over_envs_for_timeout() {
572        run_env_test(
573            vec![
574                (crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, "3000"),
575                (super::OTEL_EXPORTER_OTLP_TIMEOUT, "2000"),
576            ],
577            || {
578                let timeout = super::resolve_timeout(
579                    crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
580                    Some(&std::time::Duration::from_millis(1000)),
581                );
582                assert_eq!(timeout.as_millis(), 1000);
583            },
584        );
585    }
586
587    #[test]
588    fn test_use_default_when_others_missing_for_timeout() {
589        run_env_test(vec![], || {
590            let timeout = super::resolve_timeout(crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, None);
591            assert_eq!(timeout.as_millis(), 10_000);
592        });
593    }
594}