opentelemetry_instrumentation_actix_web/middleware/
metrics.rs1use actix_http::{
4 body::{BodySize, MessageBody},
5 header::CONTENT_LENGTH,
6};
7use actix_web::dev;
8use futures_util::future::{self, FutureExt as _, LocalBoxFuture};
9use opentelemetry::{
10 global,
11 metrics::{Histogram, Meter, MeterProvider, UpDownCounter},
12 KeyValue,
13};
14use std::borrow::Cow;
15use std::{sync::Arc, time::SystemTime};
16
17use super::get_scope;
18use crate::util::metrics_attributes_from_request;
19use crate::RouteFormatter;
20
21use opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE;
24
25const HTTP_SERVER_DURATION: &str = "http.server.duration";
26const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
27const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
28const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";
29
30#[derive(Clone, Debug)]
36struct Metrics {
37 http_server_duration: Histogram<f64>,
38 http_server_active_requests: UpDownCounter<i64>,
39 http_server_request_size: Histogram<u64>,
40 http_server_response_size: Histogram<u64>,
41}
42
43impl Metrics {
44 fn new(meter: Meter) -> Self {
46 let http_server_duration = meter
47 .f64_histogram(HTTP_SERVER_DURATION)
48 .with_description("Measures the duration of inbound HTTP requests.")
49 .with_unit("s")
50 .build();
51
52 let http_server_active_requests = meter
53 .i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
54 .with_description(
55 "Measures the number of concurrent HTTP requests that are currently in-flight.",
56 )
57 .build();
58
59 let http_server_request_size = meter
60 .u64_histogram(HTTP_SERVER_REQUEST_SIZE)
61 .with_description("Measures the size of HTTP request messages (compressed).")
62 .with_unit("By")
63 .build();
64
65 let http_server_response_size = meter
66 .u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
67 .with_description("Measures the size of HTTP response messages (compressed).")
68 .with_unit("By")
69 .build();
70
71 Metrics {
72 http_server_active_requests,
73 http_server_duration,
74 http_server_request_size,
75 http_server_response_size,
76 }
77 }
78}
79
80type MetricsAttrsFromReqFn = fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>;
81
82#[derive(Clone, Debug, Default)]
84pub struct RequestMetricsBuilder {
85 route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
86 meter: Option<Meter>,
87 metric_attrs_from_req: Option<MetricsAttrsFromReqFn>,
88}
89
90impl RequestMetricsBuilder {
91 pub fn new() -> Self {
93 Self::default()
94 }
95
96 pub fn with_route_formatter<R>(mut self, route_formatter: R) -> Self
98 where
99 R: RouteFormatter + Send + Sync + 'static,
100 {
101 self.route_formatter = Some(Arc::new(route_formatter));
102 self
103 }
104
105 pub fn with_meter_provider(mut self, meter_provider: impl MeterProvider) -> Self {
107 self.meter = Some(meter_provider.meter_with_scope(get_scope()));
108 self
109 }
110
111 pub fn with_metric_attrs_from_req(
113 mut self,
114 metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
115 ) -> Self {
116 self.metric_attrs_from_req = Some(metric_attrs_from_req);
117 self
118 }
119
120 pub fn build(self) -> RequestMetrics {
122 let meter = self
123 .meter
124 .unwrap_or_else(|| global::meter_provider().meter_with_scope(get_scope()));
125
126 RequestMetrics {
127 route_formatter: self.route_formatter,
128 metrics: Arc::new(Metrics::new(meter)),
129 metric_attrs_from_req: self
130 .metric_attrs_from_req
131 .unwrap_or(metrics_attributes_from_request),
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
187pub struct RequestMetrics {
188 route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
189 metrics: Arc<Metrics>,
190 metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
191}
192
193impl RequestMetrics {
194 pub fn builder() -> RequestMetricsBuilder {
196 RequestMetricsBuilder::new()
197 }
198}
199
200impl Default for RequestMetrics {
201 fn default() -> Self {
202 RequestMetrics::builder().build()
203 }
204}
205
206impl<S, B> dev::Transform<S, dev::ServiceRequest> for RequestMetrics
207where
208 S: dev::Service<
209 dev::ServiceRequest,
210 Response = dev::ServiceResponse<B>,
211 Error = actix_web::Error,
212 >,
213 S::Future: 'static,
214 B: MessageBody + 'static,
215{
216 type Response = dev::ServiceResponse<B>;
217 type Error = actix_web::Error;
218 type Transform = RequestMetricsMiddleware<S>;
219 type InitError = ();
220 type Future = future::Ready<Result<Self::Transform, Self::InitError>>;
221
222 fn new_transform(&self, service: S) -> Self::Future {
223 let service = RequestMetricsMiddleware {
224 service,
225 metrics: self.metrics.clone(),
226 route_formatter: self.route_formatter.clone(),
227 metric_attrs_from_req: self.metric_attrs_from_req,
228 };
229
230 future::ok(service)
231 }
232}
233
234#[allow(missing_debug_implementations)]
236pub struct RequestMetricsMiddleware<S> {
237 service: S,
238 metrics: Arc<Metrics>,
239 route_formatter: Option<Arc<dyn RouteFormatter + Send + Sync + 'static>>,
240 metric_attrs_from_req: fn(&dev::ServiceRequest, Cow<'static, str>) -> Vec<KeyValue>,
241}
242
243impl<S, B> dev::Service<dev::ServiceRequest> for RequestMetricsMiddleware<S>
244where
245 S: dev::Service<
246 dev::ServiceRequest,
247 Response = dev::ServiceResponse<B>,
248 Error = actix_web::Error,
249 >,
250 S::Future: 'static,
251 B: MessageBody + 'static,
252{
253 type Response = dev::ServiceResponse<B>;
254 type Error = actix_web::Error;
255 type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
256
257 dev::forward_ready!(service);
258
259 fn call(&self, req: dev::ServiceRequest) -> Self::Future {
260 let timer = SystemTime::now();
261
262 let mut http_target = req
263 .match_pattern()
264 .map(Cow::Owned)
265 .unwrap_or(Cow::Borrowed("default"));
266
267 if let Some(formatter) = &self.route_formatter {
268 http_target = Cow::Owned(formatter.format(&http_target));
269 }
270
271 let mut attributes = (self.metric_attrs_from_req)(&req, http_target);
272 self.metrics.http_server_active_requests.add(1, &attributes);
273
274 let content_length = req
275 .headers()
276 .get(CONTENT_LENGTH)
277 .and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok()))
278 .unwrap_or(0);
279 self.metrics
280 .http_server_request_size
281 .record(content_length, &attributes);
282
283 let request_metrics = self.metrics.clone();
284 Box::pin(self.service.call(req).map(move |res| {
285 request_metrics
286 .http_server_active_requests
287 .add(-1, &attributes);
288
289 if let Ok(res) = res {
291 attributes.push(KeyValue::new(
292 HTTP_RESPONSE_STATUS_CODE,
293 res.status().as_u16() as i64,
294 ));
295 let response_size = match res.response().body().size() {
296 BodySize::Sized(size) => size,
297 _ => 0,
298 };
299 request_metrics
300 .http_server_response_size
301 .record(response_size, &attributes);
302
303 request_metrics.http_server_duration.record(
304 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
305 &attributes,
306 );
307
308 Ok(res)
309 } else {
310 res
311 }
312 }))
313 }
314}