opentelemetry_instrumentation_actix_web/middleware/
metrics.rs

1//! # Metrics Middleware
2
3use actix_http::{
4    body::{BodySize, MessageBody},
5    header::CONTENT_LENGTH,
6};
7use actix_web::dev;
8use futures_util::future::{self, FutureExt as _, LocalBoxFuture};
9use opentelemetry::{
10    global,
11    metrics::{Histogram, Meter, MeterProvider, UpDownCounter},
12    KeyValue,
13};
14use std::borrow::Cow;
15use std::{sync::Arc, time::SystemTime};
16
17use super::get_scope;
18use crate::util::metrics_attributes_from_request;
19use crate::RouteFormatter;
20
21// Follows the experimental semantic conventions for HTTP metrics:
22// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md
23use opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE;
24
25const HTTP_SERVER_DURATION: &str = "http.server.duration";
26const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
27const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
28const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";
29
30/// Records http server metrics
31///
32/// See the [spec] for details.
33///
34/// [spec]: https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-metrics.md#http-server
35#[derive(Clone, Debug)]
36struct Metrics {
37    http_server_duration: Histogram<f64>,
38    http_server_active_requests: UpDownCounter<i64>,
39    http_server_request_size: Histogram<u64>,
40    http_server_response_size: Histogram<u64>,
41}
42
43impl Metrics {
44    /// Create a new [`RequestMetrics`]
45    fn new(meter: Meter) -> Self {
46        let http_server_duration = meter
47            .f64_histogram(HTTP_SERVER_DURATION)
48            .with_description("Measures the duration of inbound HTTP requests.")
49            .with_unit("s")
50            .build();
51
52        let http_server_active_requests = meter
53            .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
54            .with_description(
55                "Measures the number of concurrent HTTP requests that are currently in-flight.",
56            )
57            .build();
58
59        let http_server_request_size = meter
60            .u64_histogram(HTTP_SERVER_REQUEST_SIZE)
61            .with_description("Measures the size of HTTP request messages (compressed).")
62            .with_unit("By")
63            .build();
64
65        let http_server_response_size = meter
66            .u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
67            .with_description("Measures the size of HTTP response messages (compressed).")
68            .with_unit("By")
69            .build();
70
71        Metrics {
72            http_server_active_requests,
73            http_server_duration,
74            http_server_request_size,
75            http_server_response_size,
76        }
77    }
78}
79
80type MetricsAttrsFromReqFn = fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>;
81
82/// Builder for [RequestMetrics]
83#[derive(Clone, Debug, Default)]
84pub struct RequestMetricsBuilder {
85    route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
86    meter: Option<Meter>,
87    metric_attrs_from_req: Option<MetricsAttrsFromReqFn>,
88}
89
90impl RequestMetricsBuilder {
91    /// Create a new `RequestMetricsBuilder`
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Add a route formatter to customize metrics match patterns
97    pub fn with_route_formatter<R>(mut self, route_formatter: R) -> Self
98    where
99        R: RouteFormatter + Send + Sync + 'static,
100    {
101        self.route_formatter = Some(Arc::new(route_formatter));
102        self
103    }
104
105    /// Set the meter provider this middleware should use to construct meters
106    pub fn with_meter_provider(mut self, meter_provider: impl MeterProvider) -> Self {
107        self.meter = Some(meter_provider.meter_with_scope(get_scope()));
108        self
109    }
110
111    /// Set a metric attrs function that the middleware will use to create metric attributes
112    pub fn with_metric_attrs_from_req(
113        mut self,
114        metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
115    ) -> Self {
116        self.metric_attrs_from_req = Some(metric_attrs_from_req);
117        self
118    }
119
120    /// Build the `RequestMetrics` middleware
121    pub fn build(self) -> RequestMetrics {
122        let meter = self
123            .meter
124            .unwrap_or_else(|| global::meter_provider().meter_with_scope(get_scope()));
125
126        RequestMetrics {
127            route_formatter: self.route_formatter,
128            metrics: Arc::new(Metrics::new(meter)),
129            metric_attrs_from_req: self
130                .metric_attrs_from_req
131                .unwrap_or(metrics_attributes_from_request),
132        }
133    }
134}
135
136/// Request metrics tracking
137///
138/// For more information on how to configure Prometheus with [OTLP](https://prometheus.io/docs/guides/opentelemetry)
139///
140/// # Examples
141///
142/// ```no_run
143/// use actix_web::{http, web, App, HttpRequest, HttpResponse, HttpServer, Responder};
144/// use opentelemetry::{global, KeyValue};
145/// use opentelemetry_instrumentation_actix_web::{RequestMetrics, RequestTracing};
146/// use opentelemetry_sdk::{metrics::SdkMeterProvider, Resource};
147///
148/// async fn manual_hello() -> impl Responder {
149///     HttpResponse::Ok().body("Hey there!")
150/// }
151///
152/// #[actix_web::main]
153/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
154///     // Initialize STDOUT exporter
155///     let exporter = opentelemetry_stdout::MetricExporter::default();
156///
157///     // set up your meter provider with your exporter(s)
158///     let provider = SdkMeterProvider::builder()
159///         .with_periodic_exporter(exporter)
160///         .with_resource(
161///             // recommended attributes
162///             Resource::builder_empty()
163///                 .with_attribute(KeyValue::new("service.name", "my_app"))
164///                 .build(),
165///         )
166///         .build();
167///     global::set_meter_provider(provider.clone());
168///
169///     // Run actix server, metrics will be exported periodically
170///     HttpServer::new(move || {
171///         App::new()
172///             .wrap(RequestTracing::new())
173///             .wrap(RequestMetrics::default())
174///             .route("/hey", web::get().to(manual_hello))
175///         })
176///         .bind("localhost:8080")?
177///         .run()
178///         .await?;
179///
180///     //Shutdown the meter provider. This will trigger an export of all metrics.
181///     provider.shutdown()?;
182///
183///     Ok(())
184/// }
185/// ```
186#[derive(Clone, Debug)]
187pub struct RequestMetrics {
188    route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
189    metrics: Arc<Metrics>,
190    metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
191}
192
193impl RequestMetrics {
194    /// Create a builder to configure this middleware
195    pub fn builder() -> RequestMetricsBuilder {
196        RequestMetricsBuilder::new()
197    }
198}
199
200impl Default for RequestMetrics {
201    fn default() -> Self {
202        RequestMetrics::builder().build()
203    }
204}
205
206impl<S, B> dev::Transform<S, dev::ServiceRequest> for RequestMetrics
207where
208    S: dev::Service<
209        dev::ServiceRequest,
210        Response = dev::ServiceResponse<B>,
211        Error = actix_web::Error,
212    >,
213    S::Future: 'static,
214    B: MessageBody + 'static,
215{
216    type Response = dev::ServiceResponse<B>;
217    type Error = actix_web::Error;
218    type Transform = RequestMetricsMiddleware<S>;
219    type InitError = ();
220    type Future = future::Ready<Result<Self::Transform, Self::InitError>>;
221
222    fn new_transform(&self, service: S) -> Self::Future {
223        let service = RequestMetricsMiddleware {
224            service,
225            metrics: self.metrics.clone(),
226            route_formatter: self.route_formatter.clone(),
227            metric_attrs_from_req: self.metric_attrs_from_req,
228        };
229
230        future::ok(service)
231    }
232}
233
234/// Request metrics middleware
235#[allow(missing_debug_implementations)]
236pub struct RequestMetricsMiddleware<S> {
237    service: S,
238    metrics: Arc<Metrics>,
239    route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
240    metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
241}
242
243impl<S, B> dev::Service<dev::ServiceRequest> for RequestMetricsMiddleware<S>
244where
245    S: dev::Service<
246        dev::ServiceRequest,
247        Response = dev::ServiceResponse<B>,
248        Error = actix_web::Error,
249    >,
250    S::Future: 'static,
251    B: MessageBody + 'static,
252{
253    type Response = dev::ServiceResponse<B>;
254    type Error = actix_web::Error;
255    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
256
257    dev::forward_ready!(service);
258
259    fn call(&self, req: dev::ServiceRequest) -> Self::Future {
260        let timer = SystemTime::now();
261
262        let mut http_target = req
263            .match_pattern()
264            .map(Cow::Owned)
265            .unwrap_or(Cow::Borrowed("default"));
266
267        if let Some(formatter) = &self.route_formatter {
268            http_target = Cow::Owned(formatter.format(&http_target));
269        }
270
271        let mut attributes = (self.metric_attrs_from_req)(&req, http_target);
272        self.metrics.http_server_active_requests.add(1, &attributes);
273
274        let content_length = req
275            .headers()
276            .get(CONTENT_LENGTH)
277            .and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok()))
278            .unwrap_or(0);
279        self.metrics
280            .http_server_request_size
281            .record(content_length, &attributes);
282
283        let request_metrics = self.metrics.clone();
284        Box::pin(self.service.call(req).map(move |res| {
285            request_metrics
286                .http_server_active_requests
287                .add(-1, &attributes);
288
289            // Ignore actix errors for metrics
290            if let Ok(res) = res {
291                attributes.push(KeyValue::new(
292                    HTTP_RESPONSE_STATUS_CODE,
293                    res.status().as_u16() as i64,
294                ));
295                let response_size = match res.response().body().size() {
296                    BodySize::Sized(size) => size,
297                    _ => 0,
298                };
299                request_metrics
300                    .http_server_response_size
301                    .record(response_size, &attributes);
302
303                request_metrics.http_server_duration.record(
304                    timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
305                    &attributes,
306                );
307
308                Ok(res)
309            } else {
310                res
311            }
312        }))
313    }
314}