1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
//! Slots and global/thread local data for the Helping strategy.
//!
//! This is inspired (but not an exact copy) of
//! <https://pvk.ca/Blog/2020/07/07/flatter-wait-free-hazard-pointers/>. The debts are mostly
//! copies of the ones used by the hybrid strategy, but modified a bit. Just like in the hybrid
//! strategy, in case the slots run out or when the writer updates the value, the debts are paid by
//! incrementing the ref count (which is a little slower, but still wait-free/lock-free and still
//! in order of nanoseconds).
//!
//! ## Reader, the fast path
//!
//! * Publish an active address ‒ the address we'll be loading stuff from.
//! * Puts a generation into the control.
//! * Loads the pointer and puts it to the debt slot.
//! * Confirms by CaS-replacing the generation back to idle state.
//!
//! * Later, we pay it back by CaS-replacing it with the NO_DEPT (like any other slot).
//!
//! ## Writer, the non-colliding path
//!
//! * Replaces the pointer in the storage.
//! * The writer walks over all debts. It pays each debt that it is concerned with by bumping the
//!   reference and replacing the dept with NO_DEPT. The relevant reader will fail in the CaS
//!   (either because it finds NO_DEPT or other pointer in there) and knows the reference was
//!   bumped, so it needs to decrement it. Note that it is possible that someone also reuses the
//!   slot for the _same_ pointer. In that case that reader will set it to NO_DEPT and the newer
//!   reader will have a pre-paid debt, which is fine.
//!
//! ## The collision path
//!
//! The reservation of a slot is not atomic, therefore a writer can observe the reservation in
//! progress. But it doesn't want to wait for it to complete (it wants to be lock-free, which means
//! it needs to be able to resolve the situation on its own).
//!
//! The way it knows it is in progress of the reservation is by seeing a generation in there (it has
//! a distinct tag). In that case it'll try to:
//!
//! * First verify that the reservation is being done for the same address it modified, by reading
//!   and re-confirming the active_addr slot corresponding to the currently handled node. If it is
//!   for some other address, the writer doesn't have to be concerned and proceeds to the next slot.
//! * It does a full load. That is fine, because the writer must be on a different thread than the
//!   reader and therefore there is at least one free slot. Full load means paying the debt right
//!   away by incrementing the reference count.
//! * Then it tries to pass the already fully protected/paid pointer to the reader. It writes it to
//!   an envelope and CaS-replaces it into the control, instead of the generation (if it fails,
//!   someone has been faster and it rolls back). We need the envelope because the pointer itself
//!   doesn't have to be aligned to 4 bytes and we need the space for tags to distinguish the types
//!   of info in control; we can ensure the envelope is).
//! * The reader then finds the generation got replaced by a pointer to the envelope and uses that
//!   pointer inside the envelope. It aborts its own debt. This effectively exchanges the envelopes
//!   between the threads so each one has an envelope ready for future.
//!
//! ## ABA protection
//!
//! The generation as pre-reserving the slot allows the writer to make sure it is offering the
//! loaded pointer to the same reader and that the read value is new enough (and of the same type).
//!
//! This solves the general case, but there's also much less frequent but theoretical ABA problem
//! that could lead to UB, if left unsolved:
//!
//! * There is a collision on generation G.
//! * The writer loads a pointer, bumps it.
//! * In the meantime, all the 2^30 or 2^62 generations (depending on the usize width) generations
//!   wrap around.
//! * The writer stores the outdated and possibly different-typed pointer in there and the reader
//!   uses it.
//!
//! To mitigate that, every time the counter overflows we take the current node and un-assign it
//! from our current thread. We mark it as in "cooldown" and let it in there until there are no
//! writers messing with that node any more (if they are not on the node, they can't experience the
//! ABA problem on it). After that, we are allowed to use it again.
//!
//! This doesn't block the reader, it'll simply find *a* node next time ‒ this one, or possibly a
//! different (or new) one.
//!
//! # Orderings
//!
//! The linked lists/nodes are already provided for us. So we just need to make sure the debt
//! juggling is done right. We assume that the local node is ours to write to (others have only
//! limited right to write to certain fields under certain conditions) and that we are counted into
//! active writers while we dig through it on the writer end.
//!
//! We use SeqCst on a read-write operation both here at the very start of the sequence (storing
//! the generation into the control) and in the writer on the actual pointer. That establishes a
//! relation of what has happened first.
//!
//! After that we split the time into segments by read-write operations with AcqRel read-write
//! operations on the control. There's just one control in play for both threads so we don't need
//! SeqCst and the segments are understood by both the same way. The writer can sometimes use only
//! load-Acquire on that, because it needs to only read from data written by the reader. It'll
//! always see data from at least the segment before the observed control value and uses CaS to
//! send the results back, so it can't go into the past.
//!
//! There are two little gotchas:
//!
//! * When we read the address we should be loading from, we need to give up if the address does
//!   not match (we can't simply load from there, because it can be dangling by that point and we
//!   don't know its type, so we need to use our address for all loading ‒ and we just check they
//!   match). If we give up, we don't do that CaS into control, therefore we could have given up on
//!   newer address than the control we have read. For that reason, the address is also stored by
//!   reader with Release and we read it with Acquire, which'll bring an up to date version of
//!   control into our thread ‒ and we re-read that one to confirm the address is indeed between
//!   two same values holding the generation, therefore corresponding to it.
//! * The destructor doesn't have a SeqCst in the writer, because there was no write in there.
//!   That's OK. We need to ensure there are no new readers after the "change" we confirm in the
//!   writer and that change is the destruction ‒ by that time, the destroying thread has exclusive
//!   ownership and therefore there can be no new readers.

