tokio/util/
sharded_list.rs

1use std::ptr::NonNull;
2use std::sync::atomic::Ordering;
3
4use crate::loom::sync::{Mutex, MutexGuard};
5use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
6
7use super::linked_list::{Link, LinkedList};
8
9/// An intrusive linked list supporting highly concurrent updates.
10///
11/// It currently relies on `LinkedList`, so it is the caller's
12/// responsibility to ensure the list is empty before dropping it.
13///
14/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
15pub(crate) struct ShardedList<L, T> {
16    lists: Box<[Mutex<LinkedList<L, T>>]>,
17    added: MetricAtomicU64,
18    count: MetricAtomicUsize,
19    shard_mask: usize,
20}
21
22/// Determines which linked list an item should be stored in.
23///
24/// # Safety
25///
26/// Implementations must guarantee that the id of an item does not change from
27/// call to call.
28pub(crate) unsafe trait ShardedListItem: Link {
29    /// # Safety
30    ///
31    /// The provided pointer must point at a valid list item.
32    unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize;
33}
34
35impl<L, T> ShardedList<L, T> {
36    /// Creates a new and empty sharded linked list with the specified size.
37    pub(crate) fn new(sharded_size: usize) -> Self {
38        assert!(sharded_size.is_power_of_two());
39
40        let shard_mask = sharded_size - 1;
41        let mut lists = Vec::with_capacity(sharded_size);
42        for _ in 0..sharded_size {
43            lists.push(Mutex::new(LinkedList::<L, T>::new()))
44        }
45        Self {
46            lists: lists.into_boxed_slice(),
47            added: MetricAtomicU64::new(0),
48            count: MetricAtomicUsize::new(0),
49            shard_mask,
50        }
51    }
52}
53
54/// Used to get the lock of shard.
55pub(crate) struct ShardGuard<'a, L, T> {
56    lock: MutexGuard<'a, LinkedList<L, T>>,
57    added: &'a MetricAtomicU64,
58    count: &'a MetricAtomicUsize,
59    id: usize,
60}
61
62impl<L: ShardedListItem> ShardedList<L, L::Target> {
63    /// Removes the last element from a list specified by `shard_id` and returns it, or None if it is
64    /// empty.
65    pub(crate) fn pop_back(&self, shard_id: usize) -> Option<L::Handle> {
66        let mut lock = self.shard_inner(shard_id);
67        let node = lock.pop_back();
68        if node.is_some() {
69            self.count.decrement();
70        }
71        node
72    }
73
74    /// Removes the specified node from the list.
75    ///
76    /// # Safety
77    ///
78    /// The caller **must** ensure that exactly one of the following is true:
79    /// - `node` is currently contained by `self`,
80    /// - `node` is not contained by any list,
81    /// - `node` is currently contained by some other `GuardedLinkedList`.
82    pub(crate) unsafe fn remove(&self, node: NonNull<L::Target>) -> Option<L::Handle> {
83        let id = unsafe { L::get_shard_id(node) };
84        let mut lock = self.shard_inner(id);
85        // SAFETY: Since the shard id cannot change, it's not possible for this node
86        // to be in any other list of the same sharded list.
87        let node = unsafe { lock.remove(node) };
88        if node.is_some() {
89            self.count.decrement();
90        }
91        node
92    }
93
94    /// Gets the lock of `ShardedList`, makes us have the write permission.
95    pub(crate) fn lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target> {
96        let id = unsafe { L::get_shard_id(L::as_raw(val)) };
97        ShardGuard {
98            lock: self.shard_inner(id),
99            added: &self.added,
100            count: &self.count,
101            id,
102        }
103    }
104
105    /// Gets the count of elements in this list.
106    pub(crate) fn len(&self) -> usize {
107        self.count.load(Ordering::Relaxed)
108    }
109
110    cfg_unstable_metrics! {
111        cfg_64bit_metrics! {
112            /// Gets the total number of elements added to this list.
113            pub(crate) fn added(&self) -> u64 {
114                self.added.load(Ordering::Relaxed)
115            }
116        }
117    }
118
119    /// Returns whether the linked list does not contain any node.
120    pub(crate) fn is_empty(&self) -> bool {
121        self.len() == 0
122    }
123
124    /// Gets the shard size of this `SharedList`.
125    ///
126    /// Used to help us to decide the parameter `shard_id` of the `pop_back` method.
127    pub(crate) fn shard_size(&self) -> usize {
128        self.shard_mask + 1
129    }
130
131    #[inline]
132    fn shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList<L, <L as Link>::Target>> {
133        // Safety: This modulo operation ensures that the index is not out of bounds.
134        unsafe { self.lists.get_unchecked(id & self.shard_mask).lock() }
135    }
136}
137
138impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
139    /// Push a value to this shard.
140    pub(crate) fn push(mut self, val: L::Handle) {
141        let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
142        assert_eq!(id, self.id);
143        self.lock.push_front(val);
144        self.added.add(1, Ordering::Relaxed);
145        self.count.increment();
146    }
147}
148
149cfg_taskdump! {
150    impl<L: ShardedListItem> ShardedList<L, L::Target> {
151        pub(crate) fn for_each<F>(&self, mut f: F)
152        where
153            F: FnMut(&L::Handle),
154        {
155            let mut guards = Vec::with_capacity(self.lists.len());
156            for list in self.lists.iter() {
157                guards.push(list.lock());
158            }
159            for g in &mut guards {
160                g.for_each(&mut f);
161            }
162        }
163    }
164}