1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
//! Inject queue used to send wakeups to a work-stealing scheduler
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::runtime::task;
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{Acquire, Release};
/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.
pub(crate) struct Inject<T: 'static> {
/// Pointers to the head and tail of the queue.
pointers: Mutex<Pointers>,
/// Number of pending tasks in the queue. This helps prevent unnecessary
/// locking in the hot path.
len: AtomicUsize,
_p: PhantomData<T>,
}
struct Pointers {
/// True if the queue is closed.
is_closed: bool,
/// Linked-list head.
head: Option<NonNull<task::Header>>,
/// Linked-list tail.
tail: Option<NonNull<task::Header>>,
}
unsafe impl<T> Send for Inject<T> {}
unsafe impl<T> Sync for Inject<T> {}
impl<T: 'static> Inject<T> {
pub(crate) fn new() -> Inject<T> {
Inject {
pointers: Mutex::new(Pointers {
is_closed: false,
head: None,
tail: None,
}),
len: AtomicUsize::new(0),
_p: PhantomData,
}
}
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
/// Closes the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(crate) fn close(&self) -> bool {
let mut p = self.pointers.lock();
if p.is_closed {
return false;
}
p.is_closed = true;
true
}
pub(crate) fn is_closed(&self) -> bool {
self.pointers.lock().is_closed
}
pub(crate) fn len(&self) -> usize {
self.len.load(Acquire)
}
/// Pushes a value into the queue.
///
/// This does nothing if the queue is closed.
pub(crate) fn push(&self, task: task::Notified<T>) {
// Acquire queue lock
let mut p = self.pointers.lock();
if p.is_closed {
return;
}
// safety: only mutated with the lock held
let len = unsafe { self.len.unsync_load() };
let task = task.into_raw();
// The next pointer should already be null
debug_assert!(get_next(task).is_none());
if let Some(tail) = p.tail {
// safety: Holding the Notified for a task guarantees exclusive
// access to the `queue_next` field.
set_next(tail, Some(task));
} else {
p.head = Some(task);
}
p.tail = Some(task);
self.len.store(len + 1, Release);
}
/// Pushes several values into the queue.
#[inline]
pub(crate) fn push_batch<I>(&self, mut iter: I)
where
I: Iterator<Item = task::Notified<T>>,
{
let first = match iter.next() {
Some(first) => first.into_raw(),
None => return,
};
// Link up all the tasks.
let mut prev = first;
let mut counter = 1;
// We are going to be called with an `std::iter::Chain`, and that
// iterator overrides `for_each` to something that is easier for the
// compiler to optimize than a loop.
iter.for_each(|next| {
let next = next.into_raw();
// safety: Holding the Notified for a task guarantees exclusive
// access to the `queue_next` field.
set_next(prev, Some(next));
prev = next;
counter += 1;
});
// Now that the tasks are linked together, insert them into the
// linked list.
self.push_batch_inner(first, prev, counter);
}
/// Inserts several tasks that have been linked together into the queue.
///
/// The provided head and tail may be be the same task. In this case, a
/// single task is inserted.
#[inline]
fn push_batch_inner(
&self,
batch_head: NonNull<task::Header>,
batch_tail: NonNull<task::Header>,
num: usize,
) {
debug_assert!(get_next(batch_tail).is_none());
let mut p = self.pointers.lock();
if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
} else {
p.head = Some(batch_head);
}
p.tail = Some(batch_tail);
// Increment the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
let len = unsafe { self.len.unsync_load() };
self.len.store(len + num, Release);
}
pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
// Fast path, if len == 0, then there are no values
if self.is_empty() {
return None;
}
let mut p = self.pointers.lock();
// It is possible to hit null here if another thread popped the last
// task between us checking `len` and acquiring the lock.
let task = p.head?;
p.head = get_next(task);
if p.head.is_none() {
p.tail = None;
}
set_next(task, None);
// Decrement the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
self.len
.store(unsafe { self.len.unsync_load() } - 1, Release);
// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}
}
impl<T: 'static> Drop for Inject<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}
fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}
fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
header.as_ref().set_next(val);
}
}