1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
//! A linked list of debt nodes.
//!
//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
//! writer walks everything (but may also use some owned values).
//!
//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
//! thread later on). This makes the implementation much simpler, since everything here is
//! `'static` and we don't have to care about knowing when to free stuff.
//!
//! The nodes contain both the fast primary slots and a secondary fallback ones.
//!
//! # Synchronization
//!
//! We synchronize several things here.
//!
//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
//! attempt to add another node). Note that certain parts never change after that (they aren't even
//! atomic) and other things that do change take care of themselves (the debt slots have their own
//! synchronization, etc).
//!
//! The ownership is acquire-release lock pattern.
//!
//! Similar, the counting of active writers is an acquire-release lock pattern.
//!
//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
//! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
//! we know it must have happened since then.
use std::cell::Cell;
use std::ptr;
use std::slice::Iter;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use super::fast::{Local as FastLocal, Slots as FastSlots};
use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
use super::Debt;
use crate::RefCnt;
const NODE_UNUSED: usize = 0;
const NODE_USED: usize = 1;
const NODE_COOLDOWN: usize = 2;
/// The head of the debt linked list.
static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
pub struct NodeReservation<'a>(&'a Node);
impl Drop for NodeReservation<'_> {
fn drop(&mut self) {
self.0.active_writers.fetch_sub(1, Release);
}
}
/// One thread-local node for debts.
#[repr(C, align(64))]
pub(crate) struct Node {
fast: FastSlots,
helping: HelpingSlots,
in_use: AtomicUsize,
// Next node in the list.
//
// It is a pointer because we touch it before synchronization (we don't _dereference_ it before
// synchronization, only manipulate the pointer itself). That is illegal according to strict
// interpretation of the rules by MIRI on references.
next: *const Node,
active_writers: AtomicUsize,
}
impl Default for Node {
fn default() -> Self {
Node {
fast: FastSlots::default(),
helping: HelpingSlots::default(),
in_use: AtomicUsize::new(NODE_USED),
next: ptr::null(),
active_writers: AtomicUsize::new(0),
}
}
}
impl Node {
/// Goes through the debt linked list.
///
/// This traverses the linked list, calling the closure on each node. If the closure returns
/// `Some`, it terminates with that value early, otherwise it runs to the end.
pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
// Acquire ‒ we want to make sure we read the correct version of data at the end of the
// pointer. Any write to the DEBT_HEAD is with Release.
//
// Furthermore, we need to see the newest version of the list in case we examine the debts
// - if a new one is added recently, we don't want a stale read -> SeqCst.
//
// Note that the other pointers in the chain never change and are *ordinary* pointers. The
// whole linked list is synchronized through the head.
let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
while let Some(node) = current {
let result = f(node);
if result.is_some() {
return result;
}
current = unsafe { node.next.as_ref() };
}
None
}
/// Put the current thread node into cooldown
fn start_cooldown(&self) {
// Trick: Make sure we have an up to date value of the active_writers in this thread, so we
// can properly release it below.
let _reservation = self.reserve_writer();
assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
}
/// Perform a cooldown if the node is ready.
///
/// See the ABA protection at the [helping].
fn check_cooldown(&self) {
// Check if the node is in cooldown, for two reasons:
// * Skip most of nodes fast, without dealing with them.
// * More importantly, sync the value of active_writers to be at least the value when the
// cooldown started. That way we know the 0 we observe happened some time after
// start_cooldown.
if self.in_use.load(Acquire) == NODE_COOLDOWN {
// The rest can be nicely relaxed ‒ no memory is being synchronized by these
// operations. We just see an up to date 0 and allow someone (possibly us) to claim the
// node later on.
if self.active_writers.load(Relaxed) == 0 {
let _ = self
.in_use
.compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
}
}
}
/// Mark this node that a writer is currently playing with it.
pub fn reserve_writer(&self) -> NodeReservation {
self.active_writers.fetch_add(1, Acquire);
NodeReservation(self)
}
/// "Allocate" a node.
///
/// Either a new one is created, or previous one is reused. The node is claimed to become
/// in_use.
fn get() -> &'static Self {
// Try to find an unused one in the chain and reuse it.
Self::traverse(|node| {
node.check_cooldown();
if node
.in_use
// We claim a unique control over the generation and the right to write to slots if
// they are NO_DEPT
.compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
.is_ok()
{
Some(node)
} else {
None
}
})
// If that didn't work, create a new one and prepend to the list.
.unwrap_or_else(|| {
let node = Box::leak(Box::<Node>::default());
node.helping.init();
// We don't want to read any data in addition to the head, Relaxed is fine
// here.
//
// We do need to release the data to others, but for that, we acquire in the
// compare_exchange below.
let mut head = LIST_HEAD.load(Relaxed);
loop {
node.next = head;
if let Err(old) = LIST_HEAD.compare_exchange_weak(
head, node,
// We need to release *the whole chain* here. For that, we need to
// acquire it first.
//
// SeqCst because we need to make sure it is properly set "before" we do
// anything to the debts.
SeqCst, Relaxed, // Nothing changed, go next round of the loop.
) {
head = old;
} else {
return node;
}
}
})
}
/// Iterate over the fast slots.
pub(crate) fn fast_slots(&self) -> Iter<Debt> {
self.fast.into_iter()
}
/// Access the helping slot.
pub(crate) fn helping_slot(&self) -> &Debt {
self.helping.slot()
}
}
/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
pub(crate) struct LocalNode {
/// Node for this thread, if any.
///
/// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
node: Cell<Option<&'static Node>>,
/// Thread-local data for the fast slots.
fast: FastLocal,
/// Thread local data for the helping strategy.
helping: HelpingLocal,
}
impl LocalNode {
pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
let f = Cell::new(Some(f));
THREAD_HEAD
.try_with(|head| {
if head.node.get().is_none() {
head.node.set(Some(Node::get()));
}
let f = f.take().unwrap();
f(head)
})
// During the application shutdown, the thread local storage may be already
// deallocated. In that case, the above fails but we still need something. So we just
// find or allocate a node and use it just once.
//
// Note that the situation should be very very rare and not happen often, so the slower
// performance doesn't matter that much.
.unwrap_or_else(|_| {
let tmp_node = LocalNode {
node: Cell::new(Some(Node::get())),
fast: FastLocal::default(),
helping: HelpingLocal::default(),
};
let f = f.take().unwrap();
f(&tmp_node)
// Drop of tmp_node -> sends the node we just used into cooldown.
})
}
/// Creates a new debt.
///
/// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
/// reference to that slot, or gives up with `None` if all the slots are currently full.
#[inline]
pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
let node = &self.node.get().expect("LocalNode::with ensures it is set");
debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
node.fast.get_debt(ptr, &self.fast)
}
/// Initializes a helping slot transaction.
///
/// Returns the generation (with tag).
pub(crate) fn new_helping(&self, ptr: usize) -> usize {
let node = &self.node.get().expect("LocalNode::with ensures it is set");
debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
if discard {
// Too many generations happened, make sure the writers give the poor node a break for
// a while so they don't observe the generation wrapping around.
node.start_cooldown();
self.node.take();
}
gen
}
/// Confirm the helping transaction.
///
/// The generation comes from previous new_helping.
///
/// Will either return a debt with the pointer, or a debt to pay and a replacement (already
/// protected) address.
pub(crate) fn confirm_helping(
&self,
gen: usize,
ptr: usize,
) -> Result<&'static Debt, (&'static Debt, usize)> {
let node = &self.node.get().expect("LocalNode::with ensures it is set");
debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
let slot = node.helping_slot();
node.helping
.confirm(gen, ptr)
.map(|()| slot)
.map_err(|repl| (slot, repl))
}
/// The writer side of a helping slot.
///
/// This potentially helps the `who` node (uses self as the local node, which must be
/// different) by loading the address that one is trying to load.
pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
where
T: RefCnt,
R: Fn() -> T,
{
let node = &self.node.get().expect("LocalNode::with ensures it is set");
debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
node.helping.help(&who.helping, storage_addr, replacement)
}
}
impl Drop for LocalNode {
fn drop(&mut self) {
if let Some(node) = self.node.get() {
// Release - syncing writes/ownership of this Node
node.start_cooldown();
}
}
}
thread_local! {
/// A debt node assigned to this thread.
static THREAD_HEAD: LocalNode = LocalNode {
node: Cell::new(None),
fast: FastLocal::default(),
helping: HelpingLocal::default(),
};
}
#[cfg(test)]
mod tests {
use super::*;
impl Node {
fn is_empty(&self) -> bool {
self.fast_slots()
.chain(std::iter::once(self.helping_slot()))
.all(|d| d.0.load(Relaxed) == Debt::NONE)
}
fn get_thread() -> &'static Self {
LocalNode::with(|h| h.node.get().unwrap())
}
}
/// A freshly acquired thread local node is empty.
#[test]
fn new_empty() {
assert!(Node::get_thread().is_empty());
}
}