opentelemetry_instrumentation_tokio/
runtime.rs1use std::sync::{Once, RwLock};
8
9use opentelemetry::metrics::Meter;
10use opentelemetry::{InstrumentationScope, Key, KeyValue};
11
12static INSTRUMENTS_INITIALIZED: Once = Once::new();
14
15static RUNTIMES: RwLock<Vec<TrackedRuntime>> = RwLock::new(Vec::new());
17
18struct TrackedRuntime {
20 metrics: tokio::runtime::RuntimeMetrics,
21 labels: Vec<KeyValue>,
22
23 workers_labels: Vec<Vec<KeyValue>>,
26
27 #[cfg(tokio_unstable)]
29 histogram_bucket_labels: Vec<Vec<Vec<KeyValue>>>,
30}
31
32pub(crate) fn track_runtime(handle: &tokio::runtime::Handle, labels: &[KeyValue]) {
36 INSTRUMENTS_INITIALIZED.call_once(|| {
38 register_all_instruments();
39 });
40
41 let labels = build_runtime_labels(handle, labels);
42
43 let workers_labels: Vec<Vec<_>> = (0..handle.metrics().num_workers())
44 .map(|i| {
45 let mut worker_labels = labels.clone();
46 worker_labels.push(worker_idx_attribute(i));
47 worker_labels
48 })
49 .collect();
50
51 #[cfg(tokio_unstable)]
52 let histogram_bucket_labels = 'result: {
53 if !handle.metrics().poll_time_histogram_enabled() {
54 break 'result Vec::new();
57 }
58
59 let num_buckets = handle.metrics().poll_time_histogram_num_buckets();
60 let mut buckets_label: Vec<_> = (0..num_buckets)
61 .map(|bucket_idx| {
62 let range = handle
63 .metrics()
64 .poll_time_histogram_bucket_range(bucket_idx);
65 let value = range.end.as_nanos().try_into().unwrap_or(i64::MAX);
66 KeyValue::new("le", value)
67 })
68 .collect();
69
70 if let Some(last) = buckets_label.last_mut() {
72 *last = KeyValue::new("le", "+Inf");
73 }
74
75 workers_labels
76 .iter()
77 .map(|worker_labels| {
78 buckets_label
79 .iter()
80 .map(|bucket_label| {
81 let mut labels = worker_labels.clone();
82 labels.push(bucket_label.clone());
83 labels
84 })
85 .collect()
86 })
87 .collect()
88 };
89
90 let tracked_runtime = TrackedRuntime {
91 metrics: handle.metrics().clone(),
92 labels,
93 workers_labels,
94 #[cfg(tokio_unstable)]
95 histogram_bucket_labels,
96 };
97
98 let mut runtimes = RUNTIMES.write().unwrap();
99 runtimes.push(tracked_runtime);
100}
101
102fn build_runtime_labels(handle: &tokio::runtime::Handle, labels: &[KeyValue]) -> Vec<KeyValue> {
104 #[cfg_attr(not(tokio_unstable), expect(unused_mut))]
105 let mut labels = labels.to_vec();
106
107 #[cfg(tokio_unstable)]
109 {
110 labels.push(KeyValue::new(
111 Key::from_static_str("tokio.runtime.id"),
112 handle.id().to_string(),
113 ));
114 }
115
116 #[cfg(not(tokio_unstable))]
118 let _ = handle;
119
120 labels
121}
122
123fn worker_idx_attribute(i: usize) -> KeyValue {
125 KeyValue::new(
126 Key::from_static_str("tokio.worker.index"),
127 i.try_into().unwrap_or(i64::MAX),
128 )
129}
130
131fn register_all_instruments() {
133 let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
134 .with_version(env!("CARGO_PKG_VERSION"))
135 .build();
136
137 let meter = opentelemetry::global::meter_with_scope(scope);
138
139 register_workers_gauge(&meter);
141 register_global_queue_depth_gauge(&meter);
142 register_alive_tasks_gauge(&meter);
143
144 #[cfg(target_has_atomic = "64")]
146 {
147 register_worker_park_count_counter(&meter);
148 register_worker_busy_duration_counter(&meter);
149 }
150
151 #[cfg(tokio_unstable)]
153 {
154 register_blocking_threads_gauge(&meter);
155 register_idle_blocking_threads_gauge(&meter);
156 register_remote_schedules_counter(&meter);
157 register_budget_forced_yields_counter(&meter);
158
159 #[cfg(all(not(target_family = "wasm"), target_has_atomic = "64", feature = "net"))]
161 {
162 register_io_driver_fd_registrations_counter(&meter);
163 register_io_driver_fd_deregistrations_counter(&meter);
164 register_io_driver_fd_readies_counter(&meter);
165 }
166
167 register_spawned_tasks_count_counter(&meter);
168 register_blocking_queue_depth_gauge(&meter);
169 register_worker_noops_counter(&meter);
170 register_worker_task_steals_counter(&meter);
171 register_worker_steal_operations_counter(&meter);
172 register_worker_polls_counter(&meter);
173 register_worker_local_schedules_counter(&meter);
174 register_worker_overflows_counter(&meter);
175 register_worker_local_queue_depth_gauge(&meter);
176 register_worker_mean_poll_time_gauge(&meter);
177 register_poll_time_histogram(&meter);
178 }
179}
180
181fn register_workers_gauge(meter: &Meter) {
186 meter
187 .u64_observable_gauge("tokio.workers")
188 .with_description("The number of worker threads used by the runtime")
189 .with_unit("{worker}")
190 .with_callback(|instrument| {
191 let runtimes = RUNTIMES.read().unwrap();
192 for runtime in runtimes.iter() {
193 instrument.observe(
194 runtime.metrics.num_workers().try_into().unwrap_or(u64::MAX),
195 &runtime.labels,
196 );
197 }
198 })
199 .build();
200}
201
202fn register_global_queue_depth_gauge(meter: &Meter) {
203 meter
204 .u64_observable_gauge("tokio.global_queue_depth")
205 .with_description("The number of tasks currently scheduled in the runtime's global queue")
206 .with_unit("{task}")
207 .with_callback(|instrument| {
208 let runtimes = RUNTIMES.read().unwrap();
209 for runtime in runtimes.iter() {
210 instrument.observe(
211 runtime
212 .metrics
213 .global_queue_depth()
214 .try_into()
215 .unwrap_or(u64::MAX),
216 &runtime.labels,
217 );
218 }
219 })
220 .build();
221}
222
223#[cfg(target_has_atomic = "64")]
224fn register_worker_park_count_counter(meter: &Meter) {
225 meter
226 .u64_observable_counter("tokio.worker.park_count")
227 .with_description("The total number of times the given worker thread has parked")
228 .with_callback(|instrument| {
229 let runtimes = RUNTIMES.read().unwrap();
230 for runtime in runtimes.iter() {
231 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
232 instrument.observe(runtime.metrics.worker_park_count(worker_idx), &labels[..]);
233 }
234 }
235 })
236 .build();
237}
238
239#[cfg(target_has_atomic = "64")]
240fn register_worker_busy_duration_counter(meter: &Meter) {
241 meter
242 .u64_observable_counter("tokio.worker.busy_duration")
243 .with_description("The amount of time the given worker thread has been busy")
244 .with_unit("ms")
245 .with_callback(|instrument| {
246 let runtimes = RUNTIMES.read().unwrap();
247 for runtime in runtimes.iter() {
248 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
249 instrument.observe(
250 runtime
251 .metrics
252 .worker_total_busy_duration(worker_idx)
253 .as_millis()
254 .try_into()
255 .unwrap_or(u64::MAX),
256 &labels[..],
257 );
258 }
259 }
260 })
261 .build();
262}
263
264fn register_alive_tasks_gauge(meter: &Meter) {
265 meter
266 .u64_observable_gauge("tokio.alive_tasks")
267 .with_description("The number of active tasks in the runtime")
268 .with_unit("{task}")
269 .with_callback(|instrument| {
270 let runtimes = RUNTIMES.read().unwrap();
271 for runtime in runtimes.iter() {
272 instrument.observe(
273 runtime
274 .metrics
275 .num_alive_tasks()
276 .try_into()
277 .unwrap_or(u64::MAX),
278 &runtime.labels,
279 );
280 }
281 })
282 .build();
283}
284
285#[cfg(tokio_unstable)]
290fn register_blocking_threads_gauge(meter: &Meter) {
291 meter
292 .u64_observable_gauge("tokio.blocking_threads")
293 .with_description("The number of additional threads spawned by the runtime")
294 .with_unit("{thread}")
295 .with_callback(|instrument| {
296 let runtimes = RUNTIMES.read().unwrap();
297 for runtime in runtimes.iter() {
298 instrument.observe(
299 runtime
300 .metrics
301 .num_blocking_threads()
302 .try_into()
303 .unwrap_or(u64::MAX),
304 &runtime.labels,
305 );
306 }
307 })
308 .build();
309}
310
311#[cfg(tokio_unstable)]
312fn register_idle_blocking_threads_gauge(meter: &Meter) {
313 meter
314 .u64_observable_gauge("tokio.idle_blocking_threads")
315 .with_description(
316 "The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls",
317 )
318 .with_unit("{thread}")
319 .with_callback(|instrument| {
320 let runtimes = RUNTIMES.read().unwrap();
321 for runtime in runtimes.iter() {
322 instrument.observe(
323 runtime.metrics
324 .num_idle_blocking_threads()
325 .try_into()
326 .unwrap_or(u64::MAX),
327 &runtime.labels,
328 );
329 }
330 })
331 .build();
332}
333
334#[cfg(tokio_unstable)]
335fn register_remote_schedules_counter(meter: &Meter) {
336 meter
337 .u64_observable_counter("tokio.remote_schedules")
338 .with_description("The number of tasks scheduled from outside the runtime")
339 .with_unit("{task}")
340 .with_callback(|instrument| {
341 let runtimes = RUNTIMES.read().unwrap();
342 for runtime in runtimes.iter() {
343 instrument.observe(runtime.metrics.remote_schedule_count(), &runtime.labels);
344 }
345 })
346 .build();
347}
348
349#[cfg(tokio_unstable)]
350fn register_budget_forced_yields_counter(meter: &Meter) {
351 meter
352 .u64_observable_counter("tokio.budget_forced_yields")
353 .with_description(
354 "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets",
355 )
356 .with_unit("{yield}")
357 .with_callback(|instrument| {
358 let runtimes = RUNTIMES.read().unwrap();
359 for runtime in runtimes.iter() {
360 instrument.observe(runtime.metrics.budget_forced_yield_count(), &runtime.labels);
361 }
362 })
363 .build();
364}
365
366#[cfg(all(
367 tokio_unstable,
368 not(target_family = "wasm"),
369 target_has_atomic = "64",
370 feature = "net"
371))]
372fn register_io_driver_fd_registrations_counter(meter: &Meter) {
373 meter
374 .u64_observable_counter("tokio.io_driver.fd_registrations")
375 .with_description(
376 "The number of file descriptors that have been registered with the runtime's I/O driver",
377 )
378 .with_unit("{fd}")
379 .with_callback(|instrument| {
380 let runtimes = RUNTIMES.read().unwrap();
381 for runtime in runtimes.iter() {
382 instrument.observe(runtime.metrics.io_driver_fd_registered_count(), &runtime.labels);
383 }
384 })
385 .build();
386}
387
388#[cfg(all(
389 tokio_unstable,
390 not(target_family = "wasm"),
391 target_has_atomic = "64",
392 feature = "net"
393))]
394fn register_io_driver_fd_deregistrations_counter(meter: &Meter) {
395 meter
396 .u64_observable_counter("tokio.io_driver.fd_deregistrations")
397 .with_description(
398 "The number of file descriptors that have been deregistered by the runtime's I/O driver",
399 )
400 .with_unit("{fd}")
401 .with_callback(|instrument| {
402 let runtimes = RUNTIMES.read().unwrap();
403 for runtime in runtimes.iter() {
404 instrument.observe(runtime.metrics.io_driver_fd_deregistered_count(), &runtime.labels);
405 }
406 })
407 .build();
408}
409
410#[cfg(all(
411 tokio_unstable,
412 not(target_family = "wasm"),
413 target_has_atomic = "64",
414 feature = "net"
415))]
416fn register_io_driver_fd_readies_counter(meter: &Meter) {
417 meter
418 .u64_observable_counter("tokio.io_driver.fd_readies")
419 .with_description("The number of ready events processed by the runtime's I/O driver")
420 .with_unit("{event}")
421 .with_callback(|instrument| {
422 let runtimes = RUNTIMES.read().unwrap();
423 for runtime in runtimes.iter() {
424 instrument.observe(runtime.metrics.io_driver_ready_count(), &runtime.labels);
425 }
426 })
427 .build();
428}
429
430#[cfg(tokio_unstable)]
431fn register_spawned_tasks_count_counter(meter: &Meter) {
432 meter
433 .u64_observable_counter("tokio.spawned_tasks_count")
434 .with_description("The number of tasks spawned in this runtime since it was created")
435 .with_unit("{task}")
436 .with_callback(|instrument| {
437 let runtimes = RUNTIMES.read().unwrap();
438 for runtime in runtimes.iter() {
439 instrument.observe(runtime.metrics.spawned_tasks_count(), &runtime.labels);
440 }
441 })
442 .build();
443}
444
445#[cfg(tokio_unstable)]
446fn register_blocking_queue_depth_gauge(meter: &Meter) {
447 meter
448 .u64_observable_gauge("tokio.blocking_queue_depth")
449 .with_description(
450 "The number of tasks currently scheduled in the blocking thread pool, spawned using `spawn_blocking`",
451 )
452 .with_unit("{task}")
453 .with_callback(|instrument| {
454 let runtimes = RUNTIMES.read().unwrap();
455 for runtime in runtimes.iter() {
456 instrument.observe(
457 runtime.metrics
458 .blocking_queue_depth()
459 .try_into()
460 .unwrap_or(u64::MAX),
461 &runtime.labels,
462 );
463 }
464 })
465 .build();
466}
467
468#[cfg(tokio_unstable)]
469fn register_worker_noops_counter(meter: &Meter) {
470 meter
471 .u64_observable_counter("tokio.worker.noops")
472 .with_description(
473 "The number of times the given worker thread unparked but performed no work before parking again",
474 )
475 .with_unit("{operation}")
476 .with_callback(|instrument| {
477 let runtimes = RUNTIMES.read().unwrap();
478 for runtime in runtimes.iter() {
479 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
480 instrument.observe(runtime.metrics.worker_noop_count(worker_idx), &labels[..]);
481 }
482 }
483 })
484 .build();
485}
486
487#[cfg(tokio_unstable)]
488fn register_worker_task_steals_counter(meter: &Meter) {
489 meter
490 .u64_observable_counter("tokio.worker.task_steals")
491 .with_description(
492 "The number of tasks the given worker thread stole from another worker thread",
493 )
494 .with_callback(|instrument| {
495 let runtimes = RUNTIMES.read().unwrap();
496 for runtime in runtimes.iter() {
497 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
498 instrument.observe(runtime.metrics.worker_steal_count(worker_idx), &labels[..]);
499 }
500 }
501 })
502 .build();
503}
504
505#[cfg(tokio_unstable)]
506fn register_worker_steal_operations_counter(meter: &Meter) {
507 meter
508 .u64_observable_counter("tokio.worker.steal_operations")
509 .with_description(
510 "The number of times the given worker thread stole tasks from another worker thread",
511 )
512 .with_callback(|instrument| {
513 let runtimes = RUNTIMES.read().unwrap();
514 for runtime in runtimes.iter() {
515 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
516 instrument.observe(
517 runtime.metrics.worker_steal_operations(worker_idx),
518 &labels[..],
519 );
520 }
521 }
522 })
523 .build();
524}
525
526#[cfg(tokio_unstable)]
527fn register_worker_polls_counter(meter: &Meter) {
528 meter
529 .u64_observable_counter("tokio.worker.polls")
530 .with_description("The number of tasks the given worker thread has polled")
531 .with_unit("{task}")
532 .with_callback(|instrument| {
533 let runtimes = RUNTIMES.read().unwrap();
534 for runtime in runtimes.iter() {
535 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
536 instrument.observe(runtime.metrics.worker_poll_count(worker_idx), &labels[..]);
537 }
538 }
539 })
540 .build();
541}
542
543#[cfg(tokio_unstable)]
544fn register_worker_local_schedules_counter(meter: &Meter) {
545 meter
546 .u64_observable_counter("tokio.worker.local_schedules")
547 .with_description(
548 "The number of tasks scheduled from **within** the runtime on the given worker's local queue",
549 )
550 .with_unit("{task}")
551 .with_callback(|instrument| {
552 let runtimes = RUNTIMES.read().unwrap();
553 for runtime in runtimes.iter() {
554 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
555 instrument.observe(runtime.metrics.worker_local_schedule_count(worker_idx), &labels[..]);
556 }
557 }
558 })
559 .build();
560}
561
562#[cfg(tokio_unstable)]
563fn register_worker_overflows_counter(meter: &Meter) {
564 meter
565 .u64_observable_counter("tokio.worker.overflows")
566 .with_description("The number of times the given worker thread saturated its local queue")
567 .with_callback(|instrument| {
568 let runtimes = RUNTIMES.read().unwrap();
569 for runtime in runtimes.iter() {
570 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
571 instrument.observe(
572 runtime.metrics.worker_overflow_count(worker_idx),
573 &labels[..],
574 );
575 }
576 }
577 })
578 .build();
579}
580
581#[cfg(tokio_unstable)]
582fn register_worker_local_queue_depth_gauge(meter: &Meter) {
583 meter
584 .u64_observable_gauge("tokio.worker.local_queue_depth")
585 .with_description(
586 "The number of tasks currently scheduled in the given worker's local queue",
587 )
588 .with_unit("{task}")
589 .with_callback(|instrument| {
590 let runtimes = RUNTIMES.read().unwrap();
591 for runtime in runtimes.iter() {
592 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
593 instrument.observe(
594 runtime
595 .metrics
596 .worker_local_queue_depth(worker_idx)
597 .try_into()
598 .unwrap_or(u64::MAX),
599 &labels[..],
600 );
601 }
602 }
603 })
604 .build();
605}
606
607#[cfg(tokio_unstable)]
608fn register_worker_mean_poll_time_gauge(meter: &Meter) {
609 meter
610 .u64_observable_gauge("tokio.worker.mean_poll_time")
611 .with_description("The mean duration of task polls, in nanoseconds")
612 .with_unit("ns")
613 .with_callback(|instrument| {
614 let runtimes = RUNTIMES.read().unwrap();
615 for runtime in runtimes.iter() {
616 for (worker_idx, labels) in runtime.workers_labels.iter().enumerate() {
617 instrument.observe(
618 runtime
619 .metrics
620 .worker_mean_poll_time(worker_idx)
621 .as_nanos()
622 .try_into()
623 .unwrap_or(u64::MAX),
624 &labels[..],
625 );
626 }
627 }
628 })
629 .build();
630}
631
632#[cfg(tokio_unstable)]
633fn register_poll_time_histogram(meter: &Meter) {
634 meter
635 .u64_observable_gauge("tokio.worker.poll_time_bucket")
636 .with_description("An histogram of the poll time of tasks, in nanoseconds")
637 .with_callback(|instrument| {
639 let runtimes = RUNTIMES.read().unwrap();
640 for runtime in runtimes.iter() {
641 for (worker_idx, labels) in runtime.histogram_bucket_labels.iter().enumerate() {
642 let mut sum = 0u64;
643 for (bucket_idx, labels) in labels.iter().enumerate() {
644 let count = runtime
645 .metrics
646 .poll_time_histogram_bucket_count(worker_idx, bucket_idx);
647 sum += count;
648 instrument.observe(sum, &labels[..]);
649 }
650 }
651 }
652 })
653 .build();
654}