use std::cell::Cell;
use std::ptr;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicPtr, AtomicUsize};

use super::Debt;
use crate::RefCnt;

pub const REPLACEMENT_TAG: usize = 0b01;
pub const GEN_TAG: usize = 0b10;
pub const TAG_MASK: usize = 0b11;
pub const IDLE: usize = 0;

/// Thread local data for the helping strategy.
#[derive(Default)]
pub(super) struct Local {
    // The generation counter.
    generation: Cell<usize>,
}

// Make sure the pointers have 2 empty bits. Always.
#[derive(Default)]
#[repr(align(4))]
struct Handover(AtomicUsize);

/// The slots for the helping strategy.
pub(super) struct Slots {
    /// The control structure of the slot.
    ///
    /// Different threads signal what stage they are in in there. It can contain:
    ///
    /// * `IDLE` (nothing is happening, and there may or may not be an active debt).
    /// * a generation, tagged with GEN_TAG. The reader is trying to acquire a slot right now and a
    ///   writer might try to help out.
    /// * A replacement pointer, tagged with REPLACEMENT_TAG. This pointer points to an Handover,
    ///   containing an already protected value, provided by the writer for the benefit of the
    ///   reader. The reader should abort its own debt and use this instead. This indirection
    ///   (storing pointer to the envelope with the actual pointer) is to make sure there's a space
    ///   for the tag ‒ there is no guarantee the real pointer is aligned to at least 4 bytes, we
    ///   can however force that for the Handover type.
    control: AtomicUsize,
    /// A possibly active debt.
    slot: Debt,
    /// If there's a generation in control, this signifies what address the reader is trying to
    /// load from.
    active_addr: AtomicUsize,
    /// A place where a writer can put a replacement value.
    ///
    /// Note that this is simply an allocation, and every participating slot contributes one, but
    /// they may be passed around through the lifetime of the program. It is not accessed directly,
    /// but through the space_offer thing.
    ///
    handover: Handover,
    /// A pointer to a handover envelope this node currently owns.
    ///
    /// A writer makes a switch of its and readers handover when successfully storing a replacement
    /// in the control.
    space_offer: AtomicPtr<Handover>,
}

impl Default for Slots {
    fn default() -> Self {
        Slots {
            control: AtomicUsize::new(IDLE),
            slot: Debt::default(),
            // Doesn't matter yet
            active_addr: AtomicUsize::new(0),
            // Also doesn't matter
            handover: Handover::default(),
            // Here we would like it to point to our handover. But for that we need to be in place
            // in RAM (effectively pinned, though we use older Rust than Pin, possibly?), so not
            // yet. See init().
            space_offer: AtomicPtr::new(ptr::null_mut()),
        }
    }
}

impl Slots {
    pub(super) fn slot(&self) -> &Debt {
        &self.slot
    }

    pub(super) fn get_debt(&self, ptr: usize, local: &Local) -> (usize, bool) {
        // Incrementing by 4 ensures we always have enough space for 2 bit of tags.
        let gen = local.generation.get().wrapping_add(4);
        debug_assert_eq!(gen & GEN_TAG, 0);
        local.generation.set(gen);
        // Signal the caller that the node should be sent to a cooldown.
        let discard = gen == 0;
        let gen = gen | GEN_TAG;
        // We will sync by the write to the control. But we also sync the value of the previous
        // generation/released slot. That way we may re-confirm in the writer that the reader is
        // not in between here and the compare_exchange below with a stale gen (eg. if we are in
        // here, the re-confirm there will load the NO_DEPT and we are fine).
        self.active_addr.store(ptr, SeqCst);

        // We are the only ones allowed to do the IDLE -> * transition and we never leave it in
        // anything else after an transaction, so this is OK. But we still need a load-store SeqCst
        // operation here to form a relation between this and the store of the actual pointer in
        // the writer thread :-(.
        let prev = self.control.swap(gen, SeqCst);
        debug_assert_eq!(IDLE, prev, "Left control in wrong state");

        (gen, discard)
    }

