1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use crate::spinwait::SpinWait;
use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
use core::{
cell::Cell,
mem, ptr,
sync::atomic::{fence, AtomicUsize, Ordering},
};
struct ThreadData {
parker: ThreadParker,
// Linked list of threads in the queue. The queue is split into two parts:
// the processed part and the unprocessed part. When new nodes are added to
// the list, they only have the next pointer set, and queue_tail is null.
//
// Nodes are processed with the queue lock held, which consists of setting
// the prev pointer for each node and setting the queue_tail pointer on the
// first processed node of the list.
//
// This setup allows nodes to be added to the queue without a lock, while
// still allowing O(1) removal of nodes from the processed part of the list.
// The only cost is the O(n) processing, but this only needs to be done
// once for each node, and therefore isn't too expensive.
queue_tail: Cell<*const ThreadData>,
prev: Cell<*const ThreadData>,
next: Cell<*const ThreadData>,
}
impl ThreadData {
#[inline]
fn new() -> ThreadData {
assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
ThreadData {
parker: ThreadParker::new(),
queue_tail: Cell::new(ptr::null()),
prev: Cell::new(ptr::null()),
next: Cell::new(ptr::null()),
}
}
}
// Invokes the given closure with a reference to the current thread `ThreadData`.
#[inline]
fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
let mut thread_data_ptr = ptr::null();
// If ThreadData is expensive to construct, then we want to use a cached
// version in thread-local storage if possible.
if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) {
thread_data_ptr = tls_thread_data;
}
}
// Otherwise just create a ThreadData on the stack
let mut thread_data_storage = None;
if thread_data_ptr.is_null() {
thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
}
f(unsafe { &*thread_data_ptr })
}
const LOCKED_BIT: usize = 1;
const QUEUE_LOCKED_BIT: usize = 2;
const QUEUE_MASK: usize = !3;
// Word-sized lock that is used to implement the parking_lot API. Since this
// can't use parking_lot, it instead manages its own queue of waiting threads.
pub struct WordLock {
state: AtomicUsize,
}
impl WordLock {
/// Returns a new, unlocked, `WordLock`.
pub const fn new() -> Self {
WordLock {
state: AtomicUsize::new(0),
}
}
#[inline]
pub fn lock(&self) {
if self
.state
.compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return;
}
self.lock_slow();
}
/// Must not be called on an already unlocked `WordLock`!
#[inline]
pub unsafe fn unlock(&self) {
let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
if state.is_queue_locked() || state.queue_head().is_null() {
return;
}
self.unlock_slow();
}
#[cold]
fn lock_slow(&self) {
let mut spinwait = SpinWait::new();
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Grab the lock if it isn't locked, even if there is a queue on it
if !state.is_locked() {
match self.state.compare_exchange_weak(
state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there is no queue, try spinning a few times
if state.queue_head().is_null() && spinwait.spin() {
state = self.state.load(Ordering::Relaxed);
continue;
}
// Get our thread data and prepare it for parking
state = with_thread_data(|thread_data| {
// The pthread implementation is still unsafe, so we need to surround `prepare_park`
// with `unsafe {}`.
#[allow(unused_unsafe)]
unsafe {
thread_data.parker.prepare_park();
}
// Add our thread to the front of the queue
let queue_head = state.queue_head();
if queue_head.is_null() {
thread_data.queue_tail.set(thread_data);
thread_data.prev.set(ptr::null());
} else {
thread_data.queue_tail.set(ptr::null());
thread_data.prev.set(ptr::null());
thread_data.next.set(queue_head);
}
if let Err(x) = self.state.compare_exchange_weak(
state,
state.with_queue_head(thread_data),
Ordering::AcqRel,
Ordering::Relaxed,
) {
return x;
}
// Sleep until we are woken up by an unlock
// Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
#[allow(unused_unsafe)]
unsafe {
thread_data.parker.park();
}
// Loop back and try locking again
spinwait.reset();
self.state.load(Ordering::Relaxed)
});
}
}
#[cold]
fn unlock_slow(&self) {
let mut state = self.state.load(Ordering::Relaxed);
loop {
// We just unlocked the WordLock. Just check if there is a thread
// to wake up. If the queue is locked then another thread is already
// taking care of waking up a thread.
if state.is_queue_locked() || state.queue_head().is_null() {
return;
}
// Try to grab the queue lock
match self.state.compare_exchange_weak(
state,
state | QUEUE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => state = x,
}
}
// Now we have the queue lock and the queue is non-empty
'outer: loop {
// First, we need to fill in the prev pointers for any newly added
// threads. We do this until we reach a node that we previously
// processed, which has a non-null queue_tail pointer.
let queue_head = state.queue_head();
let mut queue_tail;
let mut current = queue_head;
loop {
queue_tail = unsafe { (*current).queue_tail.get() };
if !queue_tail.is_null() {
break;
}
unsafe {
let next = (*current).next.get();
(*next).prev.set(current);
current = next;
}
}
// Set queue_tail on the queue head to indicate that the whole list
// has prev pointers set correctly.
unsafe {
(*queue_head).queue_tail.set(queue_tail);
}
// If the WordLock is locked, then there is no point waking up a
// thread now. Instead we let the next unlocker take care of waking
// up a thread.
if state.is_locked() {
match self.state.compare_exchange_weak(
state,
state & !QUEUE_LOCKED_BIT,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(x) => state = x,
}
// Need an acquire fence before reading the new queue
fence_acquire(&self.state);
continue;
}
// Remove the last thread from the queue and unlock the queue
let new_tail = unsafe { (*queue_tail).prev.get() };
if new_tail.is_null() {
loop {
match self.state.compare_exchange_weak(
state,
state & LOCKED_BIT,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => state = x,
}
// If the compare_exchange failed because a new thread was
// added to the queue then we need to re-scan the queue to
// find the previous element.
if state.queue_head().is_null() {
continue;
} else {
// Need an acquire fence before reading the new queue
fence_acquire(&self.state);
continue 'outer;
}
}
} else {
unsafe {
(*queue_head).queue_tail.set(new_tail);
}
self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
}
// Finally, wake up the thread we removed from the queue. Note that
// we don't need to worry about any races here since the thread is
// guaranteed to be sleeping right now and we are the only one who
// can wake it up.
unsafe {
(*queue_tail).parker.unpark_lock().unpark();
}
break;
}
}
}
// Thread-Sanitizer only has partial fence support, so when running under it, we
// try and avoid false positives by using a discarded acquire load instead.
#[inline]
fn fence_acquire(a: &AtomicUsize) {
if cfg!(tsan_enabled) {
let _ = a.load(Ordering::Acquire);
} else {
fence(Ordering::Acquire);
}
}
trait LockState {
fn is_locked(self) -> bool;
fn is_queue_locked(self) -> bool;
fn queue_head(self) -> *const ThreadData;
fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
}
impl LockState for usize {
#[inline]
fn is_locked(self) -> bool {
self & LOCKED_BIT != 0
}
#[inline]
fn is_queue_locked(self) -> bool {
self & QUEUE_LOCKED_BIT != 0
}
#[inline]
fn queue_head(self) -> *const ThreadData {
(self & QUEUE_MASK) as *const ThreadData
}
#[inline]
fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
(self & !QUEUE_MASK) | thread_data as *const _ as usize
}
}