1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
//! Slots and global/thread local data for the Helping strategy.
//!
//! This is inspired (but not an exact copy) of
//! <https://pvk.ca/Blog/2020/07/07/flatter-wait-free-hazard-pointers/>. The debts are mostly
//! copies of the ones used by the hybrid strategy, but modified a bit. Just like in the hybrid
//! strategy, in case the slots run out or when the writer updates the value, the debts are paid by
//! incrementing the ref count (which is a little slower, but still wait-free/lock-free and still
//! in order of nanoseconds).
//!
//! ## Reader, the fast path
//!
//! * Publish an active address ‒ the address we'll be loading stuff from.
//! * Puts a generation into the control.
//! * Loads the pointer and puts it to the debt slot.
//! * Confirms by CaS-replacing the generation back to idle state.
//!
//! * Later, we pay it back by CaS-replacing it with the NO_DEPT (like any other slot).
//!
//! ## Writer, the non-colliding path
//!
//! * Replaces the pointer in the storage.
//! * The writer walks over all debts. It pays each debt that it is concerned with by bumping the
//! reference and replacing the dept with NO_DEPT. The relevant reader will fail in the CaS
//! (either because it finds NO_DEPT or other pointer in there) and knows the reference was
//! bumped, so it needs to decrement it. Note that it is possible that someone also reuses the
//! slot for the _same_ pointer. In that case that reader will set it to NO_DEPT and the newer
//! reader will have a pre-paid debt, which is fine.
//!
//! ## The collision path
//!
//! The reservation of a slot is not atomic, therefore a writer can observe the reservation in
//! progress. But it doesn't want to wait for it to complete (it wants to be lock-free, which means
//! it needs to be able to resolve the situation on its own).
//!
//! The way it knows it is in progress of the reservation is by seeing a generation in there (it has
//! a distinct tag). In that case it'll try to:
//!
//! * First verify that the reservation is being done for the same address it modified, by reading
//! and re-confirming the active_addr slot corresponding to the currently handled node. If it is
//! for some other address, the writer doesn't have to be concerned and proceeds to the next slot.
//! * It does a full load. That is fine, because the writer must be on a different thread than the
//! reader and therefore there is at least one free slot. Full load means paying the debt right
//! away by incrementing the reference count.
//! * Then it tries to pass the already fully protected/paid pointer to the reader. It writes it to
//! an envelope and CaS-replaces it into the control, instead of the generation (if it fails,
//! someone has been faster and it rolls back). We need the envelope because the pointer itself
//! doesn't have to be aligned to 4 bytes and we need the space for tags to distinguish the types
//! of info in control; we can ensure the envelope is).
//! * The reader then finds the generation got replaced by a pointer to the envelope and uses that
//! pointer inside the envelope. It aborts its own debt. This effectively exchanges the envelopes
//! between the threads so each one has an envelope ready for future.
//!
//! ## ABA protection
//!
//! The generation as pre-reserving the slot allows the writer to make sure it is offering the
//! loaded pointer to the same reader and that the read value is new enough (and of the same type).
//!
//! This solves the general case, but there's also much less frequent but theoretical ABA problem
//! that could lead to UB, if left unsolved:
//!
//! * There is a collision on generation G.
//! * The writer loads a pointer, bumps it.
//! * In the meantime, all the 2^30 or 2^62 generations (depending on the usize width) generations
//! wrap around.
//! * The writer stores the outdated and possibly different-typed pointer in there and the reader
//! uses it.
//!
//! To mitigate that, every time the counter overflows we take the current node and un-assign it
//! from our current thread. We mark it as in "cooldown" and let it in there until there are no
//! writers messing with that node any more (if they are not on the node, they can't experience the
//! ABA problem on it). After that, we are allowed to use it again.
//!
//! This doesn't block the reader, it'll simply find *a* node next time ‒ this one, or possibly a
//! different (or new) one.
//!
//! # Orderings
//!
//! The linked lists/nodes are already provided for us. So we just need to make sure the debt
//! juggling is done right. We assume that the local node is ours to write to (others have only
//! limited right to write to certain fields under certain conditions) and that we are counted into
//! active writers while we dig through it on the writer end.
//!
//! We use SeqCst on a read-write operation both here at the very start of the sequence (storing
//! the generation into the control) and in the writer on the actual pointer. That establishes a
//! relation of what has happened first.
//!
//! After that we split the time into segments by read-write operations with AcqRel read-write
//! operations on the control. There's just one control in play for both threads so we don't need
//! SeqCst and the segments are understood by both the same way. The writer can sometimes use only
//! load-Acquire on that, because it needs to only read from data written by the reader. It'll
//! always see data from at least the segment before the observed control value and uses CaS to
//! send the results back, so it can't go into the past.
//!
//! There are two little gotchas:
//!
//! * When we read the address we should be loading from, we need to give up if the address does
//! not match (we can't simply load from there, because it can be dangling by that point and we
//! don't know its type, so we need to use our address for all loading ‒ and we just check they
//! match). If we give up, we don't do that CaS into control, therefore we could have given up on
//! newer address than the control we have read. For that reason, the address is also stored by
//! reader with Release and we read it with Acquire, which'll bring an up to date version of
//! control into our thread ‒ and we re-read that one to confirm the address is indeed between
//! two same values holding the generation, therefore corresponding to it.
//! * The destructor doesn't have a SeqCst in the writer, because there was no write in there.
//! That's OK. We need to ensure there are no new readers after the "change" we confirm in the
//! writer and that change is the destruction ‒ by that time, the destroying thread has exclusive
//! ownership and therefore there can be no new readers.
use std::cell::Cell;
use std::ptr;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use super::Debt;
use crate::RefCnt;
pub const REPLACEMENT_TAG: usize = 0b01;
pub const GEN_TAG: usize = 0b10;
pub const TAG_MASK: usize = 0b11;
pub const IDLE: usize = 0;
/// Thread local data for the helping strategy.
#[derive(Default)]
pub(super) struct Local {
// The generation counter.
generation: Cell<usize>,
}
// Make sure the pointers have 2 empty bits. Always.
#[derive(Default)]
#[repr(align(4))]
struct Handover(AtomicUsize);
/// The slots for the helping strategy.
pub(super) struct Slots {
/// The control structure of the slot.
///
/// Different threads signal what stage they are in in there. It can contain:
///
/// * `IDLE` (nothing is happening, and there may or may not be an active debt).
/// * a generation, tagged with GEN_TAG. The reader is trying to acquire a slot right now and a
/// writer might try to help out.
/// * A replacement pointer, tagged with REPLACEMENT_TAG. This pointer points to an Handover,
/// containing an already protected value, provided by the writer for the benefit of the
/// reader. The reader should abort its own debt and use this instead. This indirection
/// (storing pointer to the envelope with the actual pointer) is to make sure there's a space
/// for the tag ‒ there is no guarantee the real pointer is aligned to at least 4 bytes, we
/// can however force that for the Handover type.
control: AtomicUsize,
/// A possibly active debt.
slot: Debt,
/// If there's a generation in control, this signifies what address the reader is trying to
/// load from.
active_addr: AtomicUsize,
/// A place where a writer can put a replacement value.
///
/// Note that this is simply an allocation, and every participating slot contributes one, but
/// they may be passed around through the lifetime of the program. It is not accessed directly,
/// but through the space_offer thing.
///
handover: Handover,
/// A pointer to a handover envelope this node currently owns.
///
/// A writer makes a switch of its and readers handover when successfully storing a replacement
/// in the control.
space_offer: AtomicPtr<Handover>,
}
impl Default for Slots {
fn default() -> Self {
Slots {
control: AtomicUsize::new(IDLE),
slot: Debt::default(),
// Doesn't matter yet
active_addr: AtomicUsize::new(0),
// Also doesn't matter
handover: Handover::default(),
// Here we would like it to point to our handover. But for that we need to be in place
// in RAM (effectively pinned, though we use older Rust than Pin, possibly?), so not
// yet. See init().
space_offer: AtomicPtr::new(ptr::null_mut()),
}
}
}
impl Slots {
pub(super) fn slot(&self) -> &Debt {
&self.slot
}
pub(super) fn get_debt(&self, ptr: usize, local: &Local) -> (usize, bool) {
// Incrementing by 4 ensures we always have enough space for 2 bit of tags.
let gen = local.generation.get().wrapping_add(4);
debug_assert_eq!(gen & GEN_TAG, 0);
local.generation.set(gen);
// Signal the caller that the node should be sent to a cooldown.
let discard = gen == 0;
let gen = gen | GEN_TAG;
// We will sync by the write to the control. But we also sync the value of the previous
// generation/released slot. That way we may re-confirm in the writer that the reader is
// not in between here and the compare_exchange below with a stale gen (eg. if we are in
// here, the re-confirm there will load the NO_DEPT and we are fine).
self.active_addr.store(ptr, SeqCst);
// We are the only ones allowed to do the IDLE -> * transition and we never leave it in
// anything else after an transaction, so this is OK. But we still need a load-store SeqCst
// operation here to form a relation between this and the store of the actual pointer in
// the writer thread :-(.
let prev = self.control.swap(gen, SeqCst);
debug_assert_eq!(IDLE, prev, "Left control in wrong state");
(gen, discard)
}
pub(super) fn help<R, T>(&self, who: &Self, storage_addr: usize, replacement: &R)
where
T: RefCnt,
R: Fn() -> T,
{
debug_assert_eq!(IDLE, self.control.load(Relaxed));
// Also acquires the auxiliary data in other variables.
let mut control = who.control.load(SeqCst);
loop {
match control & TAG_MASK {
// Nothing to help with
IDLE if control == IDLE => break,
// Someone has already helped out with that, so we have nothing to do here
REPLACEMENT_TAG => break,
// Something is going on, let's have a better look.
GEN_TAG => {
debug_assert!(
!ptr::eq(self, who),
"Refusing to help myself, makes no sense"
);
// Get the address that other thread is trying to load from. By that acquire,
// we also sync the control into our thread once more and reconfirm that the
// value of the active_addr is in between two same instances, therefore up to
// date to it.
let active_addr = who.active_addr.load(SeqCst);
if active_addr != storage_addr {
// Acquire for the same reason as on the top.
let new_control = who.control.load(SeqCst);
if new_control == control {
// The other thread is doing something, but to some other ArcSwap, so
// we don't care. Cool, done.
break;
} else {
// The control just changed under our hands, we don't know what to
// trust, so retry.
control = new_control;
continue;
}
}
// Now we know this work is for us. Try to create a replacement and offer it.
// This actually does a full-featured load under the hood, but we are currently
// idle and the load doesn't re-enter write, so that's all fine.
let replacement = replacement();
let replace_addr = T::as_ptr(&replacement) as usize;
// If we succeed in helping the other thread, we take their empty space in
// return for us that we pass to them. It's already there, the value is synced
// to us by Acquire on control.
let their_space = who.space_offer.load(SeqCst);
// Relaxed is fine, our own thread and nobody but us writes in here.
let my_space = self.space_offer.load(SeqCst);
// Relaxed is fine, we'll sync by the next compare-exchange. If we don't, the
// value won't ever be read anyway.
unsafe {
(*my_space).0.store(replace_addr, SeqCst);
}
// Ensured by the align annotation at the type.
assert_eq!(my_space as usize & TAG_MASK, 0);
let space_addr = (my_space as usize) | REPLACEMENT_TAG;
// Acquire on failure -> same reason as at the top, reading the value.
// Release on success -> we send data to that thread through here. Must be
// AcqRel, because success must be superset of failure. Also, load to get their
// space (it won't have changed, it does when the control is set to IDLE).
match who
.control
.compare_exchange(control, space_addr, SeqCst, SeqCst)
{
Ok(_) => {
// We have successfully sent our replacement out (Release) and got
// their space in return (Acquire on that load above).
self.space_offer.store(their_space, SeqCst);
// The ref count went with it, so forget about it here.
T::into_ptr(replacement);
// We have successfully helped out, so we are done.
break;
}
Err(new_control) => {
// Something has changed in between. Let's try again, nothing changed
// (the replacement will get dropped at the end of scope, we didn't do
// anything with the spaces, etc.
control = new_control;
}
}
}
_ => unreachable!("Invalid control value {:X}", control),
}
}
}
pub(super) fn init(&mut self) {
*self.space_offer.get_mut() = &mut self.handover;
}
pub(super) fn confirm(&self, gen: usize, ptr: usize) -> Result<(), usize> {
// Put the slot there and consider it acquire of a „lock“. For that we need swap, not store
// only (we need Acquire and Acquire works only on loads). Release is to make sure control
// is observable by the other thread (but that's probably not necessary anyway?)
let prev = self.slot.0.swap(ptr, SeqCst);
debug_assert_eq!(Debt::NONE, prev);
// Confirm by writing to the control (or discover that we got helped). We stop anyone else
// from helping by setting it to IDLE.
let control = self.control.swap(IDLE, SeqCst);
if control == gen {
// Nobody interfered, we have our debt in place and can proceed.
Ok(())
} else {
// Someone put a replacement in there.
debug_assert_eq!(control & TAG_MASK, REPLACEMENT_TAG);
let handover = (control & !TAG_MASK) as *mut Handover;
let replacement = unsafe { &*handover }.0.load(SeqCst);
// Make sure we advertise the right envelope when we set it to generation next time.
self.space_offer.store(handover, SeqCst);
// Note we've left the debt in place. The caller should pay it back (without ever
// taking advantage of it) to make sure any extra is actually dropped (it is possible
// someone provided the replacement *and* paid the debt and we need just one of them).
Err(replacement)
}
}
}