    pub(super) fn help<R, T>(&self, who: &Self, storage_addr: usize, replacement: &R)
    where
        T: RefCnt,
        R: Fn() -> T,
    {
        debug_assert_eq!(IDLE, self.control.load(Relaxed));
        // Also acquires the auxiliary data in other variables.
        let mut control = who.control.load(SeqCst);
        loop {
            match control & TAG_MASK {
                // Nothing to help with
                IDLE if control == IDLE => break,
                // Someone has already helped out with that, so we have nothing to do here
                REPLACEMENT_TAG => break,
                // Something is going on, let's have a better look.
                GEN_TAG => {
                    debug_assert!(
                        !ptr::eq(self, who),
                        "Refusing to help myself, makes no sense"
                    );
                    // Get the address that other thread is trying to load from. By that acquire,
                    // we also sync the control into our thread once more and reconfirm that the
                    // value of the active_addr is in between two same instances, therefore up to
                    // date to it.
                    let active_addr = who.active_addr.load(SeqCst);
                    if active_addr != storage_addr {
                        // Acquire for the same reason as on the top.
                        let new_control = who.control.load(SeqCst);
                        if new_control == control {
                            // The other thread is doing something, but to some other ArcSwap, so
                            // we don't care. Cool, done.
                            break;
                        } else {
                            // The control just changed under our hands, we don't know what to
                            // trust, so retry.
                            control = new_control;
                            continue;
                        }
                    }

                    // Now we know this work is for us. Try to create a replacement and offer it.
                    // This actually does a full-featured load under the hood, but we are currently
                    // idle and the load doesn't re-enter write, so that's all fine.
                    let replacement = replacement();
                    let replace_addr = T::as_ptr(&replacement) as usize;
                    // If we succeed in helping the other thread, we take their empty space in
                    // return for us that we pass to them. It's already there, the value is synced
                    // to us by Acquire on control.
                    let their_space = who.space_offer.load(SeqCst);
                    // Relaxed is fine, our own thread and nobody but us writes in here.
                    let my_space = self.space_offer.load(SeqCst);
                    // Relaxed is fine, we'll sync by the next compare-exchange. If we don't, the
                    // value won't ever be read anyway.
                    unsafe {
                        (*my_space).0.store(replace_addr, SeqCst);
                    }
                    // Ensured by the align annotation at the type.
                    assert_eq!(my_space as usize & TAG_MASK, 0);
                    let space_addr = (my_space as usize) | REPLACEMENT_TAG;
                    // Acquire on failure -> same reason as at the top, reading the value.
                    // Release on success -> we send data to that thread through here. Must be
                    // AcqRel, because success must be superset of failure. Also, load to get their
                    // space (it won't have changed, it does when the control is set to IDLE).
                    match who
                        .control
                        .compare_exchange(control, space_addr, SeqCst, SeqCst)
                    {
                        Ok(_) => {
                            // We have successfully sent our replacement out (Release) and got
                            // their space in return (Acquire on that load above).
                            self.space_offer.store(their_space, SeqCst);
                            // The ref count went with it, so forget about it here.
                            T::into_ptr(replacement);
                            // We have successfully helped out, so we are done.
                            break;
                        }
                        Err(new_control) => {
                            // Something has changed in between. Let's try again, nothing changed
                            // (the replacement will get dropped at the end of scope, we didn't do
                            // anything with the spaces, etc.
                            control = new_control;
                        }
                    }
                }
                _ => unreachable!("Invalid control value {:X}", control),
            }
        }
    }

    pub(super) fn init(&mut self) {
        *self.space_offer.get_mut() = &mut self.handover;
    }

    pub(super) fn confirm(&self, gen: usize, ptr: usize) -> Result<(), usize> {
        // Put the slot there and consider it acquire of a „lock“. For that we need swap, not store
        // only (we need Acquire and Acquire works only on loads). Release is to make sure control
        // is observable by the other thread (but that's probably not necessary anyway?)
        let prev = self.slot.0.swap(ptr, SeqCst);
        debug_assert_eq!(Debt::NONE, prev);

        // Confirm by writing to the control (or discover that we got helped). We stop anyone else
        // from helping by setting it to IDLE.
        let control = self.control.swap(IDLE, SeqCst);
        if control == gen {
            // Nobody interfered, we have our debt in place and can proceed.
            Ok(())
        } else {
            // Someone put a replacement in there.
            debug_assert_eq!(control & TAG_MASK, REPLACEMENT_TAG);
            let handover = (control & !TAG_MASK) as *mut Handover;
            let replacement = unsafe { &*handover }.0.load(SeqCst);
            // Make sure we advertise the right envelope when we set it to generation next time.
            self.space_offer.store(handover, SeqCst);
            // Note we've left the debt in place. The caller should pay it back (without ever
            // taking advantage of it) to make sure any extra is actually dropped (it is possible
            // someone provided the replacement *and* paid the debt and we need just one of them).
            Err(replacement)
        }
    }
}