opentelemetry_instrumentation_tokio/
runtime.rs

1//! Runtime metrics implementation.
2//!
3//! This module contains all the metric registration logic for Tokio runtime
4//! metrics. Each metric is implemented as a separate function for clarity and
5//! maintainability.
6
7use std::sync::{Once, RwLock};
8
9use opentelemetry::metrics::Meter;
10use opentelemetry::{InstrumentationScope, Key, KeyValue};
11
12/// One-time instrument initialization.
13static INSTRUMENTS_INITIALIZED: Once = Once::new();
14
15/// Registry of all observed runtimes.
16static RUNTIMES: RwLock<Vec<TrackedRuntime>> = RwLock::new(Vec::new());
17
18/// A tracked runtime with its metrics and labels.
19struct TrackedRuntime {
20    metrics: tokio::runtime::RuntimeMetrics,
21    labels: Vec<KeyValue>,
22
23    // Pre-computed labels for each worker. This assumes the # of workers never change in Tokio,
24    // which I think is the case?
25    workers_labels: Vec<Vec<KeyValue>>,
26
27    // Pre-computed labels for each bucket in the poll time histogram, for each worker
28    #[cfg(tokio_unstable)]
29    histogram_bucket_labels: Vec<Vec<Vec<KeyValue>>>,
30}
31
32/// Track a Tokio runtime for metrics collection.
33///
34/// This also initializes the instruments on the first call.
35pub(crate) fn track_runtime(handle: &tokio::runtime::Handle, labels: &[KeyValue]) {
36    // Ensure instruments are initialized (one-time, thread-safe).
37    INSTRUMENTS_INITIALIZED.call_once(|| {
38        register_all_instruments();
39    });
40
41    let labels = build_runtime_labels(handle, labels);
42
43    let workers_labels: Vec<Vec<_>> = (0..handle.metrics().num_workers())
44        .map(|i| {
45            let mut worker_labels = labels.clone();
46            worker_labels.push(worker_idx_attribute(i));
47            worker_labels
48        })
49        .collect();
50
51    #[cfg(tokio_unstable)]
52    let histogram_bucket_labels = 'result: {
53        if !handle.metrics().poll_time_histogram_enabled() {
54            // Don't collect histogram if not enabled
55            //
56            break 'result Vec::new();
57        }
58
59        let num_buckets = handle.metrics().poll_time_histogram_num_buckets();
60        let mut buckets_label: Vec<_> = (0..num_buckets)
61            .map(|bucket_idx| {
62                let range = handle
63                    .metrics()
64                    .poll_time_histogram_bucket_range(bucket_idx);
65                let value = range.end.as_nanos().try_into().unwrap_or(i64::MAX);
66                KeyValue::new("le", value)
67            })
68            .collect();
69
70        // Change the last bucket to +Inf
71        if let Some(last) = buckets_label.last_mut() {
72            *last = KeyValue::new("le", "+Inf");
73        }
74
75        workers_labels
76            .iter()
77            .map(|worker_labels| {
78                buckets_label
79                    .iter()
80                    .map(|bucket_label| {
81                        let mut labels = worker_labels.clone();
82                        labels.push(bucket_label.clone());
83                        labels
84                    })
85                    .collect()
86            })
87            .collect()
88    };
89
90    let tracked_runtime = TrackedRuntime {
91        metrics: handle.metrics().clone(),
92        labels,
93        workers_labels,
94        #[cfg(tokio_unstable)]
95        histogram_bucket_labels,
96    };
97
98    let mut runtimes = RUNTIMES.write().unwrap();
99    runtimes.push(tracked_runtime);
100}
101
102/// Build labels for a runtime (user labels + tokio.runtime.id if available).
103fn build_runtime_labels(handle: &tokio::runtime::Handle, labels: &[KeyValue]) -> Vec<KeyValue> {
104    #[cfg_attr(not(tokio_unstable), expect(unused_mut))]
105    let mut labels = labels.to_vec();
106
107    // Auto-add tokio.runtime.id when tokio_unstable is available
108    #[cfg(tokio_unstable)]
109    {
110        labels.push(KeyValue::new(
111            Key::from_static_str("tokio.runtime.id"),
112            handle.id().to_string(),
113        ));
114    }
115
116    // Silence unused parameter warning when tokio_unstable is not set
117    #[cfg(not(tokio_unstable))]
118    let _ = handle;
119
120    labels
121}
122
123/// Helper to construct a [`KeyValue`] with the worker index.
124fn worker_idx_attribute(i: usize) -> KeyValue {
125    KeyValue::new(
126        Key::from_static_str("tokio.worker.index"),
127        i.try_into().unwrap_or(i64::MAX),
128    )
129}
130
131/// Register all instruments (one-time, called via `Once`).
132fn register_all_instruments() {
133    let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
134        .with_version(env!("CARGO_PKG_VERSION"))
135        .build();
136
137    let meter = opentelemetry::global::meter_with_scope(scope);
138
139    // Always-available metrics
140    register_workers_gauge(&meter);
141    register_global_queue_depth_gauge(&meter);
142    register_alive_tasks_gauge(&meter);
143
144    // Metrics requiring 64-bit atomics
145    #[cfg(target_has_atomic = "64")]
146    {
147        register_worker_park_count_counter(&meter);
148        register_worker_busy_duration_counter(&meter);
149    }
150
151    // Metrics requiring `--cfg tokio_unstable`
152    #[cfg(tokio_unstable)]
153    {
154        register_blocking_threads_gauge(&meter);
155        register_idle_blocking_threads_gauge(&meter);
156        register_remote_schedules_counter(&meter);
157        register_budget_forced_yields_counter(&meter);
158
159        // I/O driver metrics require net feature
160        #[cfg(all(not(target_family = "wasm"), target_has_atomic = "64", feature = "net"))]
161        {
162            register_io_driver_fd_registrations_counter(&meter);
163            register_io_driver_fd_deregistrations_counter(&meter);
164            register_io_driver_fd_readies_counter(&meter);
165        }
166
167        register_spawned_tasks_count_counter(&meter);
168        register_blocking_queue_depth_gauge(&meter);
169        register_worker_noops_counter(&meter);
170        register_worker_task_steals_counter(&meter);
171        register_worker_steal_operations_counter(&meter);
172        register_worker_polls_counter(&meter);
173        register_worker_local_schedules_counter(&meter);
174        register_worker_overflows_counter(&meter);
175        register_worker_local_queue_depth_gauge(&meter);
176        register_worker_mean_poll_time_gauge(&meter);
177        register_poll_time_histogram(&meter);
178    }
179}
180
181// ============================================================================
182// Always-available metrics
183// ============================================================================
184
185fn register_workers_gauge(meter: &Meter) {
186    meter
187        .u64_observable_gauge("tokio.workers")
188        .with_description("The number of worker threads used by the runtime")
189        .with_unit("{worker}")
190        .with_callback(|instrument| {
191            let runtimes = RUNTIMES.read().unwrap();
192            for runtime in runtimes.iter() {
193                instrument.observe(
194                    runtime.metrics.num_workers().try_into().unwrap_or(u64::MAX),
195                    &runtime.labels,
196                );
197            }
198        })
199        .build();
200}
201
202fn register_global_queue_depth_gauge(meter: &Meter) {
203    meter
204        .u64_observable_gauge("tokio.global_queue_depth")
205        .with_description("The number of tasks currently scheduled in the runtime's global queue")
206        .with_unit("{task}")
207        .with_callback(|instrument| {
208            let runtimes = RUNTIMES.read().unwrap();
209            for runtime in runtimes.iter() {
210                instrument.observe(
211                    runtime
212                        .metrics
213                        .global_queue_depth()
214                        .try_into()
215                        .unwrap_or(u64::MAX),
216                    &runtime.labels,
217                );
218            }
219        })
220        .build();
221}
222
223#[cfg(target_has_atomic = "64")]
224fn register_worker_park_count_counter(meter: &Meter) {
225    meter
226        .u64_observable_counter("tokio.worker.park_count")
227        .with_description("The total number of times the given worker thread has parked")
228        .with_callback(|instrument| {
229            let runtimes = RUNTIMES.read().unwrap();
230            for runtime in runtimes.iter() {
231                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
232                    instrument.observe(runtime.metrics.worker_park_count(worker_idx), &labels[..]);
233                }
234            }
235        })
236        .build();
237}
238
239#[cfg(target_has_atomic = "64")]
240fn register_worker_busy_duration_counter(meter: &Meter) {
241    meter
242        .u64_observable_counter("tokio.worker.busy_duration")
243        .with_description("The amount of time the given worker thread has been busy")
244        .with_unit("ms")
245        .with_callback(|instrument| {
246            let runtimes = RUNTIMES.read().unwrap();
247            for runtime in runtimes.iter() {
248                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
249                    instrument.observe(
250                        runtime
251                            .metrics
252                            .worker_total_busy_duration(worker_idx)
253                            .as_millis()
254                            .try_into()
255                            .unwrap_or(u64::MAX),
256                        &labels[..],
257                    );
258                }
259            }
260        })
261        .build();
262}
263
264fn register_alive_tasks_gauge(meter: &Meter) {
265    meter
266        .u64_observable_gauge("tokio.alive_tasks")
267        .with_description("The number of active tasks in the runtime")
268        .with_unit("{task}")
269        .with_callback(|instrument| {
270            let runtimes = RUNTIMES.read().unwrap();
271            for runtime in runtimes.iter() {
272                instrument.observe(
273                    runtime
274                        .metrics
275                        .num_alive_tasks()
276                        .try_into()
277                        .unwrap_or(u64::MAX),
278                    &runtime.labels,
279                );
280            }
281        })
282        .build();
283}
284
285// ============================================================================
286// Metrics requiring tokio_unstable
287// ============================================================================
288
289#[cfg(tokio_unstable)]
290fn register_blocking_threads_gauge(meter: &Meter) {
291    meter
292        .u64_observable_gauge("tokio.blocking_threads")
293        .with_description("The number of additional threads spawned by the runtime")
294        .with_unit("{thread}")
295        .with_callback(|instrument| {
296            let runtimes = RUNTIMES.read().unwrap();
297            for runtime in runtimes.iter() {
298                instrument.observe(
299                    runtime
300                        .metrics
301                        .num_blocking_threads()
302                        .try_into()
303                        .unwrap_or(u64::MAX),
304                    &runtime.labels,
305                );
306            }
307        })
308        .build();
309}
310
311#[cfg(tokio_unstable)]
312fn register_idle_blocking_threads_gauge(meter: &Meter) {
313    meter
314        .u64_observable_gauge("tokio.idle_blocking_threads")
315        .with_description(
316            "The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls",
317        )
318        .with_unit("{thread}")
319        .with_callback(|instrument| {
320            let runtimes = RUNTIMES.read().unwrap();
321            for runtime in runtimes.iter() {
322                instrument.observe(
323                    runtime.metrics
324                        .num_idle_blocking_threads()
325                        .try_into()
326                        .unwrap_or(u64::MAX),
327                    &runtime.labels,
328                );
329            }
330        })
331        .build();
332}
333
334#[cfg(tokio_unstable)]
335fn register_remote_schedules_counter(meter: &Meter) {
336    meter
337        .u64_observable_counter("tokio.remote_schedules")
338        .with_description("The number of tasks scheduled from outside the runtime")
339        .with_unit("{task}")
340        .with_callback(|instrument| {
341            let runtimes = RUNTIMES.read().unwrap();
342            for runtime in runtimes.iter() {
343                instrument.observe(runtime.metrics.remote_schedule_count(), &runtime.labels);
344            }
345        })
346        .build();
347}
348
349#[cfg(tokio_unstable)]
350fn register_budget_forced_yields_counter(meter: &Meter) {
351    meter
352        .u64_observable_counter("tokio.budget_forced_yields")
353        .with_description(
354            "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets",
355        )
356        .with_unit("{yield}")
357        .with_callback(|instrument| {
358            let runtimes = RUNTIMES.read().unwrap();
359            for runtime in runtimes.iter() {
360                instrument.observe(runtime.metrics.budget_forced_yield_count(), &runtime.labels);
361            }
362        })
363        .build();
364}
365
366#[cfg(all(
367    tokio_unstable,
368    not(target_family = "wasm"),
369    target_has_atomic = "64",
370    feature = "net"
371))]
372fn register_io_driver_fd_registrations_counter(meter: &Meter) {
373    meter
374        .u64_observable_counter("tokio.io_driver.fd_registrations")
375        .with_description(
376            "The number of file descriptors that have been registered with the runtime's I/O driver",
377        )
378        .with_unit("{fd}")
379        .with_callback(|instrument| {
380            let runtimes = RUNTIMES.read().unwrap();
381            for runtime in runtimes.iter() {
382                instrument.observe(runtime.metrics.io_driver_fd_registered_count(), &runtime.labels);
383            }
384        })
385        .build();
386}
387
388#[cfg(all(
389    tokio_unstable,
390    not(target_family = "wasm"),
391    target_has_atomic = "64",
392    feature = "net"
393))]
394fn register_io_driver_fd_deregistrations_counter(meter: &Meter) {
395    meter
396        .u64_observable_counter("tokio.io_driver.fd_deregistrations")
397        .with_description(
398            "The number of file descriptors that have been deregistered by the runtime's I/O driver",
399        )
400        .with_unit("{fd}")
401        .with_callback(|instrument| {
402            let runtimes = RUNTIMES.read().unwrap();
403            for runtime in runtimes.iter() {
404                instrument.observe(runtime.metrics.io_driver_fd_deregistered_count(), &runtime.labels);
405            }
406        })
407        .build();
408}
409
410#[cfg(all(
411    tokio_unstable,
412    not(target_family = "wasm"),
413    target_has_atomic = "64",
414    feature = "net"
415))]
416fn register_io_driver_fd_readies_counter(meter: &Meter) {
417    meter
418        .u64_observable_counter("tokio.io_driver.fd_readies")
419        .with_description("The number of ready events processed by the runtime's I/O driver")
420        .with_unit("{event}")
421        .with_callback(|instrument| {
422            let runtimes = RUNTIMES.read().unwrap();
423            for runtime in runtimes.iter() {
424                instrument.observe(runtime.metrics.io_driver_ready_count(), &runtime.labels);
425            }
426        })
427        .build();
428}
429
430#[cfg(tokio_unstable)]
431fn register_spawned_tasks_count_counter(meter: &Meter) {
432    meter
433        .u64_observable_counter("tokio.spawned_tasks_count")
434        .with_description("The number of tasks spawned in this runtime since it was created")
435        .with_unit("{task}")
436        .with_callback(|instrument| {
437            let runtimes = RUNTIMES.read().unwrap();
438            for runtime in runtimes.iter() {
439                instrument.observe(runtime.metrics.spawned_tasks_count(), &runtime.labels);
440            }
441        })
442        .build();
443}
444
445#[cfg(tokio_unstable)]
446fn register_blocking_queue_depth_gauge(meter: &Meter) {
447    meter
448        .u64_observable_gauge("tokio.blocking_queue_depth")
449        .with_description(
450            "The number of tasks currently scheduled in the blocking thread pool, spawned using `spawn_blocking`",
451        )
452        .with_unit("{task}")
453        .with_callback(|instrument| {
454            let runtimes = RUNTIMES.read().unwrap();
455            for runtime in runtimes.iter() {
456                instrument.observe(
457                    runtime.metrics
458                        .blocking_queue_depth()
459                        .try_into()
460                        .unwrap_or(u64::MAX),
461                    &runtime.labels,
462                );
463            }
464        })
465        .build();
466}
467
468#[cfg(tokio_unstable)]
469fn register_worker_noops_counter(meter: &Meter) {
470    meter
471        .u64_observable_counter("tokio.worker.noops")
472        .with_description(
473            "The number of times the given worker thread unparked but performed no work before parking again",
474        )
475        .with_unit("{operation}")
476        .with_callback(|instrument| {
477            let runtimes = RUNTIMES.read().unwrap();
478            for runtime in runtimes.iter() {
479                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
480                    instrument.observe(runtime.metrics.worker_noop_count(worker_idx), &labels[..]);
481                }
482            }
483        })
484        .build();
485}
486
487#[cfg(tokio_unstable)]
488fn register_worker_task_steals_counter(meter: &Meter) {
489    meter
490        .u64_observable_counter("tokio.worker.task_steals")
491        .with_description(
492            "The number of tasks the given worker thread stole from another worker thread",
493        )
494        .with_callback(|instrument| {
495            let runtimes = RUNTIMES.read().unwrap();
496            for runtime in runtimes.iter() {
497                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
498                    instrument.observe(runtime.metrics.worker_steal_count(worker_idx), &labels[..]);
499                }
500            }
501        })
502        .build();
503}
504
505#[cfg(tokio_unstable)]
506fn register_worker_steal_operations_counter(meter: &Meter) {
507    meter
508        .u64_observable_counter("tokio.worker.steal_operations")
509        .with_description(
510            "The number of times the given worker thread stole tasks from another worker thread",
511        )
512        .with_callback(|instrument| {
513            let runtimes = RUNTIMES.read().unwrap();
514            for runtime in runtimes.iter() {
515                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
516                    instrument.observe(
517                        runtime.metrics.worker_steal_operations(worker_idx),
518                        &labels[..],
519                    );
520                }
521            }
522        })
523        .build();
524}
525
526#[cfg(tokio_unstable)]
527fn register_worker_polls_counter(meter: &Meter) {
528    meter
529        .u64_observable_counter("tokio.worker.polls")
530        .with_description("The number of tasks the given worker thread has polled")
531        .with_unit("{task}")
532        .with_callback(|instrument| {
533            let runtimes = RUNTIMES.read().unwrap();
534            for runtime in runtimes.iter() {
535                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
536                    instrument.observe(runtime.metrics.worker_poll_count(worker_idx), &labels[..]);
537                }
538            }
539        })
540        .build();
541}
542
543#[cfg(tokio_unstable)]
544fn register_worker_local_schedules_counter(meter: &Meter) {
545    meter
546        .u64_observable_counter("tokio.worker.local_schedules")
547        .with_description(
548            "The number of tasks scheduled from **within** the runtime on the given worker's local queue",
549        )
550        .with_unit("{task}")
551        .with_callback(|instrument| {
552            let runtimes = RUNTIMES.read().unwrap();
553            for runtime in runtimes.iter() {
554                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
555                    instrument.observe(runtime.metrics.worker_local_schedule_count(worker_idx), &labels[..]);
556                }
557            }
558        })
559        .build();
560}
561
562#[cfg(tokio_unstable)]
563fn register_worker_overflows_counter(meter: &Meter) {
564    meter
565        .u64_observable_counter("tokio.worker.overflows")
566        .with_description("The number of times the given worker thread saturated its local queue")
567        .with_callback(|instrument| {
568            let runtimes = RUNTIMES.read().unwrap();
569            for runtime in runtimes.iter() {
570                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
571                    instrument.observe(
572                        runtime.metrics.worker_overflow_count(worker_idx),
573                        &labels[..],
574                    );
575                }
576            }
577        })
578        .build();
579}
580
581#[cfg(tokio_unstable)]
582fn register_worker_local_queue_depth_gauge(meter: &Meter) {
583    meter
584        .u64_observable_gauge("tokio.worker.local_queue_depth")
585        .with_description(
586            "The number of tasks currently scheduled in the given worker's local queue",
587        )
588        .with_unit("{task}")
589        .with_callback(|instrument| {
590            let runtimes = RUNTIMES.read().unwrap();
591            for runtime in runtimes.iter() {
592                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
593                    instrument.observe(
594                        runtime
595                            .metrics
596                            .worker_local_queue_depth(worker_idx)
597                            .try_into()
598                            .unwrap_or(u64::MAX),
599                        &labels[..],
600                    );
601                }
602            }
603        })
604        .build();
605}
606
607#[cfg(tokio_unstable)]
608fn register_worker_mean_poll_time_gauge(meter: &Meter) {
609    meter
610        .u64_observable_gauge("tokio.worker.mean_poll_time")
611        .with_description("The mean duration of task polls, in nanoseconds")
612        .with_unit("ns")
613        .with_callback(|instrument| {
614            let runtimes = RUNTIMES.read().unwrap();
615            for runtime in runtimes.iter() {
616                for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
617                    instrument.observe(
618                        runtime
619                            .metrics
620                            .worker_mean_poll_time(worker_idx)
621                            .as_nanos()
622                            .try_into()
623                            .unwrap_or(u64::MAX),
624                        &labels[..],
625                    );
626                }
627            }
628        })
629        .build();
630}
631
632#[cfg(tokio_unstable)]
633fn register_poll_time_histogram(meter: &Meter) {
634    meter
635        .u64_observable_gauge("tokio.worker.poll_time_bucket")
636        .with_description("An histogram of the poll time of tasks, in nanoseconds")
637        // We don't set a unit here, as it would add it as a suffix to the metric name
638        .with_callback(|instrument| {
639            let runtimes = RUNTIMES.read().unwrap();
640            for runtime in runtimes.iter() {
641                for (worker_idx, labels) in runtime.histogram_bucket_labels.iter().enumerate() {
642                    let mut sum = 0u64;
643                    for (bucket_idx, labels) in labels.iter().enumerate() {
644                        let count = runtime
645                            .metrics
646                            .poll_time_histogram_bucket_count(worker_idx, bucket_idx);
647                        sum += count;
648                        instrument.observe(sum, &labels[..]);
649                    }
650                }
651            }
652        })
653        .build();
654}